Skip to content

Commit 0bbd26f

Browse files
pranaygpclaudeTooTallNate
authored
perf: use Map for invocationsQueue (O(1) lookup/delete) (#541)
* perf: use Map for invocationsQueue (O(1) lookup/delete) Replace array-based invocationsQueue with Map<string, QueueItem> for O(1) lookup and delete operations. This eliminates the O(n²) complexity from findIndex + splice patterns in step.ts, hook.ts, and sleep.ts. Changes: - private.ts: Change invocationsQueue type from QueueItem[] to Map - workflow.ts: Initialize with new Map() instead of [] - global.ts: Accept Map or array in WorkflowSuspension, add single-pass counting - step.ts: Use Map.set/has/delete instead of push/findIndex/splice - hook.ts: Use Map.set/delete instead of push/findIndex/splice - sleep.ts: Use Map.set/get/delete instead of push/find/findIndex/splice This is Phase 1 of the high-concurrency workflow optimization plan. See beads issue wrk-fyx for details. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Update packages/core/src/step.ts Co-authored-by: Nathan Rajlich <[email protected]> --------- Co-authored-by: Claude <[email protected]> Co-authored-by: Nathan Rajlich <[email protected]>
1 parent cbb03bc commit 0bbd26f

File tree

9 files changed

+82
-63
lines changed

9 files changed

+82
-63
lines changed

.changeset/slick-cooks-clean.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
perf: use Map for invocationsQueue (O(1) lookup/delete)
6+
7+
Replace array-based invocationsQueue with Map for O(1) lookup and delete operations, eliminating O(n²) complexity in high-concurrency workflows.

packages/core/src/global.test.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@ import { FatalError } from '@workflow/errors';
22
import { describe, expect, it } from 'vitest';
33
import {
44
type HookInvocationQueueItem,
5+
type QueueItem,
56
type StepInvocationQueueItem,
67
WorkflowSuspension,
78
} from './global.js';
89

10+
// Helper to convert array of queue items to Map keyed by correlationId
11+
function toQueueMap(items: QueueItem[]): Map<string, QueueItem> {
12+
return new Map(items.map((item) => [item.correlationId, item]));
13+
}
14+
915
describe('FatalError', () => {
1016
it('should create a FatalError instance', () => {
1117
const error = new FatalError('Test fatal error');
@@ -43,7 +49,7 @@ describe('WorkflowSuspension', () => {
4349
correlationId: 'inv-1',
4450
},
4551
];
46-
const error = new WorkflowSuspension(steps, globalThis);
52+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
4753

4854
expect(error).toBeInstanceOf(WorkflowSuspension);
4955
expect(error).toBeInstanceOf(Error);
@@ -60,7 +66,7 @@ describe('WorkflowSuspension', () => {
6066
correlationId: 'inv-1',
6167
},
6268
];
63-
const error = new WorkflowSuspension(steps, globalThis);
69+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
6470

6571
expect(error.message).toBe('1 step has not been run yet');
6672
});
@@ -80,14 +86,14 @@ describe('WorkflowSuspension', () => {
8086
correlationId: 'inv-2',
8187
},
8288
];
83-
const error = new WorkflowSuspension(steps, globalThis);
89+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
8490

8591
expect(error.message).toBe('2 steps have not been run yet');
8692
});
8793

8894
it('should handle empty steps array', () => {
8995
const steps: StepInvocationQueueItem[] = [];
90-
const error = new WorkflowSuspension(steps, globalThis);
96+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
9197

9298
expect(error.steps).toEqual([]);
9399
expect(error.message).toBe('0 steps have not been run yet');
@@ -115,7 +121,7 @@ describe('WorkflowSuspension', () => {
115121
correlationId: 'another-inv',
116122
},
117123
];
118-
const error = new WorkflowSuspension(complexSteps, globalThis);
124+
const error = new WorkflowSuspension(toQueueMap(complexSteps), globalThis);
119125

120126
expect(error.steps).toEqual(complexSteps);
121127
expect(error.message).toBe('2 steps have not been run yet');
@@ -142,7 +148,7 @@ describe('WorkflowSuspension', () => {
142148
correlationId: 'inv-1',
143149
},
144150
];
145-
const error = new WorkflowSuspension(steps, globalThis);
151+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
146152

147153
expect(error instanceof Error).toBe(true);
148154
expect(error instanceof WorkflowSuspension).toBe(true);
@@ -158,7 +164,7 @@ describe('WorkflowSuspension', () => {
158164
correlationId: 'inv-1',
159165
},
160166
];
161-
const error = new WorkflowSuspension(steps, globalThis);
167+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
162168

163169
expect(error.stack).toBeDefined();
164170
expect(error.stack).toContain('WorkflowSuspension');
@@ -179,7 +185,7 @@ describe('WorkflowSuspension', () => {
179185
correlationId: 'email-456',
180186
},
181187
];
182-
const error = new WorkflowSuspension(steps, globalThis);
188+
const error = new WorkflowSuspension(toQueueMap(steps), globalThis);
183189

184190
expect(error.steps).toHaveLength(2);
185191
expect((error.steps[0] as StepInvocationQueueItem).stepName).toBe(
@@ -212,7 +218,7 @@ describe('WorkflowSuspension', () => {
212218
token: 'webhook-token',
213219
},
214220
];
215-
const error = new WorkflowSuspension(hooks, globalThis);
221+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
216222

217223
expect(error.message).toBe('1 hook has not been created yet');
218224
expect(error.hookCount).toBe(1);
@@ -231,7 +237,7 @@ describe('WorkflowSuspension', () => {
231237
token: 'webhook-token-2',
232238
},
233239
];
234-
const error = new WorkflowSuspension(hooks, globalThis);
240+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
235241

236242
expect(error.message).toBe('2 hooks have not been created yet');
237243
expect(error.hookCount).toBe(2);
@@ -245,7 +251,7 @@ describe('WorkflowSuspension', () => {
245251
token: 'my-token',
246252
},
247253
];
248-
const error = new WorkflowSuspension(hooks, globalThis);
254+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
249255

250256
expect(error.message).toBe('1 hook has not been created yet');
251257
expect(error.hookCount).toBe(1);
@@ -264,7 +270,7 @@ describe('WorkflowSuspension', () => {
264270
token: 'token-2',
265271
},
266272
];
267-
const error = new WorkflowSuspension(hooks, globalThis);
273+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
268274

269275
expect(error.message).toBe('2 hooks have not been created yet');
270276
expect(error.hookCount).toBe(2);
@@ -289,7 +295,7 @@ describe('WorkflowSuspension', () => {
289295
token: 'my-token',
290296
},
291297
];
292-
const error = new WorkflowSuspension(items, globalThis);
298+
const error = new WorkflowSuspension(toQueueMap(items), globalThis);
293299

294300
expect(error.message).toBe('1 step and 2 hooks have not been run yet');
295301
expect(error.stepCount).toBe(1);
@@ -316,7 +322,7 @@ describe('WorkflowSuspension', () => {
316322
token: 'webhook-token',
317323
},
318324
];
319-
const error = new WorkflowSuspension(items, globalThis);
325+
const error = new WorkflowSuspension(toQueueMap(items), globalThis);
320326

321327
expect(error.message).toBe('2 steps and 1 hook have not been run yet');
322328
expect(error.stepCount).toBe(2);
@@ -337,7 +343,7 @@ describe('WorkflowSuspension', () => {
337343
token: 'my-token',
338344
},
339345
];
340-
const error = new WorkflowSuspension(items, globalThis);
346+
const error = new WorkflowSuspension(toQueueMap(items), globalThis);
341347

342348
// When there are steps, the action should be "run" not "created"
343349
expect(error.message).toBe('1 step and 1 hook have not been run yet');
@@ -351,7 +357,7 @@ describe('WorkflowSuspension', () => {
351357
token: 'webhook-token',
352358
},
353359
];
354-
const error = new WorkflowSuspension(hooks, globalThis);
360+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
355361

356362
expect(error.message).toBe('1 hook has not been created yet');
357363
});
@@ -364,7 +370,7 @@ describe('WorkflowSuspension', () => {
364370
token: 'my-token',
365371
},
366372
];
367-
const error = new WorkflowSuspension(hooks, globalThis);
373+
const error = new WorkflowSuspension(toQueueMap(hooks), globalThis);
368374

369375
expect(error.message).toBe('1 hook has not been created yet');
370376
});

