From 58b97b1dc1dd0cc7f9519bfa86d76550b69cb359 Mon Sep 17 00:00:00 2001 From: Jia Teoh Date: Wed, 25 Mar 2026 02:56:37 +0000 Subject: [PATCH] Optimize python stateful processor serialization to skip unnecessary list/dict/row construction. - Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes; pass normalized tuples directly to schema.toInternal which handles them by index AI-assisted + human reviewed/edited Generated-by: Claude Code (Claude Opus 4.6) --- .../pyspark/sql/streaming/stateful_processor_api_client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/streaming/stateful_processor_api_client.py b/python/pyspark/sql/streaming/stateful_processor_api_client.py index 490ae184c273..44de4533f661 100644 --- a/python/pyspark/sql/streaming/stateful_processor_api_client.py +++ b/python/pyspark/sql/streaming/stateful_processor_api_client.py @@ -518,12 +518,9 @@ def normalize_value(v: Any) -> Any: converted = [normalize_value(v) for v in data] else: - converted = list(data) + converted = data - field_names = [f.name for f in schema.fields] - row_value = Row(**dict(zip(field_names, converted))) - - return self.pickleSer.dumps(schema.toInternal(row_value)) + return self.pickleSer.dumps(schema.toInternal(converted)) def _deserialize_from_bytes(self, value: bytes) -> Any: return self.pickleSer.loads(value)