Skip to content

[SPARK-56222][PYTHON] Create ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer#55026

Open
Yicong-Huang wants to merge 3 commits intoapache:masterfrom
Yicong-Huang:SPARK-56222
Open

[SPARK-56222][PYTHON] Create ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer#55026
Yicong-Huang wants to merge 3 commits intoapache:masterfrom
Yicong-Huang:SPARK-56222

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang commented Mar 26, 2026

What changes were proposed in this pull request?

Refactors ArrowStreamSerializer by extracting group and cogroup loading logic into dedicated subclasses:

ArrowStreamSerializer              (plain Arrow stream I/O)
  ├── ArrowStreamGroupSerializer    (grouped loading, 1 df/group)
  ├── ArrowStreamCoGroupSerializer  (cogrouped loading, 2 dfs/group)
  ├── ArrowStreamUDFSerializer
  ├── ArrowStreamPandasSerializer
  └── ArrowStreamArrowUDFSerializer

Key changes:

  • ArrowStreamSerializer: simplified to only handle plain Arrow stream read/write. Removed num_dfs parameter.
  • ArrowStreamGroupSerializer(ArrowStreamSerializer): new class that overrides load_stream with group-count protocol for single-dataframe groups.
  • ArrowStreamCoGroupSerializer(ArrowStreamSerializer): new class that overrides load_stream with group-count protocol for two-dataframe cogroups.

Why are the changes needed?

This is part of the ongoing serializer simplification effort (SPARK-55384). The previous ArrowStreamSerializer mixed plain stream I/O with group-count protocol logic via a num_dfs parameter and a multi-purpose _load_group_dataframes method. This made the return types ambiguous — callers couldn't tell from the type signature whether they'd get pa.RecordBatch, Iterator[pa.RecordBatch], or Tuple[...]. By splitting group and cogroup into separate classes, each load_stream has a clear, precise return type, improving readability and enabling better static analysis.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@Yicong-Huang Yicong-Huang changed the title [SPARK-56222][PYTHON] Extract ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer from ArrowStreamSerializer [SPARK-56222][PYTHON] Create ArrowStreamGroupSerializer and ArrowStreamCoGroupSerializer Mar 26, 2026
Comment on lines +197 to +200
dataframes_in_group: Optional[int] = None

while dataframes_in_group is None or dataframes_in_group > 0:
dataframes_in_group = read_int(stream)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dataframes_in_group: Optional[int] = None
while dataframes_in_group is None or dataframes_in_group > 0:
dataframes_in_group = read_int(stream)
while dataframes_in_group := read_int(stream):

self._num_dfs: int = num_dfs

def dump_stream(self, iterator, stream):
def dump_stream(self, iterator: Iterator["pa.RecordBatch"], stream: IO[bytes]) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we want to make input less restrictive and output more restrictive. Do we have to restrict the input iterator to Iterator rather than Iterable? I saw we had to do iter on lists to make tests work. What if we take an Iterable as an input and do a iter() inside the function to get the actual iterator? If an Iterator is passed it, iter() is basically an identity function that returns the iterator itself - we lose nothing.

dataframes_in_group = read_int(stream)

if dataframes_in_group == 1:
yield self._read_arrow_stream(stream)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the base class has a clear load_stream, we don't have the recursive issue anymore. Would it make sense to do super().load_stream() here and get rid of _read_arrow_stream? As _read_arrow_stream is literally load_stream in the trivial case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants