From ff4d50f8cdfa22c15244c6c3110d41e876286587 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 19 Mar 2026 14:34:32 -0400 Subject: [PATCH 1/7] Add benchmark for approx_distinct with short strings --- .../benches/approx_distinct.rs | 61 +++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 538103d991f1f..9c22194e0384c 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -28,7 +28,8 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; const BATCH_SIZE: usize = 8192; -const STRING_LENGTH: usize = 20; +const SHORT_STRING_LENGTH: usize = 8; +const LONG_STRING_LENGTH: usize = 20; fn prepare_accumulator(data_type: DataType) -> Box { let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); @@ -55,12 +56,12 @@ fn create_i64_array(n_distinct: usize) -> Int64Array { .collect() } -/// Creates a pool of `n_distinct` random strings. -fn create_string_pool(n_distinct: usize) -> Vec { +/// Creates a pool of `n_distinct` random strings of the given length. +fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); (0..n_distinct) .map(|_| { - (0..STRING_LENGTH) + (0..string_length) .map(|_| rng.random_range(b'a'..=b'z') as char) .collect() }) @@ -98,29 +99,39 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }) }); - let string_pool = create_string_pool(n_distinct); + for (label, str_len) in + [("short", SHORT_STRING_LENGTH), ("long", LONG_STRING_LENGTH)] + { + let string_pool = create_string_pool(n_distinct, str_len); - // --- Utf8 benchmarks --- - let values = Arc::new(create_string_array(&string_pool)) as ArrayRef; - c.bench_function(&format!("approx_distinct utf8 {pct}% distinct"), |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Utf8); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); + // --- Utf8 benchmarks --- + let values = Arc::new(create_string_array(&string_pool)) as ArrayRef; + c.bench_function( + &format!("approx_distinct utf8 {label} {pct}% distinct"), + |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Utf8); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }, + ); - // --- Utf8View benchmarks --- - let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef; - c.bench_function(&format!("approx_distinct utf8view {pct}% distinct"), |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Utf8View); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); + // --- Utf8View benchmarks --- + let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef; + c.bench_function( + &format!("approx_distinct utf8view {label} {pct}% distinct"), + |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Utf8View); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }, + ); + } } } From c1d30f906a138cc8b9773b70a40cfcca70eef952 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 19 Mar 2026 14:35:04 -0400 Subject: [PATCH 2/7] Implement `add_hashed` for HyperLogLog --- .../functions-aggregate/src/hyperloglog.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/hyperloglog.rs b/datafusion/functions-aggregate/src/hyperloglog.rs index 3a467a811176d..3861800847edb 100644 --- a/datafusion/functions-aggregate/src/hyperloglog.rs +++ b/datafusion/functions-aggregate/src/hyperloglog.rs @@ -58,10 +58,11 @@ where /// Fixed seed for the hashing so that values are consistent across runs /// /// Note that when we later move on to have serialized HLL register binaries -/// shared across cluster, this SEED will have to be consistent across all +/// shared across cluster, this HLL_HASH_STATE will have to be consistent across all /// parties otherwise we might have corruption. So ideally for later this seed /// shall be part of the serialized form (or stay unchanged across versions). -const SEED: foldhash::quality::FixedState = foldhash::quality::FixedState::with_seed(0); +pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState = + foldhash::quality::FixedState::with_seed(0); impl Default for HyperLogLog where @@ -97,12 +98,21 @@ where /// reasonable performance. #[inline] fn hash_value(&self, obj: &T) -> u64 { - SEED.hash_one(obj) + HLL_HASH_STATE.hash_one(obj) } /// Adds an element to the HyperLogLog. pub fn add(&mut self, obj: &T) { let hash = self.hash_value(obj); + self.add_hashed(hash); + } + + /// Adds a pre-computed hash value directly to the HyperLogLog. + /// + /// The hash should be computed using [`HLL_HASH_STATE`], the same hasher used + /// by [`Self::add`]. + #[inline] + pub(crate) fn add_hashed(&mut self, hash: u64) { let index = (hash & HLL_P_MASK) as usize; let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1; self.registers[index] = self.registers[index].max(p as u8); From cccd3624f9bb45887243804f186d3ced3ee7ca19 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 19 Mar 2026 14:35:17 -0400 Subject: [PATCH 3/7] Optimize approx_distinct to hash short strings directly --- .../src/approx_distinct.rs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 9cebd3e8518a0..0a806b31cc787 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -17,8 +17,8 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use crate::hyperloglog::HyperLogLog; -use arrow::array::{BinaryArray, StringViewArray}; +use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; +use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -44,7 +44,7 @@ use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator; use datafusion_macros::user_doc; use std::any::Any; use std::fmt::{Debug, Formatter}; -use std::hash::Hash; +use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; make_udaf_expr_and_func!( @@ -212,8 +212,19 @@ where impl Accumulator for StringViewHLLAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &StringViewArray = downcast_value!(values[0], StringViewArray); - // flatten because we would skip nulls - self.hll.extend(array.iter().flatten()); + + // When all strings are stored inline in the StringView (≤ 12 bytes), + // hash the raw u128 view directly instead of materializing a &str. + if array.data_buffers().is_empty() { + for (i, &view) in array.views().iter().enumerate() { + if !array.is_null(i) { + self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); + } + } + } else { + self.hll.extend(array.iter().flatten()); + } + Ok(()) } From ef1f173e416943af1464a4e7252c326e2734cf27 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Fri, 20 Mar 2026 10:48:27 -0400 Subject: [PATCH 4/7] Per-string short-string opt, access data buffer directly --- .../src/approx_distinct.rs | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 0a806b31cc787..6c34e5de58fe2 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, BinaryArray, StringViewArray}; +use arrow::array::{Array, BinaryArray, ByteView, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -213,16 +213,43 @@ impl Accumulator for StringViewHLLAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &StringViewArray = downcast_value!(values[0], StringViewArray); - // When all strings are stored inline in the StringView (≤ 12 bytes), - // hash the raw u128 view directly instead of materializing a &str. + // When strings are stored inline in the StringView (≤ 12 bytes), hash + // the raw u128 view directly. Whether a string is inline is determined + // solely by its length, so the same value always takes the same code + // path and gets the same hash. if array.data_buffers().is_empty() { + // All strings are inline — skip per-element length check for (i, &view) in array.views().iter().enumerate() { if !array.is_null(i) { self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); } } } else { - self.hll.extend(array.iter().flatten()); + // At least some strings stored out-of-line + let buffers = array.data_buffers(); + for (i, &view) in array.views().iter().enumerate() { + if array.is_null(i) { + continue; + } + let view_len = view as u32; + if view_len <= 12 { + self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); + } else { + // For out-of-line strings, it is faster to hash the bytes + // in the appropriate data buffer directly, rather than + // constructing a &str to call self.hll.add() + let bv = ByteView::from(view); + let offset = bv.offset as usize; + // SAFETY: view came from the array so buffer_index + // and offset..offset+len are in bounds. + let bytes = unsafe { + buffers + .get_unchecked(bv.buffer_index as usize) + .get_unchecked(offset..offset + view_len as usize) + }; + self.hll.add_hashed(HLL_HASH_STATE.hash_one(bytes)); + } + } } Ok(()) From 893fc067a0c337b09a67d7ec662b5a218fb64095 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Fri, 20 Mar 2026 10:50:25 -0400 Subject: [PATCH 5/7] Tweak comment --- datafusion/functions-aggregate/src/approx_distinct.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 6c34e5de58fe2..3515380ff7c96 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -240,8 +240,9 @@ impl Accumulator for StringViewHLLAccumulator { // constructing a &str to call self.hll.add() let bv = ByteView::from(view); let offset = bv.offset as usize; - // SAFETY: view came from the array so buffer_index - // and offset..offset+len are in bounds. + // SAFETY: StringViewArray guarantees that each view's + // buffer_index is a valid index into data_buffers(), and + // offset..offset+len falls within that buffer. let bytes = unsafe { buffers .get_unchecked(bv.buffer_index as usize) From dcce1692dd1973abd4ac7bcdcbafeeb88b7cedb5 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Fri, 20 Mar 2026 11:26:04 -0400 Subject: [PATCH 6/7] Revert "Tweak comment" This reverts commit 893fc067a0c337b09a67d7ec662b5a218fb64095. --- datafusion/functions-aggregate/src/approx_distinct.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 3515380ff7c96..6c34e5de58fe2 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -240,9 +240,8 @@ impl Accumulator for StringViewHLLAccumulator { // constructing a &str to call self.hll.add() let bv = ByteView::from(view); let offset = bv.offset as usize; - // SAFETY: StringViewArray guarantees that each view's - // buffer_index is a valid index into data_buffers(), and - // offset..offset+len falls within that buffer. + // SAFETY: view came from the array so buffer_index + // and offset..offset+len are in bounds. let bytes = unsafe { buffers .get_unchecked(bv.buffer_index as usize) From cc44b1eef368bd387a781c69cf4addd110e94e46 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Fri, 20 Mar 2026 11:26:12 -0400 Subject: [PATCH 7/7] Revert "Per-string short-string opt, access data buffer directly" This reverts commit ef1f173e416943af1464a4e7252c326e2734cf27. --- .../src/approx_distinct.rs | 35 +++---------------- 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 6c34e5de58fe2..0a806b31cc787 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, BinaryArray, ByteView, StringViewArray}; +use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -213,43 +213,16 @@ impl Accumulator for StringViewHLLAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &StringViewArray = downcast_value!(values[0], StringViewArray); - // When strings are stored inline in the StringView (≤ 12 bytes), hash - // the raw u128 view directly. Whether a string is inline is determined - // solely by its length, so the same value always takes the same code - // path and gets the same hash. + // When all strings are stored inline in the StringView (≤ 12 bytes), + // hash the raw u128 view directly instead of materializing a &str. if array.data_buffers().is_empty() { - // All strings are inline — skip per-element length check for (i, &view) in array.views().iter().enumerate() { if !array.is_null(i) { self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); } } } else { - // At least some strings stored out-of-line - let buffers = array.data_buffers(); - for (i, &view) in array.views().iter().enumerate() { - if array.is_null(i) { - continue; - } - let view_len = view as u32; - if view_len <= 12 { - self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); - } else { - // For out-of-line strings, it is faster to hash the bytes - // in the appropriate data buffer directly, rather than - // constructing a &str to call self.hll.add() - let bv = ByteView::from(view); - let offset = bv.offset as usize; - // SAFETY: view came from the array so buffer_index - // and offset..offset+len are in bounds. - let bytes = unsafe { - buffers - .get_unchecked(bv.buffer_index as usize) - .get_unchecked(offset..offset + view_len as usize) - }; - self.hll.add_hashed(HLL_HASH_STATE.hash_one(bytes)); - } - } + self.hll.extend(array.iter().flatten()); } Ok(())