Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions datafusion/functions-aggregate/benches/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

const BATCH_SIZE: usize = 8192;
const STRING_LENGTH: usize = 20;
const SHORT_STRING_LENGTH: usize = 8;
const LONG_STRING_LENGTH: usize = 20;

fn prepare_accumulator(data_type: DataType) -> Box<dyn Accumulator> {
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
Expand All @@ -55,12 +56,12 @@ fn create_i64_array(n_distinct: usize) -> Int64Array {
.collect()
}

/// Creates a pool of `n_distinct` random strings.
fn create_string_pool(n_distinct: usize) -> Vec<String> {
/// Creates a pool of `n_distinct` random strings of the given length.
fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec<String> {
let mut rng = StdRng::seed_from_u64(42);
(0..n_distinct)
.map(|_| {
(0..STRING_LENGTH)
(0..string_length)
.map(|_| rng.random_range(b'a'..=b'z') as char)
.collect()
})
Expand Down Expand Up @@ -98,29 +99,39 @@ fn approx_distinct_benchmark(c: &mut Criterion) {
})
});

let string_pool = create_string_pool(n_distinct);
for (label, str_len) in
[("short", SHORT_STRING_LENGTH), ("long", LONG_STRING_LENGTH)]
{
let string_pool = create_string_pool(n_distinct, str_len);

// --- Utf8 benchmarks ---
let values = Arc::new(create_string_array(&string_pool)) as ArrayRef;
c.bench_function(&format!("approx_distinct utf8 {pct}% distinct"), |b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Utf8);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
});
// --- Utf8 benchmarks ---
let values = Arc::new(create_string_array(&string_pool)) as ArrayRef;
c.bench_function(
&format!("approx_distinct utf8 {label} {pct}% distinct"),
|b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Utf8);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
},
);

// --- Utf8View benchmarks ---
let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef;
c.bench_function(&format!("approx_distinct utf8view {pct}% distinct"), |b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Utf8View);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
});
// --- Utf8View benchmarks ---
let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef;
c.bench_function(
&format!("approx_distinct utf8view {label} {pct}% distinct"),
|b| {
b.iter(|| {
let mut accumulator = prepare_accumulator(DataType::Utf8View);
accumulator
.update_batch(std::slice::from_ref(&values))
.unwrap()
})
},
);
}
}
}

Expand Down
21 changes: 16 additions & 5 deletions datafusion/functions-aggregate/src/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Defines physical expressions that can evaluated at runtime during query execution

use crate::hyperloglog::HyperLogLog;
use arrow::array::{BinaryArray, StringViewArray};
use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog};
use arrow::array::{Array, BinaryArray, StringViewArray};
use arrow::array::{
GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
Expand All @@ -44,7 +44,7 @@ use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator;
use datafusion_macros::user_doc;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

make_udaf_expr_and_func!(
Expand Down Expand Up @@ -212,8 +212,19 @@ where
impl Accumulator for StringViewHLLAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &StringViewArray = downcast_value!(values[0], StringViewArray);
// flatten because we would skip nulls
self.hll.extend(array.iter().flatten());

// When all strings are stored inline in the StringView (≤ 12 bytes),
// hash the raw u128 view directly instead of materializing a &str.
if array.data_buffers().is_empty() {
for (i, &view) in array.views().iter().enumerate() {
if !array.is_null(i) {
Copy link
Contributor

@Dandandan Dandandan Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, perhaps we can reuse/generalize hash_string_view_array_inner as well (passing a quality hash function instead)?
It has some more optimization for specializing on non-nulls, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at this, but I didn't see an obvious way to reuse/generalize this code?

I played around with accessing data_buffers directly (similar to how hash_string_view_array_inner does it) and computing the hash there for out-of-line strings (I pushed a commit for this), but it didn't seem like a huge win (so I pushed a revert for it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could be rewritten in terms of https://doc.rust-lang.org/std/hash/trait.BuildHasher.html 🤔

self.hll.add_hashed(HLL_HASH_STATE.hash_one(view));
}
}
} else {
self.hll.extend(array.iter().flatten());
}

Ok(())
}

Expand Down
16 changes: 13 additions & 3 deletions datafusion/functions-aggregate/src/hyperloglog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ where
/// Fixed seed for the hashing so that values are consistent across runs
///
/// Note that when we later move on to have serialized HLL register binaries
/// shared across cluster, this SEED will have to be consistent across all
/// shared across cluster, this HLL_HASH_STATE will have to be consistent across all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// parties otherwise we might have corruption. So ideally for later this seed
/// shall be part of the serialized form (or stay unchanged across versions).
const SEED: foldhash::quality::FixedState = foldhash::quality::FixedState::with_seed(0);
pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState =
foldhash::quality::FixedState::with_seed(0);

impl<T> Default for HyperLogLog<T>
where
Expand Down Expand Up @@ -97,12 +98,21 @@ where
/// reasonable performance.
#[inline]
fn hash_value(&self, obj: &T) -> u64 {
SEED.hash_one(obj)
HLL_HASH_STATE.hash_one(obj)
}

/// Adds an element to the HyperLogLog.
pub fn add(&mut self, obj: &T) {
let hash = self.hash_value(obj);
self.add_hashed(hash);
}

/// Adds a pre-computed hash value directly to the HyperLogLog.
///
/// The hash should be computed using [`HLL_HASH_STATE`], the same hasher used
/// by [`Self::add`].
#[inline]
pub(crate) fn add_hashed(&mut self, hash: u64) {
let index = (hash & HLL_P_MASK) as usize;
let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
self.registers[index] = self.registers[index].max(p as u8);
Expand Down
Loading