Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Backend:
- intel_service: remove 50-technique limit (scan all techniques), improve
pattern matching with word boundaries (\bT1059\b), raise min name length
to 8 chars to reduce false positives, skip entries with empty titles
- technique_query_service: add intel_items to get_technique_detail() so
the technique page now shows recent threat intel articles (last 20)
- New GET /intel/items endpoint with optional technique_id filter
Frontend:
- New api/intel.ts with listIntelItems()
- ReviewQueuePage: complete redesign
* Expandable rows — click a technique to see its intel articles inline
* IntelPanel component fetches articles per technique on expand
* 'Create Template from Intel' button opens pre-filled modal:
name (from article title), source_url (article link), technique_id
User reads the article and fills the attack procedure
* Updated explanation text: lists all 3 reasons a technique can be flagged
(MITRE update / intel scan / new template or detection rule)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
231 lines
10 KiB
Python
231 lines
10 KiB
Python
import logging
|
|
import os
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI, Request, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.exceptions import RequestValidationError
|
|
from slowapi import _rate_limit_exceeded_handler
|
|
from slowapi.errors import RateLimitExceeded
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app.routers import auth as auth_router
|
|
from app.routers import techniques as techniques_router
|
|
from app.routers import tests as tests_router
|
|
from app.routers import evidence as evidence_router
|
|
from app.routers import test_templates as test_templates_router
|
|
from app.routers import system as system_router
|
|
from app.routers import metrics as metrics_router
|
|
from app.routers import users as users_router
|
|
from app.routers import audit as audit_router
|
|
from app.routers import notifications as notifications_router
|
|
from app.routers import reports as reports_router
|
|
from app.routers import data_sources as data_sources_router
|
|
from app.routers import threat_actors as threat_actors_router
|
|
from app.routers import d3fend as d3fend_router
|
|
from app.routers import detection_rules as detection_rules_router
|
|
from app.routers import campaigns as campaigns_router
|
|
from app.routers import heatmap as heatmap_router
|
|
from app.routers import scores as scores_router
|
|
from app.routers import operational_metrics as operational_metrics_router
|
|
from app.routers import compliance as compliance_router
|
|
from app.routers import snapshots as snapshots_router
|
|
from app.routers import jira as jira_router
|
|
from app.routers import worklogs as worklogs_router
|
|
from app.routers import professional_reports as professional_reports_router
|
|
from app.routers import analytics as analytics_router
|
|
from app.routers import advanced_metrics as advanced_metrics_router
|
|
from app.routers import osint as osint_router
|
|
from app.routers import webhooks as webhooks_router
|
|
from app.routers import detection_lifecycle as detection_lifecycle_router
|
|
from app.routers import intel as intel_router
|
|
from app.routers import ownership as ownership_router
|
|
from app.routers import attack_paths as attack_paths_router
|
|
from app.routers import knowledge as knowledge_router
|
|
from app.routers import risk_intelligence as risk_router
|
|
from app.routers import executive_dashboard as dashboard_router
|
|
from app.routers import api_keys as api_keys_router
|
|
from app.routers import sso as sso_router
|
|
from app.routers import operational_alerts as alerts_router
|
|
from app.domain.errors import DomainError
|
|
from app.middleware.error_handler import domain_exception_handler
|
|
from app.middleware.request_context import RequestContextMiddleware
|
|
from app.limiter import limiter
|
|
from app.storage import ensure_bucket_exists
|
|
from app.jobs.mitre_sync_job import start_scheduler, scheduler
|
|
|
|
# ── Environment detection ─────────────────────────────────────────────────
|
|
_IS_PRODUCTION = os.environ.get("AEGIS_ENV", "").lower() == "production"
|
|
|
|
# ── Logging ───────────────────────────────────────────────────────────────
|
|
from app.logging_config import setup_logging
|
|
|
|
setup_logging()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Startup / shutdown logic."""
|
|
ensure_bucket_exists()
|
|
start_scheduler()
|
|
# Seed decay policies
|
|
from app.database import SessionLocal
|
|
from app.seed_decay_policies import seed_decay_policies
|
|
db = SessionLocal()
|
|
try:
|
|
seed_decay_policies(db)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db.close()
|
|
# Seed operational alert system rules
|
|
db2 = SessionLocal()
|
|
try:
|
|
from app.services.operational_alert_service import seed_system_rules
|
|
seed_system_rules(db2)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db2.close()
|
|
yield
|
|
# Graceful shutdown of the background scheduler
|
|
scheduler.shutdown(wait=False)
|
|
|
|
|
|
# ── In production, disable Swagger UI and ReDoc to hide API surface ──────
|
|
app = FastAPI(
|
|
title="Attack Coverage Platform",
|
|
lifespan=lifespan,
|
|
docs_url=None if _IS_PRODUCTION else "/docs",
|
|
redoc_url=None if _IS_PRODUCTION else "/redoc",
|
|
openapi_url=None if _IS_PRODUCTION else "/openapi.json",
|
|
)
|
|
|
|
# ── Rate Limiter ──────────────────────────────────────────────────────────
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
|
|
app.add_middleware(RequestContextMiddleware)
|
|
|
|
# ── Domain exception → HTTP mapping ──────────────────────────────────────
|
|
app.add_exception_handler(DomainError, domain_exception_handler)
|
|
|
|
# ── CORS ──────────────────────────────────────────────────────────────────
|
|
from app.config import settings as _settings
|
|
|
|
_cors_origins: list[str] = [
|
|
o.strip() for o in _settings.CORS_ORIGINS.split(",") if o.strip()
|
|
]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=_cors_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
|
|
allow_headers=["Authorization", "Content-Type"],
|
|
)
|
|
|
|
# ── Routers ──────────────────────────────────────────────────────────────
|
|
app.include_router(auth_router.router, prefix="/api/v1")
|
|
app.include_router(techniques_router.router, prefix="/api/v1")
|
|
app.include_router(tests_router.router, prefix="/api/v1")
|
|
app.include_router(evidence_router.router, prefix="/api/v1")
|
|
app.include_router(test_templates_router.router, prefix="/api/v1")
|
|
app.include_router(system_router.router, prefix="/api/v1")
|
|
app.include_router(metrics_router.router, prefix="/api/v1")
|
|
app.include_router(users_router.router, prefix="/api/v1")
|
|
app.include_router(audit_router.router, prefix="/api/v1")
|
|
app.include_router(notifications_router.router, prefix="/api/v1")
|
|
app.include_router(reports_router.router, prefix="/api/v1")
|
|
app.include_router(data_sources_router.router, prefix="/api/v1")
|
|
app.include_router(threat_actors_router.router, prefix="/api/v1")
|
|
app.include_router(d3fend_router.router, prefix="/api/v1")
|
|
app.include_router(detection_rules_router.router, prefix="/api/v1")
|
|
app.include_router(campaigns_router.router, prefix="/api/v1")
|
|
app.include_router(heatmap_router.router, prefix="/api/v1")
|
|
app.include_router(scores_router.router, prefix="/api/v1")
|
|
app.include_router(operational_metrics_router.router, prefix="/api/v1")
|
|
app.include_router(compliance_router.router, prefix="/api/v1")
|
|
app.include_router(intel_router.router, prefix="/api/v1")
|
|
app.include_router(snapshots_router.router, prefix="/api/v1")
|
|
app.include_router(jira_router.router, prefix="/api/v1")
|
|
app.include_router(worklogs_router.router, prefix="/api/v1")
|
|
app.include_router(professional_reports_router.router, prefix="/api/v1")
|
|
app.include_router(analytics_router.router, prefix="/api/v1")
|
|
app.include_router(advanced_metrics_router.router, prefix="/api/v1")
|
|
app.include_router(osint_router.router, prefix="/api/v1")
|
|
app.include_router(webhooks_router.router, prefix="/api/v1")
|
|
app.include_router(detection_lifecycle_router.router, prefix="/api/v1")
|
|
app.include_router(ownership_router.router, prefix="/api/v1")
|
|
app.include_router(attack_paths_router.router, prefix="/api/v1")
|
|
app.include_router(knowledge_router.router, prefix="/api/v1")
|
|
app.include_router(risk_router.router, prefix="/api/v1")
|
|
app.include_router(dashboard_router.router, prefix="/api/v1")
|
|
app.include_router(api_keys_router.router, prefix="/api/v1")
|
|
app.include_router(sso_router.router, prefix="/api/v1")
|
|
app.include_router(alerts_router.router, prefix="/api/v1")
|
|
|
|
|
|
@app.get("/health", include_in_schema=False)
|
|
def health():
|
|
"""Minimal health check — returns only an HTTP 200 with no service metadata.
|
|
|
|
Access is restricted to internal networks at the Nginx level
|
|
(see ``frontend/nginx.conf``).
|
|
"""
|
|
return {"status": "ok"}
|
|
|
|
|
|
# ── Exception Handlers ────────────────────────────────────────────────────
|
|
|
|
|
|
def _serialize_validation_errors(exc: RequestValidationError) -> list[dict]:
|
|
"""Return validation errors safe for JSON (no raw exception objects)."""
|
|
serialized: list[dict] = []
|
|
for err in exc.errors():
|
|
item = dict(err)
|
|
ctx = item.get("ctx")
|
|
if isinstance(ctx, dict):
|
|
item["ctx"] = {key: str(value) for key, value in ctx.items()}
|
|
serialized.append(item)
|
|
return serialized
|
|
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
|
"""Handle validation errors with consistent format."""
|
|
return JSONResponse(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
content={
|
|
"detail": "Validation error",
|
|
"code": "VALIDATION_ERROR",
|
|
"errors": _serialize_validation_errors(exc),
|
|
},
|
|
)
|
|
|
|
|
|
@app.exception_handler(SQLAlchemyError)
|
|
async def sqlalchemy_exception_handler(request: Request, exc: SQLAlchemyError):
|
|
"""Handle database errors."""
|
|
logging.error(f"Database error: {exc}")
|
|
return JSONResponse(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
content={
|
|
"detail": "Database error occurred",
|
|
"code": "DATABASE_ERROR",
|
|
},
|
|
)
|
|
|
|
|
|
@app.exception_handler(Exception)
|
|
async def general_exception_handler(request: Request, exc: Exception):
|
|
"""Handle all unhandled exceptions."""
|
|
logging.error(f"Unhandled exception: {exc}")
|
|
return JSONResponse(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
content={
|
|
"detail": "An internal server error occurred",
|
|
"code": "INTERNAL_ERROR",
|
|
},
|
|
)
|