Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ced42b5
Adds negative spark function
Jan 26, 2026
744ad95
Lint fix
Jan 26, 2026
897fbef
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 27, 2026
34819a0
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 27, 2026
76aa1d3
Apply suggestion
SubhamSinghal Jan 27, 2026
2901ca5
Adds slt file
Jan 27, 2026
e227109
Use as_primitive fn
Jan 27, 2026
0dfa277
Use inline macro
Jan 27, 2026
e4232d4
Remove scalar macro
Jan 27, 2026
78f0159
Adds decimal32 and 64 types
Jan 27, 2026
e86473a
Fix lint
Jan 27, 2026
f53316f
Fix UT
Jan 27, 2026
30f000b
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 27, 2026
89b3059
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 27, 2026
77f63af
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 27, 2026
ddd30d0
Adds interval type
Jan 27, 2026
9191b3a
Lint fix
Jan 27, 2026
da620bb
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 28, 2026
1b971c8
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 28, 2026
99cba0a
Adds arg
Jan 28, 2026
cb79d2c
Add unsigned values for scalar
Jan 28, 2026
aaed6d2
Adds UT for interval and unsigned type
Jan 28, 2026
6af34c4
Update datafusion/spark/src/function/math/negative.rs
SubhamSinghal Jan 29, 2026
b743145
Fix UT
Jan 29, 2026
67c4b40
Remove any signature type
Jan 29, 2026
431ff5a
Fix interval type test cases
Jan 29, 2026
11296c4
Throw error for unsigned int
Jan 30, 2026
2e9b72e
Fix UT
Jan 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/spark/src/function/math/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod expm1;
pub mod factorial;
pub mod hex;
pub mod modulus;
pub mod negative;
pub mod rint;
pub mod trigonometry;
pub mod unhex;
Expand All @@ -40,6 +41,7 @@ make_udf_function!(unhex::SparkUnhex, unhex);
make_udf_function!(width_bucket::SparkWidthBucket, width_bucket);
make_udf_function!(trigonometry::SparkCsc, csc);
make_udf_function!(trigonometry::SparkSec, sec);
make_udf_function!(negative::SparkNegative, negative);

pub mod expr_fn {
use datafusion_functions::export_functions;
Expand All @@ -63,6 +65,11 @@ pub mod expr_fn {
export_functions!((width_bucket, "Returns the bucket number into which the value of this expression would fall after being evaluated.", arg1 arg2 arg3 arg4));
export_functions!((csc, "Returns the cosecant of expr.", arg1));
export_functions!((sec, "Returns the secant of expr.", arg1));
export_functions!((
negative,
"Returns the negation of expr (unary minus).",
arg1
));
}

pub fn functions() -> Vec<Arc<ScalarUDF>> {
Expand All @@ -78,5 +85,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
width_bucket(),
csc(),
sec(),
negative(),
]
}
293 changes: 293 additions & 0 deletions datafusion/spark/src/function/math/negative.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::types::*;
use arrow::array::*;
use arrow::datatypes::{DataType, IntervalDayTime, IntervalMonthDayNano, IntervalUnit};
use bigdecimal::num_traits::WrappingNeg;
use datafusion_common::utils::take_function_args;
use datafusion_common::{Result, ScalarValue, not_impl_err};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// Spark-compatible `negative` expression
/// <https://spark.apache.org/docs/latest/api/sql/index.html#negative>
///
/// Returns the negation of input (equivalent to unary minus)
/// Returns NULL if input is NULL, returns NaN if input is NaN.
///
/// ANSI mode support see (<https://github.com/apache/datafusion/issues/20034>):
/// - Spark's ANSI-compliant dialect, when off (i.e. `spark.sql.ansi.enabled=false`),
/// negating the minimal value of a signed integer wraps around.
/// For example: negative(i32::MIN) returns i32::MIN (wraps instead of error).
/// This is the current implementation (legacy mode only).
/// - Spark's ANSI mode (when `spark.sql.ansi.enabled=true`) should throw an
/// ARITHMETIC_OVERFLOW error on integer overflow instead of wrapping.
/// This is not yet implemented - all operations currently use wrapping behavior.
///
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkNegative {
signature: Signature,
}

impl Default for SparkNegative {
fn default() -> Self {
Self::new()
}
}

