Skip to content

Commit d2709e6

Browse files
committed
add graphile queue driver
1 parent baf9c70 commit d2709e6

File tree

6 files changed

+354
-39
lines changed

6 files changed

+354
-39
lines changed

packages/utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@types/ms": "2.1.0",
3838
"@types/node": "catalog:",
3939
"@workflow/tsconfig": "workspace:*",
40+
"execa": "9.6.1",
4041
"vitest": "catalog:"
4142
},
4243
"dependencies": {

packages/world-postgres/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
"cbor-x": "1.6.0",
5353
"dotenv": "16.4.5",
5454
"drizzle-orm": "0.44.7",
55+
"graphile-worker": "0.16.6",
5556
"pg-boss": "11.0.7",
5657
"postgres": "3.4.7",
5758
"ulid": "3.0.1",

packages/world-postgres/src/index.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@ import { createFunctionProxy } from './proxies/function-proxy.js';
66
import { createHttpProxy } from './proxies/http-proxy.js';
77
import { createQueue } from './queue.js';
88
import {
9+
createGraphileWorkerFunctionProxyQueue,
10+
createGraphileWorkerHttpProxyQueue,
911
createPgBossFunctionProxyQueue,
1012
createPgBossHttpProxyQueue,
1113
} from './queue-drivers/factories.js';
14+
import { createGraphileWorkerQueue } from './queue-drivers/graphile.js';
1215
import { createPgBossQueue } from './queue-drivers/pgboss.js';
16+
import type { QueueDriver } from './queue-drivers/types.js';
1317
import {
1418
createEventsStorage,
1519
createHooksStorage,
@@ -18,14 +22,30 @@ import {
1822
} from './storage.js';
1923
import { createStreamer } from './streamer.js';
2024

25+
/**
26+
* Get the default queue factory based on WORKFLOW_QUEUE_DRIVER env var.
27+
* Defaults to pg-boss for backwards compatibility.
28+
*
29+
* Set WORKFLOW_QUEUE_DRIVER=graphile to use Graphile Worker.
30+
*/
31+
function getDefaultQueueFactory(): () => QueueDriver {
32+
const driver = process.env.WORKFLOW_QUEUE_DRIVER || 'pgboss';
33+
34+
if (driver === 'graphile') {
35+
return createGraphileWorkerHttpProxyQueue;
36+
}
37+
38+
return createPgBossHttpProxyQueue;
39+
}
40+
2141
export function createWorld(
2242
opts: PostgresWorldConfig = {}
2343
): World & { start(): Promise<void> } {
2444
const config = loadWorldConfig(opts);
2545

2646
const queueDriver = opts.queueFactory
2747
? opts.queueFactory()
28-
: createPgBossHttpProxyQueue();
48+
: getDefaultQueueFactory()();
2949

3050
const postgres = createPostgres(config.connectionString);
3151
const drizzle = createClient(postgres);
@@ -63,3 +83,8 @@ export {
6383
createPgBossFunctionProxyQueue,
6484
createPgBossHttpProxyQueue,
6585
};
86+
export {
87+
createGraphileWorkerQueue,
88+
createGraphileWorkerFunctionProxyQueue,
89+
createGraphileWorkerHttpProxyQueue,
90+
};

packages/world-postgres/src/queue-drivers/factories.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { getQueueConfig, getWorldConfig } from '../config.js';
22
import { createFunctionProxy } from '../proxies/function-proxy.js';
33
import { createHttpProxy } from '../proxies/http-proxy.js';
4+
import { createGraphileWorkerQueue } from './graphile.js';
45
import { createPgBossQueue } from './pgboss.js';
56
import type { QueueDriver } from './types.js';
67

@@ -85,3 +86,91 @@ export function createPgBossHttpProxyQueue(
8586
})
8687
);
8788
}
89+
90+
/**
91+
* QueueDriver implementation using Graphile Worker for job management
92+
* and direct function calls for execution.
93+
*
94+
* Graphile Worker uses PostgreSQL LISTEN/NOTIFY for near-instant job pickup
95+
* (~3ms latency vs 500ms polling with pg-boss).
96+
*/
97+
export function createGraphileWorkerFunctionProxyQueue(opts: {
98+
jobPrefix?: string;
99+
securityToken?: string;
100+
connectionString?: string;
101+
queueConcurrency?: number;
102+
stepEntrypoint: (request: Request) => Promise<Response>;
103+
workflowEntrypoint: (request: Request) => Promise<Response>;
104+
}): QueueDriver {
105+
const worldDefaults = getWorldConfig();
106+
const queueDefaults = getQueueConfig();
107+
108+
const config = {
109+
connectionString: opts.connectionString ?? worldDefaults.connectionString,
110+
securityToken: opts.securityToken ?? worldDefaults.securityToken,
111+
jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix,
112+
queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency,
113+
};
114+
115+
return createGraphileWorkerQueue(
116+
{
117+
jobPrefix: config.jobPrefix,
118+
connectionString: config.connectionString,
119+
queueConcurrency: config.queueConcurrency,
120+
},
121+
createFunctionProxy({
122+
securityToken: config.securityToken,
123+
stepEntrypoint: opts.stepEntrypoint,
124+
workflowEntrypoint: opts.workflowEntrypoint,
125+
})
126+
);
127+
}
128+
129+
/**
130+
* QueueDriver implementation using Graphile Worker for job management
131+
* and HTTP for execution.
132+
*
133+
* Graphile Worker uses PostgreSQL LISTEN/NOTIFY for near-instant job pickup
134+
* (~3ms latency vs 500ms polling with pg-boss).
135+
*/
136+
export function createGraphileWorkerHttpProxyQueue(
137+
opts: {
138+
port?: number;
139+
baseUrl?: string;
140+
jobPrefix?: string;
141+
securityToken?: string;
142+
connectionString?: string;
143+
queueConcurrency?: number;
144+
} = {}
145+
): QueueDriver {
146+
const worldDefaults = getWorldConfig();
147+
const queueDefaults = getQueueConfig();
148+
149+
const config = {
150+
connectionString: opts.connectionString ?? worldDefaults.connectionString,
151+
securityToken: opts.securityToken ?? worldDefaults.securityToken,
152+
jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix,
153+
queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency,
154+
155+
port:
156+
opts.port ??
157+
(process.env.WORKFLOW_POSTGRES_APP_PORT
158+
? parseInt(process.env.WORKFLOW_POSTGRES_APP_PORT, 10)
159+
: undefined),
160+
161+
baseUrl: opts.baseUrl ?? process.env.WORKFLOW_POSTGRES_APP_URL,
162+
};
163+
164+
return createGraphileWorkerQueue(
165+
{
166+
jobPrefix: config.jobPrefix,
167+
connectionString: config.connectionString,
168+
queueConcurrency: config.queueConcurrency,
169+
},
170+
createHttpProxy({
171+
port: config.port,
172+
baseUrl: config.baseUrl,
173+
securityToken: config.securityToken,
174+
})
175+
);
176+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import {
2+
makeWorkerUtils,
3+
run,
4+
type Task,
5+
type WorkerUtils,
6+
} from 'graphile-worker';
7+
import type { WkfProxy } from '../proxies/types.js';
8+
import { MessageData, type QueueDriver } from './types.js';
9+
10+
/**
11+
* QueueDriver implementation using Graphile Worker for job management.
12+
* Uses PostgreSQL LISTEN/NOTIFY for near-instant job pickup (~3ms latency).
13+
* Takes in a proxy that will handle the actual step/flow execution.
14+
*/
15+
export function createGraphileWorkerQueue(
16+
opts: {
17+
jobPrefix: string;
18+
connectionString: string;
19+
queueConcurrency: number;
20+
},
21+
proxy: WkfProxy
22+
): QueueDriver {
23+
let workerUtils: WorkerUtils | null = null;
24+
25+
const stepTaskName = `${opts.jobPrefix}step`;
26+
const flowTaskName = `${opts.jobPrefix}flow`;
27+
28+
const ensureUtils = async (): Promise<WorkerUtils> => {
29+
if (!workerUtils) {
30+
workerUtils = await makeWorkerUtils({
31+
connectionString: opts.connectionString,
32+
});
33+
await workerUtils.migrate();
34+
}
35+
return workerUtils;
36+
};
37+
38+
const createTaskHandler = (
39+
proxyFn: WkfProxy[keyof WkfProxy],
40+
taskName: string
41+
): Task => {
42+
return async (payload, helpers) => {
43+
const message = MessageData.parse(payload);
44+
45+
helpers.logger.info(`Running: ${message.queueName}`);
46+
47+
try {
48+
const response = await proxyFn(message);
49+
50+
if (response.status === 503) {
51+
const body = (await response.json()) as { timeoutSeconds?: number };
52+
53+
if (body.timeoutSeconds) {
54+
// Requeue the job with a delay
55+
const utils = await ensureUtils();
56+
await utils.addJob(taskName, MessageData.encode(message), {
57+
jobKey: message.idempotencyKey ?? message.messageId,
58+
maxAttempts: 3,
59+
runAt: new Date(Date.now() + body.timeoutSeconds * 1000),
60+
});
61+
62+
helpers.logger.info(
63+
`Requeued: ${message.queueName} for ${body.timeoutSeconds}s`
64+
);
65+
66+
return;
67+
}
68+
}
69+
70+
if (!response.ok) {
71+
const text = await response.text();
72+
throw new Error(`Step failed: ${text}`);
73+
}
74+
} catch (error) {
75+
helpers.logger.error(`Error handling: ${message.queueName}`, { error });
76+
throw error;
77+
}
78+
};
79+
};
80+
81+
return {
82+
pushStep: async (message: MessageData) => {
83+
const utils = await ensureUtils();
84+
await utils.addJob(stepTaskName, MessageData.encode(message), {
85+
jobKey: message.idempotencyKey ?? message.messageId,
86+
maxAttempts: 3,
87+
});
88+
},
89+
90+
pushFlow: async (message: MessageData) => {
91+
const utils = await ensureUtils();
92+
await utils.addJob(flowTaskName, MessageData.encode(message), {
93+
jobKey: message.idempotencyKey ?? message.messageId,
94+
maxAttempts: 3,
95+
});
96+
},
97+
98+
start: async () => {
99+
await ensureUtils();
100+
101+
await run({
102+
connectionString: opts.connectionString,
103+
concurrency: opts.queueConcurrency,
104+
taskList: {
105+
[stepTaskName]: createTaskHandler(proxy.proxyStep, stepTaskName),
106+
[flowTaskName]: createTaskHandler(proxy.proxyWorkflow, flowTaskName),
107+
},
108+
});
109+
},
110+
};
111+
}

0 commit comments

Comments
 (0)