diff --git a/src/storage/file.ts b/src/storage/file.ts index 455cf52..4fb1fdc 100644 --- a/src/storage/file.ts +++ b/src/storage/file.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto' import { EventEmitter } from 'node:events' import { mkdir, @@ -7,6 +8,7 @@ import { rm, unlink, watch, + writeFile, } from 'node:fs/promises' import { join } from 'node:path' import type { Storage } from './types.ts' @@ -14,6 +16,11 @@ import { loadOptionalDependency } from './utils.ts' type WriteFileAtomic = (path: string, data: string | Buffer, options?: Record) => Promise +const CLEANUP_LOCK_KEY = 'cleanup-leader' +const CLEANUP_LOCK_TTL = 10000 +const CLEANUP_RENEWAL_INTERVAL = 3000 +const CLEANUP_ACQUIRE_RETRY = 5000 + interface FileStorageConfig { basePath: string; } @@ -49,6 +56,10 @@ export class FileStorage implements Storage { #connected = false #writeFileAtomic!: WriteFileAtomic + #instanceId = randomUUID() + #isCleanupLeader = false + #leadershipTimer: ReturnType | null = null + constructor (config: FileStorageConfig) { this.#basePath = config.basePath this.#queuePath = join(this.#basePath, 'queue') @@ -91,10 +102,21 @@ export class FileStorage implements Storage { this.#startQueueWatcher() this.#startNotifyWatcher() - // Start cleanup interval - this.#cleanupInterval = setInterval(() => { - this.#cleanupExpired().catch(() => {}) - }, 1000) + // Try to become cleanup leader + const acquired = await this.acquireLeaderLock( + CLEANUP_LOCK_KEY, + this.#instanceId, + CLEANUP_LOCK_TTL + ) + + if (acquired) { + this.#startCleanupAsLeader() + } + + // Start leadership timer + this.#leadershipTimer = setInterval(() => { + this.#leadershipTick().catch(() => {}) + }, this.#isCleanupLeader ? CLEANUP_RENEWAL_INTERVAL : CLEANUP_ACQUIRE_RETRY) this.#connected = true } @@ -102,18 +124,25 @@ export class FileStorage implements Storage { async disconnect (): Promise { if (!this.#connected) return + // Stop leadership timer + if (this.#leadershipTimer) { + clearInterval(this.#leadershipTimer) + this.#leadershipTimer = null + } + + // If leader, release lock and stop cleanup + if (this.#isCleanupLeader) { + this.#stopCleanup() + await this.releaseLeaderLock(CLEANUP_LOCK_KEY, this.#instanceId).catch(() => {}) + this.#isCleanupLeader = false + } + // Stop watchers if (this.#watchAbortController) { this.#watchAbortController.abort() this.#watchAbortController = null } - // Stop cleanup - if (this.#cleanupInterval) { - clearInterval(this.#cleanupInterval) - this.#cleanupInterval = null - } - // Clear dequeue waiters for (const waiter of this.#dequeueWaiters) { clearTimeout(waiter.timeoutId) @@ -721,10 +750,136 @@ export class FileStorage implements Storage { } } + #startCleanupAsLeader (): void { + this.#isCleanupLeader = true + // Start cleanup interval + this.#cleanupInterval = setInterval(() => { + this.#cleanupExpired().catch(() => {}) + }, 1000) + } + + #stopCleanup (): void { + if (this.#cleanupInterval) { + clearInterval(this.#cleanupInterval) + this.#cleanupInterval = null + } + } + + async #leadershipTick (): Promise { + if (this.#isCleanupLeader) { + // Renew the lock + const renewed = await this.renewLeaderLock( + CLEANUP_LOCK_KEY, + this.#instanceId, + CLEANUP_LOCK_TTL + ) + if (!renewed) { + // Lost leadership + this.#stopCleanup() + this.#isCleanupLeader = false + // Switch to follower interval + this.#restartLeadershipTimer(CLEANUP_ACQUIRE_RETRY) + } + } else { + // Try to acquire + const acquired = await this.acquireLeaderLock( + CLEANUP_LOCK_KEY, + this.#instanceId, + CLEANUP_LOCK_TTL + ) + if (acquired) { + this.#startCleanupAsLeader() + // Switch to leader interval + this.#restartLeadershipTimer(CLEANUP_RENEWAL_INTERVAL) + } + } + } + + #restartLeadershipTimer (intervalMs: number): void { + if (this.#leadershipTimer) { + clearInterval(this.#leadershipTimer) + } + this.#leadershipTimer = setInterval(() => { + this.#leadershipTick().catch(() => {}) + }, intervalMs) + } + + // ═══════════════════════════════════════════════════════════════════ + // LEADER ELECTION + // ═══════════════════════════════════════════════════════════════════ + + async acquireLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const lockPath = join(this.#basePath, `${lockKey}.lock`) + const data = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs }) + + try { + // Exclusive create - fails if file exists + await writeFile(lockPath, data, { flag: 'wx' }) + return true + } catch (err: unknown) { + if (err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST') { + // Lock file exists, check if expired + try { + const content = JSON.parse(await readFile(lockPath, 'utf8')) + if (Date.now() <= content.expiresAt) { + // Not expired, someone else holds it + return false + } + // Expired - try to take over with atomic overwrite + const newData = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs }) + await this.#writeFileAtomic(lockPath, newData) + // Re-read to verify we won the race + const verify = JSON.parse(await readFile(lockPath, 'utf8')) + return verify.ownerId === ownerId + } catch { + return false + } + } + return false + } + } + + async renewLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const lockPath = join(this.#basePath, `${lockKey}.lock`) + try { + const content = JSON.parse(await readFile(lockPath, 'utf8')) + if (content.ownerId !== ownerId) return false + const data = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs }) + await this.#writeFileAtomic(lockPath, data) + return true + } catch { + return false + } + } + + async releaseLeaderLock (lockKey: string, ownerId: string): Promise { + const lockPath = join(this.#basePath, `${lockKey}.lock`) + try { + const content = JSON.parse(await readFile(lockPath, 'utf8')) + if (content.ownerId !== ownerId) return false + await unlink(lockPath) + return true + } catch { + return false + } + } + /** * Clear all data (useful for testing) */ async clear (): Promise { + // Remove lock files + try { + const files = await readdir(this.#basePath) + await Promise.all( + files + .filter((f) => f.endsWith('.lock')) + .map((f) => unlink(join(this.#basePath, f)).catch(() => {})) + ) + } catch { + // Ignore errors + } + await Promise.all([ rm(this.#queuePath, { recursive: true, force: true }), rm(this.#processingPath, { recursive: true, force: true }), diff --git a/test/file-storage.test.ts b/test/file-storage.test.ts index 3fba1b2..dbe57d2 100644 --- a/test/file-storage.test.ts +++ b/test/file-storage.test.ts @@ -1,5 +1,8 @@ import { describe, it, beforeEach, afterEach } from 'node:test' import assert from 'node:assert' +import { mkdtemp, rm } from 'node:fs/promises' +import { join } from 'node:path' +import { tmpdir } from 'node:os' import { setTimeout as sleep } from 'node:timers/promises' import { createFileStorage } from './fixtures/file.ts' import { FileStorage } from '../src/storage/file.ts' @@ -391,4 +394,126 @@ describe('FileStorage', () => { assert.deepStrictEqual(await storage.getWorkers(), []) }) }) + + describe('leader election', () => { + let basePath: string + let leaderStorage: FileStorage + let leaderCleanup: () => Promise + + beforeEach(async () => { + basePath = await mkdtemp(join(tmpdir(), 'job-queue-leader-test-')) + const result = await createFileStorage({ basePath }) + leaderStorage = result.storage + leaderCleanup = result.cleanup + await leaderStorage.connect() + }) + + afterEach(async () => { + await leaderStorage.clear() + await leaderStorage.disconnect() + await leaderCleanup() + }) + + it('should acquire lock when no lock exists', async () => { + const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + assert.strictEqual(acquired, true) + }) + + it('should fail to acquire lock held by another', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000) + assert.strictEqual(acquired, false) + }) + + it('should acquire lock when expired', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10) + await sleep(50) + const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000) + assert.strictEqual(acquired, true) + }) + + it('should renew lock by same owner', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + const renewed = await leaderStorage.renewLeaderLock('test-lock', 'owner-1', 10000) + assert.strictEqual(renewed, true) + }) + + it('should fail to renew lock by different owner', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + const renewed = await leaderStorage.renewLeaderLock('test-lock', 'owner-2', 10000) + assert.strictEqual(renewed, false) + }) + + it('should release lock by same owner and make it acquirable', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + const released = await leaderStorage.releaseLeaderLock('test-lock', 'owner-1') + assert.strictEqual(released, true) + + const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000) + assert.strictEqual(acquired, true) + }) + + it('should fail to release lock by different owner', async () => { + await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000) + const released = await leaderStorage.releaseLeaderLock('test-lock', 'owner-2') + assert.strictEqual(released, false) + }) + }) + + describe('cleanup leader behavior', () => { + it('should clean up expired results with only one leader among two instances', async () => { + const basePath = await mkdtemp(join(tmpdir(), 'job-queue-cleanup-test-')) + + const { storage: storage1 } = await createFileStorage({ basePath }) + const { storage: storage2 } = await createFileStorage({ basePath }) + + await storage1.connect() + await storage2.connect() + + try { + // Store a result with very short TTL (50ms) + await storage1.setResult('job-1', Buffer.from('result-1'), 50) + + // Wait for result to expire and cleanup to run + await sleep(300) + + // Result should be cleaned up by whichever instance is leader + const result = await storage1.getResult('job-1') + assert.strictEqual(result, null, 'expired result should be cleaned up') + } finally { + await storage1.disconnect() + await storage2.disconnect() + await rm(basePath, { recursive: true, force: true }) + } + }) + + it('should failover cleanup when leader disconnects', async () => { + const basePath = await mkdtemp(join(tmpdir(), 'job-queue-failover-test-')) + + const { storage: storage1 } = await createFileStorage({ basePath }) + await storage1.connect() + + // storage1 is now leader. Disconnect it. + await storage1.disconnect() + + // Connect storage2 - it should become leader + const { storage: storage2 } = await createFileStorage({ basePath }) + await storage2.connect() + + try { + // Store a result with very short TTL (50ms) + await storage2.setResult('job-2', Buffer.from('result-2'), 50) + + // Wait for result to expire and cleanup to run + await sleep(300) + + // Result should be cleaned up by storage2 as the new leader + const result = await storage2.getResult('job-2') + assert.strictEqual(result, null, 'expired result should be cleaned up by new leader') + } finally { + await storage2.disconnect() + await rm(basePath, { recursive: true, force: true }) + } + }) + }) }) diff --git a/test/fixtures/file.ts b/test/fixtures/file.ts index ac4f74e..afce0f6 100644 --- a/test/fixtures/file.ts +++ b/test/fixtures/file.ts @@ -6,8 +6,8 @@ import { FileStorage } from '../../src/storage/file.ts' /** * Create a FileStorage instance for testing */ -export async function createFileStorage (): Promise<{ storage: FileStorage, cleanup: () => Promise }> { - const basePath = await mkdtemp(join(tmpdir(), 'job-queue-test-')) +export async function createFileStorage (config?: { basePath?: string }): Promise<{ storage: FileStorage, basePath: string, cleanup: () => Promise }> { + const basePath = config?.basePath ?? await mkdtemp(join(tmpdir(), 'job-queue-test-')) const storage = new FileStorage({ basePath }) @@ -15,5 +15,5 @@ export async function createFileStorage (): Promise<{ storage: FileStorage, clea await rm(basePath, { recursive: true, force: true }) } - return { storage, cleanup } + return { storage, basePath, cleanup } } diff --git a/test/reaper-leader-election-file.test.ts b/test/reaper-leader-election-file.test.ts new file mode 100644 index 0000000..c925c6a --- /dev/null +++ b/test/reaper-leader-election-file.test.ts @@ -0,0 +1,255 @@ +import { describe, it, afterEach } from 'node:test' +import assert from 'node:assert' +import { mkdtemp, rm } from 'node:fs/promises' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import { setTimeout as sleep } from 'node:timers/promises' +import { Queue, Reaper, FileStorage, type Job } from '../src/index.ts' +import { once } from './helpers/events.ts' + +describe('Reaper Leader Election (FileStorage)', () => { + const resources: Array<{ stop: () => Promise }> = [] + let basePath: string + + afterEach(async () => { + // Stop all resources in reverse order + for (const resource of resources.reverse()) { + await resource.stop().catch(() => {}) + } + resources.length = 0 + if (basePath) { + await rm(basePath, { recursive: true, force: true }).catch(() => {}) + } + }) + + function createStorage (): FileStorage { + const storage = new FileStorage({ basePath }) + resources.push({ stop: () => storage.disconnect() }) + return storage + } + + function createReaper (storage: FileStorage, opts?: { + visibilityTimeout?: number + lockTTL?: number + renewalInterval?: number + acquireRetryInterval?: number + }): Reaper { + const reaper = new Reaper({ + storage, + visibilityTimeout: opts?.visibilityTimeout ?? 100, + leaderElection: { + enabled: true, + lockTTL: opts?.lockTTL ?? 5000, + renewalInterval: opts?.renewalInterval ?? 1000, + acquireRetryInterval: opts?.acquireRetryInterval ?? 200 + } + }) + resources.push({ stop: () => reaper.stop() }) + return reaper + } + + describe('single reaper', () => { + it('should acquire leadership on start', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage = createStorage() + await storage.connect() + + const reaper = createReaper(storage) + const leaderPromise = once(reaper, 'leadershipAcquired') + await reaper.start() + await leaderPromise + + assert.strictEqual(reaper.isLeader, true) + }) + + it('should release leadership on stop', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage = createStorage() + await storage.connect() + + const reaper = createReaper(storage) + const leaderPromise = once(reaper, 'leadershipAcquired') + await reaper.start() + await leaderPromise + + assert.strictEqual(reaper.isLeader, true) + + await reaper.stop() + assert.strictEqual(reaper.isLeader, false) + }) + + it('should detect and recover stalled jobs', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage = createStorage() + await storage.connect() + + const queue = new Queue<{ value: number }, { result: number }>({ + storage, + concurrency: 2, + visibilityTimeout: 500 + }) + resources.push({ stop: () => queue.stop() }) + + const reaper = createReaper(storage) + + let processCount = 0 + let abortFirstHandler: (() => void) | undefined + + queue.execute(async (job: Job<{ value: number }>) => { + processCount++ + if (processCount === 1) { + // First attempt: stall + await new Promise((resolve) => { + abortFirstHandler = resolve + job.signal.addEventListener('abort', () => resolve()) + }) + throw new Error('Aborted for cleanup') + } + return { result: job.payload.value * 2 } + }) + + try { + await queue.start() + const leaderPromise = once(reaper, 'leadershipAcquired') + await reaper.start() + await leaderPromise + + const resultPromise = queue.enqueueAndWait('job-1', { value: 21 }, { timeout: 5000 }) + + await once(reaper, 'stalled') + + const result = await resultPromise + assert.deepStrictEqual(result, { result: 42 }) + assert.strictEqual(processCount, 2) + } finally { + abortFirstHandler?.() + } + }) + }) + + describe('multiple reapers competing', () => { + it('only one reaper should become leader', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage1 = createStorage() + const storage2 = createStorage() + await storage1.connect() + await storage2.connect() + + const reaper1 = createReaper(storage1) + const reaper2 = createReaper(storage2) + + // Start first reaper - should become leader + const leader1Promise = once(reaper1, 'leadershipAcquired') + await reaper1.start() + await leader1Promise + + assert.strictEqual(reaper1.isLeader, true) + + // Start second reaper - should NOT become leader + await reaper2.start() + await sleep(500) + + assert.strictEqual(reaper1.isLeader, true) + assert.strictEqual(reaper2.isLeader, false) + }) + + it('follower should take over when leader stops', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage1 = createStorage() + const storage2 = createStorage() + await storage1.connect() + await storage2.connect() + + const reaper1 = createReaper(storage1) + const reaper2 = createReaper(storage2) + + // Start reaper1 as leader + const leader1Promise = once(reaper1, 'leadershipAcquired') + await reaper1.start() + await leader1Promise + + // Start reaper2 as follower + await reaper2.start() + await sleep(300) + + assert.strictEqual(reaper1.isLeader, true) + assert.strictEqual(reaper2.isLeader, false) + + // Set up listener for reaper2 acquiring leadership + const reaper2LeaderPromise = once(reaper2, 'leadershipAcquired') + + // Stop reaper1 - releases lock + await reaper1.stop() + + // Reaper2 should become leader + await reaper2LeaderPromise + + assert.strictEqual(reaper1.isLeader, false) + assert.strictEqual(reaper2.isLeader, true) + }) + + it('follower should take over when leader lock expires', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage1 = createStorage() + const storage2 = createStorage() + await storage1.connect() + await storage2.connect() + + const reaper1 = createReaper(storage1, { + lockTTL: 500, + renewalInterval: 100, + acquireRetryInterval: 100 + }) + + const reaper2 = createReaper(storage2, { + lockTTL: 500, + renewalInterval: 100, + acquireRetryInterval: 100 + }) + + // Start reaper1 as leader + const leader1Promise = once(reaper1, 'leadershipAcquired') + await reaper1.start() + await leader1Promise + + // Start reaper2 as follower + await reaper2.start() + await sleep(200) + + assert.strictEqual(reaper1.isLeader, true) + assert.strictEqual(reaper2.isLeader, false) + + const reaper2LeaderPromise = once(reaper2, 'leadershipAcquired') + + // Stop reaper1 - this releases the lock, reaper2 can acquire + await reaper1.stop() + + await reaper2LeaderPromise + + assert.strictEqual(reaper2.isLeader, true) + }) + }) + + describe('lock renewal', () => { + it('should renew lock periodically and remain leader', async () => { + basePath = await mkdtemp(join(tmpdir(), 'reaper-leader-file-')) + const storage = createStorage() + await storage.connect() + + const reaper = createReaper(storage, { + lockTTL: 500, + renewalInterval: 100, + acquireRetryInterval: 100 + }) + + const leaderPromise = once(reaper, 'leadershipAcquired') + await reaper.start() + await leaderPromise + + // Wait longer than TTL - should still be leader due to renewal + await sleep(800) + + assert.strictEqual(reaper.isLeader, true) + }) + }) +})