packages/core/src/global.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,19 @@ export class WorkflowSuspension extends Error {
4040
hookCount: number;
4141
waitCount: number;
4242

43-
constructor(steps: QueueItem[], global: typeof globalThis) {
44-
const stepCount = steps.filter((s) => s.type === 'step').length;
45-
const hookCount = steps.filter((s) => s.type === 'hook').length;
46-
const waitCount = steps.filter((s) => s.type === 'wait').length;
43+
constructor(stepsInput: Map<string, QueueItem>, global: typeof globalThis) {
44+
// Convert Map to array for iteration and storage
45+
const steps = [...stepsInput.values()];
46+
47+
// Single-pass counting for efficiency
48+
let stepCount = 0;
49+
let hookCount = 0;
50+
let waitCount = 0;
51+
for (const item of steps) {
52+
if (item.type === 'step') stepCount++;
53+
else if (item.type === 'hook') hookCount++;
54+
else if (item.type === 'wait') waitCount++;
55+
}
4756

4857
// Build description parts
4958
const parts: string[] = [];

packages/core/src/private.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ export { __private_getClosureVars } from './step/get-closure-vars.js';
3838
export interface WorkflowOrchestratorContext {
3939
globalThis: typeof globalThis;
4040
eventsConsumer: EventsConsumer;
41-
invocationsQueue: QueueItem[];
41+
/**
42+
* Map of pending invocations keyed by correlationId.
43+
* Using Map instead of Array for O(1) lookup/delete operations.
44+
*/
45+
invocationsQueue: Map<string, QueueItem>;
4246
onWorkflowError: (error: Error) => void;
4347
generateUlid: () => string;
4448
generateNanoid: () => string;

packages/core/src/step.test.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
2020
return {
2121
globalThis: context.globalThis,
2222
eventsConsumer: new EventsConsumer(events),
23-
invocationsQueue: [],
23+
invocationsQueue: new Map(),
2424
generateUlid: () => ulid(workflowStartedAt), // All generated ulids use the workflow's started at time
2525
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
2626
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
@@ -99,7 +99,10 @@ describe('createUseStep', () => {
9999
expect((error as WorkflowSuspension).message).toBe(
100100
'1 step has not been run yet'
101101
);
102-
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
102+
// Compare Map values with WorkflowSuspension.steps array
103+
expect([...ctx.invocationsQueue.values()]).toEqual(
104+
(error as WorkflowSuspension).steps
105+
);
103106
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
104107
[
105108
{
@@ -142,7 +145,10 @@ describe('createUseStep', () => {
142145
expect((error as WorkflowSuspension).message).toBe(
143146
'3 steps have not been run yet'
144147
);
145-
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
148+
// Compare Map values with WorkflowSuspension.steps array
149+
expect([...ctx.invocationsQueue.values()]).toEqual(
150+
(error as WorkflowSuspension).steps
151+
);
146152
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
147153
[
148154
{
@@ -228,8 +234,9 @@ describe('createUseStep', () => {
228234
expect(result).toBe('Result: 42');
229235

230236
// Verify closure variables were added to invocation queue
231-
expect(ctx.invocationsQueue).toHaveLength(1);
232-
expect(ctx.invocationsQueue[0]).toMatchObject({
237+
expect(ctx.invocationsQueue.size).toBe(1);
238+
const queueItem = [...ctx.invocationsQueue.values()][0];
239+
expect(queueItem).toMatchObject({
233240
type: 'step',
234241
stepName: 'calculate',
235242
args: [],
@@ -263,8 +270,9 @@ describe('createUseStep', () => {
263270
expect(result).toBe(5);
264271

265272
// Verify empty closure variables were added to invocation queue
266-
expect(ctx.invocationsQueue).toHaveLength(1);
267-
expect(ctx.invocationsQueue[0]).toMatchObject({
273+
expect(ctx.invocationsQueue.size).toBe(1);
274+
const queueItem = [...ctx.invocationsQueue.values()][0];
275+
expect(queueItem).toMatchObject({
268276
type: 'step',
269277
stepName: 'add',
270278
args: [2, 3],

packages/core/src/step.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
3030
queueItem.closureVars = closureVars;
3131
}
3232

33-
ctx.invocationsQueue.push(queueItem);
33+
ctx.invocationsQueue.set(correlationId, queueItem);
3434

3535
// Track whether we've already seen a "step_started" event for this step.
3636
// This is important because after a retryable failure, the step moves back to
@@ -73,13 +73,8 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
7373
if (event.eventType === 'step_started') {
7474
// Step has started - so remove from the invocations queue (only on the first "step_started" event)
7575
if (!hasSeenStepStarted) {
76-
const invocationsQueueIndex = ctx.invocationsQueue.findIndex(
77-
(invocation) =>
78-
invocation.type === 'step' &&
79-
invocation.correlationId === correlationId
80-
);
81-
if (invocationsQueueIndex !== -1) {
82-
ctx.invocationsQueue.splice(invocationsQueueIndex, 1);
76+
// O(1) lookup and delete using Map
77+
ctx.invocationsQueue.delete(correlationId);
8378
} else {
8479
setTimeout(() => {
8580
reject(

packages/core/src/workflow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export async function runWorkflow(
7676
eventsConsumer: new EventsConsumer(events),
7777
generateUlid: () => ulid(+startedAt),
7878
generateNanoid,
79-
invocationsQueue: [],
79+
invocationsQueue: new Map(),
8080
};
8181

8282
// Subscribe to the events log to update the timestamp in the vm context

packages/core/src/workflow/hook.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
1313
const correlationId = `hook_${ctx.generateUlid()}`;
1414
const token = options.token ?? ctx.generateNanoid();
1515

16-
// Add hook creation to invocations queue
17-
ctx.invocationsQueue.push({
16+
// Add hook creation to invocations queue (using Map for O(1) operations)
17+
ctx.invocationsQueue.set(correlationId, {
1818
type: 'hook',
1919
correlationId,
2020
token,
@@ -52,13 +52,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
5252
event?.eventType === 'hook_created' &&
5353
event.correlationId === correlationId
5454
) {
55-
// Remove this hook from the invocations queue if it exists
56-
const index = ctx.invocationsQueue.findIndex(
57-
(item) => item.type === 'hook' && item.correlationId === correlationId
58-
);
59-
if (index !== -1) {
60-
ctx.invocationsQueue.splice(index, 1);
61-
}
55+
// Remove this hook from the invocations queue (O(1) delete using Map)
56+
ctx.invocationsQueue.delete(correlationId);
6257
return EventConsumerResult.Consumed;
6358
}
6459

packages/core/src/workflow/sleep.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
1414
// Calculate the resume time
1515
const resumeAt = parseDurationToDate(param);
1616

17-
// Add wait to invocations queue
18-
ctx.invocationsQueue.push({
17+
// Add wait to invocations queue (using Map for O(1) operations)
18+
const waitItem: WaitInvocationQueueItem = {
1919
type: 'wait',
2020
correlationId,
2121
resumeAt,
22-
});
22+
};
23+
ctx.invocationsQueue.set(correlationId, waitItem);
2324

2425
ctx.eventsConsumer.subscribe((event) => {
2526
// If there are no events and we're waiting for wait_completed,
@@ -39,12 +40,11 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
3940
event.correlationId === correlationId
4041
) {
4142
// Mark this wait as having the created event, but keep it in the queue
42-
const waitItem = ctx.invocationsQueue.find(
43-
(item) => item.type === 'wait' && item.correlationId === correlationId
44-
) as WaitInvocationQueueItem | undefined;
45-
if (waitItem) {
46-
waitItem.hasCreatedEvent = true;
47-
waitItem.resumeAt = event.eventData.resumeAt;
43+
// O(1) lookup using Map
44+
const queueItem = ctx.invocationsQueue.get(correlationId);
45+
if (queueItem && queueItem.type === 'wait') {
46+
queueItem.hasCreatedEvent = true;
47+
queueItem.resumeAt = event.eventData.resumeAt;
4848
}
4949
return EventConsumerResult.Consumed;
5050
}
@@ -54,13 +54,8 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
5454
event?.eventType === 'wait_completed' &&
5555
event.correlationId === correlationId
5656
) {
57-
// Remove this wait from the invocations queue
58-
const index = ctx.invocationsQueue.findIndex(
59-
(item) => item.type === 'wait' && item.correlationId === correlationId
60-
);
61-
if (index !== -1) {
62-
ctx.invocationsQueue.splice(index, 1);
63-
}
57+
// Remove this wait from the invocations queue (O(1) delete using Map)
58+
ctx.invocationsQueue.delete(correlationId);
6459

6560
// Wait has elapsed, resolve the sleep
6661
setTimeout(() => {

0 commit comments

Comments
 (0)