Skip to content

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889

Merged
Daryna Ishchenko (darynaishchenko) merged 12 commits intomainfrom
daryna/fix-substream-partition-router
Feb 16, 2026
Merged

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
Daryna Ishchenko (darynaishchenko) merged 12 commits intomainfrom
daryna/fix-substream-partition-router

Conversation

@darynaishchenko
Copy link
Contributor

@darynaishchenko Daryna Ishchenko (darynaishchenko) commented Feb 2, 2026

Summary

This PR fixes an issue where SubstreamPartitionRouter would incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughout stream_slices() to gracefully handle:

  1. Empty partition generators - iterate_with_last_flag now yields (None, True) sentinel when the input generator is empty, and stream_slices() breaks out of the partition loop when partition is None (preserving multi-parent behavior)
  2. Empty record iterators - When a partition has no records, cursor observation and slice emission are skipped (guarded by if parent_record is not None)
  3. KeyError during partition value extraction - Uses skip_slice flag instead of continue to ensure close_partition() and ensure_at_least_one_state_emitted() are always called, even when dpath.get fails

Updates since last revision

  • Changed if partition is None: return to if partition is None: break per review feedback. Using break ensures that if one parent stream has no partitions, we continue processing subsequent parent_stream_configs rather than exiting the entire method.
  • Added test_substream_partition_router_closes_all_partitions_even_when_no_records to verify that cursor.close_partition() is called for ALL partitions, including those with no records.
  • Fixed MyPy type error: Updated iterate_with_last_flag return type from tuple[T, bool] to tuple[T | None, bool] to properly reflect that the function yields (None, True) as a sentinel for empty generators.
  • Fixed expected state in test_substream_slicer_parent_state_update_with_cursor to include proper lookback_window: 1 and state values.
  • NEW: Implemented skip_slice flag pattern per CodeRabbit review feedback. Replaced continue with a flag-based approach when dpath.get raises KeyError, ensuring partition closure always runs even when slice emission is skipped. Added test_substream_partition_router_closes_partition_even_when_parent_key_missing to verify this behavior.

Review & Testing Checklist for Human

  • Verify skip_slice logic: The new skip_slice flag ensures close_partition() is called even when parent_key extraction fails with KeyError. Confirm this doesn't introduce regressions in normal slice emission.
  • Verify multi-parent behavior: The if partition is None: break exits only the current parent's partition loop. Test with a connector that has multiple parent_stream_configs where one parent has no partitions to confirm subsequent parents are still processed.
  • Test with real connector: Test with a connector using SubstreamPartitionRouter where parent records are missing the expected parent_key field.
  • Run full test suite: poetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -v

Notes

Summary by CodeRabbit

  • New Features

    • Added an iterator utility that flags the final element and emits a None marker when the source is exhausted.
  • Bug Fixes

    • Prevented emitting slices for empty/None partitions and added early exit when parent partitions yield no records.
    • Guarded partition extraction and cursor/state updates so they run only when a valid parent record exists.
    • Ensured partition cursors are closed correctly even when partitions have no records.
  • Tests

    • Added unit tests covering the new iterator and edge-case partition behaviors.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-router

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Changed SubstreamPartitionRouter to add iterate_with_last_flag (now yields (T | None, bool)), handle exhausted generators by yielding (None, True), and guard all parent_record-dependent processing (cursor observation, partition tracking, partition_value/extra_fields/lazy pointer extraction, and slice emission) behind non-None checks; outer loop now breaks early on None partitions.

Changes

Cohort / File(s) Summary
Substream partition router
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Added iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]; when input is exhausted it yields (None, True); guarded processing that depends on parent_record (cursor/state observation, partition association, partition_value extraction via dpath, extra_fields, lazy_read_pointer), preserved KeyError handling inside guarded path, emit StreamSlice only for non-None parent_record, and break outer loop early if a partition is None.
Unit tests for edge cases
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Added tests importing iterate_with_last_flag; new tests cover (item, is_last) semantics for empty/single/multiple inputs and SubstreamPartitionRouter behaviors: no slice/no cursor update for partitions with no records, handling empty parent partitions (early exit), and ensuring close_partition is called for all partitions even when no records are emitted.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt?

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main fix: preventing cursor updates when no records are read in a partition, which aligns with the core behavioral changes in SubstreamPartitionRouter.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch daryna/fix-substream-partition-router

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)

42-49: ⚠️ Potential issue | 🔴 Critical

Type annotation mismatch causing pipeline failure.

The pipeline is flagging a type error here. Yielding (None, True) produces tuple[None, bool], but the return type expects tuple[T, bool]. Since the sentinel introduces None as a possible value, would updating the return type to Optional[T] work for you, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Fast)

3 869 tests  +9   3 857 ✅ +9   6m 34s ⏱️ -15s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 26b74b1. ± Comparison against base commit 3eb9ce8.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Full)

3 872 tests  +9   3 860 ✅ +9   11m 1s ⏱️ -6s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 26b74b1. ± Comparison against base commit 3eb9ce8.

♻️ This comment has been updated with latest results.

@darynaishchenko
Copy link
Contributor Author

Daryna Ishchenko (darynaishchenko) commented Feb 2, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21601466980

devin-ai-integration bot and others added 2 commits February 3, 2026 15:38
…titionRouter null handling

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)

42-49: ⚠️ Potential issue | 🟠 Major

Fix mypy type mismatch in iterate_with_last_flag.
Line 48 yields (None, True), but the signature advertises tuple[T, bool], which is tripping mypy. Could we widen the return type to Optional[T] to match the sentinel, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:

186-187: ⚠️ Potential issue | 🟡 Minor

Update docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emits parent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?

📝 Suggested doc tweak
-        If a parent slice contains no record, emit a slice with parent_record=None.
+        If a parent slice contains no record, emit no slice.

devin-ai-integration bot and others added 3 commits February 5, 2026 10:36
… when no records

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

…_update_with_cursor

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
devin-ai-integration bot and others added 4 commits February 10, 2026 10:34
…Error

- Replace 'continue' with 'skip_slice' flag when dpath.get fails with KeyError
- This ensures close_partition() and ensure_at_least_one_state_emitted() are
  always called, even when partition value extraction fails
- Add test for partition closure when parent_key is missing from record

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
- Renamed skip_slice to emit_slice and inverted the logic
- Now emit_slice=False by default, set to True when extraction succeeds
- Makes the code more intuitive: we emit slices when conditions are met

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… exists, False on KeyError

- emit_slice = parent_record is not None (default True when we have a record)
- Set emit_slice = False only when KeyError occurs during dpath.get
- This makes the logic clearer: we want to emit slices by default, skip only on error

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@darynaishchenko Daryna Ishchenko (darynaishchenko) merged commit e9144e2 into main Feb 16, 2026
28 checks passed
@darynaishchenko Daryna Ishchenko (darynaishchenko) deleted the daryna/fix-substream-partition-router branch February 16, 2026 16:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments