Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 165 additions & 10 deletions src/storage/file.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto'
import { EventEmitter } from 'node:events'
import {
mkdir,
Expand All @@ -7,13 +8,19 @@ import {
rm,
unlink,
watch,
writeFile,
} from 'node:fs/promises'
import { join } from 'node:path'
import type { Storage } from './types.ts'
import { loadOptionalDependency } from './utils.ts'

type WriteFileAtomic = (path: string, data: string | Buffer, options?: Record<string, unknown>) => Promise<void>

const CLEANUP_LOCK_KEY = 'cleanup-leader'
const CLEANUP_LOCK_TTL = 10000
const CLEANUP_RENEWAL_INTERVAL = 3000
const CLEANUP_ACQUIRE_RETRY = 5000

interface FileStorageConfig {
basePath: string;
}
Expand Down Expand Up @@ -49,6 +56,10 @@ export class FileStorage implements Storage {
#connected = false
#writeFileAtomic!: WriteFileAtomic

#instanceId = randomUUID()
#isCleanupLeader = false
#leadershipTimer: ReturnType<typeof setInterval> | null = null

constructor (config: FileStorageConfig) {
this.#basePath = config.basePath
this.#queuePath = join(this.#basePath, 'queue')
Expand Down Expand Up @@ -91,29 +102,47 @@ export class FileStorage implements Storage {
this.#startQueueWatcher()
this.#startNotifyWatcher()

// Start cleanup interval
this.#cleanupInterval = setInterval(() => {
this.#cleanupExpired().catch(() => {})
}, 1000)
// Try to become cleanup leader
const acquired = await this.acquireLeaderLock(
CLEANUP_LOCK_KEY,
this.#instanceId,
CLEANUP_LOCK_TTL
)

if (acquired) {
this.#startCleanupAsLeader()
}

// Start leadership timer
this.#leadershipTimer = setInterval(() => {
this.#leadershipTick().catch(() => {})
}, this.#isCleanupLeader ? CLEANUP_RENEWAL_INTERVAL : CLEANUP_ACQUIRE_RETRY)

this.#connected = true
}

async disconnect (): Promise<void> {
if (!this.#connected) return

// Stop leadership timer
if (this.#leadershipTimer) {
clearInterval(this.#leadershipTimer)
this.#leadershipTimer = null
}

// If leader, release lock and stop cleanup
if (this.#isCleanupLeader) {
this.#stopCleanup()
await this.releaseLeaderLock(CLEANUP_LOCK_KEY, this.#instanceId).catch(() => {})
this.#isCleanupLeader = false
}

// Stop watchers
if (this.#watchAbortController) {
this.#watchAbortController.abort()
this.#watchAbortController = null
}

// Stop cleanup
if (this.#cleanupInterval) {
clearInterval(this.#cleanupInterval)
this.#cleanupInterval = null
}

// Clear dequeue waiters
for (const waiter of this.#dequeueWaiters) {
clearTimeout(waiter.timeoutId)
Expand Down Expand Up @@ -721,10 +750,136 @@ export class FileStorage implements Storage {
}
}

#startCleanupAsLeader (): void {
this.#isCleanupLeader = true
// Start cleanup interval
this.#cleanupInterval = setInterval(() => {
this.#cleanupExpired().catch(() => {})
}, 1000)
}

#stopCleanup (): void {
if (this.#cleanupInterval) {
clearInterval(this.#cleanupInterval)
this.#cleanupInterval = null
}
}

async #leadershipTick (): Promise<void> {
if (this.#isCleanupLeader) {
// Renew the lock
const renewed = await this.renewLeaderLock(
CLEANUP_LOCK_KEY,
this.#instanceId,
CLEANUP_LOCK_TTL
)
if (!renewed) {
// Lost leadership
this.#stopCleanup()
this.#isCleanupLeader = false
// Switch to follower interval
this.#restartLeadershipTimer(CLEANUP_ACQUIRE_RETRY)
}
} else {
// Try to acquire
const acquired = await this.acquireLeaderLock(
CLEANUP_LOCK_KEY,
this.#instanceId,
CLEANUP_LOCK_TTL
)
if (acquired) {
this.#startCleanupAsLeader()
// Switch to leader interval
this.#restartLeadershipTimer(CLEANUP_RENEWAL_INTERVAL)
}
}
}

#restartLeadershipTimer (intervalMs: number): void {
if (this.#leadershipTimer) {
clearInterval(this.#leadershipTimer)
}
this.#leadershipTimer = setInterval(() => {
this.#leadershipTick().catch(() => {})
}, intervalMs)
}

