Skip to content

Subscription: implement IoTConsensus-based subscription#17238

Draft
DanielWang2035 wants to merge 1 commit intoapache:masterfrom
DanielWang2035:consensus-subscription
Draft

Subscription: implement IoTConsensus-based subscription#17238
DanielWang2035 wants to merge 1 commit intoapache:masterfrom
DanielWang2035:consensus-subscription

Conversation

@DanielWang2035
Copy link
Collaborator

[Draft] Subscription: Implement IoTConsensus-based subscription

Description

Adds a consensus-based subscription path that reads directly from IoTConsensus WAL, bypassing the Pipe framework. Activated for live-mode, Tablet-format topics (mode=live AND format≠TsFileHandler). No changes to existing Pipe-based subscription behavior.

How It Works

  1. IoTConsensusServerImpl.write() offers IndexedConsensusRequest to registered in-memory queues (non-blocking, drop-on-full)
  2. Background prefetchLoop thread drains pending entries, fills WAL gaps, deserializes InsertNode, converts to Tablet via ConsensusLogToTabletConverter, and enqueues SubscriptionEvent
  3. Consumer polls events, commits; ConsensusSubscriptionCommitManager tracks per-(group, topic, region) progress with contiguous-advance semantics for at-least-once delivery
  4. WAL retention extended: safe-delete index = min(replication progress, subscription progress)

New Files (5 core + 1 interface + 1 POJO)

  • ConsensusPrefetchingQueue — dual-path (memory + WAL) prefetch queue, one per (group, topic, region)
  • ConsensusLogToTabletConverter — InsertNode → Tablet conversion with Tree/Table pattern filtering
  • ConsensusSubscriptionCommitManager — commit state persistence & recovery
  • ConsensusSubscriptionSetupHandler — setup/teardown, auto-binds new regions via IoTConsensus.onNewPeerCreated
  • ConsensusSubscriptionBroker — per-group broker managing multi-region queues
  • ISubscriptionBroker — common interface for pipe/consensus brokers
  • SubscriptionConsensusProgress — (searchIndex, commitIndex) POJO

Modified Files

  • SubscriptionBrokerAgent — dual broker routing (pipe + consensus)
  • IoTConsensusServerImpl — subscription queue registration, WAL safe-delete adjustment
  • CreateSubscriptionProcedure / DropSubscriptionProcedure — skip Pipe for consensus topics
  • SubscriptionConsumerAgent, ConsumerGroupMeta, SubscriptionBroker, SubscriptionEvent

Known Limitations

  • No time-based filtering (WAL lacks time index)
  • Seek not yet implemented
  • Tests are manual (TODO: migrate to IT framework)

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements a new consensus-based subscription delivery path for live Tablet-format topics by reading directly from IoTConsensus WAL (bypassing the Pipe framework), and routes subscription polling/commit through either the existing Pipe broker or the new consensus broker based on topic mode/format.

