diff --git a/.changeset/slick-cooks-clean.md b/.changeset/slick-cooks-clean.md new file mode 100644 index 000000000..9f69ec1c2 --- /dev/null +++ b/.changeset/slick-cooks-clean.md @@ -0,0 +1,7 @@ +--- +"@workflow/core": patch +--- + +perf: use Map for invocationsQueue (O(1) lookup/delete) + +Replace array-based invocationsQueue with Map for O(1) lookup and delete operations, eliminating O(n²) complexity in high-concurrency workflows. diff --git a/packages/core/src/global.test.ts b/packages/core/src/global.test.ts index 3759a0310..fda309f4f 100644 --- a/packages/core/src/global.test.ts +++ b/packages/core/src/global.test.ts @@ -2,10 +2,16 @@ import { FatalError } from '@workflow/errors'; import { describe, expect, it } from 'vitest'; import { type HookInvocationQueueItem, + type QueueItem, type StepInvocationQueueItem, WorkflowSuspension, } from './global.js'; +// Helper to convert array of queue items to Map keyed by correlationId +function toQueueMap(items: QueueItem[]): Map { + return new Map(items.map((item) => [item.correlationId, item])); +} + describe('FatalError', () => { it('should create a FatalError instance', () => { const error = new FatalError('Test fatal error'); @@ -43,7 +49,7 @@ describe('WorkflowSuspension', () => { correlationId: 'inv-1', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error).toBeInstanceOf(WorkflowSuspension); expect(error).toBeInstanceOf(Error); @@ -60,7 +66,7 @@ describe('WorkflowSuspension', () => { correlationId: 'inv-1', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error.message).toBe('1 step has not been run yet'); }); @@ -80,14 +86,14 @@ describe('WorkflowSuspension', () => { correlationId: 'inv-2', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error.message).toBe('2 steps have not been run yet'); }); it('should handle empty steps array', () => { const steps: StepInvocationQueueItem[] = []; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error.steps).toEqual([]); expect(error.message).toBe('0 steps have not been run yet'); @@ -115,7 +121,7 @@ describe('WorkflowSuspension', () => { correlationId: 'another-inv', }, ]; - const error = new WorkflowSuspension(complexSteps, globalThis); + const error = new WorkflowSuspension(toQueueMap(complexSteps), globalThis); expect(error.steps).toEqual(complexSteps); expect(error.message).toBe('2 steps have not been run yet'); @@ -142,7 +148,7 @@ describe('WorkflowSuspension', () => { correlationId: 'inv-1', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error instanceof Error).toBe(true); expect(error instanceof WorkflowSuspension).toBe(true); @@ -158,7 +164,7 @@ describe('WorkflowSuspension', () => { correlationId: 'inv-1', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error.stack).toBeDefined(); expect(error.stack).toContain('WorkflowSuspension'); @@ -179,7 +185,7 @@ describe('WorkflowSuspension', () => { correlationId: 'email-456', }, ]; - const error = new WorkflowSuspension(steps, globalThis); + const error = new WorkflowSuspension(toQueueMap(steps), globalThis); expect(error.steps).toHaveLength(2); expect((error.steps[0] as StepInvocationQueueItem).stepName).toBe( @@ -212,7 +218,7 @@ describe('WorkflowSuspension', () => { token: 'webhook-token', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('1 hook has not been created yet'); expect(error.hookCount).toBe(1); @@ -231,7 +237,7 @@ describe('WorkflowSuspension', () => { token: 'webhook-token-2', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('2 hooks have not been created yet'); expect(error.hookCount).toBe(2); @@ -245,7 +251,7 @@ describe('WorkflowSuspension', () => { token: 'my-token', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('1 hook has not been created yet'); expect(error.hookCount).toBe(1); @@ -264,7 +270,7 @@ describe('WorkflowSuspension', () => { token: 'token-2', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('2 hooks have not been created yet'); expect(error.hookCount).toBe(2); @@ -289,7 +295,7 @@ describe('WorkflowSuspension', () => { token: 'my-token', }, ]; - const error = new WorkflowSuspension(items, globalThis); + const error = new WorkflowSuspension(toQueueMap(items), globalThis); expect(error.message).toBe('1 step and 2 hooks have not been run yet'); expect(error.stepCount).toBe(1); @@ -316,7 +322,7 @@ describe('WorkflowSuspension', () => { token: 'webhook-token', }, ]; - const error = new WorkflowSuspension(items, globalThis); + const error = new WorkflowSuspension(toQueueMap(items), globalThis); expect(error.message).toBe('2 steps and 1 hook have not been run yet'); expect(error.stepCount).toBe(2); @@ -337,7 +343,7 @@ describe('WorkflowSuspension', () => { token: 'my-token', }, ]; - const error = new WorkflowSuspension(items, globalThis); + const error = new WorkflowSuspension(toQueueMap(items), globalThis); // When there are steps, the action should be "run" not "created" expect(error.message).toBe('1 step and 1 hook have not been run yet'); @@ -351,7 +357,7 @@ describe('WorkflowSuspension', () => { token: 'webhook-token', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('1 hook has not been created yet'); }); @@ -364,7 +370,7 @@ describe('WorkflowSuspension', () => { token: 'my-token', }, ]; - const error = new WorkflowSuspension(hooks, globalThis); + const error = new WorkflowSuspension(toQueueMap(hooks), globalThis); expect(error.message).toBe('1 hook has not been created yet'); }); diff --git a/packages/core/src/global.ts b/packages/core/src/global.ts index 3d4d0eda5..17a680dc4 100644 --- a/packages/core/src/global.ts +++ b/packages/core/src/global.ts @@ -40,10 +40,19 @@ export class WorkflowSuspension extends Error { hookCount: number; waitCount: number; - constructor(steps: QueueItem[], global: typeof globalThis) { - const stepCount = steps.filter((s) => s.type === 'step').length; - const hookCount = steps.filter((s) => s.type === 'hook').length; - const waitCount = steps.filter((s) => s.type === 'wait').length; + constructor(stepsInput: Map, global: typeof globalThis) { + // Convert Map to array for iteration and storage + const steps = [...stepsInput.values()]; + + // Single-pass counting for efficiency + let stepCount = 0; + let hookCount = 0; + let waitCount = 0; + for (const item of steps) { + if (item.type === 'step') stepCount++; + else if (item.type === 'hook') hookCount++; + else if (item.type === 'wait') waitCount++; + } // Build description parts const parts: string[] = []; diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index 2d284e467..d893b459f 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -38,7 +38,11 @@ export { __private_getClosureVars } from './step/get-closure-vars.js'; export interface WorkflowOrchestratorContext { globalThis: typeof globalThis; eventsConsumer: EventsConsumer; - invocationsQueue: QueueItem[]; + /** + * Map of pending invocations keyed by correlationId. + * Using Map instead of Array for O(1) lookup/delete operations. + */ + invocationsQueue: Map; onWorkflowError: (error: Error) => void; generateUlid: () => string; generateNanoid: () => string; diff --git a/packages/core/src/step.test.ts b/packages/core/src/step.test.ts index 8d2bdd6d3..cd49e9c6c 100644 --- a/packages/core/src/step.test.ts +++ b/packages/core/src/step.test.ts @@ -20,7 +20,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { return { globalThis: context.globalThis, eventsConsumer: new EventsConsumer(events), - invocationsQueue: [], + invocationsQueue: new Map(), generateUlid: () => ulid(workflowStartedAt), // All generated ulids use the workflow's started at time generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) => new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) @@ -99,7 +99,10 @@ describe('createUseStep', () => { expect((error as WorkflowSuspension).message).toBe( '1 step has not been run yet' ); - expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps); + // Compare Map values with WorkflowSuspension.steps array + expect([...ctx.invocationsQueue.values()]).toEqual( + (error as WorkflowSuspension).steps + ); expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(` [ { @@ -142,7 +145,10 @@ describe('createUseStep', () => { expect((error as WorkflowSuspension).message).toBe( '3 steps have not been run yet' ); - expect(ctx.invocationsQueue).toEqual((error as WorkflowSuspension).steps); + // Compare Map values with WorkflowSuspension.steps array + expect([...ctx.invocationsQueue.values()]).toEqual( + (error as WorkflowSuspension).steps + ); expect((error as WorkflowSuspension).steps).toMatchInlineSnapshot(` [ { @@ -228,8 +234,9 @@ describe('createUseStep', () => { expect(result).toBe('Result: 42'); // Verify closure variables were added to invocation queue - expect(ctx.invocationsQueue).toHaveLength(1); - expect(ctx.invocationsQueue[0]).toMatchObject({ + expect(ctx.invocationsQueue.size).toBe(1); + const queueItem = [...ctx.invocationsQueue.values()][0]; + expect(queueItem).toMatchObject({ type: 'step', stepName: 'calculate', args: [], @@ -263,8 +270,9 @@ describe('createUseStep', () => { expect(result).toBe(5); // Verify empty closure variables were added to invocation queue - expect(ctx.invocationsQueue).toHaveLength(1); - expect(ctx.invocationsQueue[0]).toMatchObject({ + expect(ctx.invocationsQueue.size).toBe(1); + const queueItem = [...ctx.invocationsQueue.values()][0]; + expect(queueItem).toMatchObject({ type: 'step', stepName: 'add', args: [2, 3], diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index f5d0240cb..10238dc4f 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -30,7 +30,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { queueItem.closureVars = closureVars; } - ctx.invocationsQueue.push(queueItem); + ctx.invocationsQueue.set(correlationId, queueItem); // Track whether we've already seen a "step_started" event for this step. // This is important because after a retryable failure, the step moves back to @@ -73,13 +73,8 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { if (event.eventType === 'step_started') { // Step has started - so remove from the invocations queue (only on the first "step_started" event) if (!hasSeenStepStarted) { - const invocationsQueueIndex = ctx.invocationsQueue.findIndex( - (invocation) => - invocation.type === 'step' && - invocation.correlationId === correlationId - ); - if (invocationsQueueIndex !== -1) { - ctx.invocationsQueue.splice(invocationsQueueIndex, 1); + // O(1) lookup and delete using Map + ctx.invocationsQueue.delete(correlationId); } else { setTimeout(() => { reject( diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 542d97a9c..f2d125425 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -76,7 +76,7 @@ export async function runWorkflow( eventsConsumer: new EventsConsumer(events), generateUlid: () => ulid(+startedAt), generateNanoid, - invocationsQueue: [], + invocationsQueue: new Map(), }; // Subscribe to the events log to update the timestamp in the vm context diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index aeaf64016..5219c6d58 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -13,8 +13,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { const correlationId = `hook_${ctx.generateUlid()}`; const token = options.token ?? ctx.generateNanoid(); - // Add hook creation to invocations queue - ctx.invocationsQueue.push({ + // Add hook creation to invocations queue (using Map for O(1) operations) + ctx.invocationsQueue.set(correlationId, { type: 'hook', correlationId, token, @@ -52,13 +52,8 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { event?.eventType === 'hook_created' && event.correlationId === correlationId ) { - // Remove this hook from the invocations queue if it exists - const index = ctx.invocationsQueue.findIndex( - (item) => item.type === 'hook' && item.correlationId === correlationId - ); - if (index !== -1) { - ctx.invocationsQueue.splice(index, 1); - } + // Remove this hook from the invocations queue (O(1) delete using Map) + ctx.invocationsQueue.delete(correlationId); return EventConsumerResult.Consumed; } diff --git a/packages/core/src/workflow/sleep.ts b/packages/core/src/workflow/sleep.ts index 0b16fd371..dc9df7734 100644 --- a/packages/core/src/workflow/sleep.ts +++ b/packages/core/src/workflow/sleep.ts @@ -14,12 +14,13 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { // Calculate the resume time const resumeAt = parseDurationToDate(param); - // Add wait to invocations queue - ctx.invocationsQueue.push({ + // Add wait to invocations queue (using Map for O(1) operations) + const waitItem: WaitInvocationQueueItem = { type: 'wait', correlationId, resumeAt, - }); + }; + ctx.invocationsQueue.set(correlationId, waitItem); ctx.eventsConsumer.subscribe((event) => { // If there are no events and we're waiting for wait_completed, @@ -39,12 +40,11 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { event.correlationId === correlationId ) { // Mark this wait as having the created event, but keep it in the queue - const waitItem = ctx.invocationsQueue.find( - (item) => item.type === 'wait' && item.correlationId === correlationId - ) as WaitInvocationQueueItem | undefined; - if (waitItem) { - waitItem.hasCreatedEvent = true; - waitItem.resumeAt = event.eventData.resumeAt; + // O(1) lookup using Map + const queueItem = ctx.invocationsQueue.get(correlationId); + if (queueItem && queueItem.type === 'wait') { + queueItem.hasCreatedEvent = true; + queueItem.resumeAt = event.eventData.resumeAt; } return EventConsumerResult.Consumed; } @@ -54,13 +54,8 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { event?.eventType === 'wait_completed' && event.correlationId === correlationId ) { - // Remove this wait from the invocations queue - const index = ctx.invocationsQueue.findIndex( - (item) => item.type === 'wait' && item.correlationId === correlationId - ); - if (index !== -1) { - ctx.invocationsQueue.splice(index, 1); - } + // Remove this wait from the invocations queue (O(1) delete using Map) + ctx.invocationsQueue.delete(correlationId); // Wait has elapsed, resolve the sleep setTimeout(() => {