From 82235fc574b8e084e67ede5bc198b523697d2069 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Mar 2026 21:28:20 -0700 Subject: [PATCH 1/3] Add upgrade-on-CAN support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add isTargetWorkerDeploymentVersionChanged() to WorkflowInfo, plumbed through WorkflowTaskStateMachine → WorkflowStateMachines → ReplayWorkflowContext → WorkflowInfoImpl → WorkflowInfo - Add initialVersioningBehavior on ContinueAsNewOptions, allowing workflows to CAN with AUTO_UPGRADE to move to the new version - Wire initialVersioningBehavior encoding in SyncWorkflowContext - Update API proto submodule to include api#709 and #721 changes - Fix NexusWorker deprecation warnings from proto update Co-Authored-By: Claude Opus 4.6 (1M context) --- .../common/InitialVersioningBehavior.java | 15 ++ .../common/SuggestContinueAsNewReason.java | 15 ++ .../replay/ReplayWorkflowContext.java | 13 ++ .../replay/ReplayWorkflowContextImpl.java | 11 ++ .../statemachines/WorkflowStateMachines.java | 20 ++- .../WorkflowTaskStateMachine.java | 47 +++++- .../internal/sync/SyncWorkflowContext.java | 9 ++ .../internal/sync/WorkflowInfoImpl.java | 12 ++ .../temporal/internal/worker/NexusWorker.java | 3 + .../workflow/ContinueAsNewOptions.java | 33 +++- .../io/temporal/workflow/WorkflowInfo.java | 18 +++ .../temporal/worker/WorkerVersioningTest.java | 149 ++++++++++++++++++ temporal-serviceclient/src/main/proto | 2 +- .../testservice/TestWorkflowService.java | 1 + .../sync/DummySyncWorkflowContext.java | 11 ++ 15 files changed, 353 insertions(+), 6 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java diff --git a/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java b/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java new file mode 100644 index 0000000000..70a2a975d5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java @@ -0,0 +1,15 @@ +package io.temporal.common; + +/** + * Specifies the versioning behavior for the first task of a new workflow run started via + * continue-as-new. + */ +@Experimental +public enum InitialVersioningBehavior { + /** + * Start the new run with {@link VersioningBehavior#AUTO_UPGRADE} behavior for the first task, + * upgrading to the latest version. After the first workflow task completes, the workflow uses + * whatever versioning behavior is specified in the workflow code. + */ + AUTO_UPGRADE +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java b/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java new file mode 100644 index 0000000000..3b1709a204 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/SuggestContinueAsNewReason.java @@ -0,0 +1,15 @@ +package io.temporal.common; + +/** + * Reason(s) why the server suggests a workflow should continue-as-new. Multiple reasons can be true + * at the same time. + */ +@Experimental +public enum SuggestContinueAsNewReason { + /** Workflow history size is getting too large. */ + HISTORY_SIZE_TOO_LARGE, + /** Workflow history has too many events. */ + TOO_MANY_HISTORY_EVENTS, + /** Workflow's count of completed plus in-flight updates is too large. */ + TOO_MANY_UPDATES +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 19a488e775..982737dbee 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -8,12 +8,14 @@ import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.internal.common.SdkFlag; import io.temporal.internal.statemachines.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.Functions.Func1; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -357,6 +359,17 @@ Integer getVersion( */ boolean isContinueAsNewSuggested(); + /** + * @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This + * value changes during the lifetime of a Workflow Execution. + */ + List getSuggestContinueAsNewReasons(); + + /** + * @return true if the target worker deployment version has changed for this workflow. + */ + boolean isTargetWorkerDeploymentVersionChanged(); + /** * @return true if cancellation of the workflow is requested. */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index dd1844a316..cf704544bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -10,6 +10,7 @@ import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -416,6 +417,16 @@ public boolean isContinueAsNewSuggested() { return workflowStateMachines.isContinueAsNewSuggested(); } + @Override + public List getSuggestContinueAsNewReasons() { + return workflowStateMachines.getSuggestContinueAsNewReasons(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return workflowStateMachines.isTargetWorkerDeploymentVersionChanged(); + } + /* * MUTABLE STATE OPERATIONS */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 2f2c716f24..a39b7920b6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -20,6 +20,7 @@ import io.temporal.api.protocol.v1.Message; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.*; import io.temporal.internal.history.LocalActivityMarkerUtils; @@ -88,6 +89,10 @@ enum HandleEventStatus { private boolean isContinueAsNewSuggested; + private List suggestContinueAsNewReasons = Collections.emptyList(); + + private boolean isTargetWorkerDeploymentVersionChanged; + /** * EventId of the last event seen by these state machines. Events earlier than this one will be * discarded. @@ -276,6 +281,14 @@ public boolean isContinueAsNewSuggested() { return isContinueAsNewSuggested; } + public List getSuggestContinueAsNewReasons() { + return suggestContinueAsNewReasons; + } + + public boolean isTargetWorkerDeploymentVersionChanged() { + return isTargetWorkerDeploymentVersionChanged; + } + public void setReplaying(boolean replaying) { this.replaying = replaying; } @@ -1493,7 +1506,9 @@ public void workflowTaskStarted( long currentTimeMillis, boolean nonProcessedWorkflowTask, long historySize, - boolean isContinueAsNewSuggested) { + boolean isContinueAsNewSuggested, + List suggestContinueAsNewReasons, + boolean isTargetWorkerDeploymentVersionChanged) { setCurrentTimeMillis(currentTimeMillis); for (CancellableCommand cancellableCommand : commands) { cancellableCommand.handleWorkflowTaskStarted(); @@ -1509,6 +1524,9 @@ public void workflowTaskStarted( WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId; WorkflowStateMachines.this.historySize = historySize; WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested; + WorkflowStateMachines.this.suggestContinueAsNewReasons = suggestContinueAsNewReasons; + WorkflowStateMachines.this.isTargetWorkerDeploymentVersionChanged = + isTargetWorkerDeploymentVersionChanged; eventLoop(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java index f7f1ae88c8..6be5e24320 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java @@ -4,6 +4,10 @@ import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes; +import io.temporal.common.SuggestContinueAsNewReason; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; final class WorkflowTaskStateMachine @@ -32,7 +36,9 @@ void workflowTaskStarted( long currentTimeMillis, boolean nonProcessedWorkflowTask, long historySize, - boolean isContinueAsNewSuggested); + boolean isContinueAsNewSuggested, + List suggestContinueAsNewReasons, + boolean isTargetWorkerDeploymentVersionChanged); void updateRunId(String currentRunId); } @@ -46,6 +52,8 @@ void workflowTaskStarted( private long startedEventId; private long historySize; private boolean isContinueAsNewSuggested; + private List suggestContinueAsNewReasons = Collections.emptyList(); + private boolean isTargetWorkerDeploymentVersionChanged; public static WorkflowTaskStateMachine newInstance( long workflowTaskStartedEventId, Listener listener) { @@ -103,6 +111,15 @@ private void handleStarted() { historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes(); isContinueAsNewSuggested = currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew(); + suggestContinueAsNewReasons = + convertSuggestContinueAsNewReasons( + currentEvent + .getWorkflowTaskStartedEventAttributes() + .getSuggestContinueAsNewReasonsList()); + isTargetWorkerDeploymentVersionChanged = + currentEvent + .getWorkflowTaskStartedEventAttributes() + .getTargetWorkerDeploymentVersionChanged(); // The last started event in the history. So no completed is expected. if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) { @@ -121,7 +138,33 @@ private void handleCompleted() { eventTimeOfTheLastWorkflowStartTask, lastTaskInHistory, historySize, - isContinueAsNewSuggested); + isContinueAsNewSuggested, + suggestContinueAsNewReasons, + isTargetWorkerDeploymentVersionChanged); + } + + private static List convertSuggestContinueAsNewReasons( + List protoReasons) { + if (protoReasons.isEmpty()) { + return Collections.emptyList(); + } + List reasons = new ArrayList<>(protoReasons.size()); + for (io.temporal.api.enums.v1.SuggestContinueAsNewReason proto : protoReasons) { + switch (proto) { + case SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE: + reasons.add(SuggestContinueAsNewReason.HISTORY_SIZE_TOO_LARGE); + break; + case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS: + reasons.add(SuggestContinueAsNewReason.TOO_MANY_HISTORY_EVENTS); + break; + case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES: + reasons.add(SuggestContinueAsNewReason.TOO_MANY_UPDATES); + break; + default: + break; + } + } + return Collections.unmodifiableList(reasons); } private void handleFailed() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 7a956dc4b7..1c90397e04 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1415,6 +1415,15 @@ public void continueAsNew(ContinueAsNewInput input) { .determineUseCompatibleFlag( replayContext.getTaskQueue().equals(options.getTaskQueue()))); } + if (options.getInitialVersioningBehavior() != null) { + switch (options.getInitialVersioningBehavior()) { + case AUTO_UPGRADE: + attributes.setInitialVersioningBehavior( + io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior + .CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE); + break; + } + } } if (options == null && replayContext.getRetryOptions() != null) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java index 469cd6a13b..b30f13d458 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java @@ -4,10 +4,12 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.Priority; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.internal.common.ProtoConverters; import io.temporal.internal.replay.ReplayWorkflowContext; import io.temporal.workflow.WorkflowInfo; import java.time.Duration; +import java.util.List; import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -147,6 +149,16 @@ public boolean isContinueAsNewSuggested() { return context.isContinueAsNewSuggested(); } + @Override + public List getSuggestContinueAsNewReasons() { + return context.getSuggestContinueAsNewReasons(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return context.isTargetWorkerDeploymentVersionChanged(); + } + @Override public Optional getCurrentBuildId() { return context.getCurrentBuildId(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index e179e12db9..0c9238e64a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -282,6 +282,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) { "Failure processing nexus response: " + response.getRequest().toString(), failure); } + @SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat private void handleNexusTask(NexusTask task, Scope metricsScope) { PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse(); ByteString taskToken = pollResponse.getTaskToken(); @@ -374,6 +375,8 @@ private void logExceptionDuringResultReporting( } } + @SuppressWarnings( + "deprecation") // Uses setOperationError() for backward compat with old servers private Response getResponseForOldServer(Response response) { Response.Builder b = response.toBuilder(); Failure failure = response.getStartOperation().getFailure(); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java index d30850045f..ab9e2b7c58 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java @@ -45,6 +45,8 @@ public static final class Builder { @SuppressWarnings("deprecation") private VersioningIntent versioningIntent; + private InitialVersioningBehavior initialVersioningBehavior; + private Builder() {} private Builder(ContinueAsNewOptions options) { @@ -60,6 +62,7 @@ private Builder(ContinueAsNewOptions options) { this.typedSearchAttributes = options.getTypedSearchAttributes(); this.contextPropagators = options.getContextPropagators(); this.versioningIntent = options.versioningIntent; + this.initialVersioningBehavior = options.initialVersioningBehavior; } public Builder setWorkflowRunTimeout(Duration workflowRunTimeout) { @@ -131,6 +134,18 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) { return this; } + /** + * Specifies the versioning behavior for the first task of the new workflow run. For example, + * set to AUTO_UPGRADE to upgrade to the latest version on continue-as-new instead of inheriting + * the pinned version from the previous run. + */ + @Experimental + public Builder setInitialVersioningBehavior( + InitialVersioningBehavior initialVersioningBehavior) { + this.initialVersioningBehavior = initialVersioningBehavior; + return this; + } + public ContinueAsNewOptions build() { return new ContinueAsNewOptions( workflowRunTimeout, @@ -141,7 +156,8 @@ public ContinueAsNewOptions build() { searchAttributes, typedSearchAttributes, contextPropagators, - versioningIntent); + versioningIntent, + initialVersioningBehavior); } } @@ -157,6 +173,8 @@ public ContinueAsNewOptions build() { @SuppressWarnings("deprecation") private final @Nullable VersioningIntent versioningIntent; + private final @Nullable InitialVersioningBehavior initialVersioningBehavior; + public ContinueAsNewOptions( @Nullable Duration workflowRunTimeout, @Nullable String taskQueue, @@ -166,7 +184,8 @@ public ContinueAsNewOptions( @Nullable Map searchAttributes, @Nullable SearchAttributes typedSearchAttributes, @Nullable List contextPropagators, - @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent) { + @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent, + @Nullable InitialVersioningBehavior initialVersioningBehavior) { this.workflowRunTimeout = workflowRunTimeout; this.taskQueue = taskQueue; this.retryOptions = retryOptions; @@ -176,6 +195,7 @@ public ContinueAsNewOptions( this.typedSearchAttributes = typedSearchAttributes; this.contextPropagators = contextPropagators; this.versioningIntent = versioningIntent; + this.initialVersioningBehavior = initialVersioningBehavior; } public @Nullable Duration getWorkflowRunTimeout() { @@ -223,4 +243,13 @@ public RetryOptions getRetryOptions() { public @Nullable VersioningIntent getVersioningIntent() { return versioningIntent; } + + /** + * @return the initial versioning behavior for the first task of the new workflow run, or null if + * unset. + */ + @Experimental + public @Nullable InitialVersioningBehavior getInitialVersioningBehavior() { + return initialVersioningBehavior; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java index 04a997a53a..e49929e18d 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java @@ -4,7 +4,9 @@ import io.temporal.common.Experimental; import io.temporal.common.Priority; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import java.time.Duration; +import java.util.List; import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -161,6 +163,22 @@ public interface WorkflowInfo { */ boolean isContinueAsNewSuggested(); + /** + * @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This + * value changes during the lifetime of a Workflow Execution. + */ + @Experimental + List getSuggestContinueAsNewReasons(); + + /** + * @return true if the target worker deployment version has changed for this workflow since the + * last workflow task. This is only relevant for workflows using the PINNED versioning + * behavior. When true, the workflow may want to continue-as-new with {@link + * ContinueAsNewOptions.Builder#setInitialVersioningBehavior} set to AUTO_UPGRADE. + */ + @Experimental + boolean isTargetWorkerDeploymentVersionChanged(); + /** * @return The Build ID of the worker which executed the current Workflow Task. May be empty the * task was completed by a worker without a Build ID. If this worker is the one executing this diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java index a51f0fe1b2..4f27380a0a 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java @@ -10,6 +10,7 @@ import io.temporal.api.workflowservice.v1.*; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowStub; +import io.temporal.common.InitialVersioningBehavior; import io.temporal.common.VersioningBehavior; import io.temporal.common.VersioningOverride; import io.temporal.common.WorkerDeploymentVersion; @@ -431,6 +432,94 @@ public void testWorkflowsCanUseVersioningOverride() { == PINNED_OVERRIDE_BEHAVIOR_PINNED)); } + public static class TestWorkerVersioningCanV1 implements TestWorkflows.QueryableWorkflow { + @Override + @WorkflowVersioningBehavior(VersioningBehavior.PINNED) + public String execute() { + while (!Workflow.getInfo().isTargetWorkerDeploymentVersionChanged()) { + Workflow.sleep(java.time.Duration.ofMillis(10)); + } + ContinueAsNewOptions options = + ContinueAsNewOptions.newBuilder() + .setInitialVersioningBehavior(InitialVersioningBehavior.AUTO_UPGRADE) + .build(); + TestWorkflows.QueryableWorkflow next = + Workflow.newContinueAsNewStub(TestWorkflows.QueryableWorkflow.class, options); + next.execute(); + throw new RuntimeException("unreachable"); + } + + @Override + public void mySignal(String arg) {} + + @Override + public String getState() { + return "v1-can"; + } + } + + public static class TestWorkerVersioningCanV2 implements TestWorkflows.QueryableWorkflow { + @Override + @WorkflowVersioningBehavior(VersioningBehavior.PINNED) + public String execute() { + return "v2.0"; + } + + @Override + public void mySignal(String arg) {} + + @Override + public String getState() { + return "v2-can"; + } + } + + @Test + public void testContinueAsNewWithVersionUpgrade() { + assumeTrue("Test Server doesn't support versioning", SDKTestWorkflowRule.useExternalService); + + WorkerDeploymentVersion v1 = + new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "1.0"); + WorkerDeploymentVersion v2 = + new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "2.0"); + + Worker w1 = testWorkflowRule.newWorkerWithBuildID("1.0"); + w1.registerWorkflowImplementationTypes(TestWorkerVersioningCanV1.class); + w1.start(); + + Worker w2 = testWorkflowRule.newWorkerWithBuildID("2.0"); + w2.registerWorkflowImplementationTypes(TestWorkerVersioningCanV2.class); + w2.start(); + + // Set v1 as current + DescribeWorkerDeploymentResponse d1 = waitUntilWorkerDeploymentVisible(v1); + setCurrentVersion(v1, d1.getConflictToken()); + waitForRoutingConfigPropagation(v1); + + // Start workflow on v1 + TestWorkflows.QueryableWorkflow wf = + testWorkflowRule.newWorkflowStubTimeoutOptions( + TestWorkflows.QueryableWorkflow.class, "can-version-upgrade"); + WorkflowExecution we = WorkflowClient.start(wf::execute); + + // Verify workflow is running on v1 + Assert.assertEquals("v1-can", wf.getState()); + waitForWorkflowRunningOnVersion(we.getWorkflowId(), "1.0"); + + // Set v2 as current — triggers targetWorkerDeploymentVersionChanged + DescribeWorkerDeploymentResponse d2 = waitUntilWorkerDeploymentVisible(v2); + setCurrentVersion(v2, d2.getConflictToken()); + waitForRoutingConfigPropagation(v2); + + // V1 workflow should detect version change, CAN with AUTO_UPGRADE, v2 returns "v2.0" + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(we.getWorkflowId()) + .getResult(String.class); + Assert.assertEquals("v2.0", result); + } + @SuppressWarnings("deprecation") private DescribeWorkerDeploymentResponse waitUntilWorkerDeploymentVisible( WorkerDeploymentVersion v) { @@ -499,4 +588,64 @@ private SetWorkerDeploymentRampingVersionResponse setRampingVersion( .setPercentage(percent) .build()); } + + @SuppressWarnings("deprecation") + private void waitForRoutingConfigPropagation(WorkerDeploymentVersion v) { + Eventually.assertEventually( + Duration.ofSeconds(15), + () -> { + DescribeWorkerDeploymentResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkerDeployment( + DescribeWorkerDeploymentRequest.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setDeploymentName(v.getDeploymentName()) + .build()); + Assert.assertEquals( + v.getBuildId(), + resp.getWorkerDeploymentInfo() + .getRoutingConfig() + .getCurrentDeploymentVersion() + .getBuildId()); + // Check routing config update is not in progress + int state = resp.getWorkerDeploymentInfo().getRoutingConfigUpdateStateValue(); + Assert.assertNotEquals( + io.temporal.api.enums.v1.RoutingConfigUpdateState + .ROUTING_CONFIG_UPDATE_STATE_IN_PROGRESS_VALUE, + state); + }); + } + + private void waitForWorkflowRunningOnVersion(String workflowId, String expectedBuildId) { + Eventually.assertEventually( + Duration.ofSeconds(15), + () -> { + io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest + .newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setExecution( + io.temporal.api.common.v1.WorkflowExecution.newBuilder() + .setWorkflowId(workflowId) + .build()) + .build()); + Assert.assertEquals( + io.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING, + resp.getWorkflowExecutionInfo().getStatus()); + Assert.assertEquals( + expectedBuildId, + resp.getWorkflowExecutionInfo() + .getVersioningInfo() + .getDeploymentVersion() + .getBuildId()); + }); + } } diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 188e309dee..db5f593060 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 188e309dee0acb3e3c84363d2d9f11be32df3bb8 +Subproject commit db5f593060ee579460e7f4e06d79b5e4f3af2387 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 7183ade768..c95a7dd465 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1023,6 +1023,7 @@ private static Failure wrapNexusOperationFailure(Failure cause) { .build(); } + @SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat @Override public void respondNexusTaskCompleted( RespondNexusTaskCompletedRequest request, diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 9f7aa44b6d..f89e61c64b 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -8,6 +8,7 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; +import io.temporal.common.SuggestContinueAsNewReason; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.SdkFlag; @@ -379,5 +380,15 @@ public long getHistorySize() { public boolean isContinueAsNewSuggested() { return false; } + + @Override + public List getSuggestContinueAsNewReasons() { + return Collections.emptyList(); + } + + @Override + public boolean isTargetWorkerDeploymentVersionChanged() { + return false; + } } } From 6705aa58f982d80a03b20d464b51de3782b5bfa5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 18 Mar 2026 12:24:16 -0700 Subject: [PATCH 2/3] Replace QueryableWorkflow with dedicated CAN workflow interface in versioning test The CAN test workflows no longer inherit unnecessary mySignal/getState methods from QueryableWorkflow. Instead they implement a dedicated ContinueAsNewVersionUpgradeWorkflow interface with execute(int attempt), matching the pattern used by other SDKs (Python/Go/TS/.NET). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../temporal/worker/WorkerVersioningTest.java | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java index 4f27380a0a..305d603352 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java @@ -432,10 +432,19 @@ public void testWorkflowsCanUseVersioningOverride() { == PINNED_OVERRIDE_BEHAVIOR_PINNED)); } - public static class TestWorkerVersioningCanV1 implements TestWorkflows.QueryableWorkflow { + @WorkflowInterface + public interface ContinueAsNewVersionUpgradeWorkflow { + @WorkflowMethod + String execute(int attempt); + } + + public static class TestWorkerVersioningCanV1 implements ContinueAsNewVersionUpgradeWorkflow { @Override @WorkflowVersioningBehavior(VersioningBehavior.PINNED) - public String execute() { + public String execute(int attempt) { + if (attempt > 0) { + return "v1.0"; + } while (!Workflow.getInfo().isTargetWorkerDeploymentVersionChanged()) { Workflow.sleep(java.time.Duration.ofMillis(10)); } @@ -443,35 +452,19 @@ public String execute() { ContinueAsNewOptions.newBuilder() .setInitialVersioningBehavior(InitialVersioningBehavior.AUTO_UPGRADE) .build(); - TestWorkflows.QueryableWorkflow next = - Workflow.newContinueAsNewStub(TestWorkflows.QueryableWorkflow.class, options); - next.execute(); + ContinueAsNewVersionUpgradeWorkflow next = + Workflow.newContinueAsNewStub(ContinueAsNewVersionUpgradeWorkflow.class, options); + next.execute(attempt + 1); throw new RuntimeException("unreachable"); } - - @Override - public void mySignal(String arg) {} - - @Override - public String getState() { - return "v1-can"; - } } - public static class TestWorkerVersioningCanV2 implements TestWorkflows.QueryableWorkflow { + public static class TestWorkerVersioningCanV2 implements ContinueAsNewVersionUpgradeWorkflow { @Override @WorkflowVersioningBehavior(VersioningBehavior.PINNED) - public String execute() { + public String execute(int attempt) { return "v2.0"; } - - @Override - public void mySignal(String arg) {} - - @Override - public String getState() { - return "v2-can"; - } } @Test @@ -497,13 +490,12 @@ public void testContinueAsNewWithVersionUpgrade() { waitForRoutingConfigPropagation(v1); // Start workflow on v1 - TestWorkflows.QueryableWorkflow wf = + ContinueAsNewVersionUpgradeWorkflow wf = testWorkflowRule.newWorkflowStubTimeoutOptions( - TestWorkflows.QueryableWorkflow.class, "can-version-upgrade"); - WorkflowExecution we = WorkflowClient.start(wf::execute); + ContinueAsNewVersionUpgradeWorkflow.class, "can-version-upgrade"); + WorkflowExecution we = WorkflowClient.start(wf::execute, 0); // Verify workflow is running on v1 - Assert.assertEquals("v1-can", wf.getState()); waitForWorkflowRunningOnVersion(we.getWorkflowId(), "1.0"); // Set v2 as current — triggers targetWorkerDeploymentVersionChanged From 53643b5d391a7ce4dcc9f4d78baae2f65d9eb570 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 18 Mar 2026 13:56:30 -0700 Subject: [PATCH 3/3] Suppress deprecation warnings in NexusWorkflowTest to fix CI Add @SuppressWarnings("deprecation") to test methods calling the deprecated setOperationError() API, preventing -Werror from failing the build. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../io/temporal/testserver/functional/NexusWorkflowTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 26b2691990..e913a38227 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -846,6 +846,7 @@ public void testNexusOperationStartToCloseTimeout() { } } + @SuppressWarnings("deprecation") @Test public void testNexusOperationError() { Response unsuccessfulResp = @@ -1158,6 +1159,7 @@ public void testNexusOperationCancelRequestAcknowledgeSchedulesWorkflowTask() { } } + @SuppressWarnings("deprecation") @Test(timeout = 30000) public void testNexusOperationCanceledErrorWithCauseChain() { // Verifies that a canceled OperationError with a JSON-encoded failure chain properly