diff --git a/.changeset/fast-owls-flow.md b/.changeset/fast-owls-flow.md new file mode 100644 index 000000000..deeb344e4 --- /dev/null +++ b/.changeset/fast-owls-flow.md @@ -0,0 +1,10 @@ +--- +"@workflow/core": patch +--- + +perf: parallelize suspension handler and refactor runtime + +- Process hooks first, then steps and waits in parallel to prevent race conditions +- Refactor runtime.ts into modular files: `suspension-handler.ts`, `step-handler.ts`, `helpers.ts` +- Add otel attributes for hooks created (`workflow.hooks.created`) and waits created (`workflow.waits.created`) +- Update suspension status from `pending_steps` to `workflow_suspended` diff --git a/packages/core/e2e/bench.bench.ts b/packages/core/e2e/bench.bench.ts index 4c7d2ad99..ac06bee72 100644 --- a/packages/core/e2e/bench.bench.ts +++ b/packages/core/e2e/bench.bench.ts @@ -74,6 +74,9 @@ async function triggerWorkflow( async function getWorkflowReturnValue( runId: string ): Promise<{ run: any; value: any }> { + const MAX_UNEXPECTED_CONTENT_RETRIES = 3; + let unexpectedContentRetries = 0; + // We need to poll the GET endpoint until the workflow run is completed. while (true) { const url = new URL('/api/trigger', deploymentUrl); @@ -105,7 +108,24 @@ async function getWorkflowReturnValue( return { run, value: res.body }; } - throw new Error(`Unexpected content type: ${contentType}`); + // Unexpected content type - log details and retry + unexpectedContentRetries++; + const responseText = await res.text().catch(() => ''); + console.warn( + `[bench] Unexpected content type for runId=${runId} (attempt ${unexpectedContentRetries}/${MAX_UNEXPECTED_CONTENT_RETRIES}):\n` + + ` Status: ${res.status}\n` + + ` Content-Type: ${contentType}\n` + + ` Response: ${responseText.slice(0, 500)}${responseText.length > 500 ? '...' : ''}` + ); + + if (unexpectedContentRetries >= MAX_UNEXPECTED_CONTENT_RETRIES) { + throw new Error( + `Unexpected content type after ${MAX_UNEXPECTED_CONTENT_RETRIES} retries: ${contentType} (status=${res.status})` + ); + } + + // Wait before retrying + await new Promise((resolve) => setTimeout(resolve, 500)); } } @@ -293,16 +313,6 @@ describe('Workflow Performance Benchmarks', () => { { time: 5000, iterations: 5, warmupIterations: 1, teardown } ); - bench( - 'workflow with 10 parallel steps', - async () => { - const { runId } = await triggerWorkflow('tenParallelStepsWorkflow', []); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('workflow with 10 parallel steps', run); - }, - { time: 5000, iterations: 5, warmupIterations: 1, teardown } - ); - bench( 'workflow with stream', async () => { @@ -341,88 +351,34 @@ describe('Workflow Performance Benchmarks', () => { { time: 5000, warmupIterations: 1, teardown } ); - // Stress tests for large concurrent step counts - // These reproduce reported issues with Promise.race/Promise.all at scale - - bench( - 'stress test: Promise.all with 100 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseAllStressTestWorkflow', - [100] - ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.all with 100 concurrent steps', run); - }, - { time: 30000, iterations: 1, warmupIterations: 0, teardown } - ); - - // TODO: Re-enable after performance optimizations (see beads issue wrk-fyx) - bench.skip( - 'stress test: Promise.all with 500 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseAllStressTestWorkflow', - [500] - ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.all with 500 concurrent steps', run); - }, - { time: 60000, iterations: 1, warmupIterations: 0, teardown } - ); - - // TODO: Re-enable after performance optimizations (see beads issue wrk-fyx) - bench.skip( - 'stress test: Promise.all with 1000 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseAllStressTestWorkflow', - [1000] - ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.all with 1000 concurrent steps', run); - }, - { time: 120000, iterations: 1, warmupIterations: 0, teardown } - ); - - bench( - 'stress test: Promise.race with 100 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseRaceStressTestLargeWorkflow', - [100] - ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.race with 100 concurrent steps', run); - }, - { time: 30000, iterations: 1, warmupIterations: 0, teardown } - ); - - // TODO: Re-enable after performance optimizations (see beads issue wrk-fyx) - bench.skip( - 'stress test: Promise.race with 500 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseRaceStressTestLargeWorkflow', - [500] + // Concurrent step benchmarks for Promise.all/Promise.race at various scales + const concurrentStepCounts = [ + { count: 10, skip: false, time: 30000 }, + { count: 25, skip: false, time: 30000 }, + { count: 100, skip: true, time: 60000 }, + { count: 500, skip: true, time: 120000 }, + { count: 1000, skip: true, time: 180000 }, + ] as const; + + const concurrentStepTypes = [ + { type: 'Promise.all', workflow: 'promiseAllStressTestWorkflow' }, + { type: 'Promise.race', workflow: 'promiseRaceStressTestLargeWorkflow' }, + ] as const; + + for (const { type, workflow } of concurrentStepTypes) { + for (const { count, skip, time } of concurrentStepCounts) { + const name = `${type} with ${count} concurrent steps`; + const benchFn = skip ? bench.skip : bench; + + benchFn( + name, + async () => { + const { runId } = await triggerWorkflow(workflow, [count]); + const { run } = await getWorkflowReturnValue(runId); + stageTiming(name, run); + }, + { time, iterations: 1, warmupIterations: 0, teardown } ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.race with 500 concurrent steps', run); - }, - { time: 60000, iterations: 1, warmupIterations: 0, teardown } - ); - - // TODO: Re-enable after performance optimizations (see beads issue wrk-fyx) - bench.skip( - 'stress test: Promise.race with 1000 concurrent steps', - async () => { - const { runId } = await triggerWorkflow( - 'promiseRaceStressTestLargeWorkflow', - [1000] - ); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('stress test: Promise.race with 1000 concurrent steps', run); - }, - { time: 120000, iterations: 1, warmupIterations: 0, teardown } - ); + } + } }); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index eec3d9588..cf38e3bcf 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -1,17 +1,10 @@ -import { waitUntil } from '@vercel/functions'; import { - FatalError, - RetryableError, - WorkflowAPIError, WorkflowRunCancelledError, WorkflowRunFailedError, WorkflowRunNotCompletedError, - WorkflowRuntimeError, } from '@workflow/errors'; -import { getPort } from '@workflow/utils/get-port'; import { type Event, - StepInvokePayloadSchema, WorkflowInvokePayloadSchema, type WorkflowRun, type WorkflowRunStatus, @@ -20,27 +13,21 @@ import { import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; import { parseWorkflowName } from './parse-name.js'; -import { getStepFunction } from './private.js'; +import { + getAllWorkflowRunEvents, + getQueueOverhead, + withHealthCheck, +} from './runtime/helpers.js'; +import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; import type { Serializable } from './schemas.js'; import { - dehydrateStepArguments, - dehydrateStepReturnValue, getExternalRevivers, - hydrateStepArguments, hydrateWorkflowReturnValue, } from './serialization.js'; import { remapErrorStack } from './source-map.js'; -// TODO: move step handler out to a separate file -import { contextStorage } from './step/context-storage.js'; import * as Attribute from './telemetry/semantic-conventions.js'; -import { - getSpanKind, - linkToCurrentContext, - serializeTraceCarrier, - trace, - withTraceContext, -} from './telemetry.js'; +import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js'; import { getErrorName, getErrorStack } from './types.js'; import { buildWorkflowSuspensionMessage, @@ -48,8 +35,6 @@ import { } from './util.js'; import { runWorkflow } from './workflow.js'; -const DEFAULT_STEP_MAX_RETRIES = 3; - export type { Event, WorkflowRun }; export { WorkflowSuspension } from './global.js'; export { @@ -66,6 +51,8 @@ export { setWorld, } from './runtime/world.js'; +export { stepEntrypoint } from './runtime/step-handler.js'; + /** * Options for configuring a workflow's readable stream. */ @@ -235,54 +222,6 @@ export function getRun(runId: string): Run { return new Run(runId); } -/** - * Loads all workflow run events by iterating through all pages of paginated results. - * This ensures that *all* events are loaded into memory before running the workflow. - * Events must be in chronological order (ascending) for proper workflow replay. - */ -async function getAllWorkflowRunEvents(runId: string): Promise { - const allEvents: Event[] = []; - let cursor: string | null = null; - let hasMore = true; - - const world = getWorld(); - while (hasMore) { - const response = await world.events.list({ - runId, - pagination: { - sortOrder: 'asc', // Required: events must be in chronological order for replay - cursor: cursor ?? undefined, - }, - }); - - allEvents.push(...response.data); - hasMore = response.hasMore; - cursor = response.cursor; - } - - return allEvents; -} - -/** - * Wraps a request/response handler and adds a health check "mode" - * based on the presence of a `__health` query parameter. - */ -function withHealthCheck( - handler: (req: Request) => Promise -): (req: Request) => Promise { - return async (req) => { - const url = new URL(req.url); - const isHealthCheck = url.searchParams.has('__health'); - if (isHealthCheck) { - return new Response( - `Workflow DevKit "${url.pathname}" endpoint is healthy`, - { status: 200, headers: { 'Content-Type': 'text/plain' } } - ); - } - return await handler(req); - }; -} - /** * Function that creates a single route which handles any workflow execution * request and routes to the appropriate workflow function. @@ -419,153 +358,20 @@ export function workflowEntrypoint( err.waitCount ); if (suspensionMessage) { - // Note: suspensionMessage logged only in debug mode to avoid production noise - // console.debug(suspensionMessage); - } - // Process each operation in the queue (steps and hooks) - let minTimeoutSeconds: number | null = null; - for (const queueItem of err.steps) { - if (queueItem.type === 'step') { - // Handle step operations - const ops: Promise[] = []; - const dehydratedInput = dehydrateStepArguments( - { - args: queueItem.args, - closureVars: queueItem.closureVars, - }, - err.globalThis - ); - - try { - const step = await world.steps.create(runId, { - stepId: queueItem.correlationId, - stepName: queueItem.stepName, - input: dehydratedInput as Serializable, - }); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || - err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - await queueMessage( - world, - `__wkf_step_${queueItem.stepName}`, - { - workflowName, - workflowRunId: runId, - workflowStartedAt, - stepId: step.stepId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - idempotencyKey: queueItem.correlationId, - } - ); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - // Step already exists, so we can skip it - console.warn( - `Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` - ); - continue; - } - throw err; - } - } else if (queueItem.type === 'hook') { - // Handle hook operations - try { - // Create hook in database - const hookMetadata = - typeof queueItem.metadata === 'undefined' - ? undefined - : dehydrateStepArguments( - queueItem.metadata, - err.globalThis - ); - await world.hooks.create(runId, { - hookId: queueItem.correlationId, - token: queueItem.token, - metadata: hookMetadata, - }); - - // Create hook_created event in event log - await world.events.create(runId, { - eventType: 'hook_created', - correlationId: queueItem.correlationId, - }); - } catch (err) { - if (WorkflowAPIError.is(err)) { - if (err.status === 409) { - // Hook already exists (duplicate hook_id constraint), so we can skip it - console.warn( - `Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` - ); - continue; - } else if (err.status === 410) { - // Workflow has already completed, so no-op - console.warn( - `Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${err.message}` - ); - continue; - } - } - throw err; - } - } else if (queueItem.type === 'wait') { - // Handle wait operations - try { - // Only create wait_created event if it hasn't been created yet - if (!queueItem.hasCreatedEvent) { - await world.events.create(runId, { - eventType: 'wait_created', - correlationId: queueItem.correlationId, - eventData: { - resumeAt: queueItem.resumeAt, - }, - }); - } - - // Calculate how long to wait before resuming - const now = Date.now(); - const resumeAtMs = queueItem.resumeAt.getTime(); - const delayMs = Math.max(1000, resumeAtMs - now); - const timeoutSeconds = Math.ceil(delayMs / 1000); - - // Track the minimum timeout across all waits - if ( - minTimeoutSeconds === null || - timeoutSeconds < minTimeoutSeconds - ) { - minTimeoutSeconds = timeoutSeconds; - } - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - // Wait already exists, so we can skip it - console.warn( - `Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` - ); - continue; - } - throw err; - } - } + runtimeLogger.debug(suspensionMessage); } - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('pending_steps'), - ...Attribute.WorkflowStepsCreated(err.steps.length), + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, }); - // If we encountered any waits, return the minimum timeout - if (minTimeoutSeconds !== null) { - return { timeoutSeconds: minTimeoutSeconds }; + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; } } else { const errorName = getErrorName(err); @@ -611,439 +417,6 @@ export function workflowEntrypoint( return withHealthCheck(handler); } -const stepHandler = getWorldHandlers().createQueueHandler( - '__wkf_step_', - async (message_, metadata) => { - const { - workflowName, - workflowRunId, - workflowStartedAt, - stepId, - traceCarrier: traceContext, - requestedAt, - } = StepInvokePayloadSchema.parse(message_); - const spanLinks = await linkToCurrentContext(); - // Execute step within the propagated trace context - return await withTraceContext(traceContext, async () => { - // Extract the step name from the topic name - const stepName = metadata.queueName.slice('__wkf_step_'.length); - const world = getWorld(); - - // Get the port early to avoid async operations during step execution - const port = await getPort(); - - return trace( - `STEP ${stepName}`, - { kind: await getSpanKind('CONSUMER'), links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.StepName(stepName), - ...Attribute.StepAttempt(metadata.attempt), - ...Attribute.QueueName(metadata.queueName), - ...Attribute.QueueMessageId(metadata.messageId), - ...getQueueOverhead({ requestedAt }), - }); - - const stepFn = getStepFunction(stepName); - if (!stepFn) { - throw new Error(`Step "${stepName}" not found`); - } - if (typeof stepFn !== 'function') { - throw new Error( - `Step "${stepName}" is not a function (got ${typeof stepFn})` - ); - } - - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowRunId(workflowRunId), - ...Attribute.StepId(stepId), - ...Attribute.StepMaxRetries(maxRetries), - ...Attribute.StepTracePropagated(!!traceContext), - }); - - let step = await world.steps.get(workflowRunId, stepId); - - runtimeLogger.debug('Step execution details', { - stepName, - stepId: step.stepId, - status: step.status, - attempt: step.attempt, - }); - - span?.setAttributes({ - ...Attribute.StepStatus(step.status), - }); - - // Check if the step has a `retryAfter` timestamp that hasn't been reached yet - const now = Date.now(); - if (step.retryAfter && step.retryAfter.getTime() > now) { - const timeoutSeconds = Math.ceil( - (step.retryAfter.getTime() - now) / 1000 - ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - }); - runtimeLogger.debug('Step retryAfter timestamp not yet reached', { - stepName, - stepId: step.stepId, - retryAfter: step.retryAfter, - timeoutSeconds, - }); - return { timeoutSeconds }; - } - - let result: unknown; - const attempt = step.attempt + 1; - - // Check max retries FIRST before any state changes. - // This handles edge cases where the step handler is invoked after max retries have been exceeded - // (e.g., when the step repeatedly times out or fails before reaching the catch handler at line 822). - // Without this check, the step would retry forever. - if (attempt > maxRetries) { - const errorMessage = `Step "${stepName}" exceeded max retries (${attempt} attempts)`; - console.error(`[Workflows] "${workflowRunId}" - ${errorMessage}`); - // Update step status first (idempotent), then create event - await world.steps.update(workflowRunId, stepId, { - status: 'failed', - error: { - message: errorMessage, - stack: undefined, - }, - }); - await world.events.create(workflowRunId, { - eventType: 'step_failed', - correlationId: stepId, - eventData: { - error: errorMessage, - fatal: true, - }, - }); - - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), - }); - - // Re-invoke the workflow to handle the failed step - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - return; - } - - try { - if (!['pending', 'running'].includes(step.status)) { - // We should only be running the step if it's either - // a) pending - initial state, or state set on re-try - // b) running - if a step fails mid-execution, like a function timeout - // otherwise, the step has been invoked erroneously - console.error( - `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` - ); - span?.setAttributes({ - ...Attribute.StepSkipped(true), - ...Attribute.StepSkipReason(step.status), - }); - // There's a chance that a step terminates correctly, but the underlying process - // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. - // The queue lease expires and stepEntrypoint again, which leads us here, so - // we optimistically re-enqueue the workflow if the step is in a terminal state, - // under the assumption that this edge case happened. - // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case - // where the we execute this code based on the `step` entity status, but the runtime - // failed to create the `step_completed` event (due to failing between step and event update), - // in which case, this might lead to an infinite loop. - // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 - const isTerminalStep = [ - 'completed', - 'failed', - 'cancelled', - ].includes(step.status); - if (isTerminalStep) { - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } - return; - } - - await world.events.create(workflowRunId, { - eventType: 'step_started', // TODO: Replace with 'step_retrying' when appropriate - correlationId: stepId, - }); - - step = await world.steps.update(workflowRunId, stepId, { - attempt, - status: 'running', - }); - - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` - ); - } - // Hydrate the step input arguments and closure variables - const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId - ); - - const args = hydratedInput.args; - - span?.setAttributes({ - ...Attribute.StepArgumentsCount(args.length), - }); - - result = await contextStorage.run( - { - stepMetadata: { - stepId, - stepStartedAt: new Date(+step.startedAt), - attempt, - }, - workflowMetadata: { - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: process.env.VERCEL_URL - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, - }, - ops, - closureVars: hydratedInput.closureVars, - }, - () => stepFn.apply(null, args) - ); - - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - result = dehydrateStepReturnValue(result, ops, workflowRunId); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - // Mark the step as completed first. This order is important. If a concurrent - // execution marked the step as complete, this request should throw, and - // this prevent the step_completed event in the event log - // TODO: this should really be atomic and handled by the world - await world.steps.update(workflowRunId, stepId, { - status: 'completed', - output: result as Serializable, - }); - - // Then, append the event log with the step result - await world.events.create(workflowRunId, { - eventType: 'step_completed', - correlationId: stepId, - eventData: { - result: result as Serializable, - }, - }); - - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); - } catch (err: unknown) { - span?.setAttributes({ - ...Attribute.StepErrorName(getErrorName(err)), - ...Attribute.StepErrorMessage(String(err)), - }); - - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - console.warn( - `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` - ); - return; - } - } - - if (FatalError.is(err)) { - const errorStack = getErrorStack(err); - const stackLines = errorStack.split('\n').slice(0, 4); - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`FatalError\` while executing step "${stepName}":\n > ${stackLines.join('\n > ')}\n\nBubbling up error to parent workflow` - ); - // Fatal error - store the error in the event log and re-invoke the workflow - await world.events.create(workflowRunId, { - eventType: 'step_failed', - correlationId: stepId, - eventData: { - error: String(err), - stack: errorStack, - fatal: true, - }, - }); - await world.steps.update(workflowRunId, stepId, { - status: 'failed', - error: { - message: err.message || String(err), - stack: errorStack, - // TODO: include error codes when we define them - }, - }); - - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepFatalError(true), - }); - } else { - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.StepAttempt(attempt), - ...Attribute.StepMaxRetries(maxRetries), - }); - - if (attempt > maxRetries) { - // Max retries reached - const errorStack = getErrorStack(err); - const stackLines = errorStack.split('\n').slice(0, 4); - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${attempt}):\n > ${stackLines.join('\n > ')}\n\n Max retries reached\n Bubbling error to parent workflow` - ); - const errorMessage = `Step "${stepName}" failed after max retries: ${String(err)}`; - await world.events.create(workflowRunId, { - eventType: 'step_failed', - correlationId: stepId, - eventData: { - error: errorMessage, - stack: errorStack, - fatal: true, - }, - }); - await world.steps.update(workflowRunId, stepId, { - status: 'failed', - error: { - message: errorMessage, - stack: errorStack, - }, - }); - - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), - }); - } else { - // Not at max retries yet - log as a retryable error - if (RetryableError.is(err)) { - console.warn( - `[Workflows] "${workflowRunId}" - Encountered \`RetryableError\` while executing step "${stepName}" (attempt ${attempt}):\n > ${String(err.message)}\n\n This step has failed but will be retried` - ); - } else { - const stackLines = getErrorStack(err).split('\n').slice(0, 4); - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${attempt}):\n > ${stackLines.join('\n > ')}\n\n This step has failed but will be retried` - ); - } - await world.events.create(workflowRunId, { - eventType: 'step_failed', - correlationId: stepId, - eventData: { - error: String(err), - stack: getErrorStack(err), - }, - }); - - await world.steps.update(workflowRunId, stepId, { - status: 'pending', // TODO: Should be "retrying" once we have that status - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }); - - const timeoutSeconds = Math.max( - 1, - RetryableError.is(err) - ? Math.ceil((+err.retryAfter.getTime() - Date.now()) / 1000) - : 1 - ); - - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - ...Attribute.StepRetryWillRetry(true), - }); - - // It's a retryable error - so have the queue keep the message visible - // so that it gets retried. - return { timeoutSeconds }; - } - } - } - - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } - ); - }); - } -); - -/** - * A single route that handles any step execution request and routes to the - * appropriate step function. We may eventually want to create different bundles - * for each step, this is temporary. - */ -export const stepEntrypoint: (req: Request) => Promise = - /* @__PURE__ */ withHealthCheck(stepHandler); - -/** - * Queues a message to the specified queue with tracing. - */ -async function queueMessage( - world: World, - ...args: Parameters -) { - const queueName = args[0]; - await trace( - 'queueMessage', - { - attributes: Attribute.QueueName(queueName), - kind: await getSpanKind('PRODUCER'), - }, - async (span) => { - const { messageId } = await world.queue(...args); - span?.setAttributes(Attribute.QueueMessageId(messageId)); - } - ); -} - -/** - * Calculates the queue overhead time in milliseconds for a given message. - */ -function getQueueOverhead(message: { requestedAt?: Date }) { - if (!message.requestedAt) return; - try { - return Attribute.QueueOverheadMs( - Date.now() - message.requestedAt.getTime() - ); - } catch { - return; - } -} - // this is a no-op placeholder as the client is // expecting this to be present but we aren't actually using it export function runStep() {} diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts new file mode 100644 index 000000000..25b510c65 --- /dev/null +++ b/packages/core/src/runtime/helpers.ts @@ -0,0 +1,90 @@ +import type { Event, World } from '@workflow/world'; +import * as Attribute from '../telemetry/semantic-conventions.js'; +import { getSpanKind, trace } from '../telemetry.js'; +import { getWorld } from './world.js'; + +/** + * Loads all workflow run events by iterating through all pages of paginated results. + * This ensures that *all* events are loaded into memory before running the workflow. + * Events must be in chronological order (ascending) for proper workflow replay. + */ +export async function getAllWorkflowRunEvents(runId: string): Promise { + const allEvents: Event[] = []; + let cursor: string | null = null; + let hasMore = true; + + const world = getWorld(); + while (hasMore) { + // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this + // to lazyload the data from the world instead so that we can optimize and make the event log loading + // much faster and memory efficient + const response = await world.events.list({ + runId, + pagination: { + sortOrder: 'asc', // Required: events must be in chronological order for replay + cursor: cursor ?? undefined, + }, + }); + + allEvents.push(...response.data); + hasMore = response.hasMore; + cursor = response.cursor; + } + + return allEvents; +} + +/** + * Wraps a request/response handler and adds a health check "mode" + * based on the presence of a `__health` query parameter. + */ +export function withHealthCheck( + handler: (req: Request) => Promise +): (req: Request) => Promise { + return async (req: Request) => { + const url = new URL(req.url); + const isHealthCheck = url.searchParams.has('__health'); + if (isHealthCheck) { + return new Response( + `Workflow DevKit "${url.pathname}" endpoint is healthy`, + { status: 200, headers: { 'Content-Type': 'text/plain' } } + ); + } + return await handler(req); + }; +} + +/** + * Queues a message to the specified queue with tracing. + */ +export async function queueMessage( + world: World, + ...args: Parameters +) { + const queueName = args[0]; + await trace( + 'queueMessage', + { + attributes: Attribute.QueueName(queueName), + kind: await getSpanKind('PRODUCER'), + }, + async (span) => { + const { messageId } = await world.queue(...args); + span?.setAttributes(Attribute.QueueMessageId(messageId)); + } + ); +} + +/** + * Calculates the queue overhead time in milliseconds for a given message. + */ +export function getQueueOverhead(message: { requestedAt?: Date }) { + if (!message.requestedAt) return; + try { + return Attribute.QueueOverheadMs( + Date.now() - message.requestedAt.getTime() + ); + } catch { + return; + } +} diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts new file mode 100644 index 000000000..5390ccada --- /dev/null +++ b/packages/core/src/runtime/step-handler.ts @@ -0,0 +1,428 @@ +import { waitUntil } from '@vercel/functions'; +import { + FatalError, + RetryableError, + WorkflowAPIError, + WorkflowRuntimeError, +} from '@workflow/errors'; +import { getPort } from '@workflow/utils/get-port'; +import { StepInvokePayloadSchema } from '@workflow/world'; +import { runtimeLogger } from '../logger.js'; +import { getStepFunction } from '../private.js'; +import type { Serializable } from '../schemas.js'; +import { + dehydrateStepReturnValue, + hydrateStepArguments, +} from '../serialization.js'; +import { contextStorage } from '../step/context-storage.js'; +import * as Attribute from '../telemetry/semantic-conventions.js'; +import { + getSpanKind, + linkToCurrentContext, + serializeTraceCarrier, + trace, + withTraceContext, +} from '../telemetry.js'; +import { getErrorName, getErrorStack } from '../types.js'; +import { getQueueOverhead, queueMessage, withHealthCheck } from './helpers.js'; +import { getWorld, getWorldHandlers } from './world.js'; + +const DEFAULT_STEP_MAX_RETRIES = 3; + +const stepHandler = getWorldHandlers().createQueueHandler( + '__wkf_step_', + async (message_, metadata) => { + const { + workflowName, + workflowRunId, + workflowStartedAt, + stepId, + traceCarrier: traceContext, + requestedAt, + } = StepInvokePayloadSchema.parse(message_); + const spanLinks = await linkToCurrentContext(); + // Execute step within the propagated trace context + return await withTraceContext(traceContext, async () => { + // Extract the step name from the topic name + const stepName = metadata.queueName.slice('__wkf_step_'.length); + const world = getWorld(); + + // Get the port early to avoid async operations during step execution + const port = await getPort(); + + return trace( + `STEP ${stepName}`, + { kind: await getSpanKind('CONSUMER'), links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.StepName(stepName), + ...Attribute.StepAttempt(metadata.attempt), + ...Attribute.QueueName(metadata.queueName), + ...Attribute.QueueMessageId(metadata.messageId), + ...getQueueOverhead({ requestedAt }), + }); + + const stepFn = getStepFunction(stepName); + if (!stepFn) { + throw new Error(`Step "${stepName}" not found`); + } + if (typeof stepFn !== 'function') { + throw new Error( + `Step "${stepName}" is not a function (got ${typeof stepFn})` + ); + } + + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowRunId(workflowRunId), + ...Attribute.StepId(stepId), + ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepTracePropagated(!!traceContext), + }); + + let step = await world.steps.get(workflowRunId, stepId); + + runtimeLogger.debug('Step execution details', { + stepName, + stepId: step.stepId, + status: step.status, + attempt: step.attempt, + }); + + span?.setAttributes({ + ...Attribute.StepStatus(step.status), + }); + + // Check if the step has a `retryAfter` timestamp that hasn't been reached yet + const now = Date.now(); + if (step.retryAfter && step.retryAfter.getTime() > now) { + const timeoutSeconds = Math.ceil( + (step.retryAfter.getTime() - now) / 1000 + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + runtimeLogger.debug('Step retryAfter timestamp not yet reached', { + stepName, + stepId: step.stepId, + retryAfter: step.retryAfter, + timeoutSeconds, + }); + return { timeoutSeconds }; + } + + let result: unknown; + const attempt = step.attempt + 1; + + // Check max retries FIRST before any state changes. + // This handles edge cases where the step handler is invoked after max retries have been exceeded + // (e.g., when the step repeatedly times out or fails before reaching the catch handler at line 822). + // Without this check, the step would retry forever. + if (attempt > maxRetries) { + const errorMessage = `Step "${stepName}" exceeded max retries (${attempt} attempts)`; + console.error(`[Workflows] "${workflowRunId}" - ${errorMessage}`); + // Update step status first (idempotent), then create event + await world.steps.update(workflowRunId, stepId, { + status: 'failed', + error: { + message: errorMessage, + stack: undefined, + }, + }); + await world.events.create(workflowRunId, { + eventType: 'step_failed', + correlationId: stepId, + eventData: { + error: errorMessage, + fatal: true, + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); + + // Re-invoke the workflow to handle the failed step + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + + try { + if (!['pending', 'running'].includes(step.status)) { + // We should only be running the step if it's either + // a) pending - initial state, or state set on re-try + // b) running - if a step fails mid-execution, like a function timeout + // otherwise, the step has been invoked erroneously + console.error( + `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` + ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + ...Attribute.StepSkipReason(step.status), + }); + // There's a chance that a step terminates correctly, but the underlying process + // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. + // The queue lease expires and stepEntrypoint again, which leads us here, so + // we optimistically re-enqueue the workflow if the step is in a terminal state, + // under the assumption that this edge case happened. + // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case + // where the we execute this code based on the `step` entity status, but the runtime + // failed to create the `step_completed` event (due to failing between step and event update), + // in which case, this might lead to an infinite loop. + // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 + const isTerminalStep = [ + 'completed', + 'failed', + 'cancelled', + ].includes(step.status); + if (isTerminalStep) { + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + } + return; + } + + await world.events.create(workflowRunId, { + eventType: 'step_started', // TODO: Replace with 'step_retrying' + correlationId: stepId, + }); + + step = await world.steps.update(workflowRunId, stepId, { + attempt, + status: 'running', + }); + + if (!step.startedAt) { + throw new WorkflowRuntimeError( + `Step "${stepId}" has no "startedAt" timestamp` + ); + } + // Hydrate the step input arguments and closure variables + const ops: Promise[] = []; + const hydratedInput = hydrateStepArguments( + step.input, + ops, + workflowRunId + ); + + const args = hydratedInput.args; + + span?.setAttributes({ + ...Attribute.StepArgumentsCount(args.length), + }); + + result = await contextStorage.run( + { + stepMetadata: { + stepId, + stepStartedAt: new Date(+step.startedAt), + attempt, + }, + workflowMetadata: { + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, + }, + () => stepFn.apply(null, args) + ); + + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + result = dehydrateStepReturnValue(result, ops, workflowRunId); + + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) + ); + + // Mark the step as completed first. This order is important. If a concurrent + // execution marked the step as complete, this request should throw, and + // this prevent the step_completed event in the event log + // TODO: this should really be atomic and handled by the world + await world.steps.update(workflowRunId, stepId, { + status: 'completed', + output: result as Serializable, + }); + + // Then, append the event log with the step result + await world.events.create(workflowRunId, { + eventType: 'step_completed', + correlationId: stepId, + eventData: { + result: result as Serializable, + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), + }); + } catch (err: unknown) { + span?.setAttributes({ + ...Attribute.StepErrorName(getErrorName(err)), + ...Attribute.StepErrorMessage(String(err)), + }); + + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + console.warn( + `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` + ); + return; + } + } + + if (FatalError.is(err)) { + const errorStack = getErrorStack(err); + const stackLines = errorStack.split('\n').slice(0, 4); + console.error( + `[Workflows] "${workflowRunId}" - Encountered \`FatalError\` while executing step "${stepName}":\n > ${stackLines.join('\n > ')}\n\nBubbling up error to parent workflow` + ); + // Fatal error - store the error in the event log and re-invoke the workflow + await world.events.create(workflowRunId, { + eventType: 'step_failed', + correlationId: stepId, + eventData: { + error: String(err), + stack: errorStack, + fatal: true, + }, + }); + await world.steps.update(workflowRunId, stepId, { + status: 'failed', + error: { + message: err.message || String(err), + stack: errorStack, + // TODO: include error codes when we define them + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepFatalError(true), + }); + } else { + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + + span?.setAttributes({ + ...Attribute.StepAttempt(attempt), + ...Attribute.StepMaxRetries(maxRetries), + }); + + if (attempt > maxRetries) { + // Max retries reached + const errorStack = getErrorStack(err); + const stackLines = errorStack.split('\n').slice(0, 4); + console.error( + `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${attempt}):\n > ${stackLines.join('\n > ')}\n\n Max retries reached\n Bubbling error to parent workflow` + ); + const errorMessage = `Step "${stepName}" failed after max retries: ${String(err)}`; + await world.events.create(workflowRunId, { + eventType: 'step_failed', + correlationId: stepId, + eventData: { + error: errorMessage, + stack: errorStack, + fatal: true, + }, + }); + await world.steps.update(workflowRunId, stepId, { + status: 'failed', + error: { + message: errorMessage, + stack: errorStack, + }, + }); + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); + } else { + // Not at max retries yet - log as a retryable error + if (RetryableError.is(err)) { + console.warn( + `[Workflows] "${workflowRunId}" - Encountered \`RetryableError\` while executing step "${stepName}" (attempt ${attempt}):\n > ${String(err.message)}\n\n This step has failed but will be retried` + ); + } else { + const stackLines = getErrorStack(err).split('\n').slice(0, 4); + console.error( + `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${attempt}):\n > ${stackLines.join('\n > ')}\n\n This step has failed but will be retried` + ); + } + await world.events.create(workflowRunId, { + eventType: 'step_failed', + correlationId: stepId, + eventData: { + error: String(err), + stack: getErrorStack(err), + }, + }); + + await world.steps.update(workflowRunId, stepId, { + status: 'pending', // TODO: Should be "retrying" once we have that status + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }); + + const timeoutSeconds = Math.max( + 1, + RetryableError.is(err) + ? Math.ceil((+err.retryAfter.getTime() - Date.now()) / 1000) + : 1 + ); + + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + ...Attribute.StepRetryWillRetry(true), + }); + + // It's a retryable error - so have the queue keep the message visible + // so that it gets retried. + return { timeoutSeconds }; + } + } + } + + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + } + ); + }); + } +); + +/** + * A single route that handles any step execution request and routes to the + * appropriate step function. We may eventually want to create different bundles + * for each step, this is temporary. + */ +export const stepEntrypoint: (req: Request) => Promise = + /* @__PURE__ */ withHealthCheck(stepHandler); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts new file mode 100644 index 000000000..a2a8e3930 --- /dev/null +++ b/packages/core/src/runtime/suspension-handler.ts @@ -0,0 +1,282 @@ +import { waitUntil } from '@vercel/functions'; +import { WorkflowAPIError } from '@workflow/errors'; +import type { World } from '@workflow/world'; +import type { + HookInvocationQueueItem, + StepInvocationQueueItem, + WaitInvocationQueueItem, + WorkflowSuspension, +} from '../global.js'; +import type { Serializable } from '../schemas.js'; +import { dehydrateStepArguments } from '../serialization.js'; +import * as Attribute from '../telemetry/semantic-conventions.js'; +import { serializeTraceCarrier } from '../telemetry.js'; +import type { Span } from '@opentelemetry/api'; +import { queueMessage } from './helpers.js'; + +export interface SuspensionHandlerParams { + suspension: WorkflowSuspension; + world: World; + runId: string; + workflowName: string; + workflowStartedAt: number; + span?: Span; +} + +export interface SuspensionHandlerResult { + timeoutSeconds?: number; +} + +interface ProcessHookParams { + queueItem: HookInvocationQueueItem; + world: World; + runId: string; + globalThis: typeof globalThis; +} + +/** + * Processes a single hook by creating it in the database and event log. + */ +async function processHook({ + queueItem, + world, + runId, + globalThis, +}: ProcessHookParams): Promise { + try { + // Create hook in database + const hookMetadata = + typeof queueItem.metadata === 'undefined' + ? undefined + : dehydrateStepArguments(queueItem.metadata, globalThis); + await world.hooks.create(runId, { + hookId: queueItem.correlationId, + token: queueItem.token, + metadata: hookMetadata, + }); + + // Create hook_created event in event log + await world.events.create(runId, { + eventType: 'hook_created', + correlationId: queueItem.correlationId, + }); + } catch (err) { + if (WorkflowAPIError.is(err)) { + if (err.status === 409) { + // Hook already exists (duplicate hook_id constraint), so we can skip it + console.warn( + `Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` + ); + return; + } else if (err.status === 410) { + // Workflow has already completed, so no-op + console.warn( + `Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${err.message}` + ); + return; + } + } + throw err; + } +} + +interface ProcessStepParams { + queueItem: StepInvocationQueueItem; + world: World; + runId: string; + workflowName: string; + workflowStartedAt: number; + globalThis: typeof globalThis; +} + +/** + * Processes a single step by creating it in the database and queueing execution. + */ +async function processStep({ + queueItem, + world, + runId, + workflowName, + workflowStartedAt, + globalThis, +}: ProcessStepParams): Promise { + const ops: Promise[] = []; + const dehydratedInput = dehydrateStepArguments( + { + args: queueItem.args, + closureVars: queueItem.closureVars, + }, + globalThis + ); + + try { + const step = await world.steps.create(runId, { + stepId: queueItem.correlationId, + stepName: queueItem.stepName, + input: dehydratedInput as Serializable, + }); + + waitUntil( + Promise.all(ops).catch((opErr) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted'; + if (!isAbortError) throw opErr; + }) + ); + + await queueMessage( + world, + `__wkf_step_${queueItem.stepName}`, + { + workflowName, + workflowRunId: runId, + workflowStartedAt, + stepId: step.stepId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + idempotencyKey: queueItem.correlationId, + } + ); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + // Step already exists, so we can skip it + console.warn( + `Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` + ); + return; + } + throw err; + } +} + +interface ProcessWaitParams { + queueItem: WaitInvocationQueueItem; + world: World; + runId: string; +} + +/** + * Processes a single wait by creating the event and calculating timeout. + * @returns The timeout in seconds, or null if the wait already exists. + */ +async function processWait({ + queueItem, + world, + runId, +}: ProcessWaitParams): Promise { + try { + // Only create wait_created event if it hasn't been created yet + if (!queueItem.hasCreatedEvent) { + await world.events.create(runId, { + eventType: 'wait_created', + correlationId: queueItem.correlationId, + eventData: { + resumeAt: queueItem.resumeAt, + }, + }); + } + + // Calculate how long to wait before resuming + const now = Date.now(); + const resumeAtMs = queueItem.resumeAt.getTime(); + const delayMs = Math.max(1000, resumeAtMs - now); + return Math.ceil(delayMs / 1000); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + // Wait already exists, so we can skip it + console.warn( + `Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}` + ); + return null; + } + throw err; + } +} + +/** + * Handles a workflow suspension by processing all pending operations (hooks, steps, waits). + * Hooks are processed first to prevent race conditions, then steps and waits in parallel. + */ +export async function handleSuspension({ + suspension, + world, + runId, + workflowName, + workflowStartedAt, + span, +}: SuspensionHandlerParams): Promise { + // Separate queue items by type for parallel processing + const stepItems = suspension.steps.filter( + (item): item is StepInvocationQueueItem => item.type === 'step' + ); + const hookItems = suspension.steps.filter( + (item): item is HookInvocationQueueItem => item.type === 'hook' + ); + const waitItems = suspension.steps.filter( + (item): item is WaitInvocationQueueItem => item.type === 'wait' + ); + + // Process all hooks first to prevent race conditions + await Promise.all( + hookItems.map((queueItem) => + processHook({ + queueItem, + world, + runId, + globalThis: suspension.globalThis, + }) + ) + ); + + // Then process steps and waits in parallel + const [, waitTimeouts] = await Promise.all([ + Promise.all( + stepItems.map((queueItem) => + processStep({ + queueItem, + world, + runId, + workflowName, + workflowStartedAt, + globalThis: suspension.globalThis, + }) + ) + ), + Promise.all( + waitItems.map((queueItem) => + processWait({ + queueItem, + world, + runId, + }) + ) + ), + ]); + + // Find minimum timeout from waits + const minTimeoutSeconds = waitTimeouts.reduce( + (min, timeout) => { + if (timeout === null) return min; + if (min === null) return timeout; + return Math.min(min, timeout); + }, + null + ); + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('workflow_suspended'), + ...Attribute.WorkflowStepsCreated(stepItems.length), + ...Attribute.WorkflowHooksCreated(hookItems.length), + ...Attribute.WorkflowWaitsCreated(waitItems.length), + }); + + // If we encountered any waits, return the minimum timeout + if (minTimeoutSeconds !== null) { + return { timeoutSeconds: minTimeoutSeconds }; + } + + return {}; +} diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index 68297d724..c996ced0e 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -64,7 +64,7 @@ export const WorkflowRunId = SemanticConvention('workflow.run.id'); /** Current status of the workflow run */ export const WorkflowRunStatus = SemanticConvention< - WorkflowRun['status'] | 'pending_steps' + WorkflowRun['status'] | 'workflow_suspended' >('workflow.run.status'); /** Timestamp when the workflow execution started (Unix timestamp) */ @@ -107,6 +107,16 @@ export const WorkflowStepsCreated = SemanticConvention( 'workflow.steps.created' ); +/** Number of hooks created during workflow execution */ +export const WorkflowHooksCreated = SemanticConvention( + 'workflow.hooks.created' +); + +/** Number of waits created during workflow execution */ +export const WorkflowWaitsCreated = SemanticConvention( + 'workflow.waits.created' +); + // Step attributes /** Name of the step function being executed */