Conversation
|
Findings with first version , where there was no improvements in benchmarks :
|
|
Got improvements : |
| override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { | ||
| val childExpr = expr.children.map(exprToProtoInternal(_, inputs, binding)) | ||
| val optExpr = scalarFunctionExprToProto(name, childExpr: _*) | ||
| // Pass return type to avoid native lookup in DataFusion registry |
There was a problem hiding this comment.
When expr.return_type is None, it tries to look up the function in DataFusion's built-in UDF registry , which doesn't have aes_encrypt because it's a Comet-specific function registered later via create_comet_physical_fun
|
Benchmark result : |
643c1ba to
2401225
Compare
|
Please trigger the workflow |
There was a problem hiding this comment.
Pull request overview
This pull request adds support for the aes_encrypt function to Comet, enabling native execution of AES encryption operations that would otherwise fall back to Spark's JVM implementation. The PR also includes a critical fix to CometScalarFunction serialization that was preventing proper registration of Comet-specific scalar functions.
Changes:
- Fixed
CometScalarFunctionto usescalarFunctionExprToProtoWithReturnType()instead ofscalarFunctionExprToProto(), passing the return type to avoid native registry lookup failures - Implemented comprehensive AES encryption support for ECB, CBC, and GCM cipher modes with PKCS7 padding in Rust
- Added test suites (
CometStaticInvokeSuiteandCometEncryptionSuite) and benchmark suite (CometEncryptionBenchmark) for validation
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| spark/src/main/scala/org/apache/comet/serde/CometScalarFunction.scala | Critical serialization fix to pass return type for Comet scalar functions |
| spark/src/main/scala/org/apache/comet/serde/statics.scala | Registration of aes_encrypt StaticInvoke mapping to CometScalarFunction |
| spark/src/test/scala/org/apache/comet/CometStaticInvokeSuite.scala | Test suite validating native execution of aes_encrypt with various parameter combinations |
| spark/src/test/scala/org/apache/comet/CometEncryptionSuite.scala | Additional encryption test cases |
| spark/src/test/scala/org/apache/spark/sql/benchmark/CometEncryptionBenchmark.scala | Performance benchmark suite for encryption expressions |
| native/spark-expr/src/encryption_funcs/aes_encrypt.rs | Main AES encryption implementation supporting scalar and batch processing |
| native/spark-expr/src/encryption_funcs/cipher_modes.rs | ECB, CBC, and GCM cipher mode implementations |
| native/spark-expr/src/encryption_funcs/crypto_utils.rs | Cryptographic utility functions for key/IV validation and random IV generation |
| native/spark-expr/src/encryption_funcs/mod.rs | Module organization and public exports |
| native/spark-expr/src/comet_scalar_funcs.rs | Registration of aes_encrypt UDF in native function registry |
| native/spark-expr/src/lib.rs | Module declaration for encryption_funcs |
| native/spark-expr/Cargo.toml | Added cryptography dependencies (aes, aes-gcm, cbc, cipher, ecb) |
| native/Cargo.lock | Dependency lock file updates for new cryptography crates |
| spark/.gitignore | Added spark-warehouse directory to gitignore (redundant with root .gitignore) |
| docs/source/user-guide/latest/configs.md | Auto-generated configuration documentation updates |
| docs/source/user-guide/latest/compatibility.md | Auto-generated compatibility matrix updates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class CometStaticInvokeSuite extends CometTestBase { | ||
|
|
||
| test("aes_encrypt basic - verify native execution") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1(data STRING, key STRING) USING parquet") | ||
| sql("""INSERT INTO t1 VALUES | ||
| ('Spark', '0000111122223333'), | ||
| ('SQL', 'abcdefghijklmnop')""") | ||
|
|
||
| val query = """ | ||
| SELECT | ||
| data, | ||
| hex(aes_encrypt(cast(data as binary), cast(key as binary))) as encrypted | ||
| FROM t1 | ||
| """ | ||
|
|
||
| checkSparkAnswerAndOperator(query) | ||
|
|
||
| val df = sql(query) | ||
| val plan = df.queryExecution.executedPlan.toString | ||
| assert( | ||
| plan.contains("CometProject") || plan.contains("CometNative"), | ||
| s"Expected native execution but got Spark fallback:\n$plan") | ||
| } | ||
| } | ||
|
|
||
| test("aes_encrypt with mode") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1(data STRING, key STRING) USING parquet") | ||
| sql("INSERT INTO t1 VALUES ('test', '1234567890123456')") | ||
|
|
||
| val query = """ | ||
| SELECT hex(aes_encrypt(cast(data as binary), cast(key as binary), 'GCM')) | ||
| FROM t1 | ||
| """ | ||
|
|
||
| checkSparkAnswerAndOperator(query) | ||
| } | ||
| } | ||
|
|
||
| test("aes_encrypt with all parameters") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1(data STRING, key STRING) USING parquet") | ||
| sql("INSERT INTO t1 VALUES ('test', '1234567890123456')") | ||
|
|
||
| val query = """ | ||
| SELECT hex(aes_encrypt( | ||
| cast(data as binary), | ||
| cast(key as binary), | ||
| 'GCM', | ||
| 'DEFAULT', | ||
| cast('initialization' as binary), | ||
| cast('additional' as binary) | ||
| )) | ||
| FROM t1 | ||
| """ | ||
|
|
||
| checkSparkAnswerAndOperator(query) | ||
| } | ||
| } | ||
|
|
||
| test("aes_encrypt wrapped in multiple functions") { | ||
| withTable("t1") { | ||
| sql("CREATE TABLE t1(data STRING, key STRING) USING parquet") | ||
| sql("INSERT INTO t1 VALUES ('test', '1234567890123456')") | ||
|
|
||
| val query = """ | ||
| SELECT | ||
| upper(hex(aes_encrypt(cast(data as binary), cast(key as binary)))) as encrypted, | ||
| length(hex(aes_encrypt(cast(data as binary), cast(key as binary)))) as len | ||
| FROM t1 | ||
| """ | ||
|
|
||
| checkSparkAnswerAndOperator(query) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Test coverage is missing important edge cases for the aes_encrypt function. Consider adding tests for: null input/key handling, invalid key lengths (e.g., 8 bytes, 15 bytes, 17 bytes), invalid IV lengths for different modes, unsupported mode/padding combinations, empty input data, and very large input data. These test cases would help ensure robust error handling and compatibility with Spark's behavior.
| match get_cipher_mode(mode, padding) { | ||
| Ok(cipher) => match cipher.encrypt(input, key, iv, aad) { | ||
| Ok(encrypted) => builder.append_value(&encrypted), | ||
| Err(_) => builder.append_null(), | ||
| }, | ||
| Err(_) => builder.append_null(), | ||
| } |
There was a problem hiding this comment.
Error handling in batch processing silently converts encryption errors to null values. This may mask important errors such as invalid key lengths or unsupported cipher modes. Consider logging errors or propagating them in a way that provides better visibility to users, particularly for cryptographic operations where failure reasons are critical for debugging.
| use rand::Rng; | ||
| let mut iv = vec![0u8; length]; | ||
| rand::rng().fill(&mut iv[..]); |
There was a problem hiding this comment.
The generate_random_iv function uses rand::rng() which generates cryptographically insecure random numbers. For AES encryption, especially GCM mode, initialization vectors must be cryptographically secure random values. Using non-secure random IVs can compromise the security of the encryption. Consider using rand::rngs::OsRng or rand::thread_rng() with a cryptographically secure RNG to generate IVs.
| use rand::Rng; | |
| let mut iv = vec![0u8; length]; | |
| rand::rng().fill(&mut iv[..]); | |
| use rand::rngs::OsRng; | |
| use rand::RngCore; | |
| let mut iv = vec![0u8; length]; | |
| OsRng.fill_bytes(&mut iv[..]); |
| 24 | 32 => { | ||
| let cipher = Aes256Gcm::new(key.into()); | ||
| let payload = match aad { | ||
| Some(aad_data) => Payload { | ||
| msg: input, | ||
| aad: aad_data, | ||
| }, | ||
| None => Payload { | ||
| msg: input, | ||
| aad: &[], | ||
| }, | ||
| }; | ||
| cipher | ||
| .encrypt(nonce, payload) | ||
| .map_err(|e| CryptoError::EncryptionFailed(e.to_string()))? | ||
| } | ||
| _ => unreachable!("Key length validated above"), |
There was a problem hiding this comment.
The GCM mode implementation maps both 24-byte and 32-byte keys to Aes256Gcm. However, the standard AES-GCM supports AES-128, AES-192, and AES-256, which require 16, 24, and 32-byte keys respectively. Using Aes256Gcm for a 24-byte key is incorrect. The aes-gcm crate provides Aes192Gcm for 24-byte keys. This should be updated to use the correct cipher implementation for each key size to ensure proper encryption standards compliance.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3211 +/- ##
============================================
+ Coverage 56.12% 59.86% +3.73%
- Complexity 976 1460 +484
============================================
Files 119 175 +56
Lines 11743 16166 +4423
Branches 2251 2681 +430
============================================
+ Hits 6591 9678 +3087
- Misses 4012 5132 +1120
- Partials 1140 1356 +216 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which issue does this PR close?
Closes #3187
Rationale for this change
The aes_encrypt function was fully implemented.
CometScalarFunction serialized expressions to protobuf without including the return type. This caused the native planner to attempt looking up the function in DataFusion's built-in UDF registry, which failed with "There is no UDF named 'aes_encrypt' in the registry."
What changes are included in this PR?
- Changed to use scalarFunctionExprToProtoWithReturnType() instead of scalarFunctionExprToProto()
- Now passes expr.dataType to avoid native registry lookup
- Enables native execution for all Comet-specific scalar functions
How are these changes tested?