Changes:

  • Add consensus subscription components (setup handler, prefetching queue, WAL→Tablet converter, commit/progress tracking, consensus broker).
  • Update subscription agents/brokers to route poll/commit to pipe vs. consensus queues and manage consensus queue lifecycle.
  • Extend IoTConsensus write path + WAL retention logic to feed subscription queues and attempt to pin WAL via subscription progress.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java Adds helpers to detect newly subscribed topics and expose subscribed topic names.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java Flips subscription enabled flag to always-on.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java Exposes nack counter for recycling logic.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/SubscriptionConsensusProgress.java New POJO for persisted consensus subscription progress.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionSetupHandler.java Binds/unbinds consensus queues per topic/region and registers new-region callback.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java New per-(group,topic,region) commit/progress persistence.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java Core hybrid pending-queue + WAL-reader prefetch loop producing SubscriptionEvents.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java Converts InsertNode variants into Tablet payloads with pattern filtering.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java Implements new ISubscriptionBroker interface and adds queue helpers.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ISubscriptionBroker.java New broker abstraction shared by pipe and consensus brokers.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java New broker coordinating multiple region queues per topic.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java Detects newly subscribed topics and triggers consensus setup/teardown.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java Routes poll/commit/prefetch across pipe and consensus brokers.
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java Adds subscription queue registration + WAL safe-delete adjustment + write-path offering.
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java Adds static callback hook for new peer creation to auto-bind subscription queues.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java Skips pipe removal for consensus-based topics.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java Skips pipe creation for consensus-based topics and serializes that topic set.
example/session/src/main/java/org/apache/iotdb/ConsensusSubscriptionTest.java Adds manual (non-IT) tests for tree-model consensus subscription.
example/session/src/main/java/org/apache/iotdb/ConsensusSubscriptionTableTest.java Adds manual (non-IT) tests for table-model consensus subscription.
example/session/pom.xml Overrides compiler level for the session example module.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}
try (final FileInputStream fis = new FileInputStream(file)) {
final byte[] bytes = new byte[(int) file.length()];
fis.read(bytes);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryRecover reads the progress file with a single fis.read(bytes) call, which is not guaranteed to fill the buffer. This can lead to partial reads and corrupted/failed deserialization. Use a readFully loop / DataInputStream.readFully, and validate that the expected number of bytes was read before deserializing.

Suggested change
fis.read(bytes);
int offset = 0;
while (offset < bytes.length) {
final int bytesRead = fis.read(bytes, offset, bytes.length - offset);
if (bytesRead < 0) {
break;
}
offset += bytesRead;
}
if (offset != bytes.length) {
LOGGER.warn(
"Failed to fully read consensus subscription progress from {}: expected {} bytes, read {} bytes",
file,
bytes.length,
offset);
return null;
}

Copilot uses AI. Check for mistakes.
Comment on lines +319 to +320
final long startSearchIndex = serverImpl.getSearchIndex() + 1;

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startSearchIndex is always initialized as serverImpl.getSearchIndex() + 1, which ignores any persisted subscription progress in ConsensusSubscriptionCommitManager. After a DataNode restart (or queue rebind), this will skip unconsumed WAL entries and break the at-least-once / recovery semantics described in the PR. The start index should be derived from the recovered committed search index when state exists (e.g., max(committed+1, subscribeStart) depending on desired semantics).

Suggested change
final long startSearchIndex = serverImpl.getSearchIndex() + 1;
long startSearchIndex = serverImpl.getSearchIndex() + 1;
// If we have recovered a committed search index for this group, advance the
// startSearchIndex to at least committed + 1 to honor at-least-once semantics.
if (commitManager != null) {
try {
final long committedSearchIndex = commitManager.getCommittedSearchIndex(groupId);
if (committedSearchIndex >= 0) {
startSearchIndex = Math.max(startSearchIndex, committedSearchIndex + 1);
}
} catch (Exception e) {
LOGGER.warn(
"Failed to load committed search index for consensus group {} when binding "
+ "subscription prefetching queue, fallback to startSearchIndex={}",
groupId,
startSearchIndex,
e);
}
}

Copilot uses AI. Check for mistakes.
consumerGroupId,
(id, broker) -> {
if (Objects.isNull(broker)) {
dropped.set(true);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropBroker sets dropped=true when the pipe broker entry is already null. This causes dropBroker to return true even when nothing was removed, and can also mask failure to drop a non-empty consensus broker (pipe broker null => dropped=true, consensus broker kept). Only mark dropped true when an existing broker entry is actually removed.

Suggested change
dropped.set(true);
// No existing broker to drop

Copilot uses AI. Check for mistakes.
eventsToPoll.add(event);
totalSize += currentSize;

if (totalSize + currentSize > maxBytes) {
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxBytes limiting logic is incorrect: the event is added and totalSize is incremented, then the code checks if (totalSize + currentSize > maxBytes) which double-counts currentSize and can still return a batch exceeding maxBytes. Check the size before adding the event (or adjust the condition) and decide whether to stop/push back the event when it would exceed the limit.

Suggested change
if (totalSize + currentSize > maxBytes) {
if (totalSize > maxBytes) {

Copilot uses AI. Check for mistakes.
Comment on lines +978 to +993
// Compute the minimum search index that subscription consumers still need.
// WAL entries at or after this index must be preserved.
long minSubscriptionIndex = Long.MAX_VALUE;
for (final LongSupplier supplier : subscriptionSyncIndexSuppliers) {
minSubscriptionIndex = Math.min(minSubscriptionIndex, supplier.getAsLong());
}

if (configuration.size() == 1 && subscriptionSyncIndexSuppliers.isEmpty()) {
// Single replica, no subscription consumers => delete all WAL freely
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
// min(replication progress, subscription progress) — preserve WAL for both
final long replicationIndex =
configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE;
consensusReqReader.setSafelyDeletedSearchIndex(
Math.min(replicationIndex, minSubscriptionIndex));
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAndUpdateSafeDeletedSearchIndex() now computes a subscription-aware safe delete index, but other code paths (e.g., LogDispatcher periodically calling reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex())) can overwrite it with a replication-only value. That can allow WAL deletion past the slowest subscription consumer, causing data loss. Consider centralizing all updates to safelyDeletedSearchIndex through a single subscription-aware method (or make getMinFlushedSyncIndex() incorporate subscription progress).

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +55
<!-- TODO: remove below -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module overrides the example parent’s compiler level (example/pom.xml sets source/target=8) to Java 11. That can break building examples under the project’s expected Java version and makes the module inconsistent with other examples. Prefer inheriting the parent compiler settings (or updating the parent if the whole examples tree should move to 11) instead of overriding in this module.

Suggested change
<!-- TODO: remove below -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

Copilot uses AI. Check for mistakes.

public boolean getSubscriptionEnabled() {
return false;
return true; // TODO: make it configurable after subscription is stable
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSubscriptionEnabled() is now hard-coded to always return true, which changes the default behavior globally (multiple code paths gate subscription operations on this flag). This should remain configurable (or default to the previous disabled state) to avoid unexpectedly enabling an unstable feature in production deployments.

Suggested change
return true; // TODO: make it configurable after subscription is stable
return false; // TODO: make it configurable or enable by default after subscription is stable

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants