diff --git a/backend/alembic/versions/b021_add_phase_timing_fields.py b/backend/alembic/versions/b021_add_phase_timing_fields.py index 0f57105..745d8a8 100644 --- a/backend/alembic/versions/b021_add_phase_timing_fields.py +++ b/backend/alembic/versions/b021_add_phase_timing_fields.py @@ -19,8 +19,11 @@ depends_on = None def upgrade() -> None: op.execute(""" ALTER TABLE tests - ADD COLUMN IF NOT EXISTS red_started_at TIMESTAMP, - ADD COLUMN IF NOT EXISTS blue_started_at TIMESTAMP; + ADD COLUMN IF NOT EXISTS red_started_at TIMESTAMP, + ADD COLUMN IF NOT EXISTS blue_started_at TIMESTAMP, + ADD COLUMN IF NOT EXISTS paused_at TIMESTAMP, + ADD COLUMN IF NOT EXISTS red_paused_seconds INTEGER DEFAULT 0, + ADD COLUMN IF NOT EXISTS blue_paused_seconds INTEGER DEFAULT 0; """) @@ -28,5 +31,8 @@ def downgrade() -> None: op.execute(""" ALTER TABLE tests DROP COLUMN IF EXISTS red_started_at, - DROP COLUMN IF EXISTS blue_started_at; + DROP COLUMN IF EXISTS blue_started_at, + DROP COLUMN IF EXISTS paused_at, + DROP COLUMN IF EXISTS red_paused_seconds, + DROP COLUMN IF EXISTS blue_paused_seconds; """) diff --git a/backend/app/config.py b/backend/app/config.py index c81e22b..9f93577 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -58,6 +58,12 @@ class Settings(BaseSettings): TEMPO_API_TOKEN: str = "" TEMPO_DEFAULT_WORK_TYPE: str = "Red Team" + # ── Reporting ───────────────────────────────────────────────────── + REPORT_TEMPLATES_DIR: str = "app/templates/reports" + REPORT_OUTPUT_DIR: str = "/tmp/aegis_reports" + COMPANY_NAME: str = "Organization" + COMPANY_LOGO_PATH: str = "app/templates/reports/assets/logo.png" + # ── Scoring weights (must sum to 100) ──────────────────────────── SCORING_WEIGHT_TESTS: int = 40 SCORING_WEIGHT_DETECTION_RULES: int = 20 diff --git a/backend/app/main.py b/backend/app/main.py index e9beacd..d5f9e97 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -34,6 +34,9 @@ from app.routers import compliance as compliance_router from app.routers import snapshots as snapshots_router from app.routers import jira as jira_router from app.routers import worklogs as worklogs_router +from app.routers import professional_reports as professional_reports_router +from app.routers import analytics as analytics_router +from app.routers import advanced_metrics as advanced_metrics_router from app.domain.exceptions import DomainException from app.middleware.error_handler import domain_exception_handler from app.storage import ensure_bucket_exists @@ -114,6 +117,9 @@ app.include_router(compliance_router.router, prefix="/api/v1") app.include_router(snapshots_router.router, prefix="/api/v1") app.include_router(jira_router.router, prefix="/api/v1") app.include_router(worklogs_router.router, prefix="/api/v1") +app.include_router(professional_reports_router.router, prefix="/api/v1") +app.include_router(analytics_router.router, prefix="/api/v1") +app.include_router(advanced_metrics_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/test.py b/backend/app/models/test.py index e66fe2a..5c423ff 100644 --- a/backend/app/models/test.py +++ b/backend/app/models/test.py @@ -52,6 +52,9 @@ class Test(Base): # ── Phase timing fields (for automatic Tempo worklogs) ────────── red_started_at = Column(DateTime, nullable=True) blue_started_at = Column(DateTime, nullable=True) + paused_at = Column(DateTime, nullable=True) + red_paused_seconds = Column(Integer, default=0) + blue_paused_seconds = Column(Integer, default=0) # ── Remediation fields ─────────────────────────────────────────── remediation_steps = Column(Text, nullable=True) diff --git a/backend/app/routers/advanced_metrics.py b/backend/app/routers/advanced_metrics.py new file mode 100644 index 0000000..5f1634c --- /dev/null +++ b/backend/app/routers/advanced_metrics.py @@ -0,0 +1,184 @@ +"""Advanced metrics endpoints — coverage by tactic, never-tested, avg validation time.""" + +from datetime import datetime + +from fastapi import APIRouter, Depends +from sqlalchemy import func, case +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user +from app.models.audit import AuditLog +from app.models.technique import Technique +from app.models.test import Test +from app.models.user import User + +router = APIRouter(prefix="/metrics/advanced", tags=["advanced-metrics"]) + + +@router.get("/coverage-by-tactic") +def coverage_by_tactic( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Coverage percentage broken down by MITRE ATT&CK tactic.""" + results = ( + db.query( + Technique.tactic, + func.count(Technique.id).label("total"), + func.sum( + case((Technique.status_global == "validated", 1), else_=0) + ).label("validated"), + func.sum( + case((Technique.status_global == "partial", 1), else_=0) + ).label("partial"), + func.sum( + case((Technique.status_global == "not_covered", 1), else_=0) + ).label("not_covered"), + func.sum( + case((Technique.status_global == "in_progress", 1), else_=0) + ).label("in_progress"), + ) + .group_by(Technique.tactic) + .order_by(Technique.tactic) + .all() + ) + return [ + { + "tactic": r[0] or "Unknown", + "total": r[1], + "validated": int(r[2]), + "partial": int(r[3]), + "not_covered": int(r[4]), + "in_progress": int(r[5]), + "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, + } + for r in results + ] + + +@router.get("/never-tested") +def never_tested_techniques( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Techniques that have never had a test created.""" + tested_technique_ids = ( + db.query(Test.technique_id).distinct().subquery() + ) + techniques = ( + db.query(Technique) + .filter(~Technique.id.in_(db.query(tested_technique_ids))) + .order_by(Technique.mitre_id) + .all() + ) + return [ + { + "mitre_id": t.mitre_id, + "name": t.name, + "tactic": t.tactic, + "is_subtechnique": t.is_subtechnique, + } + for t in techniques + ] + + +@router.get("/avg-validation-time") +def avg_validation_time( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Average time from test creation to validation, computed from audit logs. + + Returns overall average and per-phase averages where data is available. + """ + validated_tests = ( + db.query(Test) + .filter(Test.state == "validated") + .all() + ) + + if not validated_tests: + return { + "total_validated": 0, + "avg_total_hours": 0, + "avg_red_phase_hours": 0, + "avg_blue_phase_hours": 0, + } + + total_durations = [] + red_durations = [] + blue_durations = [] + + for test in validated_tests: + if test.created_at and test.red_validated_at: + total_seconds = (test.red_validated_at - test.created_at).total_seconds() + total_durations.append(total_seconds) + + if test.red_started_at and test.blue_started_at: + red_sec = (test.blue_started_at - test.red_started_at).total_seconds() + red_paused = test.red_paused_seconds or 0 + red_durations.append(max(red_sec - red_paused, 0)) + + if test.blue_started_at and test.blue_validated_at: + blue_sec = (test.blue_validated_at - test.blue_started_at).total_seconds() + blue_paused = test.blue_paused_seconds or 0 + blue_durations.append(max(blue_sec - blue_paused, 0)) + + def avg_hours(durations: list[float]) -> float: + if not durations: + return 0 + return round(sum(durations) / len(durations) / 3600, 2) + + return { + "total_validated": len(validated_tests), + "avg_total_hours": avg_hours(total_durations), + "avg_red_phase_hours": avg_hours(red_durations), + "avg_blue_phase_hours": avg_hours(blue_durations), + } + + +@router.get("/detection-rate-trend") +def detection_rate_trend( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Monthly detection rate trend for the last 12 months.""" + from datetime import timedelta + + now = datetime.utcnow() + months = [] + + for i in range(11, -1, -1): + month_start = datetime(now.year, now.month, 1) - timedelta(days=i * 30) + month_end = month_start + timedelta(days=30) + + validated = ( + db.query(func.count(Test.id)) + .filter( + Test.state == "validated", + Test.created_at >= month_start, + Test.created_at < month_end, + ) + .scalar() or 0 + ) + + detected = ( + db.query(func.count(Test.id)) + .filter( + Test.state == "validated", + Test.detection_result == "detected", + Test.created_at >= month_start, + Test.created_at < month_end, + ) + .scalar() or 0 + ) + + months.append({ + "month": month_start.strftime("%Y-%m"), + "validated": validated, + "detected": detected, + "detection_rate": round((detected / validated) * 100, 1) if validated > 0 else 0, + }) + + return months diff --git a/backend/app/routers/analytics.py b/backend/app/routers/analytics.py new file mode 100644 index 0000000..e2cc1d8 --- /dev/null +++ b/backend/app/routers/analytics.py @@ -0,0 +1,127 @@ +"""Analytics endpoints — flat JSON optimized for PowerBI / BI tools. + +Returns complete datasets without pagination so BI tools can ingest +directly from URL. All endpoints require authentication. +""" + +from fastapi import APIRouter, Depends, Query +from sqlalchemy import func +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.models.coverage_snapshot import CoverageSnapshot +from app.models.technique import Technique +from app.models.test import Test +from app.models.user import User + +router = APIRouter(prefix="/analytics", tags=["analytics"]) + + +@router.get("/coverage") +def analytics_coverage( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Coverage per technique — flat format for BI dashboards.""" + techniques = db.query(Technique).all() + return [ + { + "mitre_id": t.mitre_id, + "name": t.name, + "tactic": t.tactic, + "status": t.status_global.value if t.status_global else "not_evaluated", + "is_subtechnique": t.is_subtechnique, + "test_count": len(t.tests) if t.tests else 0, + "review_required": t.review_required, + "last_review_date": ( + t.last_review_date.isoformat() if t.last_review_date else None + ), + } + for t in techniques + ] + + +@router.get("/tests") +def analytics_tests( + date_from: str = Query(None, description="ISO date filter (>=)"), + date_to: str = Query(None, description="ISO date filter (<=)"), + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """All tests with timestamps — flat format for BI dashboards.""" + query = db.query(Test) + if date_from: + query = query.filter(Test.created_at >= date_from) + if date_to: + query = query.filter(Test.created_at <= date_to) + tests = query.all() + + return [ + { + "id": str(t.id), + "technique_id": str(t.technique_id), + "name": t.name, + "state": t.state.value if t.state else None, + "result": t.result.value if t.result else None, + "detection_result": ( + t.detection_result.value if t.detection_result else None + ), + "created_at": t.created_at.isoformat() if t.created_at else None, + "execution_date": ( + t.execution_date.isoformat() if t.execution_date else None + ), + "platform": t.platform, + "tool_used": t.tool_used, + "attack_success": t.attack_success, + "remediation_status": t.remediation_status, + } + for t in tests + ] + + +@router.get("/trends") +def analytics_trends( + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Historical coverage snapshots for trend visualization.""" + snapshots = ( + db.query(CoverageSnapshot) + .order_by(CoverageSnapshot.created_at) + .all() + ) + return [ + { + "date": s.created_at.isoformat() if s.created_at else None, + "name": s.name, + "total_techniques": s.total_techniques, + "validated_count": s.validated_count, + "partial_count": s.partial_count, + "not_covered_count": s.not_covered_count, + "organization_score": s.organization_score, + } + for s in snapshots + ] + + +@router.get("/operators") +def analytics_operators( + db: Session = Depends(get_db), + user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Per-operator metrics — for workload management dashboards.""" + results = ( + db.query( + User.username, + User.role, + func.count(Test.id).label("test_count"), + ) + .outerjoin(Test, Test.created_by == User.id) + .group_by(User.id, User.username, User.role) + .all() + ) + return [ + {"username": r[0], "role": r[1], "test_count": r[2]} + for r in results + ] diff --git a/backend/app/routers/professional_reports.py b/backend/app/routers/professional_reports.py new file mode 100644 index 0000000..438ff53 --- /dev/null +++ b/backend/app/routers/professional_reports.py @@ -0,0 +1,72 @@ +"""Professional report generation endpoints — PDF, DOCX, HTML output.""" + +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from fastapi.responses import FileResponse +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.models.user import User +from app.services import report_generation_service + +router = APIRouter(prefix="/reports/generate", tags=["professional-reports"]) + +_MEDIA_TYPES = { + "pdf": "application/pdf", + "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "html": "text/html", +} + + +@router.get("/purple-campaign/{campaign_id}") +def generate_purple_report( + campaign_id: UUID, + format: str = Query("pdf", pattern="^(pdf|docx|html)$"), + db: Session = Depends(get_db), + user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Generate a Purple Team campaign assessment report.""" + filepath = report_generation_service.generate_purple_campaign_report( + db, str(campaign_id), output_format=format, + ) + return FileResponse( + filepath, + media_type=_MEDIA_TYPES[format], + filename=f"purple_report.{format}", + ) + + +@router.get("/coverage-summary") +def generate_coverage_report( + format: str = Query("pdf", pattern="^(pdf|docx|html)$"), + db: Session = Depends(get_db), + user: User = Depends(get_current_user), +): + """Generate an organization-wide MITRE ATT&CK coverage report.""" + filepath = report_generation_service.generate_coverage_report( + db, output_format=format, + ) + return FileResponse( + filepath, + media_type=_MEDIA_TYPES[format], + filename=f"coverage_report.{format}", + ) + + +@router.get("/executive-summary") +def generate_executive_report( + format: str = Query("pdf", pattern="^(pdf|docx|html)$"), + db: Session = Depends(get_db), + user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Generate an executive security summary report.""" + filepath = report_generation_service.generate_executive_summary( + db, output_format=format, + ) + return FileResponse( + filepath, + media_type=_MEDIA_TYPES[format], + filename=f"executive_summary.{format}", + ) diff --git a/backend/app/routers/tests.py b/backend/app/routers/tests.py index 79e8574..1363bd2 100644 --- a/backend/app/routers/tests.py +++ b/backend/app/routers/tests.py @@ -54,6 +54,8 @@ from app.services.test_workflow_service import ( reopen_test as wf_reopen, handle_remediation_completed as wf_handle_remediation, get_retest_chain as wf_get_retest_chain, + pause_timer as wf_pause_timer, + resume_timer as wf_resume_timer, ) router = APIRouter(prefix="/tests", tags=["tests"]) @@ -473,6 +475,42 @@ def submit_blue( return test +# --------------------------------------------------------------------------- +# POST /tests/{id}/pause-timer — pause the active phase timer +# --------------------------------------------------------------------------- + + +@router.post("/{test_id}/pause-timer", response_model=TestOut) +def pause_timer( + test_id: uuid.UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Pause the running timer for the current phase (red_executing or blue_evaluating).""" + test = _get_test_or_404(db, test_id) + test = wf_pause_timer(db, test, current_user) + db.refresh(test) + return test + + +# --------------------------------------------------------------------------- +# POST /tests/{id}/resume-timer — resume a paused phase timer +# --------------------------------------------------------------------------- + + +@router.post("/{test_id}/resume-timer", response_model=TestOut) +def resume_timer( + test_id: uuid.UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Resume the paused timer for the current phase.""" + test = _get_test_or_404(db, test_id) + test = wf_resume_timer(db, test, current_user) + db.refresh(test) + return test + + # --------------------------------------------------------------------------- # POST /tests/{id}/validate-red — Red Lead validates # --------------------------------------------------------------------------- diff --git a/backend/app/schemas/test.py b/backend/app/schemas/test.py index b759a03..34d5d92 100644 --- a/backend/app/schemas/test.py +++ b/backend/app/schemas/test.py @@ -140,6 +140,9 @@ class TestOut(BaseModel): # Phase timing fields (for Tempo worklogs) red_started_at: datetime | None = None blue_started_at: datetime | None = None + paused_at: datetime | None = None + red_paused_seconds: int = 0 + blue_paused_seconds: int = 0 # Remediation fields remediation_steps: str | None = None diff --git a/backend/app/services/report_engine.py b/backend/app/services/report_engine.py new file mode 100644 index 0000000..b1a2197 --- /dev/null +++ b/backend/app/services/report_engine.py @@ -0,0 +1,93 @@ +"""Report engine — renders Jinja2 HTML templates to PDF, DOCX, and HTML. + +Uses WeasyPrint for PDF generation and docxtpl for DOCX. +""" + +import os +import uuid +import logging +from datetime import datetime + +from jinja2 import Environment, FileSystemLoader + +from app.config import settings + +logger = logging.getLogger(__name__) + + +class ReportEngine: + """Template-based report generator supporting PDF, DOCX, and HTML output.""" + + def __init__(self) -> None: + self.jinja_env = Environment( + loader=FileSystemLoader(settings.REPORT_TEMPLATES_DIR), + autoescape=True, + ) + os.makedirs(settings.REPORT_OUTPUT_DIR, exist_ok=True) + + def render_html(self, template_name: str, context: dict) -> str: + """Render a Jinja2 template to an HTML string.""" + template = self.jinja_env.get_template(f"{template_name}.html") + context.setdefault("company_name", settings.COMPANY_NAME) + context.setdefault("generated_at", datetime.utcnow().strftime("%B %d, %Y %H:%M UTC")) + return template.render(context) + + def generate_pdf(self, template_name: str, context: dict) -> str: + """Render HTML and convert to PDF with WeasyPrint.""" + from weasyprint import HTML, CSS + + html_content = self.render_html(template_name, context) + css_path = os.path.join(settings.REPORT_TEMPLATES_DIR, "styles", "report.css") + output_path = os.path.join( + settings.REPORT_OUTPUT_DIR, + f"{template_name}_{uuid.uuid4().hex[:8]}.pdf", + ) + + stylesheets = [] + if os.path.exists(css_path): + stylesheets.append(CSS(filename=css_path)) + + HTML( + string=html_content, + base_url=settings.REPORT_TEMPLATES_DIR, + ).write_pdf(output_path, stylesheets=stylesheets) + + logger.info("PDF generated: %s", output_path) + return output_path + + def generate_docx(self, template_name: str, context: dict) -> str: + """Render a .docx template with docxtpl.""" + from docxtpl import DocxTemplate + + template_path = os.path.join( + settings.REPORT_TEMPLATES_DIR, f"{template_name}.docx" + ) + output_path = os.path.join( + settings.REPORT_OUTPUT_DIR, + f"{template_name}_{uuid.uuid4().hex[:8]}.docx", + ) + + doc = DocxTemplate(template_path) + context.setdefault("company_name", settings.COMPANY_NAME) + context.setdefault("generated_at", datetime.utcnow().strftime("%B %d, %Y")) + doc.render(context) + doc.save(output_path) + + logger.info("DOCX generated: %s", output_path) + return output_path + + def generate_html_file(self, template_name: str, context: dict) -> str: + """Render and save a standalone HTML report.""" + html_content = self.render_html(template_name, context) + output_path = os.path.join( + settings.REPORT_OUTPUT_DIR, + f"{template_name}_{uuid.uuid4().hex[:8]}.html", + ) + with open(output_path, "w", encoding="utf-8") as f: + f.write(html_content) + + logger.info("HTML report generated: %s", output_path) + return output_path + + +report_engine = ReportEngine() diff --git a/backend/app/services/report_generation_service.py b/backend/app/services/report_generation_service.py new file mode 100644 index 0000000..447443b --- /dev/null +++ b/backend/app/services/report_generation_service.py @@ -0,0 +1,250 @@ +"""High-level report generation — collects domain data and delegates to ReportEngine.""" + +import logging +from datetime import datetime, timedelta + +from sqlalchemy.orm import Session + +from app.domain.exceptions import EntityNotFoundError +from app.models.campaign import Campaign, CampaignTest +from app.models.coverage_snapshot import CoverageSnapshot +from app.models.technique import Technique +from app.models.test import Test +from app.models.threat_actor import ThreatActor +from app.services.report_engine import report_engine + +logger = logging.getLogger(__name__) + + +def generate_purple_campaign_report( + db: Session, + campaign_id: str, + output_format: str = "pdf", +) -> str: + """Generate the full Purple Team campaign report.""" + campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first() + if not campaign: + raise EntityNotFoundError("Campaign", campaign_id) + + campaign_tests = ( + db.query(Test) + .join(CampaignTest, CampaignTest.test_id == Test.id) + .filter(CampaignTest.campaign_id == campaign_id) + .all() + ) + + tests_data = [] + for test in campaign_tests: + technique = db.query(Technique).filter(Technique.id == test.technique_id).first() + tests_data.append({ + "technique_mitre_id": technique.mitre_id if technique else "N/A", + "name": test.name, + "tactic": technique.tactic if technique else "N/A", + "state": test.state.value if test.state else "draft", + "detection_result": ( + test.detection_result.value if test.detection_result else "pending" + ), + }) + + validated = [t for t in campaign_tests if t.state and t.state.value == "validated"] + detected = [ + t for t in validated + if t.detection_result and t.detection_result.value == "detected" + ] + not_detected = [ + t for t in validated + if t.detection_result and t.detection_result.value == "not_detected" + ] + + critical_findings = [ + { + "technique_id": t["technique_mitre_id"], + "name": t["name"], + "severity": "critical", + "description": "Technique was not detected during campaign execution.", + "recommendation": "Implement detection rule or review existing SIEM/EDR configuration.", + } + for t in tests_data + if t["detection_result"] == "not_detected" + ] + + org_score = _safe_org_score(db) + + threat_actors = [] + if campaign.threat_actor_id: + actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_actor_id).first() + if actor: + threat_actors = [{"name": actor.name}] + + context = { + "campaign": campaign, + "tests": tests_data, + "tests_validated": len(validated), + "tests_detected": len(detected), + "tests_not_detected": len(not_detected), + "critical_findings": critical_findings, + "org_score": org_score.get("overall", 0), + "tactics": list({t["tactic"] for t in tests_data}), + "threat_actors": threat_actors, + } + + return _generate(output_format, "purple_campaign", context) + + +def generate_coverage_report( + db: Session, + output_format: str = "pdf", +) -> str: + """Generate an organization-wide MITRE ATT&CK coverage report.""" + from sqlalchemy import func, case + + org_score = _safe_org_score(db) + + techniques = db.query(Technique).all() + status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} + for t in techniques: + s = t.status_global.value if t.status_global else "not_evaluated" + if s in status_counts: + status_counts[s] += 1 + + summary = { + "total_techniques": len(techniques), + **status_counts, + } + + # Coverage by tactic + tactic_rows = ( + db.query( + Technique.tactic, + func.count(Technique.id).label("total"), + func.sum(case((Technique.status_global == "validated", 1), else_=0)).label("validated"), + ) + .group_by(Technique.tactic) + .all() + ) + tactics_coverage = [ + { + "tactic": r[0] or "Unknown", + "total": r[1], + "validated": int(r[2]), + "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, + } + for r in tactic_rows + ] + + # Never-tested techniques + tested_ids = {t.technique_id for t in db.query(Test.technique_id).distinct().all()} + never_tested = [ + {"mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic} + for t in techniques + if t.id not in tested_ids + ] + + context = { + "org_score": org_score, + "summary": summary, + "tactics_coverage": tactics_coverage, + "never_tested": never_tested[:50], + } + + return _generate(output_format, "coverage_report", context) + + +def generate_executive_summary( + db: Session, + output_format: str = "pdf", +) -> str: + """Generate an executive summary report.""" + from sqlalchemy import func + + org_score = _safe_org_score(db) + techniques = db.query(Technique).all() + + status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} + for t in techniques: + s = t.status_global.value if t.status_global else "not_evaluated" + if s in status_counts: + status_counts[s] += 1 + + summary = {"total_techniques": len(techniques), **status_counts} + + total_tests = db.query(func.count(Test.id)).scalar() or 0 + active_campaigns = ( + db.query(func.count(Campaign.id)).filter(Campaign.status == "active").scalar() or 0 + ) + + quarter_ago = datetime.utcnow() - timedelta(days=90) + tests_this_quarter = ( + db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0 + ) + + open_remediations = ( + db.query(func.count(Test.id)) + .filter(Test.remediation_status.in_(["pending", "in_progress"])) + .scalar() or 0 + ) + + # Detection rate among validated tests + validated_count = status_counts["validated"] + detected_count = ( + db.query(func.count(Test.id)) + .filter(Test.state == "validated", Test.detection_result == "detected") + .scalar() or 0 + ) + detection_rate = round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0 + + # Top gaps — lowest coverage tactics + from sqlalchemy import case as sql_case + tactic_rows = ( + db.query( + Technique.tactic, + func.count(Technique.id).label("total"), + func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label("validated"), + ) + .group_by(Technique.tactic) + .all() + ) + tactic_coverage = [ + { + "tactic": r[0] or "Unknown", + "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, + } + for r in tactic_rows + ] + top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5] + + context = { + "org_score": org_score, + "summary": summary, + "total_tests": total_tests, + "active_campaigns": active_campaigns, + "tests_this_quarter": tests_this_quarter, + "open_remediations": open_remediations, + "detection_rate": detection_rate, + "top_gaps": top_gaps, + } + + return _generate(output_format, "executive_summary", context) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _safe_org_score(db: Session) -> dict: + """Safely call the scoring service; return empty dict on failure.""" + try: + from app.services.scoring_service import calculate_organization_score + return calculate_organization_score(db) + except Exception as e: + logger.warning("Scoring service unavailable: %s", e) + return {"overall": 0, "coverage": 0, "detection_maturity": 0} + + +def _generate(output_format: str, template_name: str, context: dict) -> str: + """Dispatch to the correct ReportEngine method.""" + if output_format == "pdf": + return report_engine.generate_pdf(template_name, context) + elif output_format == "docx": + return report_engine.generate_docx(template_name, context) + else: + return report_engine.generate_html_file(template_name, context) diff --git a/backend/app/services/test_workflow_service.py b/backend/app/services/test_workflow_service.py index 841fe02..cd89eb9 100644 --- a/backend/app/services/test_workflow_service.py +++ b/backend/app/services/test_workflow_service.py @@ -135,24 +135,32 @@ def submit_red_evidence(db: Session, test: Test, user: User) -> Test: """ now = datetime.utcnow() + # Auto-resume if paused + paused_extra = 0 + if test.paused_at is not None: + paused_extra = max(int((now - test.paused_at).total_seconds()), 0) + test.paused_at = None + test = transition_state( db, test, TestState.blue_evaluating, user, action_name="submit_red_evidence", ) - # Create automatic worklog for Red Team phase + # Create automatic worklog for Red Team phase (subtract paused time) _create_phase_worklog( db, test=test, user=user, phase_started_at=test.red_started_at, phase_ended_at=now, + paused_seconds=(test.red_paused_seconds or 0) + paused_extra, activity_type="red_team_execution", description=f"Red Team execution: {test.name}", ) # Start Blue Team timer test.blue_started_at = now + test.blue_paused_seconds = 0 db.commit() return test @@ -165,18 +173,25 @@ def submit_blue_evidence(db: Session, test: Test, user: User) -> Test: """ now = datetime.utcnow() + # Auto-resume if paused + paused_extra = 0 + if test.paused_at is not None: + paused_extra = max(int((now - test.paused_at).total_seconds()), 0) + test.paused_at = None + test = transition_state( db, test, TestState.in_review, user, action_name="submit_blue_evidence", ) - # Create automatic worklog for Blue Team phase + # Create automatic worklog for Blue Team phase (subtract paused time) _create_phase_worklog( db, test=test, user=user, phase_started_at=test.blue_started_at, phase_ended_at=now, + paused_seconds=(test.blue_paused_seconds or 0) + paused_extra, activity_type="blue_team_evaluation", description=f"Blue Team evaluation: {test.name}", ) @@ -185,6 +200,62 @@ def submit_blue_evidence(db: Session, test: Test, user: User) -> Test: return test +def pause_timer(db: Session, test: Test, user: User) -> Test: + """Pause the active phase timer. + + Can only be called when the test is in ``red_executing`` or + ``blue_evaluating`` and is not already paused. + """ + if test.state not in (TestState.red_executing, TestState.blue_evaluating): + raise InvalidOperationError( + f"Cannot pause timer in '{test.state.value}' state" + ) + if test.paused_at is not None: + raise InvalidOperationError("Timer is already paused") + + test.paused_at = datetime.utcnow() + log_action( + db, + user_id=user.id, + action="pause_timer", + entity_type="test", + entity_id=test.id, + details={"state": test.state.value}, + ) + db.commit() + return test + + +def resume_timer(db: Session, test: Test, user: User) -> Test: + """Resume a paused phase timer. + + Accumulates the paused duration into the appropriate counter so + it is subtracted from the final worklog. + """ + if test.paused_at is None: + raise InvalidOperationError("Timer is not paused") + + now = datetime.utcnow() + paused_seconds = max(int((now - test.paused_at).total_seconds()), 0) + + if test.state == TestState.red_executing: + test.red_paused_seconds = (test.red_paused_seconds or 0) + paused_seconds + elif test.state == TestState.blue_evaluating: + test.blue_paused_seconds = (test.blue_paused_seconds or 0) + paused_seconds + + test.paused_at = None + log_action( + db, + user_id=user.id, + action="resume_timer", + entity_type="test", + entity_id=test.id, + details={"paused_seconds": paused_seconds, "state": test.state.value}, + ) + db.commit() + return test + + def _create_phase_worklog( db: Session, *, @@ -192,11 +263,14 @@ def _create_phase_worklog( user: User, phase_started_at: datetime | None, phase_ended_at: datetime, + paused_seconds: int = 0, activity_type: str, description: str, ) -> None: """Create an automatic, integrity-hashed worklog for a completed phase. + Subtracts accumulated *paused_seconds* from the gross elapsed time + so the worklog reflects only active working time. Also triggers Tempo sync if the test has a Jira link. """ if not phase_started_at: @@ -206,7 +280,8 @@ def _create_phase_worklog( ) return - duration_seconds = max(int((phase_ended_at - phase_started_at).total_seconds()), 1) + gross_seconds = int((phase_ended_at - phase_started_at).total_seconds()) + duration_seconds = max(gross_seconds - paused_seconds, 1) try: from app.services.worklog_service import create_worklog @@ -520,6 +595,9 @@ def reopen_test(db: Session, test: Test, user: User) -> Test: # Clear phase timing fields test.red_started_at = None test.blue_started_at = None + test.paused_at = None + test.red_paused_seconds = 0 + test.blue_paused_seconds = 0 db.commit() return test diff --git a/backend/app/templates/reports/assets/logo.svg b/backend/app/templates/reports/assets/logo.svg new file mode 100644 index 0000000..2232b0f --- /dev/null +++ b/backend/app/templates/reports/assets/logo.svg @@ -0,0 +1,4 @@ + diff --git a/backend/app/templates/reports/coverage_report.html b/backend/app/templates/reports/coverage_report.html new file mode 100644 index 0000000..228abb0 --- /dev/null +++ b/backend/app/templates/reports/coverage_report.html @@ -0,0 +1,119 @@ + + +
+ + +
+ {{ generated_at }}
+{{ classification | default('INTERNAL') }}
+| Tactic | +Total | +Validated | +Coverage % | +
|---|---|---|---|
| {{ tactic.tactic }} | +{{ tactic.total }} | +{{ tactic.validated }} | +{{ tactic.coverage_pct }}% | +
| MITRE ID | +Name | +Tactic | +
|---|---|---|
| {{ t.mitre_id }} | +{{ t.name }} | +{{ t.tactic }} | +
All techniques have been tested at least once.
+ {% endif %} +
+ {{ generated_at }}
+| Metric | Value |
|---|---|
| Techniques validated | {{ summary.validated }} / {{ summary.total_techniques }} |
| Detection rate | {{ detection_rate }}% |
| Tests this quarter | {{ tests_this_quarter }} |
| Open remediations | {{ open_remediations }} |
| Tactic | Coverage |
|---|---|
| {{ gap.tactic }} | +{{ gap.coverage_pct }}% | +
No significant gaps identified.
+ {% endif %} +
+ {{ generated_at }}
+{{ classification | default('INTERNAL') }}
+Campaign {{ campaign.name }} tested + {{ tests | length }} techniques across {{ tactics | length }} tactics. + Overall organization coverage score: {{ org_score }}%.
+{{ campaign.description or 'No description provided.' }}
+ {% if campaign.scheduled_at and campaign.completed_at %} +Period: {{ campaign.scheduled_at }} — {{ campaign.completed_at }}
+ {% endif %} + {% if threat_actors %} +Threat actors modeled: + {% for actor in threat_actors %}{{ actor.name }}{% if not loop.last %}, {% endif %}{% endfor %} +
+ {% endif %} +| MITRE ID | +Name | +Tactic | +State | +Detection | +
|---|---|---|---|---|
| {{ test.technique_mitre_id }} | +{{ test.name }} | +{{ test.tactic }} | +{{ test.state }} | +{{ test.detection_result }} | +
{{ finding.description }}
+Recommendation: {{ finding.recommendation }}
+No critical findings — all tested techniques were detected.
+ {% endif %} +Compared to previous campaign ({{ previous_campaign.name }}): + Coverage changed from {{ previous_score }}% to {{ org_score }}%.
+ {% else %} +This is the first campaign run — no historical comparison available.
+ {% endif %} +