fase(19): scheduling module refactor
This commit is contained in:
@@ -7,6 +7,7 @@ import { createFindingsRouter } from '../modules/findings/infrastructure/http/Fi
|
||||
import { createFuzzingRouter } from '../modules/fuzzing/infrastructure/http/FuzzingController';
|
||||
import { createReportingRouter } from '../modules/reporting/infrastructure/http/ReportingController';
|
||||
import { createIntegrationsRouter } from '../modules/integrations/infrastructure/http/IntegrationsController';
|
||||
import { createSchedulingRouter } from '../modules/scheduling/infrastructure/http/SchedulingController';
|
||||
import { LicensingController } from '../modules/licensing/infrastructure/http/LicensingController';
|
||||
import { LicenseService } from '../modules/licensing/application/LicenseService';
|
||||
import { requireFeature } from '../modules/licensing/infrastructure/middleware/FeatureGateMiddleware';
|
||||
@@ -71,6 +72,7 @@ export function createRouter(deps: ServerDependencies): Router {
|
||||
router.use('/fuzz', createFuzzingRouter(deps.fuzzingDeps));
|
||||
router.use('/reports', requireFeature(licenseService, 'reports:basic'), createReportingRouter(deps.reportingDeps));
|
||||
router.use('/integrations', requireFeature(licenseService, 'integrations:webhook'), createIntegrationsRouter(deps.integrationsDeps));
|
||||
router.use('/schedules', createSchedulingRouter(deps.schedulingDeps));
|
||||
|
||||
// Licensing routes (public-ish — only status and activate, no sensitive data)
|
||||
const licensingController = new LicensingController(licenseService);
|
||||
|
||||
@@ -22,6 +22,7 @@ import { ReportingControllerDeps } from '../modules/reporting/infrastructure/htt
|
||||
import { IntegrationsDeps } from '../modules/integrations/infrastructure/http/IntegrationsController';
|
||||
import { AuthControllerDeps } from './router';
|
||||
import { LicenseService } from '../modules/licensing/application/LicenseService';
|
||||
import { SchedulingControllerDeps } from '../modules/scheduling/infrastructure/http/SchedulingController';
|
||||
|
||||
export interface ServerDependencies {
|
||||
config: AppConfig;
|
||||
@@ -32,6 +33,7 @@ export interface ServerDependencies {
|
||||
fuzzingDeps: FuzzingControllerDeps;
|
||||
reportingDeps: ReportingControllerDeps;
|
||||
integrationsDeps: IntegrationsDeps;
|
||||
schedulingDeps: SchedulingControllerDeps;
|
||||
authDeps: AuthControllerDeps;
|
||||
licenseService: LicenseService;
|
||||
}
|
||||
|
||||
23
src/main.ts
23
src/main.ts
@@ -65,6 +65,14 @@ import { OnFindingCreated } from './modules/integrations/application/event-handl
|
||||
import { RSALicenseValidator } from './modules/licensing/infrastructure/validators/RSALicenseValidator';
|
||||
import { LicenseService } from './modules/licensing/application/LicenseService';
|
||||
|
||||
// Scheduling module
|
||||
import { KyselyScheduleRepository } from './modules/scheduling/infrastructure/repositories/KyselyScheduleRepository';
|
||||
import { CreateScheduleCommand } from './modules/scheduling/application/commands/CreateScheduleCommand';
|
||||
import { ToggleScheduleCommand } from './modules/scheduling/application/commands/ToggleScheduleCommand';
|
||||
import { DeleteScheduleCommand } from './modules/scheduling/application/commands/DeleteScheduleCommand';
|
||||
import { ListSchedulesQuery } from './modules/scheduling/application/queries/ListSchedulesQuery';
|
||||
import { SchedulingService } from './modules/scheduling/application/SchedulingService';
|
||||
|
||||
// Job queue
|
||||
import { SQLiteJobQueue } from './jobs/SQLiteJobQueue';
|
||||
import { createExplorationJobHandler, EXPLORATION_JOB_TYPE } from './jobs/workers/ExplorationWorker';
|
||||
@@ -147,7 +155,7 @@ async function bootstrap(): Promise<void> {
|
||||
const licenseValidator = new RSALicenseValidator();
|
||||
const licenseService = new LicenseService(licenseValidator);
|
||||
|
||||
// 11c. Integrations
|
||||
// 11c. Integrations (moved from 11d)
|
||||
const integrationRepo = new KyselyIntegrationRepository(db);
|
||||
const webhookRepo = new KyselyWebhookEndpointRepository(db);
|
||||
const webhookDispatcher = new WebhookDispatcher(webhookRepo, logger);
|
||||
@@ -163,6 +171,15 @@ async function bootstrap(): Promise<void> {
|
||||
jobQueue.registerHandler(REPORT_JOB_TYPE, createReportJobHandler({ logger, reportRepository: reportRepo, findingRepository: findingRepo }));
|
||||
jobQueue.start();
|
||||
|
||||
// 12b. Scheduling module (after job queue, since it enqueues jobs)
|
||||
const scheduleRepo = new KyselyScheduleRepository(db);
|
||||
const createSchedule = new CreateScheduleCommand(scheduleRepo, eventBus);
|
||||
const toggleSchedule = new ToggleScheduleCommand(scheduleRepo, eventBus);
|
||||
const deleteSchedule = new DeleteScheduleCommand(scheduleRepo, eventBus);
|
||||
const listSchedules = new ListSchedulesQuery(scheduleRepo);
|
||||
const schedulingService = new SchedulingService(scheduleRepo, jobQueue, eventBus, logger);
|
||||
await schedulingService.start();
|
||||
|
||||
// 13. HTTP server
|
||||
const app = createServer({
|
||||
config,
|
||||
@@ -173,6 +190,7 @@ async function bootstrap(): Promise<void> {
|
||||
fuzzingDeps: { runFuzz, repository: fuzzRepo },
|
||||
reportingDeps: { generateReport, reportRepository: reportRepo, jobQueue },
|
||||
integrationsDeps: { integrationRepo, webhookRepo },
|
||||
schedulingDeps: { createSchedule, toggleSchedule, deleteSchedule, listSchedules, schedulingService, scheduleRepo },
|
||||
licenseService,
|
||||
authDeps: {
|
||||
registerCommand,
|
||||
@@ -218,6 +236,9 @@ async function bootstrap(): Promise<void> {
|
||||
// Close socket.io
|
||||
io.close();
|
||||
|
||||
// Stop scheduling service
|
||||
schedulingService.stop();
|
||||
|
||||
// Stop job queue and wait for active jobs
|
||||
jobQueue.pause();
|
||||
await jobQueue.waitForActive(30_000);
|
||||
|
||||
100
src/modules/scheduling/application/SchedulingService.ts
Normal file
100
src/modules/scheduling/application/SchedulingService.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
/**
|
||||
* SchedulingService — manages cron jobs for scheduled explorations.
|
||||
* On startup, loads all enabled schedules and registers cron tasks.
|
||||
* When a schedule fires, it enqueues an exploration job via IJobQueue.
|
||||
*/
|
||||
import * as cron from 'node-cron';
|
||||
import { IScheduleRepository } from '../domain/ports/IScheduleRepository';
|
||||
import { IJobQueue } from '../../../jobs/JobQueue';
|
||||
import { EventBus } from '../../../shared/application/EventBus';
|
||||
import { Logger } from '../../../shared/infrastructure/Logger';
|
||||
import { EXPLORATION_JOB_TYPE, ExplorationJobPayload } from '../../../jobs/workers/ExplorationWorker';
|
||||
import { ScheduleFired } from '../domain/events/ScheduleFired';
|
||||
import { UniqueId } from '../../../shared/domain/UniqueId';
|
||||
import { Schedule } from '../domain/entities/Schedule';
|
||||
|
||||
export class SchedulingService {
|
||||
private readonly jobs = new Map<string, cron.ScheduledTask>();
|
||||
|
||||
constructor(
|
||||
private readonly scheduleRepo: IScheduleRepository,
|
||||
private readonly jobQueue: IJobQueue,
|
||||
private readonly eventBus: EventBus,
|
||||
private readonly logger: Logger
|
||||
) {}
|
||||
|
||||
async start(): Promise<void> {
|
||||
const schedules = await this.scheduleRepo.findAll(true);
|
||||
for (const schedule of schedules) {
|
||||
this.registerCron(schedule);
|
||||
}
|
||||
this.logger.info({ count: schedules.length }, 'SchedulingService started');
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const [id, task] of this.jobs) {
|
||||
task.stop();
|
||||
this.logger.debug({ scheduleId: id }, 'Cron job stopped');
|
||||
}
|
||||
this.jobs.clear();
|
||||
this.logger.info('SchedulingService stopped');
|
||||
}
|
||||
|
||||
registerCron(schedule: Schedule): void {
|
||||
this.unregisterCron(schedule.id.toString());
|
||||
|
||||
if (!schedule.enabled) return;
|
||||
if (!cron.validate(schedule.cronExpression.value)) {
|
||||
this.logger.warn({ scheduleId: schedule.id.toString(), cron: schedule.cronExpression.value }, 'Invalid cron, skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
const task = cron.schedule(schedule.cronExpression.value, () => {
|
||||
void this.fire(schedule.id.toString());
|
||||
});
|
||||
|
||||
this.jobs.set(schedule.id.toString(), task);
|
||||
this.logger.info({ scheduleId: schedule.id.toString(), cron: schedule.cronExpression.value }, 'Cron job registered');
|
||||
}
|
||||
|
||||
unregisterCron(scheduleId: string): void {
|
||||
const existing = this.jobs.get(scheduleId);
|
||||
if (existing) {
|
||||
existing.stop();
|
||||
this.jobs.delete(scheduleId);
|
||||
}
|
||||
}
|
||||
|
||||
private async fire(scheduleId: string): Promise<void> {
|
||||
const schedule = await this.scheduleRepo.findById(UniqueId.from(scheduleId));
|
||||
if (!schedule || !schedule.enabled) return;
|
||||
|
||||
this.logger.info({ scheduleId, url: schedule.url }, 'Firing scheduled exploration');
|
||||
|
||||
const payload: ExplorationJobPayload = {
|
||||
sessionId: UniqueId.create().toString(),
|
||||
url: schedule.url,
|
||||
seed: Math.floor(Math.random() * 0x7fffffff),
|
||||
maxStates: (schedule.config['maxStates'] as number | undefined) ?? 50,
|
||||
config: schedule.config,
|
||||
};
|
||||
|
||||
try {
|
||||
const jobId = await this.jobQueue.enqueue<ExplorationJobPayload>(EXPLORATION_JOB_TYPE, payload);
|
||||
|
||||
schedule.markFired(Date.now());
|
||||
await this.scheduleRepo.update(schedule);
|
||||
|
||||
await this.eventBus.publish(
|
||||
new ScheduleFired(scheduleId, { scheduleId, url: schedule.url, jobId })
|
||||
);
|
||||
|
||||
this.logger.info({ scheduleId, jobId, url: schedule.url }, 'Scheduled exploration enqueued');
|
||||
} catch (err: unknown) {
|
||||
this.logger.error(
|
||||
{ scheduleId, err: err instanceof Error ? err.message : String(err) },
|
||||
'Failed to enqueue scheduled exploration'
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
import { UseCase } from '../../../../shared/application/UseCase';
|
||||
import { Result, Ok, Err } from '../../../../shared/domain/Result';
|
||||
import { EventBus } from '../../../../shared/application/EventBus';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
import { Schedule, CreateScheduleProps } from '../../domain/entities/Schedule';
|
||||
|
||||
export type CreateScheduleRequest = CreateScheduleProps;
|
||||
|
||||
export interface CreateScheduleResponse {
|
||||
id: string;
|
||||
name: string;
|
||||
url: string;
|
||||
cronExpression: string;
|
||||
enabled: boolean;
|
||||
nextRunAt: number | null;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
export class CreateScheduleCommand implements UseCase<CreateScheduleRequest, CreateScheduleResponse, string> {
|
||||
constructor(
|
||||
private readonly scheduleRepo: IScheduleRepository,
|
||||
private readonly eventBus: EventBus
|
||||
) {}
|
||||
|
||||
async execute(req: CreateScheduleRequest): Promise<Result<CreateScheduleResponse, string>> {
|
||||
const result = Schedule.create(req);
|
||||
if (!result.ok) return Err(result.error);
|
||||
|
||||
const schedule = result.value;
|
||||
await this.scheduleRepo.save(schedule);
|
||||
|
||||
for (const event of schedule.domainEvents) {
|
||||
await this.eventBus.publish(event);
|
||||
}
|
||||
schedule.clearEvents();
|
||||
|
||||
return Ok({
|
||||
id: schedule.id.toString(),
|
||||
name: schedule.name,
|
||||
url: schedule.url,
|
||||
cronExpression: schedule.cronExpression.value,
|
||||
enabled: schedule.enabled,
|
||||
nextRunAt: schedule.nextRunAt,
|
||||
createdAt: schedule.createdAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
import { UseCase } from '../../../../shared/application/UseCase';
|
||||
import { Result, Ok, Err } from '../../../../shared/domain/Result';
|
||||
import { EventBus } from '../../../../shared/application/EventBus';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
|
||||
export interface DeleteScheduleRequest { id: string }
|
||||
export type DeleteScheduleResponse = void;
|
||||
|
||||
export class DeleteScheduleCommand implements UseCase<DeleteScheduleRequest, DeleteScheduleResponse, string> {
|
||||
constructor(
|
||||
private readonly scheduleRepo: IScheduleRepository,
|
||||
private readonly eventBus: EventBus
|
||||
) {}
|
||||
|
||||
async execute(req: DeleteScheduleRequest): Promise<Result<DeleteScheduleResponse, string>> {
|
||||
const id = UniqueId.from(req.id);
|
||||
const schedule = await this.scheduleRepo.findById(id);
|
||||
if (!schedule) return Err('Schedule not found');
|
||||
|
||||
await this.scheduleRepo.delete(id);
|
||||
void this.eventBus;
|
||||
return Ok(undefined);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
import { UseCase } from '../../../../shared/application/UseCase';
|
||||
import { Result, Ok, Err } from '../../../../shared/domain/Result';
|
||||
import { EventBus } from '../../../../shared/application/EventBus';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
|
||||
export interface ToggleScheduleRequest {
|
||||
id: string;
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export interface ToggleScheduleResponse {
|
||||
id: string;
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export class ToggleScheduleCommand implements UseCase<ToggleScheduleRequest, ToggleScheduleResponse, string> {
|
||||
constructor(
|
||||
private readonly scheduleRepo: IScheduleRepository,
|
||||
private readonly eventBus: EventBus
|
||||
) {}
|
||||
|
||||
async execute(req: ToggleScheduleRequest): Promise<Result<ToggleScheduleResponse, string>> {
|
||||
const schedule = await this.scheduleRepo.findById(UniqueId.from(req.id));
|
||||
if (!schedule) return Err('Schedule not found');
|
||||
|
||||
schedule.toggle(req.enabled);
|
||||
await this.scheduleRepo.update(schedule);
|
||||
|
||||
for (const event of schedule.domainEvents) {
|
||||
await this.eventBus.publish(event);
|
||||
}
|
||||
schedule.clearEvents();
|
||||
|
||||
return Ok({ id: req.id, enabled: req.enabled });
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
import { UseCase } from '../../../../shared/application/UseCase';
|
||||
import { Result, Ok } from '../../../../shared/domain/Result';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
|
||||
export interface ListSchedulesRequest { enabledOnly?: boolean }
|
||||
|
||||
export interface ScheduleDTO {
|
||||
id: string;
|
||||
name: string;
|
||||
url: string;
|
||||
cronExpression: string;
|
||||
config: Record<string, unknown>;
|
||||
enabled: boolean;
|
||||
lastRunAt: number | null;
|
||||
nextRunAt: number | null;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
export class ListSchedulesQuery implements UseCase<ListSchedulesRequest, ScheduleDTO[], string> {
|
||||
constructor(private readonly scheduleRepo: IScheduleRepository) {}
|
||||
|
||||
async execute(req: ListSchedulesRequest): Promise<Result<ScheduleDTO[], string>> {
|
||||
const schedules = await this.scheduleRepo.findAll(req.enabledOnly);
|
||||
return Ok(
|
||||
schedules.map((s) => ({
|
||||
id: s.id.toString(),
|
||||
name: s.name,
|
||||
url: s.url,
|
||||
cronExpression: s.cronExpression.value,
|
||||
config: s.config,
|
||||
enabled: s.enabled,
|
||||
lastRunAt: s.lastRunAt,
|
||||
nextRunAt: s.nextRunAt,
|
||||
createdAt: s.createdAt,
|
||||
}))
|
||||
);
|
||||
}
|
||||
}
|
||||
136
src/modules/scheduling/domain/entities/Schedule.ts
Normal file
136
src/modules/scheduling/domain/entities/Schedule.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
import { AggregateRoot } from '../../../../shared/domain/AggregateRoot';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
import { Result, Ok, Err, isOk } from '../../../../shared/domain/Result';
|
||||
import { CronExpression } from '../value-objects/CronExpression';
|
||||
import { ScheduleCreated } from '../events/ScheduleCreated';
|
||||
import { ScheduleToggled } from '../events/ScheduleToggled';
|
||||
import { z } from 'zod';
|
||||
|
||||
export const CreateScheduleSchema = z.object({
|
||||
name: z.string().min(1).max(100),
|
||||
url: z.string().url(),
|
||||
cronExpression: z.string().min(1),
|
||||
config: z.record(z.string(), z.unknown()).optional().default({}),
|
||||
enabled: z.boolean().optional().default(true),
|
||||
});
|
||||
|
||||
export type CreateScheduleProps = z.infer<typeof CreateScheduleSchema>;
|
||||
|
||||
interface ScheduleProps {
|
||||
name: string;
|
||||
url: string;
|
||||
cronExpression: CronExpression;
|
||||
config: Record<string, unknown>;
|
||||
enabled: boolean;
|
||||
lastRunAt: number | null;
|
||||
nextRunAt: number | null;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
export class Schedule extends AggregateRoot<ScheduleProps> {
|
||||
get name(): string { return this.props.name; }
|
||||
get url(): string { return this.props.url; }
|
||||
get cronExpression(): CronExpression { return this.props.cronExpression; }
|
||||
get config(): Record<string, unknown> { return this.props.config; }
|
||||
get enabled(): boolean { return this.props.enabled; }
|
||||
get lastRunAt(): number | null { return this.props.lastRunAt; }
|
||||
get nextRunAt(): number | null { return this.props.nextRunAt; }
|
||||
get createdAt(): number { return this.props.createdAt; }
|
||||
|
||||
static create(input: CreateScheduleProps): Result<Schedule, string> {
|
||||
const parsed = CreateScheduleSchema.safeParse(input);
|
||||
if (!parsed.success) {
|
||||
return Err(parsed.error.issues.map((e) => e.message).join(', '));
|
||||
}
|
||||
|
||||
const cronResult = CronExpression.create(parsed.data.cronExpression);
|
||||
if (!cronResult.ok) {
|
||||
return Err(cronResult.error);
|
||||
}
|
||||
|
||||
const id = UniqueId.create();
|
||||
const now = Date.now();
|
||||
|
||||
const schedule = new Schedule(
|
||||
{
|
||||
name: parsed.data.name,
|
||||
url: parsed.data.url,
|
||||
cronExpression: cronResult.value,
|
||||
config: parsed.data.config,
|
||||
enabled: parsed.data.enabled,
|
||||
lastRunAt: null,
|
||||
nextRunAt: now + 60_000, // approximate next run
|
||||
createdAt: now,
|
||||
},
|
||||
id
|
||||
);
|
||||
|
||||
schedule.addDomainEvent(
|
||||
new ScheduleCreated(id.toString(), {
|
||||
name: parsed.data.name,
|
||||
url: parsed.data.url,
|
||||
cronExpression: parsed.data.cronExpression,
|
||||
})
|
||||
);
|
||||
|
||||
return Ok(schedule);
|
||||
}
|
||||
|
||||
static reconstitute(
|
||||
id: string,
|
||||
props: {
|
||||
name: string;
|
||||
url: string;
|
||||
cronExpression: string;
|
||||
config: Record<string, unknown>;
|
||||
enabled: boolean;
|
||||
lastRunAt: number | null;
|
||||
nextRunAt: number | null;
|
||||
createdAt: number;
|
||||
}
|
||||
): Schedule {
|
||||
const cronResult = CronExpression.create(props.cronExpression);
|
||||
// If stored cron is invalid, store raw value — shouldn't happen in practice
|
||||
const cronExpr = isOk(cronResult)
|
||||
? cronResult.value
|
||||
: ({ props: { value: props.cronExpression }, value: props.cronExpression } as unknown as CronExpression);
|
||||
|
||||
return new Schedule(
|
||||
{
|
||||
name: props.name,
|
||||
url: props.url,
|
||||
cronExpression: cronExpr,
|
||||
config: props.config,
|
||||
enabled: props.enabled,
|
||||
lastRunAt: props.lastRunAt,
|
||||
nextRunAt: props.nextRunAt,
|
||||
createdAt: props.createdAt,
|
||||
},
|
||||
UniqueId.from(id)
|
||||
);
|
||||
}
|
||||
|
||||
toggle(enabled: boolean): void {
|
||||
this.props.enabled = enabled;
|
||||
this.addDomainEvent(
|
||||
new ScheduleToggled(this.id.toString(), { enabled })
|
||||
);
|
||||
}
|
||||
|
||||
markFired(now: number): void {
|
||||
this.props.lastRunAt = now;
|
||||
this.props.nextRunAt = now + 60_000; // approximate
|
||||
}
|
||||
|
||||
update(fields: { name?: string; url?: string; cronExpression?: string; config?: Record<string, unknown> }): Result<void, string> {
|
||||
if (fields.cronExpression !== undefined) {
|
||||
const cronResult = CronExpression.create(fields.cronExpression);
|
||||
if (!cronResult.ok) return Err(cronResult.error);
|
||||
this.props.cronExpression = cronResult.value;
|
||||
}
|
||||
if (fields.name !== undefined) this.props.name = fields.name;
|
||||
if (fields.url !== undefined) this.props.url = fields.url;
|
||||
if (fields.config !== undefined) this.props.config = fields.config;
|
||||
return Ok(undefined);
|
||||
}
|
||||
}
|
||||
13
src/modules/scheduling/domain/events/ScheduleCreated.ts
Normal file
13
src/modules/scheduling/domain/events/ScheduleCreated.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { DomainEvent } from '../../../../shared/domain/DomainEvent';
|
||||
import { randomUUID } from 'crypto';
|
||||
|
||||
export class ScheduleCreated implements DomainEvent {
|
||||
readonly eventId = randomUUID();
|
||||
readonly eventName = 'scheduling.schedule_created';
|
||||
readonly occurredOn = new Date();
|
||||
|
||||
constructor(
|
||||
readonly aggregateId: string,
|
||||
readonly payload: { name: string; url: string; cronExpression: string }
|
||||
) {}
|
||||
}
|
||||
13
src/modules/scheduling/domain/events/ScheduleFired.ts
Normal file
13
src/modules/scheduling/domain/events/ScheduleFired.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { DomainEvent } from '../../../../shared/domain/DomainEvent';
|
||||
import { randomUUID } from 'crypto';
|
||||
|
||||
export class ScheduleFired implements DomainEvent {
|
||||
readonly eventId = randomUUID();
|
||||
readonly eventName = 'scheduling.schedule_fired';
|
||||
readonly occurredOn = new Date();
|
||||
|
||||
constructor(
|
||||
readonly aggregateId: string,
|
||||
readonly payload: { scheduleId: string; url: string; jobId: string }
|
||||
) {}
|
||||
}
|
||||
13
src/modules/scheduling/domain/events/ScheduleToggled.ts
Normal file
13
src/modules/scheduling/domain/events/ScheduleToggled.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { DomainEvent } from '../../../../shared/domain/DomainEvent';
|
||||
import { randomUUID } from 'crypto';
|
||||
|
||||
export class ScheduleToggled implements DomainEvent {
|
||||
readonly eventId = randomUUID();
|
||||
readonly eventName = 'scheduling.schedule_toggled';
|
||||
readonly occurredOn = new Date();
|
||||
|
||||
constructor(
|
||||
readonly aggregateId: string,
|
||||
readonly payload: { enabled: boolean }
|
||||
) {}
|
||||
}
|
||||
10
src/modules/scheduling/domain/ports/IScheduleRepository.ts
Normal file
10
src/modules/scheduling/domain/ports/IScheduleRepository.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { Schedule } from '../entities/Schedule';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
|
||||
export interface IScheduleRepository {
|
||||
save(schedule: Schedule): Promise<void>;
|
||||
findById(id: UniqueId): Promise<Schedule | null>;
|
||||
findAll(enabledOnly?: boolean): Promise<Schedule[]>;
|
||||
update(schedule: Schedule): Promise<void>;
|
||||
delete(id: UniqueId): Promise<void>;
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
import { ValueObject } from '../../../../shared/domain/ValueObject';
|
||||
import { Result, Ok, Err } from '../../../../shared/domain/Result';
|
||||
import * as cron from 'node-cron';
|
||||
|
||||
interface CronExpressionProps {
|
||||
value: string;
|
||||
}
|
||||
|
||||
export class CronExpression extends ValueObject<CronExpressionProps> {
|
||||
get value(): string {
|
||||
return this.props.value;
|
||||
}
|
||||
|
||||
static create(expression: string): Result<CronExpression, string> {
|
||||
if (!expression || expression.trim().length === 0) {
|
||||
return Err('Cron expression cannot be empty');
|
||||
}
|
||||
if (!cron.validate(expression)) {
|
||||
return Err(`Invalid cron expression: "${expression}"`);
|
||||
}
|
||||
return Ok(new CronExpression({ value: expression }));
|
||||
}
|
||||
}
|
||||
11
src/modules/scheduling/index.ts
Normal file
11
src/modules/scheduling/index.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
// Scheduling module public API
|
||||
export { Schedule } from './domain/entities/Schedule';
|
||||
export { CronExpression } from './domain/value-objects/CronExpression';
|
||||
export { IScheduleRepository } from './domain/ports/IScheduleRepository';
|
||||
export { CreateScheduleCommand } from './application/commands/CreateScheduleCommand';
|
||||
export { ToggleScheduleCommand } from './application/commands/ToggleScheduleCommand';
|
||||
export { DeleteScheduleCommand } from './application/commands/DeleteScheduleCommand';
|
||||
export { ListSchedulesQuery } from './application/queries/ListSchedulesQuery';
|
||||
export { SchedulingService } from './application/SchedulingService';
|
||||
export { KyselyScheduleRepository } from './infrastructure/repositories/KyselyScheduleRepository';
|
||||
export { createSchedulingRouter, SchedulingControllerDeps } from './infrastructure/http/SchedulingController';
|
||||
@@ -0,0 +1,110 @@
|
||||
import { Router, Request, Response } from 'express';
|
||||
import { CreateScheduleCommand } from '../../application/commands/CreateScheduleCommand';
|
||||
import { ToggleScheduleCommand } from '../../application/commands/ToggleScheduleCommand';
|
||||
import { DeleteScheduleCommand } from '../../application/commands/DeleteScheduleCommand';
|
||||
import { ListSchedulesQuery } from '../../application/queries/ListSchedulesQuery';
|
||||
import { SchedulingService } from '../../application/SchedulingService';
|
||||
import { Schedule } from '../../domain/entities/Schedule';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
|
||||
export interface SchedulingControllerDeps {
|
||||
createSchedule: CreateScheduleCommand;
|
||||
toggleSchedule: ToggleScheduleCommand;
|
||||
deleteSchedule: DeleteScheduleCommand;
|
||||
listSchedules: ListSchedulesQuery;
|
||||
schedulingService: SchedulingService;
|
||||
scheduleRepo: IScheduleRepository;
|
||||
}
|
||||
|
||||
export function createSchedulingRouter(deps: SchedulingControllerDeps): Router {
|
||||
const router = Router();
|
||||
const { createSchedule, toggleSchedule, deleteSchedule, listSchedules, schedulingService, scheduleRepo } = deps;
|
||||
|
||||
// GET /api/schedules
|
||||
router.get('/', async (_req: Request, res: Response) => {
|
||||
const result = await listSchedules.execute({});
|
||||
if (!result.ok) {
|
||||
res.status(500).json({ error: result.error });
|
||||
return;
|
||||
}
|
||||
res.json(result.value);
|
||||
});
|
||||
|
||||
// POST /api/schedules
|
||||
router.post('/', async (req: Request, res: Response) => {
|
||||
const body = req.body as {
|
||||
name?: string;
|
||||
url?: string;
|
||||
cronExpression?: string;
|
||||
config?: Record<string, unknown>;
|
||||
enabled?: boolean;
|
||||
};
|
||||
|
||||
const result = await createSchedule.execute({
|
||||
name: body.name ?? '',
|
||||
url: body.url ?? '',
|
||||
cronExpression: body.cronExpression ?? '',
|
||||
config: body.config ?? {},
|
||||
enabled: body.enabled !== false,
|
||||
});
|
||||
|
||||
if (!result.ok) {
|
||||
res.status(400).json({ error: result.error });
|
||||
return;
|
||||
}
|
||||
|
||||
// Register cron after creation
|
||||
const schedule = await scheduleRepo.findById(UniqueId.from(result.value.id));
|
||||
if (schedule) {
|
||||
schedulingService.registerCron(schedule as Schedule);
|
||||
}
|
||||
|
||||
res.status(201).json(result.value);
|
||||
});
|
||||
|
||||
// PATCH /api/schedules/:id/toggle
|
||||
router.patch('/:id/toggle', async (req: Request, res: Response) => {
|
||||
const id = String(req.params['id']);
|
||||
const { enabled } = req.body as { enabled?: boolean };
|
||||
|
||||
if (enabled === undefined) {
|
||||
res.status(400).json({ error: 'enabled is required' });
|
||||
return;
|
||||
}
|
||||
|
||||
const result = await toggleSchedule.execute({ id, enabled });
|
||||
if (!result.ok) {
|
||||
res.status(result.error === 'Schedule not found' ? 404 : 400).json({ error: result.error });
|
||||
return;
|
||||
}
|
||||
|
||||
// Update cron registration
|
||||
const schedule = await scheduleRepo.findById(UniqueId.from(id));
|
||||
if (schedule) {
|
||||
if (enabled) {
|
||||
schedulingService.registerCron(schedule as Schedule);
|
||||
} else {
|
||||
schedulingService.unregisterCron(id);
|
||||
}
|
||||
}
|
||||
|
||||
res.json(result.value);
|
||||
});
|
||||
|
||||
// DELETE /api/schedules/:id
|
||||
router.delete('/:id', async (req: Request, res: Response) => {
|
||||
const id = String(req.params['id']);
|
||||
schedulingService.unregisterCron(id);
|
||||
|
||||
const result = await deleteSchedule.execute({ id });
|
||||
if (!result.ok) {
|
||||
res.status(result.error === 'Schedule not found' ? 404 : 400).json({ error: result.error });
|
||||
return;
|
||||
}
|
||||
|
||||
res.status(204).send();
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
import { Kysely } from 'kysely';
|
||||
import { Database } from '../../../../shared/infrastructure/DatabaseConnection';
|
||||
import { IScheduleRepository } from '../../domain/ports/IScheduleRepository';
|
||||
import { Schedule } from '../../domain/entities/Schedule';
|
||||
import { UniqueId } from '../../../../shared/domain/UniqueId';
|
||||
|
||||
export class KyselyScheduleRepository implements IScheduleRepository {
|
||||
constructor(private readonly db: Kysely<Database>) {}
|
||||
|
||||
async save(schedule: Schedule): Promise<void> {
|
||||
await this.db
|
||||
.insertInto('schedules')
|
||||
.values({
|
||||
id: schedule.id.toString(),
|
||||
name: schedule.name,
|
||||
url: schedule.url,
|
||||
config_json: JSON.stringify(schedule.config),
|
||||
cron_expression: schedule.cronExpression.value,
|
||||
enabled: schedule.enabled ? 1 : 0,
|
||||
last_run_at: schedule.lastRunAt,
|
||||
next_run_at: schedule.nextRunAt,
|
||||
created_at: schedule.createdAt,
|
||||
})
|
||||
.execute();
|
||||
}
|
||||
|
||||
async findById(id: UniqueId): Promise<Schedule | null> {
|
||||
const row = await this.db
|
||||
.selectFrom('schedules')
|
||||
.selectAll()
|
||||
.where('id', '=', id.toString())
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!row) return null;
|
||||
return this.toEntity(row);
|
||||
}
|
||||
|
||||
async findAll(enabledOnly = false): Promise<Schedule[]> {
|
||||
let query = this.db.selectFrom('schedules').selectAll().orderBy('created_at', 'desc');
|
||||
if (enabledOnly) {
|
||||
query = query.where('enabled', '=', 1);
|
||||
}
|
||||
const rows = await query.execute();
|
||||
return rows.map((r) => this.toEntity(r));
|
||||
}
|
||||
|
||||
async update(schedule: Schedule): Promise<void> {
|
||||
await this.db
|
||||
.updateTable('schedules')
|
||||
.set({
|
||||
name: schedule.name,
|
||||
url: schedule.url,
|
||||
config_json: JSON.stringify(schedule.config),
|
||||
cron_expression: schedule.cronExpression.value,
|
||||
enabled: schedule.enabled ? 1 : 0,
|
||||
last_run_at: schedule.lastRunAt,
|
||||
next_run_at: schedule.nextRunAt,
|
||||
})
|
||||
.where('id', '=', schedule.id.toString())
|
||||
.execute();
|
||||
}
|
||||
|
||||
async delete(id: UniqueId): Promise<void> {
|
||||
await this.db.deleteFrom('schedules').where('id', '=', id.toString()).execute();
|
||||
}
|
||||
|
||||
private toEntity(row: {
|
||||
id: string;
|
||||
name: string;
|
||||
url: string;
|
||||
config_json: string;
|
||||
cron_expression: string;
|
||||
enabled: number;
|
||||
last_run_at: number | null;
|
||||
next_run_at: number | null;
|
||||
created_at: number;
|
||||
}): Schedule {
|
||||
return Schedule.reconstitute(row.id, {
|
||||
name: row.name,
|
||||
url: row.url,
|
||||
cronExpression: row.cron_expression,
|
||||
config: JSON.parse(row.config_json) as Record<string, unknown>,
|
||||
enabled: row.enabled === 1,
|
||||
lastRunAt: row.last_run_at,
|
||||
nextRunAt: row.next_run_at,
|
||||
createdAt: row.created_at,
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user