[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039
[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039jiateoh wants to merge 3 commits intoapache:masterfrom
Conversation
| return serializer | ||
|
|
||
| field_names = [f.name for f in schema.fields] | ||
| row_value = Row(**dict(zip(field_names, converted))) |
There was a problem hiding this comment.
Copying from the PR description for reference on why this field_name/Row creation is no longer necessary:
StructType.toInternaldispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list. (L521 deletion)Rowis a tuple subclass, so it always hit the positional branch.- Since 3.0.0 (
types.pychange notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered. dict(zip(field_names, converted)) → Row(**...)ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being aRow(tuple subclass) which uses same positional branch ofSchema.toInternalas the original input tuple would.
…on, cache serializer, hoist normalize helpers. - Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes; pass normalized tuples directly to schema.toInternal which handles them by index - Cache the serialize callable per schema id so field extraction and closure creation happen once per schema rather than per row - Hoist _normalize_value/_normalize_tuple to module level to avoid re-creating closures and re-importing numpy on every call - Add non-testing helper method has_numpy to avoid circular dependency. AI-assisted + human reviewed/edited Generated-by: Claude Code (Claude Opus 4.6)
9d1665d to
243a49b
Compare
| return v | ||
|
|
||
| def _normalize_tuple(data: Tuple) -> Tuple: | ||
| return tuple(_normalize_value(v) for v in data) |
There was a problem hiding this comment.
[edit: this prototype is now in the diff, latter discussion is still relevant though]
Another fastpath option being considered: prevalidate the schema, if it's all non-nested we can skip some checks. Prototype outlined below but with more analysis/tradeoffs in the follow up comment:
"""True if schema has no nested structs, arrays, or maps."""
return all(
not isinstance(f.dataType, (StructType, ArrayType, MapType))
for f in schema.fields
)
def _normalize_tuple_simple(data: Tuple) -> Tuple:
"""Fast path: 1 isinstance per element instead of 7 (no nested types)."""
return tuple(v.tolist() if isinstance(v, np.generic) else v for v in data)
There was a problem hiding this comment.
This gets a bit tricky with the last two timestamp checks though:
- As indicated in the existing comment "Address a couple of pandas dtypes too.", the target case is pandas objects. These already subclass datetime.datetime/timedelta so
Schema.toInternalwould properly handle them today without the explicit conversions in the normal path. However removing these checks does introduce an expectation that pandas types will always subclass the datetime classes (this is indeed the current expectation though). - The existing API inadvertently supports any python values so long as they can be converted with to_pytimedelta/to_pydatetime. So introducing a fastpath without these checks could be a breaking change for custom datetime values. It's worth pointing out this is already within a branch where we have checked that numpy is present though.
Supporting this backwards compatibility increases cost from1 isinstancecheck to1 isinstance + 2 hasattrchecks though.
For point 2, I ran a microbenchmark with [timestamp (datetime) + long (numeric)] struct on each of the normalize function variants. Averaging over 2 million iterations on an input (datetime(2024, 1, 1), 42):
- Full normalize (existing): 0.55 us/row
- Fastpath (1-check): 0.26 us/row [52% faster]
- Fastpath (3-check): 0.34 us/row [38% faster]
The broader speedup on a full TWS workload will of course vary depending on schemas and row values used though.
What changes were proposed in this pull request?
Skip unnecessary list+Row construction, add cache serializer, hoist normalize helpers.
has_numpyto avoid circular dependency, as the existinghave_numpydepends on dataframe (hence the original deferred import). The previoushave_numpyis also in a testing file whereas the newhas_numpyis in a dedicated utils file.Of the above changes, the first one is the only one with significant logic changes (removing Row/dict/list creation). The next two are primarily moving/caching function definitions, and the last is a new helper method for enabling the new function locations.
To better explain the removal of
Rowusage:StructType.toInternaldispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list.Rowis a tuple subclass, so it always hit the positional branch.types.pychange notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered.dict(zip(field_names, converted)) → Row(**...)ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being aRow(tuple subclass) which uses same positional branch ofSchema.toInternalas the original input tuple would.AI-assisted + human reviewed/updated
Why are the changes needed?
This is a code cleanup/performance optimization. Original code has unnecessary operations that are executed for every row, including: rebuilding closures, extracting field names, building intermediate lists + dicts, and constructing Row objects (which sort by field unnecessarily). These can all add minor overhead while having no effect on the underlying usage.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.6)