// ═══════════════════════════════════════════════════════════════════
// LEADER ELECTION
// ═══════════════════════════════════════════════════════════════════

async acquireLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
const lockPath = join(this.#basePath, `${lockKey}.lock`)
const data = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs })

try {
// Exclusive create - fails if file exists
await writeFile(lockPath, data, { flag: 'wx' })
return true
} catch (err: unknown) {
if (err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST') {
// Lock file exists, check if expired
try {
const content = JSON.parse(await readFile(lockPath, 'utf8'))
if (Date.now() <= content.expiresAt) {
// Not expired, someone else holds it
return false
}
// Expired - try to take over with atomic overwrite
const newData = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs })
await this.#writeFileAtomic(lockPath, newData)
// Re-read to verify we won the race
const verify = JSON.parse(await readFile(lockPath, 'utf8'))
return verify.ownerId === ownerId
} catch {
return false
}
}
return false
}
}

async renewLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise<boolean> {
const lockPath = join(this.#basePath, `${lockKey}.lock`)
try {
const content = JSON.parse(await readFile(lockPath, 'utf8'))
if (content.ownerId !== ownerId) return false
const data = JSON.stringify({ ownerId, expiresAt: Date.now() + ttlMs })
await this.#writeFileAtomic(lockPath, data)
return true
} catch {
return false
}
}

async releaseLeaderLock (lockKey: string, ownerId: string): Promise<boolean> {
const lockPath = join(this.#basePath, `${lockKey}.lock`)
try {
const content = JSON.parse(await readFile(lockPath, 'utf8'))
if (content.ownerId !== ownerId) return false
await unlink(lockPath)
return true
} catch {
return false
}
}

/**
* Clear all data (useful for testing)
*/
async clear (): Promise<void> {
// Remove lock files
try {
const files = await readdir(this.#basePath)
await Promise.all(
files
.filter((f) => f.endsWith('.lock'))
.map((f) => unlink(join(this.#basePath, f)).catch(() => {}))
)
} catch {
// Ignore errors
}

await Promise.all([
rm(this.#queuePath, { recursive: true, force: true }),
rm(this.#processingPath, { recursive: true, force: true }),
Expand Down
125 changes: 125 additions & 0 deletions test/file-storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { mkdtemp, rm } from 'node:fs/promises'
import { join } from 'node:path'
import { tmpdir } from 'node:os'
import { setTimeout as sleep } from 'node:timers/promises'
import { createFileStorage } from './fixtures/file.ts'
import { FileStorage } from '../src/storage/file.ts'
Expand Down Expand Up @@ -391,4 +394,126 @@ describe('FileStorage', () => {
assert.deepStrictEqual(await storage.getWorkers(), [])
})
})

describe('leader election', () => {
let basePath: string
let leaderStorage: FileStorage
let leaderCleanup: () => Promise<void>

beforeEach(async () => {
basePath = await mkdtemp(join(tmpdir(), 'job-queue-leader-test-'))
const result = await createFileStorage({ basePath })
leaderStorage = result.storage
leaderCleanup = result.cleanup
await leaderStorage.connect()
})

afterEach(async () => {
await leaderStorage.clear()
await leaderStorage.disconnect()
await leaderCleanup()
})

it('should acquire lock when no lock exists', async () => {
const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
assert.strictEqual(acquired, true)
})

it('should fail to acquire lock held by another', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000)
assert.strictEqual(acquired, false)
})

it('should acquire lock when expired', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10)
await sleep(50)
const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000)
assert.strictEqual(acquired, true)
})

it('should renew lock by same owner', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
const renewed = await leaderStorage.renewLeaderLock('test-lock', 'owner-1', 10000)
assert.strictEqual(renewed, true)
})

it('should fail to renew lock by different owner', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
const renewed = await leaderStorage.renewLeaderLock('test-lock', 'owner-2', 10000)
assert.strictEqual(renewed, false)
})

it('should release lock by same owner and make it acquirable', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
const released = await leaderStorage.releaseLeaderLock('test-lock', 'owner-1')
assert.strictEqual(released, true)

const acquired = await leaderStorage.acquireLeaderLock('test-lock', 'owner-2', 10000)
assert.strictEqual(acquired, true)
})

it('should fail to release lock by different owner', async () => {
await leaderStorage.acquireLeaderLock('test-lock', 'owner-1', 10000)
const released = await leaderStorage.releaseLeaderLock('test-lock', 'owner-2')
assert.strictEqual(released, false)
})
})

describe('cleanup leader behavior', () => {
it('should clean up expired results with only one leader among two instances', async () => {
const basePath = await mkdtemp(join(tmpdir(), 'job-queue-cleanup-test-'))

const { storage: storage1 } = await createFileStorage({ basePath })
const { storage: storage2 } = await createFileStorage({ basePath })

await storage1.connect()
await storage2.connect()

try {
// Store a result with very short TTL (50ms)
await storage1.setResult('job-1', Buffer.from('result-1'), 50)

// Wait for result to expire and cleanup to run
await sleep(300)

// Result should be cleaned up by whichever instance is leader
const result = await storage1.getResult('job-1')
assert.strictEqual(result, null, 'expired result should be cleaned up')
} finally {
await storage1.disconnect()
await storage2.disconnect()
await rm(basePath, { recursive: true, force: true })
}
})

it('should failover cleanup when leader disconnects', async () => {
const basePath = await mkdtemp(join(tmpdir(), 'job-queue-failover-test-'))

const { storage: storage1 } = await createFileStorage({ basePath })
await storage1.connect()

// storage1 is now leader. Disconnect it.
await storage1.disconnect()

// Connect storage2 - it should become leader
const { storage: storage2 } = await createFileStorage({ basePath })
await storage2.connect()

try {
// Store a result with very short TTL (50ms)
await storage2.setResult('job-2', Buffer.from('result-2'), 50)

// Wait for result to expire and cleanup to run
await sleep(300)

// Result should be cleaned up by storage2 as the new leader
const result = await storage2.getResult('job-2')
assert.strictEqual(result, null, 'expired result should be cleaned up by new leader')
} finally {
await storage2.disconnect()
await rm(basePath, { recursive: true, force: true })
}
})
})
})
6 changes: 3 additions & 3 deletions test/fixtures/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { FileStorage } from '../../src/storage/file.ts'
/**
* Create a FileStorage instance for testing
*/
export async function createFileStorage (): Promise<{ storage: FileStorage, cleanup: () => Promise<void> }> {
const basePath = await mkdtemp(join(tmpdir(), 'job-queue-test-'))
export async function createFileStorage (config?: { basePath?: string }): Promise<{ storage: FileStorage, basePath: string, cleanup: () => Promise<void> }> {
const basePath = config?.basePath ?? await mkdtemp(join(tmpdir(), 'job-queue-test-'))

const storage = new FileStorage({ basePath })

const cleanup = async () => {
await rm(basePath, { recursive: true, force: true })
}

return { storage, cleanup }
return { storage, basePath, cleanup }
}
Loading