impl SparkNegative {
pub fn new() -> Self {
Self {
signature: Signature {
type_signature: TypeSignature::OneOf(vec![
// Numeric types: signed integers, float, decimals
TypeSignature::Numeric(1),
// Interval types: YearMonth, DayTime, MonthDayNano
TypeSignature::Uniform(
1,
vec![
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::DayTime),
DataType::Interval(IntervalUnit::MonthDayNano),
],
),
]),
volatility: Volatility::Immutable,
parameter_names: None,
},
}
}
}

impl ScalarUDFImpl for SparkNegative {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"negative"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
spark_negative(&args.args)
}
}

/// Core implementation of Spark's negative function
fn spark_negative(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let [arg] = take_function_args("negative", args)?;

match arg {
ColumnarValue::Array(array) => match array.data_type() {
DataType::Null => Ok(arg.clone()),

// Signed integers - use wrapping negation (Spark legacy mode behavior)
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let result: PrimitiveArray<Int8Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let result: PrimitiveArray<Int16Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let result: PrimitiveArray<Int32Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let result: PrimitiveArray<Int64Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}

// Floating point - simple negation (no overflow possible)
DataType::Float16 => {
let array = array.as_primitive::<Float16Type>();
let result: PrimitiveArray<Float16Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let result: PrimitiveArray<Float32Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let result: PrimitiveArray<Float64Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}

// Decimal types - wrapping negation
DataType::Decimal32(_, _) => {
let array = array.as_primitive::<Decimal32Type>();
let result: PrimitiveArray<Decimal32Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal64(_, _) => {
let array = array.as_primitive::<Decimal64Type>();
let result: PrimitiveArray<Decimal64Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal128(_, _) => {
let array = array.as_primitive::<Decimal128Type>();
let result: PrimitiveArray<Decimal128Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal256(_, _) => {
let array = array.as_primitive::<Decimal256Type>();
let result: PrimitiveArray<Decimal256Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}

// interval type
DataType::Interval(IntervalUnit::YearMonth) => {
let array = array.as_primitive::<IntervalYearMonthType>();
let result: PrimitiveArray<IntervalYearMonthType> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Interval(IntervalUnit::DayTime) => {
let array = array.as_primitive::<IntervalDayTimeType>();
let result: PrimitiveArray<IntervalDayTimeType> =
array.unary(|x| IntervalDayTime {
days: x.days.wrapping_neg(),
milliseconds: x.milliseconds.wrapping_neg(),
});
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let result: PrimitiveArray<IntervalMonthDayNanoType> =
array.unary(|x| IntervalMonthDayNano {
months: x.months.wrapping_neg(),
days: x.days.wrapping_neg(),
nanoseconds: x.nanoseconds.wrapping_neg(),
});
Ok(ColumnarValue::Array(Arc::new(result)))
}

dt => not_impl_err!("Not supported datatype for Spark negative(): {dt}"),
},
ColumnarValue::Scalar(sv) => match sv {
ScalarValue::Null => Ok(arg.clone()),
_ if sv.is_null() => Ok(arg.clone()),

// Signed integers - wrapping negation
ScalarValue::Int8(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int8(Some(result))))
}
ScalarValue::Int16(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int16(Some(result))))
}
ScalarValue::Int32(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(result))))
}
ScalarValue::Int64(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(result))))
}

// Floating point - simple negation
ScalarValue::Float16(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float16(Some(-v))))
}
ScalarValue::Float32(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(-v))))
}
ScalarValue::Float64(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(-v))))
}

// Decimal types - wrapping negation
ScalarValue::Decimal32(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal32(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal64(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal64(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal128(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal128(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal256(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal256(
Some(result),
*precision,
*scale,
)))
}

//interval type
ScalarValue::IntervalYearMonth(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalYearMonth(Some(v.wrapping_neg())),
)),
ScalarValue::IntervalDayTime(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalDayTime(Some(IntervalDayTime {
days: v.days.wrapping_neg(),
milliseconds: v.milliseconds.wrapping_neg(),
})),
)),
ScalarValue::IntervalMonthDayNano(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
months: v.months.wrapping_neg(),
days: v.days.wrapping_neg(),
nanoseconds: v.nanoseconds.wrapping_neg(),
})),
)),

dt => not_impl_err!("Not supported datatype for Spark negative(): {dt}"),
},
}
}
Loading