Files
Aegis/backend/app/services/report_generation_service.py
Kitos ed2c34ef28 feat(reports): extend report generation service [FASE-2.3]
Add quarterly summary and technique detail builders with UUID-safe lookups and unit tests for purple campaign context.
2026-05-18 14:00:42 +02:00

370 lines
12 KiB
Python

"""High-level report generation — collects domain data and delegates to ReportEngine."""
import logging
from datetime import datetime, timedelta
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.exceptions import EntityNotFoundError
from app.models.campaign import Campaign, CampaignTest
from app.models.coverage_snapshot import CoverageSnapshot
from app.models.technique import Technique
from app.models.test import Test
from app.models.threat_actor import ThreatActor
from app.services.report_engine import report_engine
logger = logging.getLogger(__name__)
def generate_purple_campaign_report(
db: Session,
campaign_id: str,
output_format: str = "pdf",
) -> str:
"""Generate the full Purple Team campaign report."""
cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign_id))
campaign = db.query(Campaign).filter(Campaign.id == cid).first()
if not campaign:
raise EntityNotFoundError("Campaign", campaign_id)
campaign_tests = (
db.query(Test)
.join(CampaignTest, CampaignTest.test_id == Test.id)
.filter(CampaignTest.campaign_id == cid)
.all()
)
tests_data = []
for test in campaign_tests:
technique = db.query(Technique).filter(Technique.id == test.technique_id).first()
tests_data.append({
"technique_mitre_id": technique.mitre_id if technique else "N/A",
"name": test.name,
"tactic": technique.tactic if technique else "N/A",
"state": test.state.value if test.state else "draft",
"detection_result": (
test.detection_result.value if test.detection_result else "pending"
),
})
validated = [t for t in campaign_tests if t.state and t.state.value == "validated"]
detected = [
t for t in validated
if t.detection_result and t.detection_result.value == "detected"
]
not_detected = [
t for t in validated
if t.detection_result and t.detection_result.value == "not_detected"
]
critical_findings = [
{
"technique_id": t["technique_mitre_id"],
"name": t["name"],
"severity": "critical",
"description": "Technique was not detected during campaign execution.",
"recommendation": "Implement detection rule or review existing SIEM/EDR configuration.",
}
for t in tests_data
if t["detection_result"] == "not_detected"
]
org_score = _safe_org_score(db)
threat_actors = []
if campaign.threat_actor_id:
actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_actor_id).first()
if actor:
threat_actors = [{"name": actor.name}]
context = {
"campaign": campaign,
"tests": tests_data,
"tests_validated": len(validated),
"tests_detected": len(detected),
"tests_not_detected": len(not_detected),
"critical_findings": critical_findings,
"org_score": org_score.get("overall", 0),
"tactics": list({t["tactic"] for t in tests_data}),
"threat_actors": threat_actors,
}
return _generate(output_format, "purple_campaign", context)
def generate_coverage_report(
db: Session,
output_format: str = "pdf",
) -> str:
"""Generate an organization-wide MITRE ATT&CK coverage report."""
from sqlalchemy import func, case
org_score = _safe_org_score(db)
techniques = db.query(Technique).all()
status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0}
for t in techniques:
s = t.status_global.value if t.status_global else "not_evaluated"
if s in status_counts:
status_counts[s] += 1
summary = {
"total_techniques": len(techniques),
**status_counts,
}
# Coverage by tactic
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(case((Technique.status_global == "validated", 1), else_=0)).label("validated"),
)
.group_by(Technique.tactic)
.all()
)
tactics_coverage = [
{
"tactic": r[0] or "Unknown",
"total": r[1],
"validated": int(r[2]),
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
]
# Never-tested techniques
tested_ids = {t.technique_id for t in db.query(Test.technique_id).distinct().all()}
never_tested = [
{"mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic}
for t in techniques
if t.id not in tested_ids
]
context = {
"org_score": org_score,
"summary": summary,
"tactics_coverage": tactics_coverage,
"never_tested": never_tested[:50],
}
return _generate(output_format, "coverage_report", context)
def generate_executive_summary(
db: Session,
output_format: str = "pdf",
) -> str:
"""Generate an executive summary report."""
from sqlalchemy import func
org_score = _safe_org_score(db)
techniques = db.query(Technique).all()
status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0}
for t in techniques:
s = t.status_global.value if t.status_global else "not_evaluated"
if s in status_counts:
status_counts[s] += 1
summary = {"total_techniques": len(techniques), **status_counts}
total_tests = db.query(func.count(Test.id)).scalar() or 0
active_campaigns = (
db.query(func.count(Campaign.id)).filter(Campaign.status == "active").scalar() or 0
)
quarter_ago = datetime.utcnow() - timedelta(days=90)
tests_this_quarter = (
db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0
)
open_remediations = (
db.query(func.count(Test.id))
.filter(Test.remediation_status.in_(["pending", "in_progress"]))
.scalar() or 0
)
# Detection rate among validated tests
validated_count = status_counts["validated"]
detected_count = (
db.query(func.count(Test.id))
.filter(Test.state == "validated", Test.detection_result == "detected")
.scalar() or 0
)
detection_rate = round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0
# Top gaps — lowest coverage tactics
from sqlalchemy import case as sql_case
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label("validated"),
)
.group_by(Technique.tactic)
.all()
)
tactic_coverage = [
{
"tactic": r[0] or "Unknown",
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
]
top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5]
context = {
"org_score": org_score,
"summary": summary,
"total_tests": total_tests,
"active_campaigns": active_campaigns,
"tests_this_quarter": tests_this_quarter,
"open_remediations": open_remediations,
"detection_rate": detection_rate,
"top_gaps": top_gaps,
}
return _generate(output_format, "executive_summary", context)
def generate_quarterly_summary(
db: Session,
output_format: str = "pdf",
) -> str:
"""Quarterly summary — reuses executive metrics plus snapshot trend rows."""
from sqlalchemy import case as sql_case, func
org_score = _safe_org_score(db)
quarter_ago = datetime.utcnow() - timedelta(days=90)
tests_this_quarter = (
db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0
)
techniques = db.query(Technique).all()
validated_count = sum(
1 for t in techniques if t.status_global and t.status_global.value == "validated"
)
detected_count = (
db.query(func.count(Test.id))
.filter(Test.state == "validated", Test.detection_result == "detected")
.scalar() or 0
)
detection_rate = (
round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0
)
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label(
"validated",
),
)
.group_by(Technique.tactic)
.all()
)
top_gaps = sorted(
[
{
"tactic": r[0] or "Unknown",
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
],
key=lambda x: x["coverage_pct"],
)[:5]
snapshots = (
db.query(CoverageSnapshot)
.filter(CoverageSnapshot.created_at >= quarter_ago)
.order_by(CoverageSnapshot.created_at)
.all()
)
trend_rows = [
{
"date": s.created_at.strftime("%Y-%m-%d") if s.created_at else "",
"validated_count": s.validated_count,
"total_techniques": s.total_techniques,
"organization_score": round(s.organization_score, 1),
}
for s in snapshots
]
now = datetime.utcnow()
quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}"
context = {
"quarter_label": quarter_label,
"org_score": org_score,
"tests_this_quarter": tests_this_quarter,
"detection_rate": detection_rate,
"trend_rows": trend_rows,
"top_gaps": top_gaps,
}
return _generate(output_format, "quarterly_summary", context)
def generate_technique_detail_report(
db: Session,
technique_id: str,
output_format: str = "pdf",
) -> str:
"""Detailed report for a single MITRE technique and its tests."""
tid = technique_id if isinstance(technique_id, UUID) else UUID(str(technique_id))
technique = db.query(Technique).filter(Technique.id == tid).first()
if not technique:
raise EntityNotFoundError("Technique", str(technique_id))
related_tests = (
db.query(Test)
.filter(Test.technique_id == tid)
.order_by(Test.created_at.desc())
.all()
)
tests_data = [
{
"name": t.name,
"state": t.state.value if t.state else "draft",
"detection_result": (
t.detection_result.value if t.detection_result else "pending"
),
"created_at": t.created_at.strftime("%Y-%m-%d") if t.created_at else "",
}
for t in related_tests
]
context = {
"technique": technique,
"technique_status": (
technique.status_global.value if technique.status_global else "not_evaluated"
),
"tests": tests_data,
}
return _generate(output_format, "technique_detail", context)
# ── Helpers ──────────────────────────────────────────────────────────
def _safe_org_score(db: Session) -> dict:
"""Safely call the scoring service; return empty dict on failure."""
try:
from app.services.scoring_service import calculate_organization_score
return calculate_organization_score(db)
except Exception as e:
logger.warning("Scoring service unavailable: %s", e)
return {"overall": 0, "coverage": 0, "detection_maturity": 0}
def _generate(output_format: str, template_name: str, context: dict) -> str:
"""Dispatch to the correct ReportEngine method."""
if output_format == "pdf":
return report_engine.generate_pdf(template_name, context)
elif output_format == "docx":
return report_engine.generate_docx(template_name, context)
else:
return report_engine.generate_html_file(template_name, context)