Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b9828ca
fix: optimize null predicate evaluation by early exit for non-restric…
kosiew Mar 16, 2026
88e4455
Add test case for a > b in join key predicates
kosiew Mar 16, 2026
df73001
Refactor column membership check into a helper function
kosiew Mar 16, 2026
144cab3
Clarify null-restricting behavior in filter check
kosiew Mar 16, 2026
9ef45f0
Simplify column check and null-restrict handling
kosiew Mar 16, 2026
3d3945c
refactor: streamline null predicate evaluation by introducing predica…
kosiew Mar 16, 2026
17009ae
Improve SQL boolean/null semantics handling
kosiew Mar 18, 2026
515da96
Add regression tests for SQL shape coverage
kosiew Mar 19, 2026
115b9f2
Refactor null predicate evaluation and add tests
kosiew Mar 19, 2026
38680f5
Refactor null handling in expression evaluation
kosiew Mar 19, 2026
ebd70ef
Test window scalar subquery optimizer delta
kosiew Mar 19, 2026
9665c63
Avoid join filter rewrite for scalar subqueries
kosiew Mar 19, 2026
00dcba4
new
kosiew Mar 19, 2026
12de382
Simplify null handling and filter push down logic
kosiew Mar 19, 2026
4fdeb7c
clippy fix
kosiew Mar 19, 2026
1d12172
Amend benchmark
kosiew Mar 20, 2026
2c5096d
Fix alias for scalar aggregate in push_down_filter
kosiew Mar 20, 2026
718cdf6
Refactor optimizer: remove test-only controls
kosiew Mar 20, 2026
deb6799
Add domain-specific evaluator for null restrictions
kosiew Mar 20, 2026
aa030ad
Improve null_restriction with documentation and evaluator
kosiew Mar 20, 2026
9cae403
Rename and clarify filter promotion function
kosiew Mar 20, 2026
11098c1
Refactor pushdown filter logic for clarity
kosiew Mar 20, 2026
84ca1b9
Add null restriction eval mode and test controls
kosiew Mar 20, 2026
fae294d
Refactor SQL planner for left join with filters
kosiew Mar 20, 2026
a7abb47
revert to main benchmark
kosiew Mar 20, 2026
69da463
Refactor null restriction logic in null_restriction.rs
kosiew Mar 20, 2026
72267e8
Make NullRestrictionEvalMode test-only
kosiew Mar 20, 2026
cb25006
Revert "revert to main benchmark"
kosiew Mar 20, 2026
0f15d15
amend benchmark
kosiew Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 132 additions & 126 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use criterion::{
BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main,
measurement::WallTime,
};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
Expand All @@ -27,12 +30,17 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::env;
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
use tokio::runtime::Runtime;

const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60];
const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3];
const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)];

// This benchmark suite is designed to test the performance of
// logical planning with a large plan containing unions, many columns
// with a variety of operations in it.
Expand Down Expand Up @@ -252,26 +260,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
query
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
let removed = ctx.remove_optimizer_rule("push_down_filter");
assert!(
removed,
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_case_heavy_left_join_query(predicate_count, case_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_non_case_left_join_query(
predicate_count: usize,
nesting_depth: usize,
Expand Down Expand Up @@ -304,10 +292,11 @@ fn build_non_case_left_join_query(
query
}

fn build_non_case_left_join_df_with_push_down_filter(
fn build_left_join_df_with_push_down_filter(
rt: &Runtime,
query_builder: impl Fn(usize, usize) -> String,
predicate_count: usize,
nesting_depth: usize,
depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
Expand All @@ -320,10 +309,103 @@ fn build_non_case_left_join_df_with_push_down_filter(
);
}

let query = build_non_case_left_join_query(predicate_count, nesting_depth);
let query = query_builder(predicate_count, depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
build_left_join_df_with_push_down_filter(
rt,
build_case_heavy_left_join_query,
predicate_count,
case_depth,
push_down_filter_enabled,
)
}

fn build_non_case_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
nesting_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
build_left_join_df_with_push_down_filter(
rt,
build_non_case_left_join_query,
predicate_count,
nesting_depth,
push_down_filter_enabled,
)
}

fn include_full_push_down_filter_sweep() -> bool {
env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP")
.map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

fn push_down_filter_sweep_points() -> Vec<(usize, usize)> {
if include_full_push_down_filter_sweep() {
FULL_DEPTH_SWEEP
.into_iter()
.flat_map(|depth| {
FULL_PREDICATE_SWEEP
.into_iter()
.map(move |predicate_count| (predicate_count, depth))
})
.collect()
} else {
DEFAULT_SWEEP_POINTS.to_vec()
}
}

fn bench_push_down_filter_ab<BuildFn>(
group: &mut BenchmarkGroup<'_, WallTime>,
rt: &Runtime,
sweep_points: &[(usize, usize)],
build_df: BuildFn,
) where
BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame,
{
for &(predicate_count, depth) in sweep_points {
let with_push_down_filter = build_df(rt, predicate_count, depth, true);
let without_push_down_filter = build_df(rt, predicate_count, depth, false);

let input_label = format!("predicates={predicate_count},nesting_depth={depth}");

group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);

group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
}
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
Expand All @@ -349,116 +431,40 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];
let sweep_points = push_down_filter_sweep_points();

let mut hotspot_group =
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
bench_push_down_filter_ab(
&mut hotspot_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_case_heavy_left_join_df_with_push_down_filter(
rt,
predicate_count,
depth,
enable,
)
},
);
hotspot_group.finish();

let mut control_group =
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
for nesting_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
bench_push_down_filter_ab(
&mut control_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_non_case_left_join_df_with_push_down_filter(
rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
depth,
enable,
)
},
);
control_group.finish();
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub mod create_drop;
pub mod explain_analyze;
pub mod joins;
mod path_partition;
mod push_down_filter_regressions;
mod runtime_config;
pub mod select;
mod sql_api;
Expand Down
Loading
Loading