Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/slick-cooks-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": patch
---

perf: use Map for invocationsQueue (O(1) lookup/delete)

Replace array-based invocationsQueue with Map for O(1) lookup and delete operations, eliminating O(n²) complexity in high-concurrency workflows.
40 changes: 23 additions & 17 deletions packages/core/src/global.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ import { FatalError } from '@workflow/errors';
import { describe, expect, it } from 'vitest';
import {
type HookInvocationQueueItem,
type QueueItem,
type StepInvocationQueueItem,
WorkflowSuspension,
} from './global.js';

// Helper to convert array of queue items to Map keyed by correlationId
function toQueueMap(items: QueueItem[]): Map<string, QueueItem> {
return new Map(items.map((item) => [item.correlationId, item]));
}

describe('FatalError', () => {
it('should create a FatalError instance', () => {
const error = new FatalError('Test fatal error');
Expand Down Expand Up @@ -43,7 +49,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'inv-1',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error).toBeInstanceOf(WorkflowSuspension);
expect(error).toBeInstanceOf(Error);
Expand All @@ -60,7 +66,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'inv-1',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error.message).toBe('1 step has not been run yet');
});
Expand All @@ -80,14 +86,14 @@ describe('WorkflowSuspension', () => {
correlationId: 'inv-2',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error.message).toBe('2 steps have not been run yet');
});

it('should handle empty steps array', () => {
const steps: StepInvocationQueueItem[] = [];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error.steps).toEqual([]);
expect(error.message).toBe('0 steps have not been run yet');
Expand Down Expand Up @@ -115,7 +121,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'another-inv',
},
];
const error = new WorkflowSuspension(complexSteps, globalThis);
const error = new WorkflowSuspension(toQueueMap(complexSteps), globalThis);

expect(error.steps).toEqual(complexSteps);
expect(error.message).toBe('2 steps have not been run yet');
Expand All @@ -142,7 +148,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'inv-1',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error instanceof Error).toBe(true);
expect(error instanceof WorkflowSuspension).toBe(true);
Expand All @@ -158,7 +164,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'inv-1',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error.stack).toBeDefined();
expect(error.stack).toContain('WorkflowSuspension');
Expand All @@ -179,7 +185,7 @@ describe('WorkflowSuspension', () => {
correlationId: 'email-456',
},
];
const error = new WorkflowSuspension(steps, globalThis);
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);

