diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d50404c8fc1e8..fb56170c2bb4e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -48,7 +48,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::utils::transpose; use datafusion_common::{ - ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, + ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, + internal_datafusion_err, internal_err, }; use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; @@ -421,6 +422,7 @@ enum BatchPartitionerState { exprs: Vec>, num_partitions: usize, hash_buffer: Vec, + indices: Vec>, }, RoundRobin { num_partitions: usize, @@ -453,6 +455,7 @@ impl BatchPartitioner { exprs, num_partitions, hash_buffer: vec![], + indices: vec![vec![]; num_partitions], }, timer, } @@ -562,6 +565,7 @@ impl BatchPartitioner { exprs, num_partitions: partitions, hash_buffer, + indices, } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); @@ -578,9 +582,7 @@ impl BatchPartitioner { hash_buffer, )?; - let mut indices: Vec<_> = (0..*partitions) - .map(|_| Vec::with_capacity(batch.num_rows())) - .collect(); + indices.iter_mut().for_each(|v| v.clear()); for (index, hash) in hash_buffer.iter().enumerate() { indices[(*hash % *partitions as u64) as usize].push(index as u32); @@ -591,22 +593,23 @@ impl BatchPartitioner { // Borrowing partitioner timer to prevent moving `self` to closure let partitioner_timer = &self.timer; - let it = indices - .into_iter() - .enumerate() - .filter_map(|(partition, indices)| { - let indices: PrimitiveArray = indices.into(); - (!indices.is_empty()).then_some((partition, indices)) - }) - .map(move |(partition, indices)| { + + let mut partitioned_batches = vec![]; + for (partition, p_indices) in indices.iter_mut().enumerate() { + if !p_indices.is_empty() { + let taken_indices = std::mem::take(p_indices); + let indices_array: PrimitiveArray = + taken_indices.into(); + // Tracking time required for repartitioned batches construction let _timer = partitioner_timer.timer(); // Produce batches based on indices - let columns = take_arrays(batch.columns(), &indices, None)?; + let columns = + take_arrays(batch.columns(), &indices_array, None)?; let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices.len())); + options = options.with_row_count(Some(indices_array.len())); let batch = RecordBatch::try_new_with_options( batch.schema(), columns, @@ -614,10 +617,22 @@ impl BatchPartitioner { ) .unwrap(); - Ok((partition, batch)) - }); + partitioned_batches.push(Ok((partition, batch))); + + // Return the taken vec + let (_, buffer, _) = indices_array.into_parts(); + let mut vec = + buffer.into_inner().into_vec::().map_err(|e| { + internal_datafusion_err!( + "Could not convert buffer to vec: {e:?}" + ) + })?; + vec.clear(); + *p_indices = vec; + } + } - Box::new(it) + Box::new(partitioned_batches.into_iter()) } };