fix(sync-service): recover from dead consumers blocking WAL flush advancement#3975
fix(sync-service): recover from dead consumers blocking WAL flush advancement#3975
Conversation
…ancement When a consumer process crashes during ConsumerRegistry.broadcast, its dead PID remains in the ETS lookup table. All subsequent events for that shape are silently dropped, but the FlushTracker still records the shape as needing to flush. Since the dead consumer never calls notify_flushed, last_global_flushed_offset is permanently blocked, preventing confirmed_flush_lsn from advancing and causing unbounded WAL growth. Three fixes: 1. Dead consumer recovery (consumer_registry.ex): On non-suspend DOWN in broadcast, return the handle for retry instead of silently dropping it. In publish, detect dead PIDs via Process.alive?, clean the ETS entry, and let the recursive call start a fresh consumer. 2. Nil pending_txn guard (consumer.ex): A replacement consumer started after a crash may receive a mid-transaction fragment (has_begin?=false) with pending_txn=nil. Add a guard clause that skips the fragment and calls consider_flushed on commit fragments to unblock the FlushTracker. 3. Delivery failure reporting (consumer_registry.ex, shape_log_collector.ex): publish now returns a MapSet of handles where delivery permanently failed (shape removed). The SLC subtracts these from affected_shapes before updating the FlushTracker, preventing it from tracking shapes that will never flush. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3975 +/- ##
===========================================
+ Coverage 75.75% 88.74% +12.98%
===========================================
Files 11 25 +14
Lines 693 2425 +1732
Branches 172 608 +436
===========================================
+ Hits 525 2152 +1627
- Misses 167 271 +104
- Partials 1 2 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I've put an agent working on hypothesis. not sure if this is useful or not |
Claude Code ReviewSummaryThis PR fixes a production bug where crashed temporary consumers permanently block WAL flush advancement by leaving dead PIDs in the ETS lookup table, causing unbounded WAL growth (~24 GB at 300K shapes). The three-part fix is well-reasoned: dead consumer recovery in What Is Working Well
Issues FoundImportant (Should Fix)1. Unbounded recursion in File: Issue: Impact: Blocks the SLC mailbox-processing loop indefinitely. At 300K shapes with a pathological transaction, this stalls replication for the entire stack. Suggested fix: Add a retry counter parameter and cap retries at 3, returning handles as undeliverable if the limit is exceeded. 2. Missing changeset file Issue: Impact: The release tooling will not pick up this fix for version bumps and changelogs. 3. Missing unit test for File: Issue: The new Impact: A regression in this clause would not be caught by the test suite. Suggestions (Nice to Have)1. Add shape handle and offset to the nil pending_txn warning
2. Use
Issue ConformanceNo linked issue. The PR description compensates well with the 5-step failure chain, production reproduction context (300K shapes, 24 GB WAL), and explicit deferral reasoning for the periodic sweep. A GitHub issue would allow tracking the deferred periodic liveness sweep as future work. Review iteration: 1 | 2026-03-07 |
Summary
Fixes a bug where crashed consumer processes permanently block WAL flush advancement (
confirmed_flush_lsn), causing unbounded WAL growth on Postgres.When a consumer crashes during
ConsumerRegistry.broadcast:restart: :temporary){:DOWN, ...}returns[])FlushTrackerstill records the shape as needing to flush (updated after publish)notify_flushed→last_global_flushed_offsetpermanently stuckObserved in production with recurring WAL spikes up to ~24 GB at 300K concurrent shapes.
Changes
Fix 1 — Dead consumer recovery (
consumer_registry.ex)broadcast/1: Non-suspend{:DOWN}now returns the handle for retry instead of silently droppingpublish/2: Before retrying, checksProcess.alive?on the PID in ETS, deletes dead entries, logs a warning, and lets the recursive call start a fresh consumer viastart_consumer!Fix 2 — Nil pending_txn guard (
consumer.ex)process_txn_fragment/2clause forhas_begin?: false+pending_txn: nilnil.consider_flushed?consider_flushedso the FlushTracker isn't blockedFix 3 — Delivery failure reporting (
consumer_registry.ex,shape_log_collector.ex)publish/2now returns aMapSetof handles where delivery permanently failed (shape removed between event routing and delivery)affected_shapesbefore updating the FlushTracker, preventing it from tracking shapes that will never flushDeferred: Periodic liveness sweep
A periodic sweep checking
Process.alive?on all ETS entries would serve as a safety net for any edge cases not caught by Fix 1. This was deferred for human evaluation — a timeout-based staleness check is unsafe because legitimate consumers can be very slow:broadcastblocks{:writer_flushed, ...}cast processing for the duration of the fan-out (seconds to minutes at 300K shapes)buffering?: true) until snapshot info arrives (minutes under pool contention)A
Process.alive?check has zero false positives — a dead process will never flush.Test plan
last_global_flushed_offset, and thathandle_shape_removedunblocks it🤖 Generated with Claude Code