expect(error.steps).toHaveLength(2);
expect((error.steps[0] as StepInvocationQueueItem).stepName).toBe(
Expand Down Expand Up @@ -212,7 +218,7 @@ describe('WorkflowSuspension', () => {
token: 'webhook-token',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('1 hook has not been created yet');
expect(error.hookCount).toBe(1);
Expand All @@ -231,7 +237,7 @@ describe('WorkflowSuspension', () => {
token: 'webhook-token-2',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('2 hooks have not been created yet');
expect(error.hookCount).toBe(2);
Expand All @@ -245,7 +251,7 @@ describe('WorkflowSuspension', () => {
token: 'my-token',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('1 hook has not been created yet');
expect(error.hookCount).toBe(1);
Expand All @@ -264,7 +270,7 @@ describe('WorkflowSuspension', () => {
token: 'token-2',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('2 hooks have not been created yet');
expect(error.hookCount).toBe(2);
Expand All @@ -289,7 +295,7 @@ describe('WorkflowSuspension', () => {
token: 'my-token',
},
];
const error = new WorkflowSuspension(items, globalThis);
const error = new WorkflowSuspension(toQueueMap(items), globalThis);

expect(error.message).toBe('1 step and 2 hooks have not been run yet');
expect(error.stepCount).toBe(1);
Expand All @@ -316,7 +322,7 @@ describe('WorkflowSuspension', () => {
token: 'webhook-token',
},
];
const error = new WorkflowSuspension(items, globalThis);
const error = new WorkflowSuspension(toQueueMap(items), globalThis);

expect(error.message).toBe('2 steps and 1 hook have not been run yet');
expect(error.stepCount).toBe(2);
Expand All @@ -337,7 +343,7 @@ describe('WorkflowSuspension', () => {
token: 'my-token',
},
];
const error = new WorkflowSuspension(items, globalThis);
const error = new WorkflowSuspension(toQueueMap(items), globalThis);

// When there are steps, the action should be "run" not "created"
expect(error.message).toBe('1 step and 1 hook have not been run yet');
Expand All @@ -351,7 +357,7 @@ describe('WorkflowSuspension', () => {
token: 'webhook-token',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('1 hook has not been created yet');
});
Expand All @@ -364,7 +370,7 @@ describe('WorkflowSuspension', () => {
token: 'my-token',
},
];
const error = new WorkflowSuspension(hooks, globalThis);
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);

expect(error.message).toBe('1 hook has not been created yet');
});
Expand Down
17 changes: 13 additions & 4 deletions packages/core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@ export class WorkflowSuspension extends Error {
hookCount: number;
waitCount: number;

constructor(steps: QueueItem[], global: typeof globalThis) {
const stepCount = steps.filter((s) => s.type === 'step').length;
const hookCount = steps.filter((s) => s.type === 'hook').length;
const waitCount = steps.filter((s) => s.type === 'wait').length;
constructor(stepsInput: Map<string, QueueItem>, global: typeof globalThis) {
// Convert Map to array for iteration and storage
const steps = [...stepsInput.values()];

// Single-pass counting for efficiency
let stepCount = 0;
let hookCount = 0;
let waitCount = 0;
for (const item of steps) {
if (item.type === 'step') stepCount++;
else if (item.type === 'hook') hookCount++;
else if (item.type === 'wait') waitCount++;
}

// Build description parts
const parts: string[] = [];
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ export { __private_getClosureVars } from './step/get-closure-vars.js';
export interface WorkflowOrchestratorContext {
globalThis: typeof globalThis;
eventsConsumer: EventsConsumer;
invocationsQueue: QueueItem[];
/**
* Map of pending invocations keyed by correlationId.
* Using Map instead of Array for O(1) lookup/delete operations.
*/
invocationsQueue: Map<string, QueueItem>;
onWorkflowError: (error: Error) => void;
generateUlid: () => string;
generateNanoid: () => string;
Expand Down
22 changes: 15 additions & 7 deletions packages/core/src/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
return {
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events),
invocationsQueue: [],
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt), // All generated ulids use the workflow's started at time
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
Expand Down Expand Up @@ -99,7 +99,10 @@ describe('createUseStep', () => {
expect((error as WorkflowSuspension).message).toBe(
'1 step has not been run yet'
);
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
// Compare Map values with WorkflowSuspension.steps array
expect([...ctx.invocationsQueue.values()]).toEqual(
(error as WorkflowSuspension).steps
);
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
[
{
Expand Down Expand Up @@ -142,7 +145,10 @@ describe('createUseStep', () => {
expect((error as WorkflowSuspension).message).toBe(
'3 steps have not been run yet'
);
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
// Compare Map values with WorkflowSuspension.steps array
expect([...ctx.invocationsQueue.values()]).toEqual(
(error as WorkflowSuspension).steps
);
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
[
{
Expand Down Expand Up @@ -228,8 +234,9 @@ describe('createUseStep', () => {
expect(result).toBe('Result: 42');

// Verify closure variables were added to invocation queue
expect(ctx.invocationsQueue).toHaveLength(1);
expect(ctx.invocationsQueue[0]).toMatchObject({
expect(ctx.invocationsQueue.size).toBe(1);
const queueItem = [...ctx.invocationsQueue.values()][0];
expect(queueItem).toMatchObject({
type: 'step',
stepName: 'calculate',
args: [],
Expand Down Expand Up @@ -263,8 +270,9 @@ describe('createUseStep', () => {
expect(result).toBe(5);

// Verify empty closure variables were added to invocation queue
expect(ctx.invocationsQueue).toHaveLength(1);
expect(ctx.invocationsQueue[0]).toMatchObject({
expect(ctx.invocationsQueue.size).toBe(1);
const queueItem = [...ctx.invocationsQueue.values()][0];
expect(queueItem).toMatchObject({
type: 'step',
stepName: 'add',
args: [2, 3],
Expand Down
11 changes: 3 additions & 8 deletions packages/core/src/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
queueItem.closureVars = closureVars;
}

ctx.invocationsQueue.push(queueItem);
ctx.invocationsQueue.set(correlationId, queueItem);

// Track whether we've already seen a "step_started" event for this step.
// This is important because after a retryable failure, the step moves back to
Expand Down Expand Up @@ -73,13 +73,8 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
if (event.eventType === 'step_started') {
// Step has started - so remove from the invocations queue (only on the first "step_started" event)
if (!hasSeenStepStarted) {
const invocationsQueueIndex = ctx.invocationsQueue.findIndex(
(invocation) =>
invocation.type === 'step' &&
invocation.correlationId === correlationId
);
if (invocationsQueueIndex !== -1) {
ctx.invocationsQueue.splice(invocationsQueueIndex, 1);
// O(1) lookup and delete using Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a syntax error in the step handler: an orphaned else clause on line 78 appears after a statement rather than an if block. Additionally, the logic is broken - the error handling code (lines 79-86) is now unreachable because the delete() return value is not checked.

View Details
📝 Patch Details
diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts
index 10238dc..f76f438 100644
--- a/packages/core/src/step.ts
+++ b/packages/core/src/step.ts
@@ -74,8 +74,8 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
           // Step has started - so remove from the invocations queue (only on the first "step_started" event)
           if (!hasSeenStepStarted) {
             // O(1) lookup and delete using Map
-            ctx.invocationsQueue.delete(correlationId);
-            } else {
+            const wasDeleted = ctx.invocationsQueue.delete(correlationId);
+            if (!wasDeleted) {
               setTimeout(() => {
                 reject(
                   new WorkflowRuntimeError(

Analysis

Syntax error in step handler due to incomplete refactoring of Map.delete()

What fails: The code in packages/core/src/step.ts (lines 75-89) contains a syntax error where an orphaned else clause appears after a statement rather than an if block, preventing the entire module from compiling.

How to reproduce:

cd packages/core
pnpm typecheck

Result: TypeScript compiler reports:

src/step.ts(95,9): error TS1005: ',' expected.
src/step.ts(128,8): error TS1005: ',' expected.
src/step.ts(160,1): error TS1128: Declaration or statement expected.

Root cause: A refactoring commit (fe0a6b0) changed from checking if (ctx.invocationsQueue.has(correlationId)) before deleting to using Map.delete() directly, but the refactoring was incomplete - the inner if check was removed while leaving the corresponding else clause orphaned:

// BROKEN - orphaned else with no matching if
if (!hasSeenStepStarted) {
  ctx.invocationsQueue.delete(correlationId);   // Statement, not an if
  } else {                                        // Syntax error: else without if
    // error handling
  }
}

Fix: Check the return value of Map.delete() (which returns boolean) to determine if the deletion succeeded:

const wasDeleted = ctx.invocationsQueue.delete(correlationId);
if (!wasDeleted) {
  // error handling
}

This preserves the original logic while maintaining O(1) complexity with Map operations.

ctx.invocationsQueue.delete(correlationId);
} else {
setTimeout(() => {
reject(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export async function runWorkflow(
eventsConsumer: new EventsConsumer(events),
generateUlid: () => ulid(+startedAt),
generateNanoid,
invocationsQueue: [],
invocationsQueue: new Map(),
};

// Subscribe to the events log to update the timestamp in the vm context
Expand Down
13 changes: 4 additions & 9 deletions packages/core/src/workflow/hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
const correlationId = `hook_${ctx.generateUlid()}`;
const token = options.token ?? ctx.generateNanoid();

// Add hook creation to invocations queue
ctx.invocationsQueue.push({
// Add hook creation to invocations queue (using Map for O(1) operations)
ctx.invocationsQueue.set(correlationId, {
type: 'hook',
correlationId,
token,
Expand Down Expand Up @@ -52,13 +52,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
event?.eventType === 'hook_created' &&
event.correlationId === correlationId
) {
// Remove this hook from the invocations queue if it exists
const index = ctx.invocationsQueue.findIndex(
(item) => item.type === 'hook' && item.correlationId === correlationId
);
if (index !== -1) {
ctx.invocationsQueue.splice(index, 1);
}
// Remove this hook from the invocations queue (O(1) delete using Map)
ctx.invocationsQueue.delete(correlationId);
return EventConsumerResult.Consumed;
}

Expand Down
27 changes: 11 additions & 16 deletions packages/core/src/workflow/sleep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
// Calculate the resume time
const resumeAt = parseDurationToDate(param);

// Add wait to invocations queue
ctx.invocationsQueue.push({
// Add wait to invocations queue (using Map for O(1) operations)
const waitItem: WaitInvocationQueueItem = {
type: 'wait',
correlationId,
resumeAt,
});
};
ctx.invocationsQueue.set(correlationId, waitItem);

ctx.eventsConsumer.subscribe((event) => {
// If there are no events and we're waiting for wait_completed,
Expand All @@ -39,12 +40,11 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
event.correlationId === correlationId
) {
// Mark this wait as having the created event, but keep it in the queue
const waitItem = ctx.invocationsQueue.find(
(item) => item.type === 'wait' && item.correlationId === correlationId
) as WaitInvocationQueueItem | undefined;
if (waitItem) {
waitItem.hasCreatedEvent = true;
waitItem.resumeAt = event.eventData.resumeAt;
// O(1) lookup using Map
const queueItem = ctx.invocationsQueue.get(correlationId);
if (queueItem && queueItem.type === 'wait') {
queueItem.hasCreatedEvent = true;
queueItem.resumeAt = event.eventData.resumeAt;
}
return EventConsumerResult.Consumed;
}
Expand All @@ -54,13 +54,8 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
event?.eventType === 'wait_completed' &&
event.correlationId === correlationId
) {
// Remove this wait from the invocations queue
const index = ctx.invocationsQueue.findIndex(
(item) => item.type === 'wait' && item.correlationId === correlationId
);
if (index !== -1) {
ctx.invocationsQueue.splice(index, 1);
}
// Remove this wait from the invocations queue (O(1) delete using Map)
ctx.invocationsQueue.delete(correlationId);

// Wait has elapsed, resolve the sleep
setTimeout(() => {
Expand Down
Loading