diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 2a90c7780..3387ee760 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ final class ActivityWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; + private final AtomicBoolean serverSupportsAutoscaling; public ActivityWorker( @Nonnull WorkflowServiceStubs service, @@ -56,7 +58,8 @@ public ActivityWorker( double taskQueueActivitiesPerSecond, @Nonnull SingleWorkerOptions options, @Nonnull ActivityTaskHandler handler, - @Nonnull SlotSupplier slotSupplier) { + @Nonnull SlotSupplier slotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -72,6 +75,7 @@ public ActivityWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); + this.serverSupportsAutoscaling = serverSupportsAutoscaling; } @Override @@ -106,6 +110,7 @@ public boolean start() { service.getServerCapabilities()), this.pollTaskExecutor, pollerOptions, + serverSupportsAutoscaling.get(), workerMetricsScope); } else { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java index d7a62d3f5..510634379 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java @@ -29,6 +29,7 @@ final class AsyncPoller extends BasePoller { private final List> asyncTaskPollers; private final PollerOptions pollerOptions; private final PollerBehaviorAutoscaling pollerBehavior; + private final boolean serverSupportsAutoscaling; private final Scope workerMetricsScope; private Throttler pollRateThrottler; private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = @@ -42,6 +43,7 @@ final class AsyncPoller extends BasePoller { PollTaskAsync asyncTaskPoller, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, + boolean serverSupportsAutoscaling, Scope workerMetricsScope) { this( slotSupplier, @@ -49,6 +51,7 @@ final class AsyncPoller extends BasePoller { Collections.singletonList(asyncTaskPoller), taskExecutor, pollerOptions, + serverSupportsAutoscaling, workerMetricsScope); } @@ -58,6 +61,7 @@ final class AsyncPoller extends BasePoller { List> asyncTaskPollers, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, + boolean serverSupportsAutoscaling, Scope workerMetricsScope) { super(taskExecutor); Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null"); @@ -78,6 +82,7 @@ final class AsyncPoller extends BasePoller { + " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported."); } this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior(); + this.serverSupportsAutoscaling = serverSupportsAutoscaling; this.pollerOptions = pollerOptions; this.workerMetricsScope = workerMetricsScope; } @@ -109,6 +114,7 @@ public boolean start() { pollerBehavior.getMinConcurrentTaskPollers(), pollerBehavior.getMaxConcurrentTaskPollers(), pollerBehavior.getInitialConcurrentTaskPollers(), + serverSupportsAutoscaling, (newTarget) -> { log.debug( "Updating maximum number of pollers for {} to: {}", diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index e179e12db..f5291cb16 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,7 @@ final class NexusWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; + private final AtomicBoolean serverSupportsAutoscaling; private final boolean forceOldFailureFormat; public NexusWorker( @@ -61,7 +63,8 @@ public NexusWorker( @Nonnull SingleWorkerOptions options, @Nonnull NexusTaskHandler handler, @Nonnull DataConverter dataConverter, - @Nonnull SlotSupplier slotSupplier) { + @Nonnull SlotSupplier slotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -77,6 +80,7 @@ public NexusWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); + this.serverSupportsAutoscaling = serverSupportsAutoscaling; // Allow tests to force old format for backward compatibility testing String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat"); this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat); @@ -112,6 +116,7 @@ public boolean start() { this.slotSupplier), this.pollTaskExecutor, pollerOptions, + serverSupportsAutoscaling.get(), workerMetricsScope); } else { poller = @@ -282,6 +287,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) { "Failure processing nexus response: " + response.getRequest().toString(), failure); } + @SuppressWarnings("deprecation") // Uses deprecated operationError private void handleNexusTask(NexusTask task, Scope metricsScope) { PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse(); ByteString taskToken = pollResponse.getTaskToken(); @@ -374,6 +380,7 @@ private void logExceptionDuringResultReporting( } } + @SuppressWarnings("deprecation") // Uses deprecated setOperationError private Response getResponseForOldServer(Response response) { Response.Builder b = response.toBuilder(); Failure failure = response.getStartOperation().getFailure(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java index 925c60e0b..d56ab9d89 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java @@ -18,6 +18,7 @@ public class PollScaleReportHandle implements Runnable { private final int maxPollerCount; private int targetPollerCount; private final Functions.Proc1 scaleCallback; + private final boolean serverSupportsAutoscaling; private boolean everSawScalingDecision; private int ingestedThisPeriod; private int ingestedLastPeriod; @@ -27,18 +28,20 @@ public PollScaleReportHandle( int minPollerCount, int maxPollerCount, int initialPollerCount, + boolean serverSupportsAutoscaling, Functions.Proc1 scaleCallback) { this.minPollerCount = minPollerCount; this.maxPollerCount = maxPollerCount; this.targetPollerCount = initialPollerCount; + this.serverSupportsAutoscaling = serverSupportsAutoscaling; this.scaleCallback = scaleCallback; } public synchronized void report(T task, Throwable e) { if (e != null) { // We want to avoid scaling down on errors if we have never seen a scaling decision - // since we might never scale up again. - if (!everSawScalingDecision) { + // and the server doesn't support autoscaling - otherwise we might never scale up again. + if (!everSawScalingDecision && !serverSupportsAutoscaling) { return; } if ((e instanceof StatusRuntimeException)) { @@ -68,9 +71,10 @@ public synchronized void report(T task, Throwable e) { updateTarget((t -> t + deltaSuggestion)); } - } else if (task == null && everSawScalingDecision) { + } else if (task == null && (everSawScalingDecision || serverSupportsAutoscaling)) { // We want to avoid scaling down on empty polls if the server has never made any - // scaling decisions - otherwise we might never scale up again. + // scaling decisions and doesn't support autoscaling - otherwise we might never scale + // up again. updateTarget((t) -> t - 1); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index 55ff1a32c..87ddc7c4a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -11,6 +11,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,8 @@ public SyncActivityWorker( String taskQueue, double taskQueueActivitiesPerSecond, SingleWorkerOptions options, - SlotSupplier slotSupplier) { + SlotSupplier slotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -72,7 +75,8 @@ public SyncActivityWorker( taskQueueActivitiesPerSecond, options, taskHandler, - slotSupplier); + slotSupplier, + serverSupportsAutoscaling); } public void registerActivityImplementations(Object... activitiesImplementation) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index bc089644d..23471f86e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -6,6 +6,8 @@ import io.temporal.worker.tuning.SlotSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +25,8 @@ public SyncNexusWorker( String namespace, String taskQueue, SingleWorkerOptions options, - SlotSupplier slotSupplier) { + SlotSupplier slotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -43,7 +46,8 @@ public SyncNexusWorker( options, taskHandler, options.getDataConverter(), - slotSupplier); + slotSupplier, + serverSupportsAutoscaling); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index 37c977572..2b94effb6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -68,7 +69,8 @@ public SyncWorkflowWorker( @Nonnull WorkflowThreadExecutor workflowThreadExecutor, @Nonnull EagerActivityDispatcher eagerActivityDispatcher, @Nonnull SlotSupplier slotSupplier, - @Nonnull SlotSupplier laSlotSupplier) { + @Nonnull SlotSupplier laSlotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.identity = singleWorkerOptions.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -122,7 +124,8 @@ public SyncWorkflowWorker( cache, taskHandler, eagerActivityDispatcher, - slotSupplier); + slotSupplier, + serverSupportsAutoscaling); // Exists to support Worker#replayWorkflowExecution functionality. // This handler has to be non-sticky to avoid evicting actual executions from the cache diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 2193acf59..4986808c4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -54,6 +55,7 @@ final class WorkflowWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final EagerActivityDispatcher eagerActivityDispatcher; private final TrackingSlotSupplier slotSupplier; + private final AtomicBoolean serverSupportsAutoscaling; private PollTaskExecutor pollTaskExecutor; @@ -73,7 +75,8 @@ public WorkflowWorker( @Nonnull WorkflowExecutorCache cache, @Nonnull WorkflowTaskHandler handler, @Nonnull EagerActivityDispatcher eagerActivityDispatcher, - @Nonnull SlotSupplier slotSupplier) { + @Nonnull SlotSupplier slotSupplier, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -88,6 +91,7 @@ public WorkflowWorker( this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities()); this.eagerActivityDispatcher = eagerActivityDispatcher; this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); + this.serverSupportsAutoscaling = serverSupportsAutoscaling; } @Override @@ -154,6 +158,7 @@ public boolean start() { pollers, this.pollTaskExecutor, pollerOptions, + serverSupportsAutoscaling.get(), workerMetricsScope); } else { PollerBehaviorSimpleMaximum pollerBehavior = diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index ed761558b..5ffd300e9 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -69,7 +69,8 @@ public final class Worker { boolean useStickyTaskQueue, WorkflowThreadExecutor workflowThreadExecutor, List contextPropagators, - @Nonnull List plugins) { + @Nonnull List plugins, + @Nonnull AtomicBoolean serverSupportsAutoscaling) { Objects.requireNonNull(client, "client should not be null"); this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); @@ -102,7 +103,8 @@ public final class Worker { taskQueue, this.options.getMaxTaskQueueActivitiesPerSecond(), activityOptions, - activitySlotSupplier); + activitySlotSupplier, + serverSupportsAutoscaling); } EagerActivityDispatcher eagerActivityDispatcher = @@ -120,7 +122,13 @@ public final class Worker { attachMetricsToResourceController(taggedScope, nexusSlotSupplier); nexusWorker = - new SyncNexusWorker(client, namespace, taskQueue, nexusOptions, nexusSlotSupplier); + new SyncNexusWorker( + client, + namespace, + taskQueue, + nexusOptions, + nexusSlotSupplier, + serverSupportsAutoscaling); SingleWorkerOptions singleWorkerOptions = toWorkflowWorkerOptions( @@ -158,7 +166,8 @@ public final class Worker { workflowThreadExecutor, eagerActivityDispatcher, workflowSlotSupplier, - localActivitySlotSupplier); + localActivitySlotSupplier, + serverSupportsAutoscaling); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 16f7305b7..3a22b7351 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -5,6 +5,7 @@ import com.google.common.base.Strings; import com.uber.m3.tally.Scope; import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; +import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.converter.DataConverter; @@ -28,6 +29,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -56,6 +58,9 @@ public final class WorkerFactory { /** Plugins propagated from the client and applied to this factory. */ private final List plugins; + /** Set during start() if the namespace has the poller_autoscaling capability. */ + private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + private State state = State.Initial; private final String statusErrorMessage = @@ -192,7 +197,8 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { true, workflowThreadExecutor, workflowClient.getOptions().getContextPropagators(), - plugins); + plugins, + pollerAutoscaling); workers.put(taskQueue, worker); // Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, @@ -251,13 +257,17 @@ public synchronized void start() { // Workers check and require that Temporal Server is available during start to fail-fast in case // of configuration issues. - workflowClient - .getWorkflowServiceStubs() - .blockingStub() - .describeNamespace( - DescribeNamespaceRequest.newBuilder() - .setNamespace(workflowClient.getOptions().getNamespace()) - .build()); + DescribeNamespaceResponse describeNamespaceResponse = + workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .describeNamespace( + DescribeNamespaceRequest.newBuilder() + .setNamespace(workflowClient.getOptions().getNamespace()) + .build()); + if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { + pollerAutoscaling.set(true); + } // Build plugin execution chain (reverse order for proper nesting) Consumer startChain = WorkerFactory::doStart; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java index e0c2443b5..2ade97762 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java @@ -133,6 +133,7 @@ private AsyncPoller newPoller( pollTask, taskExecutor, options, + false, new NoopScope()); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java index 7edc3f69f..745818ae6 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java @@ -17,7 +17,7 @@ public void handleResourceExhaustedError() { Mockito.when(mockTask.getScalingDecision()).thenReturn(mockDecision); Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(0); PollScaleReportHandle handle = - new PollScaleReportHandle<>(1, 10, 8, mockScaleCallback); + new PollScaleReportHandle<>(1, 10, 8, false, mockScaleCallback); // Simulate RESOURCE_EXHAUSTED error StatusRuntimeException exception = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED); @@ -37,7 +37,7 @@ public void handleGenericError() { Mockito.when(mockTask.getScalingDecision()).thenReturn(mockDecision); Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(0); PollScaleReportHandle handle = - new PollScaleReportHandle<>(1, 10, 5, mockScaleCallback); + new PollScaleReportHandle<>(1, 10, 5, false, mockScaleCallback); // Simulate a generic error handle.report(mockTask, null); @@ -57,7 +57,7 @@ public void applyScalingDecisionDeltaWhenAllowed() { Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(3); PollScaleReportHandle handle = - new PollScaleReportHandle<>(1, 10, 5, mockScaleCallback); + new PollScaleReportHandle<>(1, 10, 5, false, mockScaleCallback); handle.run(); // Enable scale-up // Report a task with a scaling decision @@ -66,4 +66,71 @@ public void applyScalingDecisionDeltaWhenAllowed() { // Verify target poller count is updated and callback is invoked Mockito.verify(mockScaleCallback).apply(8); } + + @Test + public void scaleDownOnEmptyPollWhenServerSupportsAutoscaling() { + Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + PollScaleReportHandle handle = + new PollScaleReportHandle<>(1, 10, 5, true, mockScaleCallback); + + // Empty poll (null task, no error) should scale down when server supports autoscaling, + // even without having seen a scaling decision. + handle.report(null, null); + + Mockito.verify(mockScaleCallback).apply(4); + } + + @Test + public void noScaleDownOnEmptyPollWithoutAutoscalingOrDecision() { + Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + PollScaleReportHandle handle = + new PollScaleReportHandle<>(1, 10, 5, false, mockScaleCallback); + + // Empty poll should NOT scale down when server doesn't support autoscaling + // and we haven't seen a scaling decision. + handle.report(null, null); + + Mockito.verifyNoInteractions(mockScaleCallback); + } + + @Test + public void scaleDownOnErrorWhenServerSupportsAutoscaling() { + Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + PollScaleReportHandle handle = + new PollScaleReportHandle<>(1, 10, 5, true, mockScaleCallback); + + // Error should scale down when server supports autoscaling, + // even without a prior scaling decision. + handle.report(null, new RuntimeException("error")); + + Mockito.verify(mockScaleCallback).apply(4); + } + + @Test + public void noScaleDownOnErrorWithoutAutoscalingOrDecision() { + Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + PollScaleReportHandle handle = + new PollScaleReportHandle<>(1, 10, 5, false, mockScaleCallback); + + // Error should NOT scale down without autoscaling support or prior scaling decision. + handle.report(null, new RuntimeException("error")); + + Mockito.verifyNoInteractions(mockScaleCallback); + } + + @Test + public void scaleDownOnEmptyPollRespectsMinPollerCount() { + Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + // min=3, initial=3, so scale down should be clamped + PollScaleReportHandle handle = + new PollScaleReportHandle<>(3, 10, 3, true, mockScaleCallback); + + // Empty polls should not scale below minimum + for (int i = 0; i < 5; i++) { + handle.report(null, null); + } + + // Should never have been called since target can't go below min + Mockito.verifyNoInteractions(mockScaleCallback); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index 7888f6cf8..cbe194091 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -83,7 +84,8 @@ public void concurrentPollRequestLockTest() throws Exception { cache, taskHandler, eagerActivityDispatcher, - slotSupplier); + slotSupplier, + new AtomicBoolean(false)); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -252,7 +254,8 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { cache, taskHandler, eagerActivityDispatcher, - slotSupplier); + slotSupplier, + new AtomicBoolean(false)); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -394,7 +397,8 @@ public boolean isAnyTypeSupported() { cache, taskHandler, eagerActivityDispatcher, - slotSupplier); + slotSupplier, + new AtomicBoolean(false)); WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 188e309de..db5f59306 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 188e309dee0acb3e3c84363d2d9f11be32df3bb8 +Subproject commit db5f593060ee579460e7f4e06d79b5e4f3af2387 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 7183ade76..e73e6e7a8 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1024,6 +1024,7 @@ private static Failure wrapNexusOperationFailure(Failure cause) { } @Override + @SuppressWarnings("deprecation") // Uses deprecated operationError public void respondNexusTaskCompleted( RespondNexusTaskCompletedRequest request, StreamObserver responseObserver) { diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 26b269199..7db34c346 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -846,6 +846,7 @@ public void testNexusOperationStartToCloseTimeout() { } } + @SuppressWarnings("deprecation") // Uses deprecated operationError @Test public void testNexusOperationError() { Response unsuccessfulResp = @@ -1159,6 +1160,7 @@ public void testNexusOperationCancelRequestAcknowledgeSchedulesWorkflowTask() { } @Test(timeout = 30000) + @SuppressWarnings("deprecation") // Uses deprecated operationError public void testNexusOperationCanceledErrorWithCauseChain() { // Verifies that a canceled OperationError with a JSON-encoded failure chain properly // unwraps the CanceledFailureInfo and preserves the cause.