Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +49,7 @@ final class ActivityWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final AtomicBoolean serverSupportsAutoscaling;

public ActivityWorker(
@Nonnull WorkflowServiceStubs service,
Expand All @@ -56,7 +58,8 @@ public ActivityWorker(
double taskQueueActivitiesPerSecond,
@Nonnull SingleWorkerOptions options,
@Nonnull ActivityTaskHandler handler,
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -72,6 +75,7 @@ public ActivityWorker(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
}

@Override
Expand Down Expand Up @@ -106,6 +110,7 @@ public boolean start() {
service.getServerCapabilities()),
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
workerMetricsScope);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
private final List<PollTaskAsync<T>> asyncTaskPollers;
private final PollerOptions pollerOptions;
private final PollerBehaviorAutoscaling pollerBehavior;
private final boolean serverSupportsAutoscaling;
private final Scope workerMetricsScope;
private Throttler pollRateThrottler;
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
Expand All @@ -42,13 +43,15 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
PollTaskAsync<T> asyncTaskPoller,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
boolean serverSupportsAutoscaling,
Scope workerMetricsScope) {
this(
slotSupplier,
slotReservationData,
Collections.singletonList(asyncTaskPoller),
taskExecutor,
pollerOptions,
serverSupportsAutoscaling,
workerMetricsScope);
}

