Skip to content

Commit bdefce0

Browse files
pranaygpclaude
andcommitted
perf: use Map for invocationsQueue (O(1) lookup/delete)
Replace array-based invocationsQueue with Map<string, QueueItem> for O(1) lookup and delete operations. This eliminates the O(n²) complexity from findIndex + splice patterns in step.ts, hook.ts, and sleep.ts. Changes: - private.ts: Change invocationsQueue type from QueueItem[] to Map - workflow.ts: Initialize with new Map() instead of [] - global.ts: Accept Map or array in WorkflowSuspension, add single-pass counting - step.ts: Use Map.set/has/delete instead of push/findIndex/splice - hook.ts: Use Map.set/delete instead of push/findIndex/splice - sleep.ts: Use Map.set/get/delete instead of push/find/findIndex/splice This is Phase 1 of the high-concurrency workflow optimization plan. See beads issue wrk-fyx for details. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 48b3a12 commit bdefce0

File tree

8 files changed

+64
-46
lines changed

8 files changed

+64
-46
lines changed

.changeset/slick-cooks-clean.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
perf: use Map for invocationsQueue (O(1) lookup/delete)
6+
7+
Replace array-based invocationsQueue with Map for O(1) lookup and delete operations, eliminating O(n²) complexity in high-concurrency workflows.

