Skip to content

[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039

Open
jiateoh wants to merge 3 commits intoapache:masterfrom
jiateoh:tws_python_serialization_improvements
Open

[WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls#55039
jiateoh wants to merge 3 commits intoapache:masterfrom
jiateoh:tws_python_serialization_improvements

Conversation

@jiateoh
Copy link
Copy Markdown
Contributor

@jiateoh jiateoh commented Mar 26, 2026

What changes were proposed in this pull request?

Skip unnecessary list+Row construction, add cache serializer, hoist normalize helpers.

  1. Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes; pass normalized tuples directly to schema.toInternal which handles them by index
  2. Cache the serialize callable per schema id so closure creation and attribute lookups happen once per schema rather than per row. Although this cache is unbounded, it scales by number of state store schemas provided and the surrounding client is instantiated once per batch/partition, rather than surviving indefinitely.
  3. Hoist _normalize_value/_normalize_tuple to module level to avoid re-creating closures and re-importing numpy on every call
  4. Add has_numpy to avoid circular dependency, as the existing have_numpy depends on dataframe (hence the original deferred import). The previous have_numpy is also in a testing file whereas the new has_numpy is in a dedicated utils file.

Of the above changes, the first one is the only one with significant logic changes (removing Row/dict/list creation). The next two are primarily moving/caching function definitions, and the last is a new helper method for enabling the new function locations.

To better explain the removal of Row usage:

  • StructType.toInternal dispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list.
  • Row is a tuple subclass, so it always hit the positional branch.
  • Since 3.0.0 (types.py change notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered.
  • dict(zip(field_names, converted)) → Row(**...) ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being a Row (tuple subclass) which uses same positional branch of Schema.toInternal as the original input tuple would.

AI-assisted + human reviewed/updated

Why are the changes needed?

This is a code cleanup/performance optimization. Original code has unnecessary operations that are executed for every row, including: rebuilding closures, extracting field names, building intermediate lists + dicts, and constructing Row objects (which sort by field unnecessarily). These can all add minor overhead while having no effect on the underlying usage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

return serializer

field_names = [f.name for f in schema.fields]
row_value = Row(**dict(zip(field_names, converted)))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from the PR description for reference on why this field_name/Row creation is no longer necessary:

  • StructType.toInternal dispatches on type: for dict it looks up by field name, for tuple/list it zips by position. So functionally there is no need to convert the tuple to list. (L521 deletion)
  • Row is a tuple subclass, so it always hit the positional branch.
  • Since 3.0.0 (types.py change notes), Row field names are insertion ordered. Python dictionaries (as of 3.7+) are also insertion ordered.
  • dict(zip(field_names, converted)) → Row(**...) ends up adding extra hops to (1) fetch field names, (2) zip them with row values, (3) create an insertion-ordered dictionary of those field names, and (4) create an insertion-ordered row (dropping the field names which are no longer used). With the end result being a Row (tuple subclass) which uses same positional branch of Schema.toInternal as the original input tuple would.

@jiateoh jiateoh changed the title [WIP] Optimize python TWS stateful processor serialization calls [SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls Mar 27, 2026
@jiateoh jiateoh changed the title [SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls [WIP][SPARK-56248][PYTHON][SS] Optimize python TWS stateful processor serialization calls Mar 27, 2026
…on, cache serializer, hoist normalize helpers.

- Eliminate per-call Row(**dict(zip(...))) construction in _serialize_to_bytes;
  pass normalized tuples directly to schema.toInternal which handles them by index
- Cache the serialize callable per schema id so field extraction and closure
  creation happen once per schema rather than per row
- Hoist _normalize_value/_normalize_tuple to module level to avoid re-creating
  closures and re-importing numpy on every call
- Add non-testing helper method has_numpy to avoid circular dependency.

AI-assisted + human reviewed/edited

Generated-by: Claude Code (Claude Opus 4.6)
@jiateoh jiateoh force-pushed the tws_python_serialization_improvements branch from 9d1665d to 243a49b Compare March 27, 2026 00:30
return v

def _normalize_tuple(data: Tuple) -> Tuple:
return tuple(_normalize_value(v) for v in data)
Copy link
Copy Markdown
Contributor Author

@jiateoh jiateoh Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[edit: this prototype is now in the diff, latter discussion is still relevant though]

Another fastpath option being considered: prevalidate the schema, if it's all non-nested we can skip some checks. Prototype outlined below but with more analysis/tradeoffs in the follow up comment:

    """True if schema has no nested structs, arrays, or maps."""
    return all(
        not isinstance(f.dataType, (StructType, ArrayType, MapType))
        for f in schema.fields
    )

   def _normalize_tuple_simple(data: Tuple) -> Tuple:
        """Fast path: 1 isinstance per element instead of 7 (no nested types)."""
        return tuple(v.tolist() if isinstance(v, np.generic) else v for v in data)

Copy link
Copy Markdown
Contributor Author

@jiateoh jiateoh Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets a bit tricky with the last two timestamp checks though:

  1. As indicated in the existing comment "Address a couple of pandas dtypes too.", the target case is pandas objects. These already subclass datetime.datetime/timedelta so Schema.toInternal would properly handle them today without the explicit conversions in the normal path. However removing these checks does introduce an expectation that pandas types will always subclass the datetime classes (this is indeed the current expectation though).
  2. The existing API inadvertently supports any python values so long as they can be converted with to_pytimedelta/to_pydatetime. So introducing a fastpath without these checks could be a breaking change for custom datetime values. It's worth pointing out this is already within a branch where we have checked that numpy is present though.
    Supporting this backwards compatibility increases cost from 1 isinstance check to 1 isinstance + 2 hasattr checks though.

For point 2, I ran a microbenchmark with [timestamp (datetime) + long (numeric)] struct on each of the normalize function variants. Averaging over 2 million iterations on an input (datetime(2024, 1, 1), 42):

  1. Full normalize (existing): 0.55 us/row
  2. Fastpath (1-check): 0.26 us/row [52% faster]
  3. Fastpath (3-check): 0.34 us/row [38% faster]

The broader speedup on a full TWS workload will of course vary depending on schemas and row values used though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant