173 lines
6.2 KiB
JavaScript
173 lines
6.2 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.SQLiteJobQueue = void 0;
|
|
/**
|
|
* SQLiteJobQueue — SQLite-backed job queue with exponential backoff retry.
|
|
* Zero external dependencies: uses Kysely + better-sqlite3.
|
|
*/
|
|
const kysely_1 = require("kysely");
|
|
const crypto_1 = require("crypto");
|
|
class SQLiteJobQueue {
|
|
constructor(db, logger, pollIntervalMs = 1000) {
|
|
this.db = db;
|
|
this.logger = logger;
|
|
this.pollIntervalMs = pollIntervalMs;
|
|
this.running = false;
|
|
this.activeJobs = 0;
|
|
this.pollTimer = null;
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
this.handlers = new Map();
|
|
}
|
|
registerHandler(type, handler) {
|
|
this.handlers.set(type, handler);
|
|
}
|
|
async enqueue(type, payload, opts) {
|
|
const id = (0, crypto_1.randomUUID)();
|
|
const now = new Date().toISOString();
|
|
const runAt = (opts?.runAt ?? new Date()).toISOString();
|
|
await this.db
|
|
.insertInto('jobs')
|
|
.values({
|
|
id,
|
|
type,
|
|
status: 'pending',
|
|
payload: JSON.stringify(payload),
|
|
result: null,
|
|
error: null,
|
|
attempts: 0,
|
|
max_attempts: opts?.maxAttempts ?? 3,
|
|
priority: opts?.priority ?? 0,
|
|
run_at: runAt,
|
|
started_at: null,
|
|
completed_at: null,
|
|
created_at: now,
|
|
updated_at: now,
|
|
})
|
|
.execute();
|
|
this.logger.debug({ jobId: id, type, runAt }, 'Job enqueued');
|
|
return id;
|
|
}
|
|
start() {
|
|
if (this.running)
|
|
return;
|
|
this.running = true;
|
|
this.logger.info('Job queue started');
|
|
this.scheduleNextPoll();
|
|
}
|
|
pause() {
|
|
this.running = false;
|
|
if (this.pollTimer !== null) {
|
|
clearTimeout(this.pollTimer);
|
|
this.pollTimer = null;
|
|
}
|
|
this.logger.info('Job queue paused');
|
|
}
|
|
async waitForActive(timeoutMs) {
|
|
const deadline = Date.now() + timeoutMs;
|
|
while (this.activeJobs > 0 && Date.now() < deadline) {
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
}
|
|
}
|
|
scheduleNextPoll() {
|
|
if (!this.running)
|
|
return;
|
|
this.pollTimer = setTimeout(() => {
|
|
this.pollOnce()
|
|
.catch((err) => {
|
|
this.logger.error({ err }, 'Job queue poll error');
|
|
})
|
|
.finally(() => {
|
|
this.scheduleNextPoll();
|
|
});
|
|
}, this.pollIntervalMs);
|
|
}
|
|
async pollOnce() {
|
|
const now = new Date().toISOString();
|
|
// Find one pending job that is due
|
|
const row = await this.db
|
|
.selectFrom('jobs')
|
|
.selectAll()
|
|
.where('status', '=', 'pending')
|
|
.where('run_at', '<=', now)
|
|
.orderBy('priority', 'desc')
|
|
.orderBy('created_at', 'asc')
|
|
.limit(1)
|
|
.executeTakeFirst();
|
|
if (!row)
|
|
return;
|
|
// Optimistic lock: claim the job atomically
|
|
const claimTime = new Date().toISOString();
|
|
const updateResult = await this.db
|
|
.updateTable('jobs')
|
|
.set({
|
|
status: 'running',
|
|
started_at: claimTime,
|
|
attempts: (0, kysely_1.sql) `attempts + 1`,
|
|
updated_at: claimTime,
|
|
})
|
|
.where('id', '=', row.id)
|
|
.where('status', '=', 'pending')
|
|
.executeTakeFirst();
|
|
if (!updateResult || Number(updateResult.numUpdatedRows) === 0) {
|
|
return; // Another worker claimed this job
|
|
}
|
|
this.activeJobs++;
|
|
this.logger.info({ jobId: row.id, type: row.type, attempt: row.attempts + 1 }, 'Job started');
|
|
try {
|
|
const handler = this.handlers.get(row.type);
|
|
if (!handler) {
|
|
throw new Error(`No handler registered for job type: ${row.type}`);
|
|
}
|
|
const payload = JSON.parse(row.payload);
|
|
const result = await handler(payload);
|
|
const completedAt = new Date().toISOString();
|
|
await this.db
|
|
.updateTable('jobs')
|
|
.set({
|
|
status: 'completed',
|
|
result: JSON.stringify(result),
|
|
completed_at: completedAt,
|
|
updated_at: completedAt,
|
|
error: null,
|
|
})
|
|
.where('id', '=', row.id)
|
|
.execute();
|
|
this.logger.info({ jobId: row.id, type: row.type }, 'Job completed');
|
|
}
|
|
catch (err) {
|
|
const failedAt = new Date().toISOString();
|
|
const errorMsg = err instanceof Error ? err.message : String(err);
|
|
// Fetch current attempts count (was incremented above)
|
|
const current = await this.db
|
|
.selectFrom('jobs')
|
|
.select(['attempts', 'max_attempts'])
|
|
.where('id', '=', row.id)
|
|
.executeTakeFirst();
|
|
const attempts = current?.attempts ?? row.attempts + 1;
|
|
const maxAttempts = current?.max_attempts ?? row.max_attempts;
|
|
if (attempts >= maxAttempts) {
|
|
await this.db
|
|
.updateTable('jobs')
|
|
.set({ status: 'failed', error: errorMsg, updated_at: failedAt })
|
|
.where('id', '=', row.id)
|
|
.execute();
|
|
this.logger.error({ jobId: row.id, type: row.type, attempts, err }, 'Job failed permanently');
|
|
}
|
|
else {
|
|
const backoffMs = Math.min(1000 * Math.pow(2, attempts), 60000);
|
|
const retryAt = new Date(Date.now() + backoffMs).toISOString();
|
|
await this.db
|
|
.updateTable('jobs')
|
|
.set({ status: 'pending', run_at: retryAt, error: errorMsg, updated_at: failedAt })
|
|
.where('id', '=', row.id)
|
|
.execute();
|
|
this.logger.warn({ jobId: row.id, type: row.type, attempts, backoffMs }, 'Job failed, will retry');
|
|
}
|
|
}
|
|
finally {
|
|
this.activeJobs--;
|
|
}
|
|
}
|
|
}
|
|
exports.SQLiteJobQueue = SQLiteJobQueue;
|