packages/core/src/global.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,23 @@ export class WorkflowSuspension extends Error {
4040
hookCount: number;
4141
waitCount: number;
4242

43-
constructor(steps: QueueItem[], global: typeof globalThis) {
44-
const stepCount = steps.filter((s) => s.type === 'step').length;
45-
const hookCount = steps.filter((s) => s.type === 'hook').length;
46-
const waitCount = steps.filter((s) => s.type === 'wait').length;
43+
constructor(
44+
stepsInput: QueueItem[] | Map<string, QueueItem>,
45+
global: typeof globalThis
46+
) {
47+
// Convert Map to array for backward compatibility
48+
const steps =
49+
stepsInput instanceof Map ? [...stepsInput.values()] : stepsInput;
50+
51+
// Single-pass counting for efficiency
52+
let stepCount = 0;
53+
let hookCount = 0;
54+
let waitCount = 0;
55+
for (const item of steps) {
56+
if (item.type === 'step') stepCount++;
57+
else if (item.type === 'hook') hookCount++;
58+
else if (item.type === 'wait') waitCount++;
59+
}
4760

4861
// Build description parts
4962
const parts: string[] = [];

packages/core/src/private.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ export { __private_getClosureVars } from './step/get-closure-vars.js';
3838
export interface WorkflowOrchestratorContext {
3939
globalThis: typeof globalThis;
4040
eventsConsumer: EventsConsumer;
41-
invocationsQueue: QueueItem[];
41+
/**
42+
* Map of pending invocations keyed by correlationId.
43+
* Using Map instead of Array for O(1) lookup/delete operations.
44+
*/
45+
invocationsQueue: Map<string, QueueItem>;
4246
onWorkflowError: (error: Error) => void;
4347
generateUlid: () => string;
4448
generateNanoid: () => string;

packages/core/src/step.test.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
2020
return {
2121
globalThis: context.globalThis,
2222
eventsConsumer: new EventsConsumer(events),
23-
invocationsQueue: [],
23+
invocationsQueue: new Map(),
2424
generateUlid: () => ulid(workflowStartedAt), // All generated ulids use the workflow's started at time
2525
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
2626
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
@@ -99,7 +99,10 @@ describe('createUseStep', () => {
9999
expect((error as WorkflowSuspension).message).toBe(
100100
'1 step has not been run yet'
101101
);
102-
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
102+
// Compare Map values with WorkflowSuspension.steps array
103+
expect([...ctx.invocationsQueue.values()]).toEqual(
104+
(error as WorkflowSuspension).steps
105+
);
103106
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
104107
[
105108
{
@@ -142,7 +145,10 @@ describe('createUseStep', () => {
142145
expect((error as WorkflowSuspension).message).toBe(
143146
'3 steps have not been run yet'
144147
);
145-
expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps);
148+
// Compare Map values with WorkflowSuspension.steps array
149+
expect([...ctx.invocationsQueue.values()]).toEqual(
150+
(error as WorkflowSuspension).steps
151+
);
146152
expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(`
147153
[
148154
{
@@ -228,8 +234,9 @@ describe('createUseStep', () => {
228234
expect(result).toBe('Result: 42');
229235

230236
// Verify closure variables were added to invocation queue
231-
expect(ctx.invocationsQueue).toHaveLength(1);
232-
expect(ctx.invocationsQueue[0]).toMatchObject({
237+
expect(ctx.invocationsQueue.size).toBe(1);
238+
const queueItem = [...ctx.invocationsQueue.values()][0];
239+
expect(queueItem).toMatchObject({
233240
type: 'step',
234241
stepName: 'calculate',
235242
args: [],
@@ -263,8 +270,9 @@ describe('createUseStep', () => {
263270
expect(result).toBe(5);
264271

265272
// Verify empty closure variables were added to invocation queue
266-
expect(ctx.invocationsQueue).toHaveLength(1);
267-
expect(ctx.invocationsQueue[0]).toMatchObject({
273+
expect(ctx.invocationsQueue.size).toBe(1);
274+
const queueItem = [...ctx.invocationsQueue.values()][0];
275+
expect(queueItem).toMatchObject({
268276
type: 'step',
269277
stepName: 'add',
270278
args: [2, 3],

packages/core/src/step.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
3030
queueItem.closureVars = closureVars;
3131
}
3232

33-
ctx.invocationsQueue.push(queueItem);
33+
ctx.invocationsQueue.set(correlationId, queueItem);
3434

3535
// Track whether we've already seen a "step_started" event for this step.
3636
// This is important because after a retryable failure, the step moves back to
@@ -73,13 +73,9 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
7373
if (event.eventType === 'step_started') {
7474
// Step has started - so remove from the invocations queue (only on the first "step_started" event)
7575
if (!hasSeenStepStarted) {
76-
const invocationsQueueIndex = ctx.invocationsQueue.findIndex(
77-
(invocation) =>
78-
invocation.type === 'step' &&
79-
invocation.correlationId === correlationId
80-
);
81-
if (invocationsQueueIndex !== -1) {
82-
ctx.invocationsQueue.splice(invocationsQueueIndex, 1);
76+
// O(1) lookup and delete using Map
77+
if (ctx.invocationsQueue.has(correlationId)) {
78+
ctx.invocationsQueue.delete(correlationId);
8379
} else {
8480
setTimeout(() => {
8581
reject(

packages/core/src/workflow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export async function runWorkflow(
7676
eventsConsumer: new EventsConsumer(events),
7777
generateUlid: () => ulid(+startedAt),
7878
generateNanoid,
79-
invocationsQueue: [],
79+
invocationsQueue: new Map(),
8080
};
8181

8282
// Subscribe to the events log to update the timestamp in the vm context

packages/core/src/workflow/hook.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
1313
const correlationId = `hook_${ctx.generateUlid()}`;
1414
const token = options.token ?? ctx.generateNanoid();
1515

16-
// Add hook creation to invocations queue
17-
ctx.invocationsQueue.push({
16+
// Add hook creation to invocations queue (using Map for O(1) operations)
17+
ctx.invocationsQueue.set(correlationId, {
1818
type: 'hook',
1919
correlationId,
2020
token,
@@ -52,13 +52,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) {
5252
event?.eventType === 'hook_created' &&
5353
event.correlationId === correlationId
5454
) {
55-
// Remove this hook from the invocations queue if it exists
56-
const index = ctx.invocationsQueue.findIndex(
57-
(item) => item.type === 'hook' && item.correlationId === correlationId
58-
);
59-
if (index !== -1) {
60-
ctx.invocationsQueue.splice(index, 1);
61-
}
55+
// Remove this hook from the invocations queue (O(1) delete using Map)
56+
ctx.invocationsQueue.delete(correlationId);
6257
return EventConsumerResult.Consumed;
6358
}
6459

packages/core/src/workflow/sleep.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
1414
// Calculate the resume time
1515
const resumeAt = parseDurationToDate(param);
1616

17-
// Add wait to invocations queue
18-
ctx.invocationsQueue.push({
17+
// Add wait to invocations queue (using Map for O(1) operations)
18+
const waitItem: WaitInvocationQueueItem = {
1919
type: 'wait',
2020
correlationId,
2121
resumeAt,
22-
});
22+
};
23+
ctx.invocationsQueue.set(correlationId, waitItem);
2324

2425
ctx.eventsConsumer.subscribe((event) => {
2526
// If there are no events and we're waiting for wait_completed,
@@ -39,12 +40,11 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
3940
event.correlationId === correlationId
4041
) {
4142
// Mark this wait as having the created event, but keep it in the queue
42-
const waitItem = ctx.invocationsQueue.find(
43-
(item) => item.type === 'wait' && item.correlationId === correlationId
44-
) as WaitInvocationQueueItem | undefined;
45-
if (waitItem) {
46-
waitItem.hasCreatedEvent = true;
47-
waitItem.resumeAt = event.eventData.resumeAt;
43+
// O(1) lookup using Map
44+
const queueItem = ctx.invocationsQueue.get(correlationId);
45+
if (queueItem && queueItem.type === 'wait') {
46+
queueItem.hasCreatedEvent = true;
47+
queueItem.resumeAt = event.eventData.resumeAt;
4848
}
4949
return EventConsumerResult.Consumed;
5050
}
@@ -54,13 +54,8 @@ export function createSleep(ctx: WorkflowOrchestratorContext) {
5454
event?.eventType === 'wait_completed' &&
5555
event.correlationId === correlationId
5656
) {
57-
// Remove this wait from the invocations queue
58-
const index = ctx.invocationsQueue.findIndex(
59-
(item) => item.type === 'wait' && item.correlationId === correlationId
60-
);
61-
if (index !== -1) {
62-
ctx.invocationsQueue.splice(index, 1);
63-
}
57+
// Remove this wait from the invocations queue (O(1) delete using Map)
58+
ctx.invocationsQueue.delete(correlationId);
6459

6560
// Wait has elapsed, resolve the sleep
6661
setTimeout(() => {

0 commit comments

Comments
 (0)