[AKKA-31844] dynamic scaling of ShardedDaemonProcess instances#2755
[AKKA-31844] dynamic scaling of ShardedDaemonProcess instances#2755pjfanning merged 10 commits intoapache:mainfrom
Conversation
|
We can have |
Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com>
886d7b9 to
650a39f
Compare
There was a problem hiding this comment.
Pull request overview
Ports Akka’s dynamic scaling workflow for ShardedDaemonProcess into Apache Pekko, enabling runtime changes to the number of daemon process entities and coordinating shard stop/start during rescaling.
Changes:
- Adds classic sharding coordinator support for explicitly stopping a set of shards (
StopShards) and forwards coordinator commands viaShardRegion. - Introduces typed sharding rescaling control API (
ShardedDaemonProcessCommand) and implements a singleton-based rescaling coordinator backed by Distributed Data state. - Updates typed sharding serialization bindings and protobuf definitions to cover the new rescaling messages/state.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java | Regenerated protobuf Java to include usedTimestamp fields in relevant messages. |
| cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala | Forwards CoordinatorCommand messages from region to coordinator. |
| cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala | Adds StopShards command and tracking for shard-stop acknowledgements. |
| cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala | Updates tests to validate coordinator-based keepalive behavior and ID revision scheme. |
| cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessStateSpec.scala | Adds unit tests for merge behavior of the new ddata state. |
| cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessIdSpec.scala | Adds unit tests for the new entity ID encoding/decoding scheme. |
| cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java | Expands Java compile-only coverage for new factories and control API return type. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ShardedDaemonProcess.scala | Adds initWithContext Scala APIs returning a rescaling control ActorRef. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala | Adds initWithContext Java APIs returning a rescaling control ActorRef. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardingSerializer.scala | Extends typed sharding serializer to cover rescaling messages and state. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala | Adds ddata-backed state model and helper behavior for revision verification. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala | Refactors implementation to use singleton coordinator rather than per-node keepalive pinger. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessId.scala | Adds revision-aware ID encoding/decoding and shard-id extraction. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala | Implements singleton coordinator that runs keepalive + rescaling workflow. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingTypedSerializable.scala | Adds internal marker trait for typed sharding serialization binding. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardingMessageExtractor.scala | Makes ShardingEnvelope extend the new serialization marker trait. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessSettings.scala | Adds public ShardedDaemonProcessContext for behavior factories. |
| cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/ShardedDaemonProcessCommand.scala | Introduces public rescaling/query commands and reply API. |
| cluster-sharding-typed/src/main/resources/reference.conf | Switches serializer binding from ShardingEnvelope to the marker trait to cover new messages. |
| cluster-sharding-typed/src/main/protobuf/ShardingMessages.proto | Adds protobuf messages required for rescaling serialization. |
| cluster-sharding-typed/src/main/mima-filters/2.0.x.backwards.excludes/dynamic-scaling.excludes | Adds MiMa filters for new API surface and internal refactor impacts. |
| cluster-sharding-typed/src/main/java/org/apache/pekko/cluster/sharding/typed/internal/protobuf/ShardingMessages.java | Regenerated protobuf Java for new typed sharding rescaling messages. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| shard, | ||
| waitingForShardsToStop.getOrElse(shard, Set.empty) + ((sender(), requestId))) | ||
| } | ||
| shutdownShards(self, shards.filter(state.shards.contains)) |
There was a problem hiding this comment.
StopShards currently calls shutdownShards(self, ...), which makes the rebalance worker send HandOff to the coordinator itself instead of to the region that actually hosts the shard. This will prevent shards from stopping and the caller will never receive the expected ShardStopped acks. Consider looking up each shard’s current region from state.shards and using that as the from/shuttingDownRegion when initiating the handoff.
| shutdownShards(self, shards.filter(state.shards.contains)) | |
| // Initiate handoff from the actual hosting regions instead of from the coordinator itself | |
| val shardsWithRegion: Iterable[(ActorRef, ShardId)] = | |
| shards.flatMap { shardId => | |
| state.shards.get(shardId).map(regionRef => (regionRef, shardId)) | |
| } | |
| shardsWithRegion | |
| .groupBy(_._1) | |
| .foreach { | |
| case (regionRef, regionShards) => | |
| shutdownShards(regionRef, regionShards.map(_._2)) | |
| } |
Ports Akka PR 31844 — dynamic scaling of
ShardedDaemonProcessinstances — to the Apache Pekko codebase.part of #2730
Changes Made
cluster-sharding (classic)
StopShardstoShardCoordinator.Internalwith supporting state (waitingForShardsToStop) and handler in theactivereceive methodCoordinatorCommanddelivery toShardRegionvia a newdeliverCoordinatorCommandmethod that forwards commands to the coordinatorcluster-sharding-typed (new files)
ClusterShardingTypedSerializable— internal marker trait for typed cluster sharding serializable messagesShardedDaemonProcessCommand— public API trait withChangeNumberOfProcesses,GetNumberOfProcesses, andNumberOfProcessesShardedDaemonProcessId— internal entity ID encoding/decoding with revision support for rolling upgradesShardedDaemonProcessState— distributed data (ddata) state using a customReplicatedDatatypeShardedDaemonProcessCoordinator— singleton coordinator actor implementing the rescaling workflow (stop pinging → write ddata → stop shards → mark complete → resume pinging)cluster-sharding-typed (modified files)
ShardedDaemonProcessSettings— addedShardedDaemonProcessContexttraitShardingMessageExtractor—ShardingEnvelopenow extendsClusterShardingTypedSerializablescaladsl/ShardedDaemonProcess— addedinitWithContextmethods returningActorRef[ShardedDaemonProcessCommand]javadsl/ShardedDaemonProcess— addedinitWithContextmethods returningActorRef[ShardedDaemonProcessCommand]ShardedDaemonProcessImpl— refactored to use the newShardedDaemonProcessCoordinatorsingleton instead of the oldKeepAlivePingerShardingSerializer— added serialization forShardedDaemonProcessState,ChangeNumberOfProcesses,GetNumberOfProcesses,GetNumberOfProcessesReplyreference.conf— changed serialization binding fromShardingEnvelopetoClusterShardingTypedSerializableStill Remaining
ShardingMessages.proto— needs new proto message definitions (requires protoc to regenerate Java)ShardingMessages.java— needs regeneration from updated protoShardedDaemonProcessSpec, addShardedDaemonProcessIdSpec,ShardedDaemonProcessStateSpec,ShardedDaemonProcessRescaleSpec