Files
Aegis/backend/app/routers/system.py
kitos 802e8f862b
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(evaluations): bulk approve evaluation tests with 4-step confirmation modal
Backend:
- POST /system/attck-evaluations/bulk-approve: finds all [EVAL R*] tests in
  in_review state, approves blue side, transitions to validated, recalculates
  technique statuses, audit logs each test
- GET /system/attck-evaluations/pending-count: returns count of pending eval tests

Frontend:
- BulkApproveModal: 4 mandatory checkboxes before confirm button enables
  (lab env / not org detection / metrics impact / spot-check recommendation)
- Bulk Approve button in header badge showing pending count
- Green result banner showing approved tests + techniques recalculated
- Invalidates techniques, metrics and review-queue queries on success

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 16:53:00 +02:00

770 lines
25 KiB
Python

"""System-level endpoints (admin only).
Provides manual triggers for background operations such as the MITRE
ATT&CK synchronisation, intel scanning, Atomic Red Team import, and
scheduler health introspection.
Also exposes email configuration CRUD (admin only) that writes to the
system_configs table so settings survive container restarts.
"""
import logging
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import SessionLocal, get_db
from app.dependencies.auth import get_current_user, require_role
from app.models.user import User
from app.services.mitre_sync_service import sync_mitre
from app.services.intel_service import scan_intel
from app.services.atomic_import_service import import_atomic_red_team
from app.jobs.mitre_sync_job import scheduler
from app.limiter import limiter
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/system", tags=["system"])
# ---------------------------------------------------------------------------
# Pydantic schemas for email config
# ---------------------------------------------------------------------------
class EmailConfigOut(BaseModel):
enabled: bool
host: str
port: int
username: str
from_email: str
use_tls: bool
# password is never returned
class EmailConfigUpdate(BaseModel):
enabled: Optional[bool] = None
host: Optional[str] = None
port: Optional[int] = None
username: Optional[str] = None
password: Optional[str] = None
from_email: Optional[str] = None
use_tls: Optional[bool] = None
class EmailTestRequest(BaseModel):
to: str
# ---------------------------------------------------------------------------
# Helpers for system_configs CRUD
# ---------------------------------------------------------------------------
_SMTP_KEYS = {
"enabled": "smtp.enabled",
"host": "smtp.host",
"port": "smtp.port",
"username": "smtp.username",
"password": "smtp.password",
"from_email": "smtp.from_email",
"use_tls": "smtp.use_tls",
}
def _upsert_config(db: Session, key: str, value: str) -> None:
from app.models.system_config import SystemConfig # lazy import avoids circular
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
else:
row = SystemConfig(key=key, value=value)
db.add(row)
def _read_email_config_from_db(db: Session) -> dict:
"""Return a dict with resolved email settings (DB overrides env)."""
from app.services.email_service import _get_smtp_config
return _get_smtp_config(db)
def _bg_mitre_sync() -> None:
"""Run MITRE sync in a background task with its own DB session."""
logger.info("Background MITRE sync task starting...")
db = SessionLocal()
try:
summary = sync_mitre(db)
logger.info("Background MITRE sync task finished — %s", summary)
except Exception:
logger.exception("Background MITRE sync task failed")
finally:
db.close()
@router.post("/sync-mitre")
@limiter.limit("2/hour")
def trigger_mitre_sync(
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(require_role("admin")),
):
"""Manually trigger a MITRE ATT&CK synchronisation in the background.
**Requires** the ``admin`` role.
Returns immediately — the sync runs asynchronously. Poll
``/system/scheduler-status`` for progress, or check server logs.
"""
background_tasks.add_task(_bg_mitre_sync)
return {
"message": "MITRE sync started in background",
"status": "started",
"new": 0,
"updated": 0,
}
@router.post("/run-intel-scan")
def trigger_intel_scan(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Manually trigger a threat-intelligence scan.
**Requires** the ``admin`` role.
Returns a JSON object with the scan summary including the count of
new intel items found.
"""
summary = scan_intel(db)
return {
"message": "Intel scan completed",
"new_items": summary["new_items"],
}
@router.post("/import-atomic-tests")
@limiter.limit("2/hour")
def trigger_atomic_import(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Trigger an import of Atomic Red Team tests as TestTemplates.
**Requires** the ``admin`` role.
Downloads the Atomic Red Team repository ZIP from GitHub, parses the
YAML files, and creates/updates TestTemplate records. Running this
endpoint multiple times is idempotent — duplicates are skipped.
Returns a JSON object with import statistics.
"""
try:
summary = import_atomic_red_team(db)
except Exception as exc:
logger.error("Atomic Red Team import failed: %s", exc, exc_info=True)
return {
"message": "Import failed. Check server logs for details.",
}
return {
"message": "Import completed",
"imported": summary["created"],
"skipped": summary["skipped_existing"],
"total_parsed": summary["total_tests_parsed"],
}
@router.get("/scheduler-status")
def scheduler_status(
current_user: User = Depends(require_role("admin")),
):
"""Return the current state of the background scheduler.
**Requires** the ``admin`` role.
"""
jobs = scheduler.get_jobs()
return {
"running": scheduler.running,
"jobs": [
{
"id": job.id,
"name": job.name,
"next_run_time": str(job.next_run_time) if job.next_run_time else None,
}
for job in jobs
],
}
# ---------------------------------------------------------------------------
# Jira config endpoints (admin only)
# ---------------------------------------------------------------------------
class JiraConfigOut(BaseModel):
enabled: bool
url: str
project_key: str
parent_ticket: str
parent_ticket_standalone: str # parent for tests not in a campaign
# Credentials are never returned
class JiraConfigUpdate(BaseModel):
enabled: Optional[bool] = None
url: Optional[str] = None
project_key: Optional[str] = None
parent_ticket: Optional[str] = None
parent_ticket_standalone: Optional[str] = None
_JIRA_KEYS = {
"enabled": "jira.enabled",
"url": "jira.url",
"project_key": "jira.project_key",
"parent_ticket": "jira.parent_ticket",
"parent_ticket_standalone": "jira.parent_ticket_standalone",
}
@router.get("/jira-config", response_model=JiraConfigOut)
def get_jira_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return current Jira configuration (merged DB + env).
**Requires** the ``admin`` role. Credentials are never returned.
"""
from app.services.jira_service import (
get_jira_url, get_jira_project_key, is_jira_enabled,
get_jira_parent_ticket, get_jira_parent_ticket_standalone,
)
return JiraConfigOut(
enabled=is_jira_enabled(db),
url=get_jira_url(db) or "",
project_key=get_jira_project_key(db) or "",
parent_ticket=get_jira_parent_ticket(db) or "",
parent_ticket_standalone=get_jira_parent_ticket_standalone(db) or "",
)
@router.patch("/jira-config", response_model=JiraConfigOut)
def update_jira_config(
payload: JiraConfigUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update Jira configuration and persist to DB.
**Requires** the ``admin`` role. Only provided fields are updated.
"""
from app.services.jira_service import (
upsert_jira_config, get_jira_url, get_jira_project_key, is_jira_enabled,
get_jira_parent_ticket, get_jira_parent_ticket_standalone,
)
update_data = payload.model_dump(exclude_unset=True)
for field, val in update_data.items():
db_key = _JIRA_KEYS.get(field)
if db_key:
upsert_jira_config(db, db_key, str(val))
db.commit()
return JiraConfigOut(
enabled=is_jira_enabled(db),
url=get_jira_url(db) or "",
project_key=get_jira_project_key(db) or "",
parent_ticket=get_jira_parent_ticket(db) or "",
parent_ticket_standalone=get_jira_parent_ticket_standalone(db) or "",
)
@router.post("/jira-test")
def test_jira_connection(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Test the Jira connection using the current user's credentials.
Requires the admin to have a personal Jira API token configured in their
profile settings.
Always returns HTTP 200 with a ``status`` field so Cloudflare never
replaces the response with its own error page.
"""
from app.services.jira_service import get_user_jira_client, get_jira_url, _effective_jira_email
jira_url = get_jira_url(db)
if not jira_url:
return {"status": "error", "message": "Jira URL is not configured. Set it in System Settings → Jira Configuration.", "jira_url": ""}
auth_email = _effective_jira_email(current_user)
try:
jira = get_user_jira_client(current_user, db)
# 10-second timeout so we never block Cloudflare into a 524
try:
jira._session.timeout = 10 # type: ignore[attr-defined]
except Exception:
pass
myself = jira.myself()
logger.info("Jira myself() response keys: %s", list(myself.keys()) if isinstance(myself, dict) else type(myself))
# Use displayName → emailAddress → name → the auth email as fallback
connected_as = (
(myself.get("displayName") if isinstance(myself, dict) else None)
or (myself.get("emailAddress") if isinstance(myself, dict) else None)
or (myself.get("name") if isinstance(myself, dict) else None)
or auth_email
or "authenticated"
)
return {
"status": "ok",
"connected_as": connected_as,
"jira_url": jira_url,
}
except Exception as exc:
err = str(exc)
# Always return HTTP 200 with status="error" so Cloudflare never
# intercepts the response and the frontend always sees our message.
if "Expecting value" in err or "line 1 column 1" in err:
msg = (
"Jira returned a non-JSON response. "
"Verify the URL (e.g. https://company.atlassian.net), "
"email and API token."
)
elif "401" in err or "Unauthorized" in err:
msg = (
"Authentication failed (401). "
f"Check that the Atlassian email ({auth_email or 'not set'}) "
"and API token are correct. The token must be an Atlassian API token "
"(not your account password)."
)
elif "403" in err or "Forbidden" in err:
msg = "Access denied (403). The token may not have permission for this Jira project."
elif "timed out" in err.lower() or "timeout" in err.lower():
msg = "Connection timed out. Check that the Jira URL is reachable from the server."
elif "not configured" in err.lower():
msg = err
else:
msg = f"Jira connection failed: {err}"
logger.warning("Jira test connection failed: %s", err)
return {"status": "error", "message": msg, "jira_url": jira_url}
# ---------------------------------------------------------------------------
# POST /system/tempo-test
# ---------------------------------------------------------------------------
@router.post("/tempo-test")
def test_tempo_connection(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Test the current user's personal Tempo connection.
Uses the Tempo API token stored in the user's profile (not a global token).
Always returns HTTP 200 with a ``status`` field so Cloudflare never
intercepts the response.
"""
from app.services.tempo_service import has_tempo_configured
tempo_token = getattr(current_user, "tempo_api_token", None)
if not tempo_token:
return {
"status": "error",
"message": (
"No Tempo API token configured. "
"Add it in Settings → Profile → Tempo Integration."
),
}
jira_account_id = getattr(current_user, "jira_account_id", None)
if not jira_account_id:
return {
"status": "error",
"message": (
"No Jira Account ID configured. "
"Set it in Settings → Profile → Jira Integration → Account ID."
),
}
try:
from tempoapiclient import client_v4 as tempo_client
tempo = tempo_client.Tempo(auth_token=tempo_token)
# search_worklogs by authorId is the correct v4 method; use a tight
# date range so we fetch almost nothing but still verify connectivity.
worklogs = tempo.search_worklogs(
dateFrom="2024-01-01",
dateTo="2024-01-02",
authorIds=[jira_account_id],
)
count = len(worklogs) if isinstance(worklogs, list) else "n/a"
return {
"status": "ok",
"message": f"Tempo connected successfully. Account ID: {jira_account_id}",
"worklogs_found": count,
}
except Exception as exc:
err = str(exc)
if "401" in err or "Unauthorized" in err:
msg = (
f"Authentication failed (401). "
f"Check your Tempo API token — obtain it at "
f"Jira → Apps → Tempo → Settings → API Integration."
)
elif "403" in err or "Forbidden" in err:
msg = "Access denied (403). The Tempo token lacks the required permissions."
elif "404" in err or "not found" in err.lower():
msg = (
f"Account ID not found (404). "
f"The value '{jira_account_id}' may be wrong — see the instructions "
f"below to find your correct Atlassian Account ID."
)
else:
msg = f"Tempo connection failed: {err}"
logger.warning(
"Tempo test connection failed for user %s (account_id=%s): %s",
current_user.username, jira_account_id, err,
)
return {"status": "error", "message": msg}
# ---------------------------------------------------------------------------
# GET /system/email-config
# ---------------------------------------------------------------------------
@router.get("/email-config", response_model=EmailConfigOut)
def get_email_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return current SMTP email configuration (merged DB + env).
**Requires** the ``admin`` role. Password is never returned.
"""
cfg = _read_email_config_from_db(db)
return EmailConfigOut(
enabled=cfg["enabled"],
host=cfg["host"],
port=cfg["port"],
username=cfg["username"],
from_email=cfg["from_email"],
use_tls=cfg["use_tls"],
)
# ---------------------------------------------------------------------------
# PATCH /system/email-config
# ---------------------------------------------------------------------------
@router.patch("/email-config", response_model=EmailConfigOut)
def update_email_config(
payload: EmailConfigUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update SMTP email configuration and persist to DB.
**Requires** the ``admin`` role.
Only provided fields are updated (partial update).
"""
update_data = payload.model_dump(exclude_unset=True)
for field, val in update_data.items():
db_key = _SMTP_KEYS.get(field)
if db_key:
_upsert_config(db, db_key, str(val))
db.commit()
cfg = _read_email_config_from_db(db)
return EmailConfigOut(
enabled=cfg["enabled"],
host=cfg["host"],
port=cfg["port"],
username=cfg["username"],
from_email=cfg["from_email"],
use_tls=cfg["use_tls"],
)
# ---------------------------------------------------------------------------
# POST /system/email-test
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# ATT&CK Evaluations endpoints (admin only)
# ---------------------------------------------------------------------------
@router.get("/attck-evaluations/rounds")
def list_evaluation_rounds(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return all public CrowdStrike ENTERPRISE evaluation rounds with import status.
Each entry includes whether it has already been imported into this platform.
"""
from app.services.attck_evaluations_service import fetch_rounds_with_status
from app.models.evaluation_import import EvaluationImport
status_info = fetch_rounds_with_status()
rounds = status_info["rounds"]
imported = {
row.adversary_name.lower(): row
for row in db.query(EvaluationImport).filter(EvaluationImport.status == "completed").all()
}
round_list = [
{
"name": r["name"],
"display_name": r.get("display_name", r["name"]),
"eval_round": r["eval_round"],
"imported": r["name"].lower() in imported,
"imported_at": imported[r["name"].lower()].imported_at.isoformat()
if r["name"].lower() in imported else None,
"tests_created": imported[r["name"].lower()].tests_created
if r["name"].lower() in imported else None,
"techniques_covered": imported[r["name"].lower()].techniques_covered
if r["name"].lower() in imported else None,
}
for r in rounds
]
return {
"rounds": round_list,
"api_reachable": status_info["api_reachable"],
"api_error": status_info.get("api_error"),
}
@router.post("/attck-evaluations/import")
def import_evaluation_round(
payload: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Import a specific ATT&CK Evaluation round for CrowdStrike.
Body: { "adversary_name": "apt29", "adversary_display": "APT29", "eval_round": 2 }
Creates tests in ``in_review`` state — Blue Leads must validate each
result before it counts as real coverage.
"""
from app.services.attck_evaluations_service import import_evaluation_round as _import
adversary_name = payload.get("adversary_name", "")
adversary_display = payload.get("adversary_display", adversary_name)
eval_round = payload.get("eval_round", 0)
if not adversary_name or not eval_round:
raise HTTPException(status_code=400, detail="adversary_name and eval_round are required")
try:
summary = _import(db, adversary_name, adversary_display, eval_round, current_user)
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc))
except Exception as exc:
logger.error("ATT&CK Evaluation import failed: %s", exc, exc_info=True)
raise HTTPException(status_code=502, detail=f"Import failed: {exc}")
return {
"message": f"Import complete — {summary['created']} tests created",
**summary,
}
@router.post("/attck-evaluations/import-latest")
def import_latest_evaluation(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Import the latest available CrowdStrike evaluation round automatically.
Returns 409 if the latest round was already imported.
"""
from app.services.attck_evaluations_service import get_latest_round, import_evaluation_round as _import
try:
latest = get_latest_round()
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Could not reach MITRE Evaluations API: {exc}")
try:
summary = _import(
db,
latest["name"],
latest.get("display_name", latest["name"]),
latest["eval_round"],
current_user,
)
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc))
except Exception as exc:
logger.error("ATT&CK Evaluation import failed: %s", exc, exc_info=True)
raise HTTPException(status_code=502, detail=f"Import failed: {exc}")
return {
"message": f"Import complete — {summary['created']} tests created",
**summary,
}
@router.get("/attck-evaluations/check-new")
def check_new_evaluation_round(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Check if a new ATT&CK Evaluation round is available that hasn't been imported yet."""
from app.services.attck_evaluations_service import check_for_new_round
return check_for_new_round(db)
@router.post("/attck-evaluations/bulk-approve")
def bulk_approve_evaluation_tests(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Bulk-approve all Blue Team validation for ATT&CK Evaluation imported tests.
Finds every test in ``in_review`` state whose name starts with ``[EVAL R``
and approves the Blue Team side. Because all evaluation imports pre-approve
the Red Team side, this moves every matched test to ``validated`` state.
**Important caveats** (enforced by UI warnings before this is called):
- Results come from a controlled MITRE lab, NOT the organisation's env.
- Validated tests will immediately affect coverage metrics and dashboards.
- Blue Leads should still spot-check high-priority techniques individually.
"""
from datetime import datetime
from app.models.test import Test
from app.models.enums import TestState
from app.models.technique import Technique
from app.services.status_service import recalculate_technique_status
from app.services.audit_service import log_action
# Find all pending evaluation tests
pending = (
db.query(Test)
.filter(
Test.state == TestState.in_review,
Test.name.like("[EVAL R%"),
)
.all()
)
if not pending:
return {
"approved": 0,
"techniques_recalculated": 0,
"message": "No pending evaluation tests found — nothing to approve.",
}
now = datetime.utcnow()
affected_technique_ids: set = set()
for test in pending:
# Approve blue side
test.blue_validation_status = "approved"
test.blue_validated_by = current_user.id
test.blue_validated_at = now
test.blue_validation_notes = (
"Bulk-approved via ATT&CK Evaluations admin panel. "
"Source: MITRE lab environment — not organisational detection."
)
# Red side was pre-approved during import → move to validated
if test.red_validation_status == "approved":
test.state = TestState.validated
# else stays in_review (shouldn't happen for eval imports, but be safe)
if test.technique_id:
affected_technique_ids.add(test.technique_id)
log_action(
db,
user_id=current_user.id,
action="bulk_eval_approve",
entity_type="test",
entity_id=test.id,
details={"source": "attck_evaluation_bulk_approve"},
)
db.flush()
# Recalculate coverage for every touched technique
for tech_id in affected_technique_ids:
tech = db.query(Technique).filter(Technique.id == tech_id).first()
if tech:
recalculate_technique_status(db, tech)
db.commit()
logger.info(
"Bulk eval approval: %d tests validated, %d techniques recalculated (by %s)",
len(pending), len(affected_technique_ids), current_user.email,
)
return {
"approved": len(pending),
"techniques_recalculated": len(affected_technique_ids),
"message": (
f"{len(pending)} evaluation tests approved and moved to Validated. "
f"{len(affected_technique_ids)} technique statuses recalculated."
),
}
@router.get("/attck-evaluations/pending-count")
def get_pending_evaluation_count(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return the number of imported evaluation tests still awaiting Blue approval."""
from app.models.test import Test
from app.models.enums import TestState
count = (
db.query(Test)
.filter(
Test.state == TestState.in_review,
Test.name.like("[EVAL R%"),
)
.count()
)
return {"pending": count}
@router.post("/email-test")
def send_test_email(
payload: EmailTestRequest,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Send a test email to verify SMTP configuration.
**Requires** the ``admin`` role.
Returns 200 on success, 502 if sending fails.
"""
from app.services.email_service import send_test_email as _send_test
ok = _send_test(payload.to, db=db)
if not ok:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to send test email. Check SMTP configuration and server logs.",
)
return {"detail": f"Test email sent to {payload.to}"}