From cfff488eb1b0589286fc2cbe43ff991183bf5be0 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 13 Mar 2026 11:07:08 -0700 Subject: [PATCH 1/3] Add worker_control_task_queue to poll and respond protos Made-with: Cursor --- .../workerservice/v1/request_response.proto | 46 +++++++++++++++++++ .../workerservice/v1/service.yaml | 32 +++++++++++++ .../workflowservice/v1/request_response.proto | 15 ++++++ 3 files changed, 93 insertions(+) create mode 100644 temporal/api/nexusservices/workerservice/v1/request_response.proto create mode 100644 temporal/api/nexusservices/workerservice/v1/service.yaml diff --git a/temporal/api/nexusservices/workerservice/v1/request_response.proto b/temporal/api/nexusservices/workerservice/v1/request_response.proto new file mode 100644 index 000000000..ef14decbd --- /dev/null +++ b/temporal/api/nexusservices/workerservice/v1/request_response.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package temporal.api.nexusservices.workerservice.v1; + +option go_package = "go.temporal.io/api/nexusservices/workerservice/v1;workerservice"; +option java_package = "io.temporal.api.nexusservices.workerservice.v1"; +option java_multiple_files = true; +option java_outer_classname = "RequestResponseProto"; +option ruby_package = "Temporalio::Api::Nexusservices::Workerservice::V1"; +option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1"; + +// (-- +// Internal Nexus service for server-to-worker communication. +// See service.yaml for the service definition. +// --) + +// Request payload for the "executeCommands" Nexus operation. +message WorkerCommandsRequest { + repeated WorkerCommand commands = 1; + + message WorkerCommand { + oneof type { + CancelActivity cancel_activity = 1; + } + } + + // Cancel an activity if it is still running. Otherwise, do nothing. + message CancelActivity { + bytes task_token = 1; + } +} + +// Response payload for the "executeCommands" Nexus operation. +message WorkerCommandsResponse { + repeated WorkerCommandResult results = 1; + + message WorkerCommandResult { + oneof type { + CancelActivity cancel_activity = 1; + } + } + + // Treat both successful cancellation and no-op (activity is no longer running) as success. + message CancelActivity { + } +} diff --git a/temporal/api/nexusservices/workerservice/v1/service.yaml b/temporal/api/nexusservices/workerservice/v1/service.yaml new file mode 100644 index 000000000..ad42b3bba --- /dev/null +++ b/temporal/api/nexusservices/workerservice/v1/service.yaml @@ -0,0 +1,32 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/nexus-rpc/nexus-rpc-gen/main/schemas/nexus-rpc-gen.json +# +# Nexus service definition for server-to-worker communication. +# See request_response.proto for message definitions. +# +# Task queue format: /temporal-sys/worker-commands/{namespace}/{worker_grouping_key} + +nexusrpc: 1.0.0 + +services: + temporal.api.nexusservices.workerservice.v1.WorkerService: + description: > + Internal Nexus service for server-to-worker communication. + Used by the Temporal server to send commands to workers. + operations: + executeCommands: + description: Executes worker commands sent by the server. + input: + $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.WorkerCommandsRequest" + $javaRef: "io.temporal.api.nexusservices.workerservice.v1.WorkerCommandsRequest" + $pythonRef: "temporalio.api.nexusservices.workerservice.v1.WorkerCommandsRequest" + $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.WorkerCommandsRequest" + $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.WorkerCommandsRequest" + $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::WorkerCommandsRequest" + output: + $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.WorkerCommandsResponse" + $javaRef: "io.temporal.api.nexusservices.workerservice.v1.WorkerCommandsResponse" + $pythonRef: "temporalio.api.nexusservices.workerservice.v1.WorkerCommandsResponse" + $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.WorkerCommandsResponse" + $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.WorkerCommandsResponse" + $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::WorkerCommandsResponse" + diff --git a/temporal/api/workflowservice/v1/request_response.proto b/temporal/api/workflowservice/v1/request_response.proto index 651816e37..e95ac8ba4 100644 --- a/temporal/api/workflowservice/v1/request_response.proto +++ b/temporal/api/workflowservice/v1/request_response.proto @@ -262,6 +262,10 @@ message PollWorkflowTaskQueueRequest { // A unique key for this worker instance, used for tracking worker lifecycle. // This is guaranteed to be unique, whereas identity is not guaranteed to be unique. string worker_instance_key = 8; + + // The task queue on which the server will send control tasks to this worker. + string worker_control_task_queue = 9; + // Deprecated. Use deployment_options instead. // Each worker process should provide an ID unique to the specific set of code it is running // "checksum" in this field name isn't very accurate, it should be though of as an id. @@ -381,6 +385,13 @@ message RespondWorkflowTaskCompletedRequest { // Worker deployment options that user has set in the worker. temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 17; + // A unique key for this worker instance, used for tracking worker lifecycle. + // This is guaranteed to be unique, whereas identity is not guaranteed to be unique. + string worker_instance_key = 18; + + // The task queue on which the server will send control tasks to this worker. + string worker_control_task_queue = 19; + // SDK capability details. message Capabilities { // True if the SDK can handle speculative workflow task with command events. If true, the @@ -444,6 +455,10 @@ message PollActivityTaskQueueRequest { // A unique key for this worker instance, used for tracking worker lifecycle. // This is guaranteed to be unique, whereas identity is not guaranteed to be unique. string worker_instance_key = 8; + + // The task queue on which the server will send control tasks to this worker. + string worker_control_task_queue = 9; + temporal.api.taskqueue.v1.TaskQueueMetadata task_queue_metadata = 4; // Information about this worker's build identifier and if it is choosing to use the versioning // feature. See the `WorkerVersionCapabilities` docstring for more. From 763e9f5ea668c65b78327b1f5f299a4ea1b97fc1 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 13 Mar 2026 13:16:53 -0700 Subject: [PATCH 2/3] Address review: rename to ExecuteCommands, add response doc - Rename WorkerCommandsRequest/Response to ExecuteCommandsRequest/Response to match the operation name (bergundy feedback) - Change operation name from executeCommands to ExecuteCommands (PascalCase) - Add doc: results list must be 1:1 with commands list (Sushisource feedback) Made-with: Cursor --- .../workerservice/v1/request_response.proto | 9 ++++--- .../workerservice/v1/service.yaml | 26 +++++++++---------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/temporal/api/nexusservices/workerservice/v1/request_response.proto b/temporal/api/nexusservices/workerservice/v1/request_response.proto index ef14decbd..d53c46419 100644 --- a/temporal/api/nexusservices/workerservice/v1/request_response.proto +++ b/temporal/api/nexusservices/workerservice/v1/request_response.proto @@ -14,8 +14,8 @@ option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1"; // See service.yaml for the service definition. // --) -// Request payload for the "executeCommands" Nexus operation. -message WorkerCommandsRequest { +// Request payload for the "ExecuteCommands" Nexus operation. +message ExecuteCommandsRequest { repeated WorkerCommand commands = 1; message WorkerCommand { @@ -30,8 +30,9 @@ message WorkerCommandsRequest { } } -// Response payload for the "executeCommands" Nexus operation. -message WorkerCommandsResponse { +// Response payload for the "ExecuteCommands" Nexus operation. +// The results list must be 1:1 with the commands list in the request (same size and order). +message ExecuteCommandsResponse { repeated WorkerCommandResult results = 1; message WorkerCommandResult { diff --git a/temporal/api/nexusservices/workerservice/v1/service.yaml b/temporal/api/nexusservices/workerservice/v1/service.yaml index ad42b3bba..cb49cb118 100644 --- a/temporal/api/nexusservices/workerservice/v1/service.yaml +++ b/temporal/api/nexusservices/workerservice/v1/service.yaml @@ -13,20 +13,20 @@ services: Internal Nexus service for server-to-worker communication. Used by the Temporal server to send commands to workers. operations: - executeCommands: + ExecuteCommands: description: Executes worker commands sent by the server. input: - $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.WorkerCommandsRequest" - $javaRef: "io.temporal.api.nexusservices.workerservice.v1.WorkerCommandsRequest" - $pythonRef: "temporalio.api.nexusservices.workerservice.v1.WorkerCommandsRequest" - $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.WorkerCommandsRequest" - $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.WorkerCommandsRequest" - $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::WorkerCommandsRequest" + $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsRequest" + $javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsRequest" + $pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsRequest" + $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsRequest" + $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsRequest" + $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsRequest" output: - $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.WorkerCommandsResponse" - $javaRef: "io.temporal.api.nexusservices.workerservice.v1.WorkerCommandsResponse" - $pythonRef: "temporalio.api.nexusservices.workerservice.v1.WorkerCommandsResponse" - $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.WorkerCommandsResponse" - $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.WorkerCommandsResponse" - $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::WorkerCommandsResponse" + $goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsResponse" + $javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsResponse" + $pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsResponse" + $typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsResponse" + $dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsResponse" + $rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsResponse" From 63fca37c755f6616df16d2cf770f43a4ac39af9c Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 13 Mar 2026 14:24:11 -0700 Subject: [PATCH 3/3] Address review: extract command types to worker/v1/message.proto Move WorkerCommand, CancelActivityCommand, WorkerCommandResult, and CancelActivityResult to temporal.api.worker.v1 as top-level messages. request_response.proto now only contains ExecuteCommandsRequest and ExecuteCommandsResponse, following the repo convention. (bergundy feedback) Made-with: Cursor --- .../workerservice/v1/request_response.proto | 27 +++---------------- temporal/api/worker/v1/message.proto | 24 +++++++++++++++++ 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/temporal/api/nexusservices/workerservice/v1/request_response.proto b/temporal/api/nexusservices/workerservice/v1/request_response.proto index d53c46419..0daa67b64 100644 --- a/temporal/api/nexusservices/workerservice/v1/request_response.proto +++ b/temporal/api/nexusservices/workerservice/v1/request_response.proto @@ -9,6 +9,8 @@ option java_outer_classname = "RequestResponseProto"; option ruby_package = "Temporalio::Api::Nexusservices::Workerservice::V1"; option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1"; +import "temporal/api/worker/v1/message.proto"; + // (-- // Internal Nexus service for server-to-worker communication. // See service.yaml for the service definition. @@ -16,32 +18,11 @@ option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1"; // Request payload for the "ExecuteCommands" Nexus operation. message ExecuteCommandsRequest { - repeated WorkerCommand commands = 1; - - message WorkerCommand { - oneof type { - CancelActivity cancel_activity = 1; - } - } - - // Cancel an activity if it is still running. Otherwise, do nothing. - message CancelActivity { - bytes task_token = 1; - } + repeated temporal.api.worker.v1.WorkerCommand commands = 1; } // Response payload for the "ExecuteCommands" Nexus operation. // The results list must be 1:1 with the commands list in the request (same size and order). message ExecuteCommandsResponse { - repeated WorkerCommandResult results = 1; - - message WorkerCommandResult { - oneof type { - CancelActivity cancel_activity = 1; - } - } - - // Treat both successful cancellation and no-op (activity is no longer running) as success. - message CancelActivity { - } + repeated temporal.api.worker.v1.WorkerCommandResult results = 1; } diff --git a/temporal/api/worker/v1/message.proto b/temporal/api/worker/v1/message.proto index b65faeb29..a87142c78 100644 --- a/temporal/api/worker/v1/message.proto +++ b/temporal/api/worker/v1/message.proto @@ -193,3 +193,27 @@ message StorageDriverInfo { // The type of the driver, required. string type = 1; } + +// A command sent from the server to a worker. +message WorkerCommand { + oneof type { + CancelActivityCommand cancel_activity = 1; + } +} + +// Cancel an activity if it is still running. Otherwise, do nothing. +message CancelActivityCommand { + bytes task_token = 1; +} + +// The result of executing a WorkerCommand. +message WorkerCommandResult { + oneof type { + CancelActivityResult cancel_activity = 1; + } +} + +// Result of a CancelActivityCommand. +// Treat both successful cancellation and no-op (activity is no longer running) as success. +message CancelActivityResult { +}