Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 197 additions & 141 deletions datafusion/core/benches/topk_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async fn aggregate_string(
ctx: SessionContext,
limit: usize,
use_topk: bool,
) -> Result<()> {
) -> Result<Vec<RecordBatch>> {
let sql = format!(
"select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};"
);
Expand All @@ -204,7 +204,7 @@ async fn aggregate_string(
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), LIMIT);

Ok(())
Ok(batches)
}

async fn aggregate_distinct(
Expand Down Expand Up @@ -285,157 +285,213 @@ async fn aggregate_distinct(
Ok(())
}

fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;

let ctx = rt
.block_on(create_context(partitions, samples, false, false, false))
.unwrap();
c.bench_function(
format!("aggregate {} time-series rows", partitions * samples).as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
);

let ctx = rt
.block_on(create_context(partitions, samples, true, false, false))
.unwrap();
c.bench_function(
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
);
struct BenchCase<'a> {
name_tpl: &'a str,
asc: bool,
use_topk: bool,
use_view: bool,
}

let ctx = rt
.block_on(create_context(partitions, samples, false, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
);
struct StringCase {
asc: bool,
use_topk: bool,
use_view: bool,
}

let ctx = rt
.block_on(create_context(partitions, samples, true, true, false))
fn assert_utf8_utf8view_match(
rt: &Runtime,
partitions: i32,
samples: i32,
limit: usize,
asc: bool,
use_topk: bool,
) {
let ctx_utf8 = rt
.block_on(create_context(partitions, samples, asc, use_topk, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
);

// Utf8View schema,time-series rows
let ctx = rt
.block_on(create_context(partitions, samples, false, true, true))
let ctx_view = rt
.block_on(create_context(partitions, samples, asc, use_topk, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
);

// Utf8View schema,worst-case rows
let ctx = rt
.block_on(create_context(partitions, samples, true, true, true))
let batches_utf8 = rt
.block_on(aggregate_string(ctx_utf8, limit, use_topk))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
);

// String aggregate benchmarks - grouping by timestamp, aggregating string column
let ctx = rt
.block_on(create_context(partitions, samples, false, true, false))
let batches_view = rt
.block_on(aggregate_string(ctx_view, limit, use_topk))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} time-series rows [Utf8]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
let result_utf8 = pretty_format_batches(&batches_utf8).unwrap().to_string();
let result_view = pretty_format_batches(&batches_view).unwrap().to_string();
assert_eq!(
result_utf8, result_view,
"Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}"
);
}

let ctx = rt
.block_on(create_context(partitions, samples, true, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} worst-case rows [Utf8]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);
fn assert_string_results_match(
rt: &Runtime,
partitions: i32,
samples: i32,
limit: usize,
) {
for asc in [false, true] {
for use_topk in [false, true] {
assert_utf8_utf8view_match(rt, partitions, samples, limit, asc, use_topk);
}
}
}

let ctx = rt
.block_on(create_context(partitions, samples, false, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} time-series rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);
fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
let total_rows = partitions * samples;

// Numeric aggregate benchmarks
let numeric_cases = &[
BenchCase {
name_tpl: "aggregate {rows} time-series rows",
asc: false,
use_topk: false,
use_view: false,
},
BenchCase {
name_tpl: "aggregate {rows} worst-case rows",
asc: true,
use_topk: false,
use_view: false,
},
BenchCase {
name_tpl: "top k={limit} aggregate {rows} time-series rows",
asc: false,
use_topk: true,
use_view: false,
},
BenchCase {
name_tpl: "top k={limit} aggregate {rows} worst-case rows",
asc: true,
use_topk: true,
use_view: false,
},
BenchCase {
name_tpl: "top k={limit} aggregate {rows} time-series rows [Utf8View]",
asc: false,
use_topk: true,
use_view: true,
},
BenchCase {
name_tpl: "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
asc: true,
use_topk: true,
use_view: true,
},
];
for case in numeric_cases {
let name = case
.name_tpl
.replace("{rows}", &total_rows.to_string())
.replace("{limit}", &limit.to_string());
let ctx = rt
.block_on(create_context(
partitions,
samples,
case.asc,
case.use_topk,
case.use_view,
))
.unwrap();
c.bench_function(&name, |b| {
b.iter(|| run(&rt, ctx.clone(), limit, case.use_topk, case.asc))
});
}

let ctx = rt
.block_on(create_context(partitions, samples, true, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} worst-case rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);
assert_string_results_match(&rt, partitions, samples, limit);

let string_cases = &[
StringCase {
asc: false,
use_topk: false,
use_view: false,
},
StringCase {
asc: true,
use_topk: false,
use_view: false,
},
StringCase {
asc: false,
use_topk: false,
use_view: true,
},
StringCase {
asc: true,
use_topk: false,
use_view: true,
},
StringCase {
asc: false,
use_topk: true,
use_view: false,
},
StringCase {
asc: true,
use_topk: true,
use_view: false,
},
StringCase {
asc: false,
use_topk: true,
use_view: true,
},
StringCase {
asc: true,
use_topk: true,
use_view: true,
},
];
for case in string_cases {
let scenario = if case.asc {
"worst-case"
} else {
"time-series"
};
let type_label = if case.use_view { "Utf8View" } else { "Utf8" };
let name = if case.use_topk {
format!(
"top k={limit} string aggregate {total_rows} {scenario} rows [{type_label}]"
)
} else {
format!("string aggregate {total_rows} {scenario} rows [{type_label}]")
};
let ctx = rt
.block_on(create_context(
partitions,
samples,
case.asc,
case.use_topk,
case.use_view,
))
.unwrap();
c.bench_function(&name, |b| {
b.iter(|| run_string(&rt, ctx.clone(), limit, case.use_topk))
});
}

// DISTINCT benchmarks
let ctx = rt.block_on(async {
create_context_distinct(partitions, samples, false)
.await
.unwrap()
});
c.bench_function(
format!("distinct {} rows desc [no TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)),
);

c.bench_function(
format!("distinct {} rows asc [no TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)),
);

let ctx_topk = rt.block_on(async {
create_context_distinct(partitions, samples, true)
.await
.unwrap()
});
c.bench_function(
format!("distinct {} rows desc [TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)),
);

c.bench_function(
format!("distinct {} rows asc [TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)),
);
for use_topk in [false, true] {
let ctx = rt.block_on(async {
create_context_distinct(partitions, samples, use_topk)
.await
.unwrap()
});
let topk_label = if use_topk { "TopK" } else { "no TopK" };
for asc in [false, true] {
let dir = if asc { "asc" } else { "desc" };
let name = format!("distinct {total_rows} rows {dir} [{topk_label}]");
c.bench_function(&name, |b| {
b.iter(|| run_distinct(&rt, ctx.clone(), limit, use_topk, asc))
});
}
}
}

criterion_group!(benches, criterion_benchmark);
Expand Down
Loading