"""High-level report generation — collects domain data and delegates to ReportEngine.""" import logging from datetime import datetime, timedelta from uuid import UUID from sqlalchemy.orm import Session from app.domain.exceptions import EntityNotFoundError from app.models.campaign import Campaign, CampaignTest from app.models.coverage_snapshot import CoverageSnapshot from app.models.technique import Technique from app.models.test import Test from app.models.threat_actor import ThreatActor from app.services.report_engine import report_engine logger = logging.getLogger(__name__) def generate_purple_campaign_report( db: Session, campaign_id: str, output_format: str = "pdf", ) -> str: """Generate the full Purple Team campaign report.""" cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign_id)) campaign = db.query(Campaign).filter(Campaign.id == cid).first() if not campaign: raise EntityNotFoundError("Campaign", campaign_id) campaign_tests = ( db.query(Test) .join(CampaignTest, CampaignTest.test_id == Test.id) .filter(CampaignTest.campaign_id == cid) .all() ) tests_data = [] for test in campaign_tests: technique = db.query(Technique).filter(Technique.id == test.technique_id).first() tests_data.append({ "technique_mitre_id": technique.mitre_id if technique else "N/A", "name": test.name, "tactic": technique.tactic if technique else "N/A", "state": test.state.value if test.state else "draft", "detection_result": ( test.detection_result.value if test.detection_result else "pending" ), }) validated = [t for t in campaign_tests if t.state and t.state.value == "validated"] detected = [ t for t in validated if t.detection_result and t.detection_result.value == "detected" ] not_detected = [ t for t in validated if t.detection_result and t.detection_result.value == "not_detected" ] critical_findings = [ { "technique_id": t["technique_mitre_id"], "name": t["name"], "severity": "critical", "description": "Technique was not detected during campaign execution.", "recommendation": "Implement detection rule or review existing SIEM/EDR configuration.", } for t in tests_data if t["detection_result"] == "not_detected" ] org_score = _safe_org_score(db) threat_actors = [] if campaign.threat_actor_id: actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_actor_id).first() if actor: threat_actors = [{"name": actor.name}] context = { "campaign": campaign, "tests": tests_data, "tests_validated": len(validated), "tests_detected": len(detected), "tests_not_detected": len(not_detected), "critical_findings": critical_findings, "org_score": org_score.get("overall", 0), "tactics": list({t["tactic"] for t in tests_data}), "threat_actors": threat_actors, } return _generate(output_format, "purple_campaign", context) def generate_coverage_report( db: Session, output_format: str = "pdf", ) -> str: """Generate an organization-wide MITRE ATT&CK coverage report.""" from sqlalchemy import func, case org_score = _safe_org_score(db) techniques = db.query(Technique).all() status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} for t in techniques: s = t.status_global.value if t.status_global else "not_evaluated" if s in status_counts: status_counts[s] += 1 summary = { "total_techniques": len(techniques), **status_counts, } # Coverage by tactic tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(case((Technique.status_global == "validated", 1), else_=0)).label("validated"), ) .group_by(Technique.tactic) .all() ) tactics_coverage = [ { "tactic": r[0] or "Unknown", "total": r[1], "validated": int(r[2]), "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ] # Never-tested techniques tested_ids = {t.technique_id for t in db.query(Test.technique_id).distinct().all()} never_tested = [ {"mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic} for t in techniques if t.id not in tested_ids ] context = { "org_score": org_score, "summary": summary, "tactics_coverage": tactics_coverage, "never_tested": never_tested[:50], } return _generate(output_format, "coverage_report", context) def generate_executive_summary( db: Session, output_format: str = "pdf", ) -> str: """Generate an executive summary report.""" from sqlalchemy import func org_score = _safe_org_score(db) techniques = db.query(Technique).all() status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} for t in techniques: s = t.status_global.value if t.status_global else "not_evaluated" if s in status_counts: status_counts[s] += 1 summary = {"total_techniques": len(techniques), **status_counts} total_tests = db.query(func.count(Test.id)).scalar() or 0 active_campaigns = ( db.query(func.count(Campaign.id)).filter(Campaign.status == "active").scalar() or 0 ) quarter_ago = datetime.utcnow() - timedelta(days=90) tests_this_quarter = ( db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0 ) open_remediations = ( db.query(func.count(Test.id)) .filter(Test.remediation_status.in_(["pending", "in_progress"])) .scalar() or 0 ) # Detection rate among validated tests validated_count = status_counts["validated"] detected_count = ( db.query(func.count(Test.id)) .filter(Test.state == "validated", Test.detection_result == "detected") .scalar() or 0 ) detection_rate = round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0 # Top gaps — lowest coverage tactics from sqlalchemy import case as sql_case tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label("validated"), ) .group_by(Technique.tactic) .all() ) tactic_coverage = [ { "tactic": r[0] or "Unknown", "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ] top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5] context = { "org_score": org_score, "summary": summary, "total_tests": total_tests, "active_campaigns": active_campaigns, "tests_this_quarter": tests_this_quarter, "open_remediations": open_remediations, "detection_rate": detection_rate, "top_gaps": top_gaps, } return _generate(output_format, "executive_summary", context) def generate_quarterly_summary( db: Session, output_format: str = "pdf", ) -> str: """Quarterly summary — reuses executive metrics plus snapshot trend rows.""" from sqlalchemy import case as sql_case, func org_score = _safe_org_score(db) quarter_ago = datetime.utcnow() - timedelta(days=90) tests_this_quarter = ( db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0 ) techniques = db.query(Technique).all() validated_count = sum( 1 for t in techniques if t.status_global and t.status_global.value == "validated" ) detected_count = ( db.query(func.count(Test.id)) .filter(Test.state == "validated", Test.detection_result == "detected") .scalar() or 0 ) detection_rate = ( round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0 ) tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label( "validated", ), ) .group_by(Technique.tactic) .all() ) top_gaps = sorted( [ { "tactic": r[0] or "Unknown", "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ], key=lambda x: x["coverage_pct"], )[:5] snapshots = ( db.query(CoverageSnapshot) .filter(CoverageSnapshot.created_at >= quarter_ago) .order_by(CoverageSnapshot.created_at) .all() ) trend_rows = [ { "date": s.created_at.strftime("%Y-%m-%d") if s.created_at else "", "validated_count": s.validated_count, "total_techniques": s.total_techniques, "organization_score": round(s.organization_score, 1), } for s in snapshots ] now = datetime.utcnow() quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}" context = { "quarter_label": quarter_label, "org_score": org_score, "tests_this_quarter": tests_this_quarter, "detection_rate": detection_rate, "trend_rows": trend_rows, "top_gaps": top_gaps, } return _generate(output_format, "quarterly_summary", context) def generate_technique_detail_report( db: Session, technique_id: str, output_format: str = "pdf", ) -> str: """Detailed report for a single MITRE technique and its tests.""" tid = technique_id if isinstance(technique_id, UUID) else UUID(str(technique_id)) technique = db.query(Technique).filter(Technique.id == tid).first() if not technique: raise EntityNotFoundError("Technique", str(technique_id)) related_tests = ( db.query(Test) .filter(Test.technique_id == tid) .order_by(Test.created_at.desc()) .all() ) tests_data = [ { "name": t.name, "state": t.state.value if t.state else "draft", "detection_result": ( t.detection_result.value if t.detection_result else "pending" ), "created_at": t.created_at.strftime("%Y-%m-%d") if t.created_at else "", } for t in related_tests ] context = { "technique": technique, "technique_status": ( technique.status_global.value if technique.status_global else "not_evaluated" ), "tests": tests_data, } return _generate(output_format, "technique_detail", context) # ── Helpers ────────────────────────────────────────────────────────── def _safe_org_score(db: Session) -> dict: """Safely call the scoring service; return empty dict on failure.""" try: from app.services.scoring_service import calculate_organization_score return calculate_organization_score(db) except Exception as e: logger.warning("Scoring service unavailable: %s", e) return {"overall": 0, "coverage": 0, "detection_maturity": 0} def _generate(output_format: str, template_name: str, context: dict) -> str: """Dispatch to the correct ReportEngine method.""" if output_format == "pdf": return report_engine.generate_pdf(template_name, context) elif output_format == "docx": return report_engine.generate_docx(template_name, context) else: return report_engine.generate_html_file(template_name, context)