Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/fast-owls-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/core": patch
---

perf: parallelize suspension handler for high-concurrency workflows

- Process step/hook/wait creation in parallel with Promise.all instead of sequential for-loop
- Reduces suspension handling time from O(n) sequential I/O to O(1) parallel I/O
266 changes: 148 additions & 118 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,141 +398,171 @@ export function workflowEntrypoint(workflowCode: string) {
// Note: suspensionMessage logged only in debug mode to avoid production noise
// console.debug(suspensionMessage);
}
// Process each operation in the queue (steps and hooks)
let minTimeoutSeconds: number | null = null;
for (const queueItem of err.steps) {
if (queueItem.type === 'step') {
// Handle step operations
const ops: Promise<void>[] = [];
const dehydratedInput = dehydrateStepArguments(
// Process all operations in the queue in parallel for better performance
// Separate queue items by type for parallel processing
const stepItems = err.steps.filter(
(item): item is Extract<typeof item, { type: 'step' }> =>
item.type === 'step'
);
const hookItems = err.steps.filter(
(item): item is Extract<typeof item, { type: 'hook' }> =>
item.type === 'hook'
);
const waitItems = err.steps.filter(
(item): item is Extract<typeof item, { type: 'wait' }> =>
item.type === 'wait'
);

// Process all steps in parallel
const stepPromises = stepItems.map(async (queueItem) => {
const ops: Promise<void>[] = [];
const dehydratedInput = dehydrateStepArguments(
{
args: queueItem.args,
closureVars: queueItem.closureVars,
},
err.globalThis
);

try {
const step = await world.steps.create(runId, {
stepId: queueItem.correlationId,
stepName: queueItem.stepName,
input: dehydratedInput as Serializable,
});

waitUntil(
Promise.all(ops).catch((opErr) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
opErr?.name === 'AbortError' ||
opErr?.name === 'ResponseAborted';
if (!isAbortError) throw opErr;
})
);

await queueMessage(
world,
`__wkf_step_${queueItem.stepName}`,
{
args: queueItem.args,
closureVars: queueItem.closureVars,
workflowName,
workflowRunId: runId,
workflowStartedAt,
stepId: step.stepId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
err.globalThis
{
idempotencyKey: queueItem.correlationId,
}
);

try {
const step = await world.steps.create(runId, {
stepId: queueItem.correlationId,
stepName: queueItem.stepName,
input: dehydratedInput as Serializable,
});

waitUntil(
Promise.all(ops).catch((err) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
err?.name === 'AbortError' ||
err?.name === 'ResponseAborted';
if (!isAbortError) throw err;
})
} catch (stepErr) {
if (
WorkflowAPIError.is(stepErr) &&
stepErr.status === 409
) {
// Step already exists, so we can skip it
console.warn(
`Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${stepErr.message}`
);
return;
}
throw stepErr;
}
});

await queueMessage(
world,
`__wkf_step_${queueItem.stepName}`,
{
workflowName,
workflowRunId: runId,
workflowStartedAt,
stepId: step.stepId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
idempotencyKey: queueItem.correlationId,
}
);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
// Step already exists, so we can skip it
// Process all hooks in parallel
const hookPromises = hookItems.map(async (queueItem) => {
try {
// Create hook in database
const hookMetadata =
typeof queueItem.metadata === 'undefined'
? undefined
: dehydrateStepArguments(
queueItem.metadata,
err.globalThis
);
await world.hooks.create(runId, {
hookId: queueItem.correlationId,
token: queueItem.token,
metadata: hookMetadata,
});

// Create hook_created event in event log
await world.events.create(runId, {
eventType: 'hook_created',
correlationId: queueItem.correlationId,
});
} catch (hookErr) {
if (WorkflowAPIError.is(hookErr)) {
if (hookErr.status === 409) {
// Hook already exists (duplicate hook_id constraint), so we can skip it
console.warn(
`Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${hookErr.message}`
);
return;
} else if (hookErr.status === 410) {
// Workflow has already completed, so no-op
console.warn(
`Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
`Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${hookErr.message}`
);
continue;
return;
}
throw err;
}
} else if (queueItem.type === 'hook') {
// Handle hook operations
try {
// Create hook in database
const hookMetadata =
typeof queueItem.metadata === 'undefined'
? undefined
: dehydrateStepArguments(
queueItem.metadata,
err.globalThis
);
await world.hooks.create(runId, {
hookId: queueItem.correlationId,
token: queueItem.token,
metadata: hookMetadata,
});
throw hookErr;
}
});

// Create hook_created event in event log
// Process all waits in parallel, collecting timeout values
const waitPromises = waitItems.map(async (queueItem) => {
try {
// Only create wait_created event if it hasn't been created yet
if (!queueItem.hasCreatedEvent) {
await world.events.create(runId, {
eventType: 'hook_created',
eventType: 'wait_created',
correlationId: queueItem.correlationId,
eventData: {
resumeAt: queueItem.resumeAt,
},
});
} catch (err) {
if (WorkflowAPIError.is(err)) {
if (err.status === 409) {
// Hook already exists (duplicate hook_id constraint), so we can skip it
console.warn(
`Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
);
continue;
} else if (err.status === 410) {
// Workflow has already completed, so no-op
console.warn(
`Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${err.message}`
);
continue;
}
}
throw err;
}
} else if (queueItem.type === 'wait') {
// Handle wait operations
try {
// Only create wait_created event if it hasn't been created yet
if (!queueItem.hasCreatedEvent) {
await world.events.create(runId, {
eventType: 'wait_created',
correlationId: queueItem.correlationId,
eventData: {
resumeAt: queueItem.resumeAt,
},
});
}

// Calculate how long to wait before resuming
const now = Date.now();
const resumeAtMs = queueItem.resumeAt.getTime();
const delayMs = Math.max(1000, resumeAtMs - now);
const timeoutSeconds = Math.ceil(delayMs / 1000);

// Track the minimum timeout across all waits
if (
minTimeoutSeconds === null ||
timeoutSeconds < minTimeoutSeconds
) {
minTimeoutSeconds = timeoutSeconds;
}
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
// Wait already exists, so we can skip it
console.warn(
`Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
);
continue;
}
throw err;
// Calculate how long to wait before resuming
const now = Date.now();
const resumeAtMs = queueItem.resumeAt.getTime();
const delayMs = Math.max(1000, resumeAtMs - now);
return Math.ceil(delayMs / 1000);
} catch (waitErr) {
if (
WorkflowAPIError.is(waitErr) &&
waitErr.status === 409
) {
// Wait already exists, so we can skip it
console.warn(
`Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${waitErr.message}`
);
return null;
}
throw waitErr;
}
}
});

// Wait for all operations to complete in parallel
const [, , waitTimeouts] = await Promise.all([
Promise.all(stepPromises),
Promise.all(hookPromises),
Promise.all(waitPromises),
]);

// Find minimum timeout from waits
const minTimeoutSeconds = waitTimeouts.reduce<number | null>(
(min, timeout) => {
if (timeout === null) return min;
if (min === null) return timeout;
return Math.min(min, timeout);
},
null
);

span?.setAttributes({
...Attribute.WorkflowRunStatus('pending_steps'),
Expand Down
Loading