Subscription: implement IoTConsensus-based subscription#17238
Subscription: implement IoTConsensus-based subscription#17238DanielWang2035 wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements a new consensus-based subscription delivery path for live Tablet-format topics by reading directly from IoTConsensus WAL (bypassing the Pipe framework), and routes subscription polling/commit through either the existing Pipe broker or the new consensus broker based on topic mode/format.
Changes:
- Add consensus subscription components (setup handler, prefetching queue, WAL→Tablet converter, commit/progress tracking, consensus broker).
- Update subscription agents/brokers to route poll/commit to pipe vs. consensus queues and manage consensus queue lifecycle.
- Extend IoTConsensus write path + WAL retention logic to feed subscription queues and attempt to pin WAL via subscription progress.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java | Adds helpers to detect newly subscribed topics and expose subscribed topic names. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java | Flips subscription enabled flag to always-on. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java | Exposes nack counter for recycling logic. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/SubscriptionConsensusProgress.java | New POJO for persisted consensus subscription progress. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionSetupHandler.java | Binds/unbinds consensus queues per topic/region and registers new-region callback. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java | New per-(group,topic,region) commit/progress persistence. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java | Core hybrid pending-queue + WAL-reader prefetch loop producing SubscriptionEvents. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java | Converts InsertNode variants into Tablet payloads with pattern filtering. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java | Implements new ISubscriptionBroker interface and adds queue helpers. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ISubscriptionBroker.java | New broker abstraction shared by pipe and consensus brokers. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java | New broker coordinating multiple region queues per topic. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java | Detects newly subscribed topics and triggers consensus setup/teardown. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java | Routes poll/commit/prefetch across pipe and consensus brokers. |
| iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java | Adds subscription queue registration + WAL safe-delete adjustment + write-path offering. |
| iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java | Adds static callback hook for new peer creation to auto-bind subscription queues. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java | Skips pipe removal for consensus-based topics. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java | Skips pipe creation for consensus-based topics and serializes that topic set. |
| example/session/src/main/java/org/apache/iotdb/ConsensusSubscriptionTest.java | Adds manual (non-IT) tests for tree-model consensus subscription. |
| example/session/src/main/java/org/apache/iotdb/ConsensusSubscriptionTableTest.java | Adds manual (non-IT) tests for table-model consensus subscription. |
| example/session/pom.xml | Overrides compiler level for the session example module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| try (final FileInputStream fis = new FileInputStream(file)) { | ||
| final byte[] bytes = new byte[(int) file.length()]; | ||
| fis.read(bytes); |
There was a problem hiding this comment.
tryRecover reads the progress file with a single fis.read(bytes) call, which is not guaranteed to fill the buffer. This can lead to partial reads and corrupted/failed deserialization. Use a readFully loop / DataInputStream.readFully, and validate that the expected number of bytes was read before deserializing.
| fis.read(bytes); | |
| int offset = 0; | |
| while (offset < bytes.length) { | |
| final int bytesRead = fis.read(bytes, offset, bytes.length - offset); | |
| if (bytesRead < 0) { | |
| break; | |
| } | |
| offset += bytesRead; | |
| } | |
| if (offset != bytes.length) { | |
| LOGGER.warn( | |
| "Failed to fully read consensus subscription progress from {}: expected {} bytes, read {} bytes", | |
| file, | |
| bytes.length, | |
| offset); | |
| return null; | |
| } |
| final long startSearchIndex = serverImpl.getSearchIndex() + 1; | ||
|
|
There was a problem hiding this comment.
startSearchIndex is always initialized as serverImpl.getSearchIndex() + 1, which ignores any persisted subscription progress in ConsensusSubscriptionCommitManager. After a DataNode restart (or queue rebind), this will skip unconsumed WAL entries and break the at-least-once / recovery semantics described in the PR. The start index should be derived from the recovered committed search index when state exists (e.g., max(committed+1, subscribeStart) depending on desired semantics).
| final long startSearchIndex = serverImpl.getSearchIndex() + 1; | |
| long startSearchIndex = serverImpl.getSearchIndex() + 1; | |
| // If we have recovered a committed search index for this group, advance the | |
| // startSearchIndex to at least committed + 1 to honor at-least-once semantics. | |
| if (commitManager != null) { | |
| try { | |
| final long committedSearchIndex = commitManager.getCommittedSearchIndex(groupId); | |
| if (committedSearchIndex >= 0) { | |
| startSearchIndex = Math.max(startSearchIndex, committedSearchIndex + 1); | |
| } | |
| } catch (Exception e) { | |
| LOGGER.warn( | |
| "Failed to load committed search index for consensus group {} when binding " | |
| + "subscription prefetching queue, fallback to startSearchIndex={}", | |
| groupId, | |
| startSearchIndex, | |
| e); | |
| } | |
| } |
| consumerGroupId, | ||
| (id, broker) -> { | ||
| if (Objects.isNull(broker)) { | ||
| dropped.set(true); |
There was a problem hiding this comment.
dropBroker sets dropped=true when the pipe broker entry is already null. This causes dropBroker to return true even when nothing was removed, and can also mask failure to drop a non-empty consensus broker (pipe broker null => dropped=true, consensus broker kept). Only mark dropped true when an existing broker entry is actually removed.
| dropped.set(true); | |
| // No existing broker to drop |
| eventsToPoll.add(event); | ||
| totalSize += currentSize; | ||
|
|
||
| if (totalSize + currentSize > maxBytes) { |
There was a problem hiding this comment.
The maxBytes limiting logic is incorrect: the event is added and totalSize is incremented, then the code checks if (totalSize + currentSize > maxBytes) which double-counts currentSize and can still return a batch exceeding maxBytes. Check the size before adding the event (or adjust the condition) and decide whether to stop/push back the event when it would exceed the limit.
| if (totalSize + currentSize > maxBytes) { | |
| if (totalSize > maxBytes) { |
| // Compute the minimum search index that subscription consumers still need. | ||
| // WAL entries at or after this index must be preserved. | ||
| long minSubscriptionIndex = Long.MAX_VALUE; | ||
| for (final LongSupplier supplier : subscriptionSyncIndexSuppliers) { | ||
| minSubscriptionIndex = Math.min(minSubscriptionIndex, supplier.getAsLong()); | ||
| } | ||
|
|
||
| if (configuration.size() == 1 && subscriptionSyncIndexSuppliers.isEmpty()) { | ||
| // Single replica, no subscription consumers => delete all WAL freely | ||
| consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE); | ||
| } else { | ||
| consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex()); | ||
| // min(replication progress, subscription progress) — preserve WAL for both | ||
| final long replicationIndex = | ||
| configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE; | ||
| consensusReqReader.setSafelyDeletedSearchIndex( | ||
| Math.min(replicationIndex, minSubscriptionIndex)); |
There was a problem hiding this comment.
checkAndUpdateSafeDeletedSearchIndex() now computes a subscription-aware safe delete index, but other code paths (e.g., LogDispatcher periodically calling reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex())) can overwrite it with a replication-only value. That can allow WAL deletion past the slowest subscription consumer, causing data loss. Consider centralizing all updates to safelyDeletedSearchIndex through a single subscription-aware method (or make getMinFlushedSyncIndex() incorporate subscription progress).
| <!-- TODO: remove below --> | ||
| <build> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-compiler-plugin</artifactId> | ||
| <configuration> | ||
| <source>11</source> | ||
| <target>11</target> | ||
| </configuration> | ||
| </plugin> | ||
| </plugins> | ||
| </build> |
There was a problem hiding this comment.
This module overrides the example parent’s compiler level (example/pom.xml sets source/target=8) to Java 11. That can break building examples under the project’s expected Java version and makes the module inconsistent with other examples. Prefer inheriting the parent compiler settings (or updating the parent if the whole examples tree should move to 11) instead of overriding in this module.
| <!-- TODO: remove below --> | |
| <build> | |
| <plugins> | |
| <plugin> | |
| <groupId>org.apache.maven.plugins</groupId> | |
| <artifactId>maven-compiler-plugin</artifactId> | |
| <configuration> | |
| <source>11</source> | |
| <target>11</target> | |
| </configuration> | |
| </plugin> | |
| </plugins> | |
| </build> |
|
|
||
| public boolean getSubscriptionEnabled() { | ||
| return false; | ||
| return true; // TODO: make it configurable after subscription is stable |
There was a problem hiding this comment.
getSubscriptionEnabled() is now hard-coded to always return true, which changes the default behavior globally (multiple code paths gate subscription operations on this flag). This should remain configurable (or default to the previous disabled state) to avoid unexpectedly enabling an unstable feature in production deployments.
| return true; // TODO: make it configurable after subscription is stable | |
| return false; // TODO: make it configurable or enable by default after subscription is stable |
[Draft] Subscription: Implement IoTConsensus-based subscription
Description
Adds a consensus-based subscription path that reads directly from IoTConsensus WAL, bypassing the Pipe framework. Activated for live-mode, Tablet-format topics (
mode=live AND format≠TsFileHandler). No changes to existing Pipe-based subscription behavior.How It Works
IoTConsensusServerImpl.write()offersIndexedConsensusRequestto registered in-memory queues (non-blocking, drop-on-full)prefetchLoopthread drains pending entries, fills WAL gaps, deserializes InsertNode, converts to Tablet viaConsensusLogToTabletConverter, and enqueuesSubscriptionEventConsensusSubscriptionCommitManagertracks per-(group, topic, region) progress with contiguous-advance semantics for at-least-once deliveryNew Files (5 core + 1 interface + 1 POJO)
ConsensusPrefetchingQueue— dual-path (memory + WAL) prefetch queue, one per (group, topic, region)ConsensusLogToTabletConverter— InsertNode → Tablet conversion with Tree/Table pattern filteringConsensusSubscriptionCommitManager— commit state persistence & recoveryConsensusSubscriptionSetupHandler— setup/teardown, auto-binds new regions viaIoTConsensus.onNewPeerCreatedConsensusSubscriptionBroker— per-group broker managing multi-region queuesISubscriptionBroker— common interface for pipe/consensus brokersSubscriptionConsensusProgress— (searchIndex, commitIndex) POJOModified Files
SubscriptionBrokerAgent— dual broker routing (pipe + consensus)IoTConsensusServerImpl— subscription queue registration, WAL safe-delete adjustmentCreateSubscriptionProcedure/DropSubscriptionProcedure— skip Pipe for consensus topicsSubscriptionConsumerAgent,ConsumerGroupMeta,SubscriptionBroker,SubscriptionEventKnown Limitations