Expand All @@ -58,6 +61,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
List<PollTaskAsync<T>> asyncTaskPollers,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
boolean serverSupportsAutoscaling,
Scope workerMetricsScope) {
super(taskExecutor);
Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null");
Expand All @@ -78,6 +82,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
+ " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported.");
}
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
this.pollerOptions = pollerOptions;
this.workerMetricsScope = workerMetricsScope;
}
Expand Down Expand Up @@ -109,6 +114,7 @@ public boolean start() {
pollerBehavior.getMinConcurrentTaskPollers(),
pollerBehavior.getMaxConcurrentTaskPollers(),
pollerBehavior.getInitialConcurrentTaskPollers(),
serverSupportsAutoscaling,
(newTarget) -> {
log.debug(
"Updating maximum number of pollers for {} to: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 +53,7 @@ final class NexusWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
private final AtomicBoolean serverSupportsAutoscaling;
private final boolean forceOldFailureFormat;

public NexusWorker(
Expand All @@ -61,7 +63,8 @@ public NexusWorker(
@Nonnull SingleWorkerOptions options,
@Nonnull NexusTaskHandler handler,
@Nonnull DataConverter dataConverter,
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier) {
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -77,6 +80,7 @@ public NexusWorker(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
// Allow tests to force old format for backward compatibility testing
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat);
Expand Down Expand Up @@ -112,6 +116,7 @@ public boolean start() {
this.slotSupplier),
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
workerMetricsScope);
} else {
poller =
Expand Down Expand Up @@ -282,6 +287,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) {
"Failure processing nexus response: " + response.getRequest().toString(), failure);
}

@SuppressWarnings("deprecation") // Uses deprecated operationError
private void handleNexusTask(NexusTask task, Scope metricsScope) {
PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
ByteString taskToken = pollResponse.getTaskToken();
Expand Down Expand Up @@ -374,6 +380,7 @@ private void logExceptionDuringResultReporting(
}
}

@SuppressWarnings("deprecation") // Uses deprecated setOperationError
private Response getResponseForOldServer(Response response) {
Response.Builder b = response.toBuilder();
Failure failure = response.getStartOperation().getFailure();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class PollScaleReportHandle<T extends ScalingTask> implements Runnable {
private final int maxPollerCount;
private int targetPollerCount;
private final Functions.Proc1<Integer> scaleCallback;
private final boolean serverSupportsAutoscaling;
private boolean everSawScalingDecision;
private int ingestedThisPeriod;
private int ingestedLastPeriod;
Expand All @@ -27,18 +28,20 @@ public PollScaleReportHandle(
int minPollerCount,
int maxPollerCount,
int initialPollerCount,
boolean serverSupportsAutoscaling,
Functions.Proc1<Integer> scaleCallback) {
this.minPollerCount = minPollerCount;
this.maxPollerCount = maxPollerCount;
this.targetPollerCount = initialPollerCount;
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
this.scaleCallback = scaleCallback;
}

public synchronized void report(T task, Throwable e) {
if (e != null) {
// We want to avoid scaling down on errors if we have never seen a scaling decision
// since we might never scale up again.
if (!everSawScalingDecision) {
// and the server doesn't support autoscaling - otherwise we might never scale up again.
if (!everSawScalingDecision && !serverSupportsAutoscaling) {
return;
}
if ((e instanceof StatusRuntimeException)) {
Expand Down Expand Up @@ -68,9 +71,10 @@ public synchronized void report(T task, Throwable e) {
updateTarget((t -> t + deltaSuggestion));
}

} else if (task == null && everSawScalingDecision) {
} else if (task == null && (everSawScalingDecision || serverSupportsAutoscaling)) {
// We want to avoid scaling down on empty polls if the server has never made any
// scaling decisions - otherwise we might never scale up again.
// scaling decisions and doesn't support autoscaling - otherwise we might never scale
// up again.
updateTarget((t) -> t - 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +34,8 @@ public SyncActivityWorker(
String taskQueue,
double taskQueueActivitiesPerSecond,
SingleWorkerOptions options,
SlotSupplier<ActivitySlotInfo> slotSupplier) {
SlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.identity = options.getIdentity();
this.namespace = namespace;
this.taskQueue = taskQueue;
Expand Down Expand Up @@ -72,7 +75,8 @@ public SyncActivityWorker(
taskQueueActivitiesPerSecond,
options,
taskHandler,
slotSupplier);
slotSupplier,
serverSupportsAutoscaling);
}

public void registerActivityImplementations(Object... activitiesImplementation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.temporal.worker.tuning.SlotSupplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,7 +25,8 @@ public SyncNexusWorker(
String namespace,
String taskQueue,
SingleWorkerOptions options,
SlotSupplier<NexusSlotInfo> slotSupplier) {
SlotSupplier<NexusSlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.identity = options.getIdentity();
this.namespace = namespace;
this.taskQueue = taskQueue;
Expand All @@ -43,7 +46,8 @@ public SyncNexusWorker(
options,
taskHandler,
options.getDataConverter(),
slotSupplier);
slotSupplier,
serverSupportsAutoscaling);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,7 +69,8 @@ public SyncWorkflowWorker(
@Nonnull WorkflowThreadExecutor workflowThreadExecutor,
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
@Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier) {
@Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.identity = singleWorkerOptions.getIdentity();
this.namespace = namespace;
this.taskQueue = taskQueue;
Expand Down Expand Up @@ -122,7 +124,8 @@ public SyncWorkflowWorker(
cache,
taskHandler,
eagerActivityDispatcher,
slotSupplier);
slotSupplier,
serverSupportsAutoscaling);

// Exists to support Worker#replayWorkflowExecution functionality.
// This handler has to be non-sticky to avoid evicting actual executions from the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -54,6 +55,7 @@ final class WorkflowWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final EagerActivityDispatcher eagerActivityDispatcher;
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
private final AtomicBoolean serverSupportsAutoscaling;

private PollTaskExecutor<WorkflowTask> pollTaskExecutor;

Expand All @@ -73,7 +75,8 @@ public WorkflowWorker(
@Nonnull WorkflowExecutorCache cache,
@Nonnull WorkflowTaskHandler handler,
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier) {
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -88,6 +91,7 @@ public WorkflowWorker(
this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
this.eagerActivityDispatcher = eagerActivityDispatcher;
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
}

@Override
Expand Down Expand Up @@ -154,6 +158,7 @@ public boolean start() {
pollers,
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
workerMetricsScope);
} else {
PollerBehaviorSimpleMaximum pollerBehavior =
Expand Down
17 changes: 13 additions & 4 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public final class Worker {
boolean useStickyTaskQueue,
WorkflowThreadExecutor workflowThreadExecutor,
List<ContextPropagator> contextPropagators,
@Nonnull List<WorkerPlugin> plugins) {
@Nonnull List<WorkerPlugin> plugins,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {

Objects.requireNonNull(client, "client should not be null");
this.plugins = Objects.requireNonNull(plugins, "plugins should not be null");
Expand Down Expand Up @@ -102,7 +103,8 @@ public final class Worker {
taskQueue,
this.options.getMaxTaskQueueActivitiesPerSecond(),
activityOptions,
activitySlotSupplier);
activitySlotSupplier,
serverSupportsAutoscaling);
}

EagerActivityDispatcher eagerActivityDispatcher =
Expand All @@ -120,7 +122,13 @@ public final class Worker {
attachMetricsToResourceController(taggedScope, nexusSlotSupplier);

nexusWorker =
new SyncNexusWorker(client, namespace, taskQueue, nexusOptions, nexusSlotSupplier);
new SyncNexusWorker(
client,
namespace,
taskQueue,
nexusOptions,
nexusSlotSupplier,
serverSupportsAutoscaling);

SingleWorkerOptions singleWorkerOptions =
toWorkflowWorkerOptions(
Expand Down Expand Up @@ -158,7 +166,8 @@ public final class Worker {
workflowThreadExecutor,
eagerActivityDispatcher,
workflowSlotSupplier,
localActivitySlotSupplier);
localActivitySlotSupplier,
serverSupportsAutoscaling);
}

/**
Expand Down
Loading
Loading