Skip to content

Commit c4a92e0

Browse files
committed
perf: parallelize suspension handler for high-concurrency
1 parent 9220add commit c4a92e0

File tree

2 files changed

+156
-118
lines changed

2 files changed

+156
-118
lines changed

.changeset/fast-owls-flow.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
perf: parallelize suspension handler for high-concurrency workflows
6+
7+
- Process step/hook/wait creation in parallel with Promise.all instead of sequential for-loop
8+
- Reduces suspension handling time from O(n) sequential I/O to O(1) parallel I/O

packages/core/src/runtime.ts

Lines changed: 148 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -398,141 +398,171 @@ export function workflowEntrypoint(workflowCode: string) {
398398
// Note: suspensionMessage logged only in debug mode to avoid production noise
399399
// console.debug(suspensionMessage);
400400
}
401-
// Process each operation in the queue (steps and hooks)
402-
let minTimeoutSeconds: number | null = null;
403-
for (const queueItem of err.steps) {
404-
if (queueItem.type === 'step') {
405-
// Handle step operations
406-
const ops: Promise<void>[] = [];
407-
const dehydratedInput = dehydrateStepArguments(
401+
// Process all operations in the queue in parallel for better performance
402+
// Separate queue items by type for parallel processing
403+
const stepItems = err.steps.filter(
404+
(item): item is Extract<typeof item, { type: 'step' }> =>
405+
item.type === 'step'
406+
);
407+
const hookItems = err.steps.filter(
408+
(item): item is Extract<typeof item, { type: 'hook' }> =>
409+
item.type === 'hook'
410+
);
411+
const waitItems = err.steps.filter(
412+
(item): item is Extract<typeof item, { type: 'wait' }> =>
413+
item.type === 'wait'
414+
);
415+
416+
// Process all steps in parallel
417+
const stepPromises = stepItems.map(async (queueItem) => {
418+
const ops: Promise<void>[] = [];
419+
const dehydratedInput = dehydrateStepArguments(
420+
{
421+
args: queueItem.args,
422+
closureVars: queueItem.closureVars,
423+
},
424+
err.globalThis
425+
);
426+
427+
try {
428+
const step = await world.steps.create(runId, {
429+
stepId: queueItem.correlationId,
430+
stepName: queueItem.stepName,
431+
input: dehydratedInput as Serializable,
432+
});
433+
434+
waitUntil(
435+
Promise.all(ops).catch((opErr) => {
436+
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
437+
const isAbortError =
438+
opErr?.name === 'AbortError' ||
439+
opErr?.name === 'ResponseAborted';
440+
if (!isAbortError) throw opErr;
441+
})
442+
);
443+
444+
await queueMessage(
445+
world,
446+
`__wkf_step_${queueItem.stepName}`,
408447
{
409-
args: queueItem.args,
410-
closureVars: queueItem.closureVars,
448+
workflowName,
449+
workflowRunId: runId,
450+
workflowStartedAt,
451+
stepId: step.stepId,
452+
traceCarrier: await serializeTraceCarrier(),
453+
requestedAt: new Date(),
411454
},
412-
err.globalThis
455+
{
456+
idempotencyKey: queueItem.correlationId,
457+
}
413458
);
414-
415-
try {
416-
const step = await world.steps.create(runId, {
417-
stepId: queueItem.correlationId,
418-
stepName: queueItem.stepName,
419-
input: dehydratedInput as Serializable,
420-
});
421-
422-
waitUntil(
423-
Promise.all(ops).catch((err) => {
424-
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
425-
const isAbortError =
426-
err?.name === 'AbortError' ||
427-
err?.name === 'ResponseAborted';
428-
if (!isAbortError) throw err;
429-
})
459+
} catch (stepErr) {
460+
if (
461+
WorkflowAPIError.is(stepErr) &&
462+
stepErr.status === 409
463+
) {
464+
// Step already exists, so we can skip it
465+
console.warn(
466+
`Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${stepErr.message}`
430467
);
468+
return;
469+
}
470+
throw stepErr;
471+
}
472+
});
431473

432-
await queueMessage(
433-
world,
434-
`__wkf_step_${queueItem.stepName}`,
435-
{
436-
workflowName,
437-
workflowRunId: runId,
438-
workflowStartedAt,
439-
stepId: step.stepId,
440-
traceCarrier: await serializeTraceCarrier(),
441-
requestedAt: new Date(),
442-
},
443-
{
444-
idempotencyKey: queueItem.correlationId,
445-
}
446-
);
447-
} catch (err) {
448-
if (WorkflowAPIError.is(err) && err.status === 409) {
449-
// Step already exists, so we can skip it
474+
// Process all hooks in parallel
475+
const hookPromises = hookItems.map(async (queueItem) => {
476+
try {
477+
// Create hook in database
478+
const hookMetadata =
479+
typeof queueItem.metadata === 'undefined'
480+
? undefined
481+
: dehydrateStepArguments(
482+
queueItem.metadata,
483+
err.globalThis
484+
);
485+
await world.hooks.create(runId, {
486+
hookId: queueItem.correlationId,
487+
token: queueItem.token,
488+
metadata: hookMetadata,
489+
});
490+
491+
// Create hook_created event in event log
492+
await world.events.create(runId, {
493+
eventType: 'hook_created',
494+
correlationId: queueItem.correlationId,
495+
});
496+
} catch (hookErr) {
497+
if (WorkflowAPIError.is(hookErr)) {
498+
if (hookErr.status === 409) {
499+
// Hook already exists (duplicate hook_id constraint), so we can skip it
500+
console.warn(
501+
`Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${hookErr.message}`
502+
);
503+
return;
504+
} else if (hookErr.status === 410) {
505+
// Workflow has already completed, so no-op
450506
console.warn(
451-
`Step "${queueItem.stepName}" with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
507+
`Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${hookErr.message}`
452508
);
453-
continue;
509+
return;
454510
}
455-
throw err;
456511
}
457-
} else if (queueItem.type === 'hook') {
458-
// Handle hook operations
459-
try {
460-
// Create hook in database
461-
const hookMetadata =
462-
typeof queueItem.metadata === 'undefined'
463-
? undefined
464-
: dehydrateStepArguments(
465-
queueItem.metadata,
466-
err.globalThis
467-
);
468-
await world.hooks.create(runId, {
469-
hookId: queueItem.correlationId,
470-
token: queueItem.token,
471-
metadata: hookMetadata,
472-
});
512+
throw hookErr;
513+
}
514+
});
473515

474-
// Create hook_created event in event log
516+
// Process all waits in parallel, collecting timeout values
517+
const waitPromises = waitItems.map(async (queueItem) => {
518+
try {
519+
// Only create wait_created event if it hasn't been created yet
520+
if (!queueItem.hasCreatedEvent) {
475521
await world.events.create(runId, {
476-
eventType: 'hook_created',
522+
eventType: 'wait_created',
477523
correlationId: queueItem.correlationId,
524+
eventData: {
525+
resumeAt: queueItem.resumeAt,
526+
},
478527
});
479-
} catch (err) {
480-
if (WorkflowAPIError.is(err)) {
481-
if (err.status === 409) {
482-
// Hook already exists (duplicate hook_id constraint), so we can skip it
483-
console.warn(
484-
`Hook with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
485-
);
486-
continue;
487-
} else if (err.status === 410) {
488-
// Workflow has already completed, so no-op
489-
console.warn(
490-
`Workflow run "${runId}" has already completed, skipping hook "${queueItem.correlationId}": ${err.message}`
491-
);
492-
continue;
493-
}
494-
}
495-
throw err;
496528
}
497-
} else if (queueItem.type === 'wait') {
498-
// Handle wait operations
499-
try {
500-
// Only create wait_created event if it hasn't been created yet
501-
if (!queueItem.hasCreatedEvent) {
502-
await world.events.create(runId, {
503-
eventType: 'wait_created',
504-
correlationId: queueItem.correlationId,
505-
eventData: {
506-
resumeAt: queueItem.resumeAt,
507-
},
508-
});
509-
}
510529

511-
// Calculate how long to wait before resuming
512-
const now = Date.now();
513-
const resumeAtMs = queueItem.resumeAt.getTime();
514-
const delayMs = Math.max(1000, resumeAtMs - now);
515-
const timeoutSeconds = Math.ceil(delayMs / 1000);
516-
517-
// Track the minimum timeout across all waits
518-
if (
519-
minTimeoutSeconds === null ||
520-
timeoutSeconds < minTimeoutSeconds
521-
) {
522-
minTimeoutSeconds = timeoutSeconds;
523-
}
524-
} catch (err) {
525-
if (WorkflowAPIError.is(err) && err.status === 409) {
526-
// Wait already exists, so we can skip it
527-
console.warn(
528-
`Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${err.message}`
529-
);
530-
continue;
531-
}
532-
throw err;
530+
// Calculate how long to wait before resuming
531+
const now = Date.now();
532+
const resumeAtMs = queueItem.resumeAt.getTime();
533+
const delayMs = Math.max(1000, resumeAtMs - now);
534+
return Math.ceil(delayMs / 1000);
535+
} catch (waitErr) {
536+
if (
537+
WorkflowAPIError.is(waitErr) &&
538+
waitErr.status === 409
539+
) {
540+
// Wait already exists, so we can skip it
541+
console.warn(
542+
`Wait with correlation ID "${queueItem.correlationId}" already exists, skipping: ${waitErr.message}`
543+
);
544+
return null;
533545
}
546+
throw waitErr;
534547
}
535-
}
548+
});
549+
550+
// Wait for all operations to complete in parallel
551+
const [, , waitTimeouts] = await Promise.all([
552+
Promise.all(stepPromises),
553+
Promise.all(hookPromises),
554+
Promise.all(waitPromises),
555+
]);
556+
557+
// Find minimum timeout from waits
558+
const minTimeoutSeconds = waitTimeouts.reduce<number | null>(
559+
(min, timeout) => {
560+
if (timeout === null) return min;
561+
if (min === null) return timeout;
562+
return Math.min(min, timeout);
563+
},
564+
null
565+
);
536566

537567
span?.setAttributes({
538568
...Attribute.WorkflowRunStatus('pending_steps'),

0 commit comments

Comments
 (0)