[SPARK-56222][PYTHON] Create ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer#55026
[SPARK-56222][PYTHON] Create ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer#55026Yicong-Huang wants to merge 3 commits intoapache:masterfrom
Conversation
| dataframes_in_group: Optional[int] = None | ||
|
|
||
| while dataframes_in_group is None or dataframes_in_group > 0: | ||
| dataframes_in_group = read_int(stream) |
There was a problem hiding this comment.
| dataframes_in_group: Optional[int] = None | |
| while dataframes_in_group is None or dataframes_in_group > 0: | |
| dataframes_in_group = read_int(stream) | |
| while dataframes_in_group := read_int(stream): |
| self._num_dfs: int = num_dfs | ||
|
|
||
| def dump_stream(self, iterator, stream): | ||
| def dump_stream(self, iterator: Iterator["pa.RecordBatch"], stream: IO[bytes]) -> None: |
There was a problem hiding this comment.
Normally we want to make input less restrictive and output more restrictive. Do we have to restrict the input iterator to Iterator rather than Iterable? I saw we had to do iter on lists to make tests work. What if we take an Iterable as an input and do a iter() inside the function to get the actual iterator? If an Iterator is passed it, iter() is basically an identity function that returns the iterator itself - we lose nothing.
| dataframes_in_group = read_int(stream) | ||
|
|
||
| if dataframes_in_group == 1: | ||
| yield self._read_arrow_stream(stream) |
There was a problem hiding this comment.
Now that the base class has a clear load_stream, we don't have the recursive issue anymore. Would it make sense to do super().load_stream() here and get rid of _read_arrow_stream? As _read_arrow_stream is literally load_stream in the trivial case.
What changes were proposed in this pull request?
Refactors
ArrowStreamSerializerby extracting group and cogroup loading logic into dedicated subclasses:Key changes:
ArrowStreamSerializer: simplified to only handle plain Arrow stream read/write. Removednum_dfsparameter.ArrowStreamGroupSerializer(ArrowStreamSerializer): new class that overridesload_streamwith group-count protocol for single-dataframe groups.ArrowStreamCoGroupSerializer(ArrowStreamSerializer): new class that overridesload_streamwith group-count protocol for two-dataframe cogroups.Why are the changes needed?
This is part of the ongoing serializer simplification effort (SPARK-55384). The previous
ArrowStreamSerializermixed plain stream I/O with group-count protocol logic via anum_dfsparameter and a multi-purpose_load_group_dataframesmethod. This made the return types ambiguous — callers couldn't tell from the type signature whether they'd getpa.RecordBatch,Iterator[pa.RecordBatch], orTuple[...]. By splitting group and cogroup into separate classes, eachload_streamhas a clear, precise return type, improving readability and enabling better static analysis.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No.