Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/brave-walls-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@workflow/world-local": patch
---

perf: optimize for high-concurrency workflows

- Add in-memory cache for file existence checks to avoid expensive fs.access() calls
- Increase default concurrency limit from 20 to 100
- Improve HTTP connection pooling with undici Agent (100 connections, 30s keepalive)
85 changes: 85 additions & 0 deletions packages/core/e2e/bench.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,89 @@ describe('Workflow Performance Benchmarks', () => {
},
{ time: 5000, warmupIterations: 1, teardown }
);

// Stress tests for large concurrent step counts
// These reproduce reported issues with Promise.race/Promise.all at scale

bench(
'stress test: Promise.all with 100 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[100]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 100 concurrent steps', run);
},
{ time: 30000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.all with 500 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[500]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 500 concurrent steps', run);
},
{ time: 60000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.all with 1000 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[1000]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 1000 concurrent steps', run);
},
{ time: 120000, iterations: 1, warmupIterations: 0, teardown }
);

bench(
'stress test: Promise.race with 100 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[100]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 100 concurrent steps', run);
},
{ time: 30000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.race with 500 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[500]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 500 concurrent steps', run);
},
{ time: 60000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be stress testing for Promise.allSettled?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add more in a future PR but should be no different than Promise.all where everything succeeds

Will need to add a new step that throws errors too, to stress test retrying steps that fail

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna make sure these tests are performant with the incoming PRs, and make sure they don't break (hence why I disabled 500 and 1k) and then will add more teats

bench.skip(
'stress test: Promise.race with 1000 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[1000]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 1000 concurrent steps', run);
},
{ time: 120000, iterations: 1, warmupIterations: 0, teardown }
);
});
25 changes: 25 additions & 0 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ const ulid = monotonicFactory(() => Math.random());

const Ulid = z.string().ulid();

// In-memory cache of created files to avoid expensive fs.access() calls
// This is safe because we only write once per file path (no overwrites without explicit flag)
const createdFilesCache = new Set<string>();

/**
* Clear the created files cache. Useful for testing or when files are deleted externally.
*/
export function clearCreatedFilesCache(): void {
createdFilesCache.clear();
}

export function ulidToDate(maybeUlid: string): Date | null {
const ulid = Ulid.safeParse(maybeUlid);
if (!ulid.success) {
Expand Down Expand Up @@ -53,8 +64,20 @@ export async function write(
opts?: WriteOptions
): Promise<void> {
if (!opts?.overwrite) {
// Fast path: check in-memory cache first to avoid expensive fs.access() calls
// This provides significant performance improvement when creating many files
if (createdFilesCache.has(filePath)) {
throw new WorkflowAPIError(
`File ${filePath} already exists and 'overwrite' is false`,
{ status: 409 }
);
}

// Slow path: check filesystem for files created before this process started
try {
await fs.access(filePath);
// File exists on disk, add to cache for future checks
createdFilesCache.add(filePath);
throw new WorkflowAPIError(
`File ${filePath} already exists and 'overwrite' is false`,
{ status: 409 }
Expand All @@ -74,6 +97,8 @@ export async function write(
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await fs.rename(tempPath, filePath);
// Track this file in cache so future writes know it exists
createdFilesCache.add(filePath);
} catch (error) {
// Only try to clean up temp file if it was actually created
if (tempFileCreated) {
Expand Down
10 changes: 8 additions & 2 deletions packages/world-local/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ const LOCAL_QUEUE_MAX_VISIBILITY =

// The local workers share the same Node.js process and event loop,
// so we need to limit concurrency to avoid overwhelming the system.
const DEFAULT_CONCURRENCY_LIMIT = 20;
const DEFAULT_CONCURRENCY_LIMIT = 100;
const WORKFLOW_LOCAL_QUEUE_CONCURRENCY =
parseInt(process.env.WORKFLOW_LOCAL_QUEUE_CONCURRENCY ?? '0', 10) ||
DEFAULT_CONCURRENCY_LIMIT;

// Create a custom agent with unlimited headers timeout for long-running steps
// Create a custom agent optimized for high-concurrency local workflows:
// - headersTimeout: 0 allows long-running steps
// - connections: 100 allows many parallel connections to the same host
// - pipelining: 1 (default) for HTTP/1.1 compatibility
// - keepAliveTimeout: 30s keeps connections warm for rapid step execution
const httpAgent = new Agent({
headersTimeout: 0,
connections: 100,
keepAliveTimeout: 30_000,
});

export function createQueue(config: Partial<Config>): Queue {
Expand Down
39 changes: 39 additions & 0 deletions workbench/example/workflows/97_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,42 @@ export async function streamWorkflow() {
const doubled = await doubleNumbers(stream);
return doubled;
}

//////////////////////////////////////////////////////////
// Stress test workflows for large concurrent step counts
//////////////////////////////////////////////////////////

async function stressTestStep(i: number) {
'use step';
// Minimal work to isolate the overhead of concurrent step tracking
return i;
}

// Stress test: Promise.all with many concurrent steps
export async function promiseAllStressTestWorkflow(count: number) {
'use workflow';
const promises: Promise<number>[] = [];
for (let i = 0; i < count; i++) {
promises.push(stressTestStep(i));
}
const results = await Promise.all(promises);
return results.length;
}

// Stress test: Promise.race with many concurrent steps (uses Map pattern from report)
export async function promiseRaceStressTestLargeWorkflow(count: number) {
'use workflow';
const runningTasks = new Map<number, Promise<number>>();
for (let i = 0; i < count; i++) {
runningTasks.set(i, stressTestStep(i));
}

const done: number[] = [];
while (runningTasks.size > 0) {
const result = await Promise.race(runningTasks.values());
done.push(result);
runningTasks.delete(result);
Comment on lines +121 to +123
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Promise.race pattern has a logical bug. The Map is keyed by index i, but runningTasks.delete(result) attempts to delete using the result value (which equals i since stressTestStep(i) returns i). While this happens to work when result equals the key, it creates a tight coupling and will fail if the step returns any value different from its input.

Additionally, this doesn't correctly track which promise completed. Since multiple tasks can have the same result value, deleting by result could remove the wrong task or fail to remove any task if the result doesn't match a key.

Consider using Promise.race with an array of objects that include both the promise and its key:

const runningTasks = new Map<number, Promise<number>>();
for (let i = 0; i < count; i++) {
  runningTasks.set(i, stressTestStep(i));
}

const done: number[] = [];
while (runningTasks.size > 0) {
  const entries = Array.from(runningTasks.entries());
  const racePromises = entries.map(([key, promise]) => 
    promise.then(result => ({ key, result }))
  );
  const { key, result } = await Promise.race(racePromises);
  done.push(result);
  runningTasks.delete(key);
}
Suggested change
const result = await Promise.race(runningTasks.values());
done.push(result);
runningTasks.delete(result);
const entries = Array.from(runningTasks.entries());
const racePromises = entries.map(([key, promise]) =>
promise.then(result => ({ key, result }))
);
const { key, result } = await Promise.race(racePromises);
done.push(result);
runningTasks.delete(key);

Copilot uses AI. Check for mistakes.
}

return done.length;
}
Loading