/** * ABE API Server * Express + socket.io on port 3001. * Manages exploration sessions and serves REST + WebSocket API. */ import express, { Request, Response, NextFunction } from 'express'; import cors from 'cors'; import http from 'http'; import rateLimit from 'express-rate-limit'; import { Server as SocketIOServer } from 'socket.io'; import { createSessionRouter } from './routes/sessions'; import { createAnomalyRouter } from './routes/anomalies'; import { createConfigRouter } from './routes/config'; import { createScheduleRouter } from './routes/schedules'; import { createVisualRouter } from './routes/visual'; import { SessionStore } from './SessionStore'; import { apiKeyAuth } from './middleware/auth'; import { log } from './logger'; import { SchedulerService } from './scheduler/SchedulerService'; import { ScheduleRepository } from '../db/ScheduleRepository'; import { VisualBaselineRepository } from '../db/VisualBaselineRepository'; import { AIEnrichmentService } from './enrichment/AIEnrichmentService'; const PORT = process.env['ABE_PORT'] ? parseInt(process.env['ABE_PORT'], 10) : process.env['PORT'] ? parseInt(process.env['PORT'], 10) : 3001; export function createApp( store: SessionStore, dbCheck?: () => boolean, scheduleRepo?: ScheduleRepository, scheduler?: SchedulerService, visualRepo?: VisualBaselineRepository, enrichmentService?: AIEnrichmentService ) { const corsOrigin = process.env['ABE_CORS_ORIGIN'] ?? 'http://localhost:5173'; const app = express(); app.use(cors({ origin: corsOrigin })); app.use(express.json()); // Health endpoints — no auth required app.get('/health', (_req: Request, res: Response) => { const uptime = Math.floor(process.uptime()); res.json({ status: 'ok', version: '0.1.0', uptime_seconds: uptime }); }); app.get('/ready', (_req: Request, res: Response) => { const stats = store.getStats(); if (dbCheck && !dbCheck()) { res.status(503).json({ status: 'not_ready', db: 'disconnected', active_sessions: stats.runningSessions }); return; } res.json({ status: 'ready', db: 'connected', active_sessions: stats.runningSessions }); }); // Apply API key auth to all /api/* routes app.use('/api', apiKeyAuth); // Global rate limit: 200 req/min const globalLimiter = rateLimit({ windowMs: 60 * 1000, max: 200, standardHeaders: true, legacyHeaders: false, }); app.use('/api', globalLimiter); // POST /api/sessions rate limit: 20/hour const sessionCreateLimiter = rateLimit({ windowMs: 60 * 60 * 1000, max: 20, standardHeaders: true, legacyHeaders: false, }); app.post('/api/sessions', sessionCreateLimiter); app.get('/api/stats', (_req: Request, res: Response) => { res.json(store.getStats()); }); app.use('/api/sessions', createSessionRouter(store)); app.use('/api/anomalies', createAnomalyRouter(store, enrichmentService)); app.use('/api/config', createConfigRouter()); if (scheduleRepo && scheduler) { app.use('/api/schedules', createScheduleRouter(scheduleRepo, scheduler)); } if (visualRepo) { app.use('/api/visual', createVisualRouter(visualRepo)); } // Global error handler app.use((err: unknown, _req: Request, res: Response, _next: NextFunction) => { const isDev = process.env['NODE_ENV'] !== 'production'; const message = isDev && err instanceof Error ? err.message : 'Internal server error'; res.status(500).json({ error: message, code: 'INTERNAL_ERROR', timestamp: Date.now(), }); }); return app; } export function createServer( store: SessionStore, dbCheck?: () => boolean, scheduleRepo?: ScheduleRepository, scheduler?: SchedulerService, visualRepo?: VisualBaselineRepository ) { const corsOrigin = process.env['ABE_CORS_ORIGIN'] ?? 'http://localhost:5173'; // Deferred emitter: AIEnrichmentService is created before io, using a closure let ioEmit: (event: string, payload: unknown) => void = () => undefined; const enrichmentService = new AIEnrichmentService((event, payload) => ioEmit(event, payload)); const app = createApp(store, dbCheck, scheduleRepo, scheduler, visualRepo, enrichmentService); const httpServer = http.createServer(app); const io = new SocketIOServer(httpServer, { cors: { origin: corsOrigin }, }); // Now wire the real io emitter ioEmit = (event, payload) => io.emit(event, payload); io.on('connection', (socket) => { socket.on('session:stop', (data: { sessionId: string }) => { store.stopSession(data.sessionId); }); }); store.setEmitter((event, payload) => { io.emit(event, payload); // Auto-enrich high/critical anomalies if (event === 'anomaly:detected') { const p = payload as { anomalyId?: string }; if (p?.anomalyId) { const anomaly = store.getAnomaly(p.anomalyId); if (anomaly && enrichmentService.shouldAutoEnrich(anomaly)) { const context = { domSnapshot: '', httpLog: anomaly.evidence.httpLog ?? [], consoleErrors: anomaly.evidence.rawErrors ?? [], actionTrace: anomaly.actionTrace, pageTitle: '', url: anomaly.actionTrace[anomaly.actionTrace.length - 1]?.url ?? '', }; void enrichmentService.enrich(anomaly, context); } } } }); return httpServer; } if (require.main === module) { // eslint-disable-next-line @typescript-eslint/no-require-imports const { getDb } = require('../db/connection') as typeof import('../db/connection'); // eslint-disable-next-line @typescript-eslint/no-require-imports const { SessionRepository } = require('../db/SessionRepository') as typeof import('../db/SessionRepository'); // eslint-disable-next-line @typescript-eslint/no-require-imports const { AnomalyRepository } = require('../db/AnomalyRepository') as typeof import('../db/AnomalyRepository'); // eslint-disable-next-line @typescript-eslint/no-require-imports const { NotificationService } = require('./notifications/NotificationService') as typeof import('./notifications/NotificationService'); const db = getDb(); const sessionRepo = new SessionRepository(db); const anomalyRepo = new AnomalyRepository(db); const notificationService = new NotificationService({ persister: (record) => { db.prepare( `INSERT OR REPLACE INTO notifications (id, anomaly_id, channel, status, sent_at, error) VALUES (?, ?, ?, ?, ?, ?)` ).run( record.id, record.anomalyId, record.channel, record.status, record.sentAt ?? null, record.error ?? null ); }, }); const outputDir = process.env['ABE_REPORTS_DIR'] ?? './reports'; const maxConcurrent = process.env['ABE_MAX_CONCURRENT_SESSIONS'] ? parseInt(process.env['ABE_MAX_CONCURRENT_SESSIONS'], 10) : 3; // eslint-disable-next-line @typescript-eslint/no-require-imports const { VisualBaselineRepository: VisualRepo } = require('../db/VisualBaselineRepository') as typeof import('../db/VisualBaselineRepository'); const visualRepo = new VisualRepo(db); const store = new SessionStore(outputDir, sessionRepo, anomalyRepo, maxConcurrent, notificationService, visualRepo); const dbCheck = (): boolean => { try { db.prepare('SELECT 1').run(); return true; } catch { return false; } }; // Scheduler // eslint-disable-next-line @typescript-eslint/no-require-imports const { ScheduleRepository: SchedRepo } = require('../db/ScheduleRepository') as typeof import('../db/ScheduleRepository'); // eslint-disable-next-line @typescript-eslint/no-require-imports const { SchedulerService: SchedSvc } = require('./scheduler/SchedulerService') as typeof import('./scheduler/SchedulerService'); const scheduleRepo = new SchedRepo(db); const scheduler = new SchedSvc(scheduleRepo, store); scheduler.start(); const server = createServer(store, dbCheck, scheduleRepo, scheduler, visualRepo); // Graceful shutdown let shuttingDown = false; function shutdown(signal: string): void { if (shuttingDown) return; shuttingDown = true; log.info({ signal }, 'Graceful shutdown initiated'); scheduler.stop(); server.close(() => { try { db.close(); } catch { /* ignore */ } process.exit(0); }); setTimeout(() => { log.error('Forced shutdown after 30s'); process.exit(1); }, 30000); } process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); server.listen(PORT, () => { log.info({ port: PORT }, 'ABE API server listening'); }); }