"""High-level report generation — collects domain data and delegates to ReportEngine.""" # Import logging import logging # Import datetime, timedelta from datetime from datetime import datetime, timedelta # Import UUID from uuid from uuid import UUID # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import EntityNotFoundError from app.domain.exceptions from app.domain.exceptions import EntityNotFoundError # Import Campaign, CampaignTest from app.models.campaign from app.models.campaign import Campaign, CampaignTest # Import CoverageSnapshot from app.models.coverage_snapshot from app.models.coverage_snapshot import CoverageSnapshot # Import Technique from app.models.technique from app.models.technique import Technique # Import Test from app.models.test from app.models.test import Test # Import ThreatActor from app.models.threat_actor from app.models.threat_actor import ThreatActor # Import report_engine from app.services.report_engine from app.services.report_engine import report_engine # Assign logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) # Define function generate_purple_campaign_report def generate_purple_campaign_report( # Entry: db db: Session, # Entry: campaign_id campaign_id: str, # Entry: output_format output_format: str = "pdf", ) -> str: """Generate the full Purple Team campaign report.""" # Assign cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign... cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign_id)) # Assign campaign = db.query(Campaign).filter(Campaign.id == cid).first() campaign = db.query(Campaign).filter(Campaign.id == cid).first() # Check: not campaign if not campaign: # Raise EntityNotFoundError raise EntityNotFoundError("Campaign", campaign_id) # Assign campaign_tests = ( campaign_tests = ( db.query(Test) # Chain .join() call .join(CampaignTest, CampaignTest.test_id == Test.id) # Chain .filter() call .filter(CampaignTest.campaign_id == cid) # Chain .all() call .all() ) # Assign tests_data = [] tests_data = [] # Iterate over campaign_tests for test in campaign_tests: # Assign technique = db.query(Technique).filter(Technique.id == test.technique_id).first() technique = db.query(Technique).filter(Technique.id == test.technique_id).first() # Call tests_data.append() tests_data.append({ # Literal argument value "technique_mitre_id": technique.mitre_id if technique else "N/A", # Literal argument value "name": test.name, # Literal argument value "tactic": technique.tactic if technique else "N/A", # Literal argument value "state": test.state.value if test.state else "draft", # Literal argument value "detection_result": ( test.detection_result.value if test.detection_result else "pending" ), }) # Assign validated = [t for t in campaign_tests if t.state and t.state.value == "validat... validated = [t for t in campaign_tests if t.state and t.state.value == "validated"] # Assign detected = [ detected = [ t for t in validated if t.detection_result and t.detection_result.value == "detected" ] # Assign not_detected = [ not_detected = [ t for t in validated if t.detection_result and t.detection_result.value == "not_detected" ] # Assign critical_findings = [ critical_findings = [ { # Literal argument value "technique_id": t["technique_mitre_id"], # Literal argument value "name": t["name"], # Literal argument value "severity": "critical", # Literal argument value "description": "Technique was not detected during campaign execution.", # Literal argument value "recommendation": "Implement detection rule or review existing SIEM/EDR configuration.", } for t in tests_data if t["detection_result"] == "not_detected" ] # Assign org_score = _safe_org_score(db) org_score = _safe_org_score(db) # Assign threat_actors = [] threat_actors = [] # Check: campaign.threat_actor_id if campaign.threat_actor_id: # Assign actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_acto... actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_actor_id).first() # Check: actor if actor: # Assign threat_actors = [{"name": actor.name}] threat_actors = [{"name": actor.name}] # Assign context = { context = { # Literal argument value "campaign": campaign, # Literal argument value "tests": tests_data, # Literal argument value "tests_validated": len(validated), # Literal argument value "tests_detected": len(detected), # Literal argument value "tests_not_detected": len(not_detected), # Literal argument value "critical_findings": critical_findings, # Literal argument value "org_score": org_score.get("overall", 0), # Literal argument value "tactics": list({t["tactic"] for t in tests_data}), # Literal argument value "threat_actors": threat_actors, } # Return _generate(output_format, "purple_campaign", context) return _generate(output_format, "purple_campaign", context) # Define function generate_coverage_report def generate_coverage_report( # Entry: db db: Session, # Entry: output_format output_format: str = "pdf", ) -> str: """Generate an organization-wide MITRE ATT&CK coverage report.""" # Import case, func from sqlalchemy from sqlalchemy import case, func # Assign org_score = _safe_org_score(db) org_score = _safe_org_score(db) # Assign techniques = db.query(Technique).all() techniques = db.query(Technique).all() # Assign status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, ... status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} # Iterate over techniques for t in techniques: # Assign s = t.status_global.value if t.status_global else "not_evaluated" s = t.status_global.value if t.status_global else "not_evaluated" # Check: s in status_counts if s in status_counts: # Assign status_counts[s] = 1 status_counts[s] += 1 # Assign summary = { summary = { # Literal argument value "total_techniques": len(techniques), **status_counts, } # Coverage by tactic tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(case((Technique.status_global == "validated", 1), else_=0)).label("validated"), ) # Chain .group_by() call .group_by(Technique.tactic) # Chain .all() call .all() ) # Assign tactics_coverage = [ tactics_coverage = [ { # Literal argument value "tactic": r[0] or "Unknown", # Literal argument value "total": r[1], # Literal argument value "validated": int(r[2]), # Literal argument value "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ] # Never-tested techniques tested_ids = {t.technique_id for t in db.query(Test.technique_id).distinct().all()} # Assign never_tested = [ never_tested = [ {"mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic} for t in techniques if t.id not in tested_ids ] # Assign context = { context = { # Literal argument value "org_score": org_score, # Literal argument value "summary": summary, # Literal argument value "tactics_coverage": tactics_coverage, # Literal argument value "never_tested": never_tested[:50], } # Return _generate(output_format, "coverage_report", context) return _generate(output_format, "coverage_report", context) # Define function generate_executive_summary def generate_executive_summary( # Entry: db db: Session, # Entry: output_format output_format: str = "pdf", ) -> str: """Generate an executive summary report.""" # Import func from sqlalchemy from sqlalchemy import func # Assign org_score = _safe_org_score(db) org_score = _safe_org_score(db) # Assign techniques = db.query(Technique).all() techniques = db.query(Technique).all() # Assign status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, ... status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0} # Iterate over techniques for t in techniques: # Assign s = t.status_global.value if t.status_global else "not_evaluated" s = t.status_global.value if t.status_global else "not_evaluated" # Check: s in status_counts if s in status_counts: # Assign status_counts[s] = 1 status_counts[s] += 1 # Assign summary = {"total_techniques": len(techniques), **status_counts} summary = {"total_techniques": len(techniques), **status_counts} # Assign total_tests = db.query(func.count(Test.id)).scalar() or 0 total_tests = db.query(func.count(Test.id)).scalar() or 0 # Assign active_campaigns = ( active_campaigns = ( db.query(func.count(Campaign.id)).filter(Campaign.status == "active").scalar() or 0 ) # Assign quarter_ago = datetime.utcnow() - timedelta(days=90) quarter_ago = datetime.utcnow() - timedelta(days=90) # Assign tests_this_quarter = ( tests_this_quarter = ( db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0 ) # Assign open_remediations = ( open_remediations = ( db.query(func.count(Test.id)) # Chain .filter() call .filter(Test.remediation_status.in_(["pending", "in_progress"])) # Chain .scalar() call .scalar() or 0 ) # Detection rate among validated tests validated_count = status_counts["validated"] # Assign detected_count = ( detected_count = ( db.query(func.count(Test.id)) # Chain .filter() call .filter(Test.state == "validated", Test.detection_result == "detected") # Chain .scalar() call .scalar() or 0 ) # Assign detection_rate = round((detected_count / validated_count) * 100, 1) if validated_cou... detection_rate = round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0 # Top gaps — lowest coverage tactics from sqlalchemy import case as sql_case # Assign tactic_rows = ( tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label("validated"), ) # Chain .group_by() call .group_by(Technique.tactic) # Chain .all() call .all() ) # Assign tactic_coverage = [ tactic_coverage = [ { # Literal argument value "tactic": r[0] or "Unknown", # Literal argument value "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ] # Assign top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5] top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5] # Assign context = { context = { # Literal argument value "org_score": org_score, # Literal argument value "summary": summary, # Literal argument value "total_tests": total_tests, # Literal argument value "active_campaigns": active_campaigns, # Literal argument value "tests_this_quarter": tests_this_quarter, # Literal argument value "open_remediations": open_remediations, # Literal argument value "detection_rate": detection_rate, # Literal argument value "top_gaps": top_gaps, } # Return _generate(output_format, "executive_summary", context) return _generate(output_format, "executive_summary", context) # Define function generate_quarterly_summary def generate_quarterly_summary( # Entry: db db: Session, # Entry: output_format output_format: str = "pdf", ) -> str: """Quarterly summary — reuses executive metrics plus snapshot trend rows.""" # Import case as sql_case from sqlalchemy from sqlalchemy import case as sql_case # Import func from sqlalchemy from sqlalchemy import func # Assign org_score = _safe_org_score(db) org_score = _safe_org_score(db) # Assign quarter_ago = datetime.utcnow() - timedelta(days=90) quarter_ago = datetime.utcnow() - timedelta(days=90) # Assign tests_this_quarter = ( tests_this_quarter = ( db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0 ) # Assign techniques = db.query(Technique).all() techniques = db.query(Technique).all() # Assign validated_count = sum( validated_count = sum( # Literal argument value 1 for t in techniques if t.status_global and t.status_global.value == "validated" ) # Assign detected_count = ( detected_count = ( db.query(func.count(Test.id)) # Chain .filter() call .filter(Test.state == "validated", Test.detection_result == "detected") # Chain .scalar() call .scalar() or 0 ) # Assign detection_rate = ( detection_rate = ( round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0 ) # Assign tactic_rows = ( tactic_rows = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label( # Literal argument value "validated", ), ) # Chain .group_by() call .group_by(Technique.tactic) # Chain .all() call .all() ) # Assign top_gaps = sorted( top_gaps = sorted( [ { # Literal argument value "tactic": r[0] or "Unknown", # Literal argument value "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in tactic_rows ], # Keyword argument: key key=lambda x: x["coverage_pct"], )[:5] # Assign snapshots = ( snapshots = ( db.query(CoverageSnapshot) # Chain .filter() call .filter(CoverageSnapshot.created_at >= quarter_ago) # Chain .order_by() call .order_by(CoverageSnapshot.created_at) # Chain .all() call .all() ) # Assign trend_rows = [ trend_rows = [ { # Literal argument value "date": s.created_at.strftime("%Y-%m-%d") if s.created_at else "", # Literal argument value "validated_count": s.validated_count, # Literal argument value "total_techniques": s.total_techniques, # Literal argument value "organization_score": round(s.organization_score, 1), } for s in snapshots ] # Assign now = datetime.utcnow() now = datetime.utcnow() # Assign quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}" quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}" # Assign context = { context = { # Literal argument value "quarter_label": quarter_label, # Literal argument value "org_score": org_score, # Literal argument value "tests_this_quarter": tests_this_quarter, # Literal argument value "detection_rate": detection_rate, # Literal argument value "trend_rows": trend_rows, # Literal argument value "top_gaps": top_gaps, } # Return _generate(output_format, "quarterly_summary", context) return _generate(output_format, "quarterly_summary", context) # Define function generate_technique_detail_report def generate_technique_detail_report( # Entry: db db: Session, # Entry: technique_id technique_id: str, # Entry: output_format output_format: str = "pdf", ) -> str: """Detailed report for a single MITRE technique and its tests.""" # Assign tid = technique_id if isinstance(technique_id, UUID) else UUID(str(techni... tid = technique_id if isinstance(technique_id, UUID) else UUID(str(technique_id)) # Assign technique = db.query(Technique).filter(Technique.id == tid).first() technique = db.query(Technique).filter(Technique.id == tid).first() # Check: not technique if not technique: # Raise EntityNotFoundError raise EntityNotFoundError("Technique", str(technique_id)) # Assign related_tests = ( related_tests = ( db.query(Test) # Chain .filter() call .filter(Test.technique_id == tid) # Chain .order_by() call .order_by(Test.created_at.desc()) # Chain .all() call .all() ) # Assign tests_data = [ tests_data = [ { # Literal argument value "name": t.name, # Literal argument value "state": t.state.value if t.state else "draft", # Literal argument value "detection_result": ( t.detection_result.value if t.detection_result else "pending" ), # Literal argument value "created_at": t.created_at.strftime("%Y-%m-%d") if t.created_at else "", } for t in related_tests ] # Assign context = { context = { # Literal argument value "technique": technique, # Literal argument value "technique_status": ( technique.status_global.value if technique.status_global else "not_evaluated" ), # Literal argument value "tests": tests_data, } # Return _generate(output_format, "technique_detail", context) return _generate(output_format, "technique_detail", context) # ── Helpers ────────────────────────────────────────────────────────── def _safe_org_score(db: Session) -> dict: """Safely call the scoring service; return empty dict on failure.""" # Attempt the following; catch errors below try: # Import calculate_organization_score from app.services.scoring_service from app.services.scoring_service import calculate_organization_score # Return calculate_organization_score(db) return calculate_organization_score(db) # Handle Exception except Exception as e: # Log warning: "Scoring service unavailable: %s", e logger.warning("Scoring service unavailable: %s", e) # Return {"overall": 0, "coverage": 0, "detection_maturity": 0} return {"overall": 0, "coverage": 0, "detection_maturity": 0} # Define function _generate def _generate(output_format: str, template_name: str, context: dict) -> str: """Dispatch to the correct ReportEngine method.""" # Check: output_format == "pdf" if output_format == "pdf": # Return report_engine.generate_pdf(template_name, context) return report_engine.generate_pdf(template_name, context) # Alternative: output_format == "docx" elif output_format == "docx": # Return report_engine.generate_docx(template_name, context) return report_engine.generate_docx(template_name, context) # Fallback: handle remaining cases else: # Return report_engine.generate_html_file(template_name, context) return report_engine.generate_html_file(template_name, context)