Compare commits

...

6 Commits

Author SHA1 Message Date
kitos a8a24b5429 fix(metrics): correct never-tested technique query [FASE-2.6]
Aegis CI / lint-and-test (push) Has been cancelled
Use distinct technique_id list filtering so untested techniques are returned reliably on SQLite and Postgres.
2026-05-18 14:00:48 +02:00
kitos b6f23f385d fix(analytics): restrict operators endpoint to admin [FASE-2.5]
Align with BI security spec and add flat JSON API tests for coverage, tests, and operators.
2026-05-18 14:00:47 +02:00
kitos 6ab950ec42 feat(reports): add quarterly and technique download routes [FASE-2.4]
Expose GET endpoints for quarterly-summary and technique reports with PDF, DOCX, and HTML formats.
2026-05-18 14:00:46 +02:00
kitos ed2c34ef28 feat(reports): extend report generation service [FASE-2.3]
Add quarterly summary and technique detail builders with UUID-safe lookups and unit tests for purple campaign context.
2026-05-18 14:00:42 +02:00
kitos 96fdd9fa85 feat(reports): add quarterly and technique HTML templates [FASE-2.2]
Introduce quarterly_summary and technique_detail Jinja layouts; use SVG logo asset across report covers.
2026-05-18 14:00:40 +02:00
kitos c28a47c43b test(reports): add ReportEngine unit tests [FASE-2.1]
Stub WeasyPrint for CI-friendly PDF generation and verify HTML render, PDF path, and HTML file output.
2026-05-18 14:00:37 +02:00
14 changed files with 559 additions and 13 deletions
+2 -2
View File
@@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.dependencies.auth import get_current_user, require_role
from app.models.user import User
from app.services import analytics_service
@@ -49,7 +49,7 @@ def analytics_trends(
@router.get("/operators")
def analytics_operators(
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
user: User = Depends(require_role("admin")),
):
"""Per-operator metrics — for workload management dashboards."""
return analytics_service.get_operators_analytics(db)
@@ -70,3 +70,38 @@ def generate_executive_report(
media_type=_MEDIA_TYPES[format],
filename=f"executive_summary.{format}",
)
@router.get("/quarterly-summary")
def generate_quarterly_report(
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
):
"""Generate a quarterly security summary report."""
filepath = report_generation_service.generate_quarterly_summary(
db, output_format=format,
)
return FileResponse(
filepath,
media_type=_MEDIA_TYPES[format],
filename=f"quarterly_summary.{format}",
)
@router.get("/technique/{technique_id}")
def generate_technique_report(
technique_id: UUID,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Generate a detailed report for one MITRE technique."""
filepath = report_generation_service.generate_technique_detail_report(
db, str(technique_id), output_format=format,
)
return FileResponse(
filepath,
media_type=_MEDIA_TYPES[format],
filename=f"technique_{technique_id}.{format}",
)
@@ -51,13 +51,17 @@ def get_coverage_by_tactic(db: Session) -> list[dict]:
def get_never_tested_techniques(db: Session) -> list[dict]:
"""Techniques that have never had a test created."""
tested_technique_ids = db.query(Test.technique_id).distinct().subquery()
techniques = (
db.query(Technique)
.filter(~Technique.id.in_(db.query(tested_technique_ids)))
.order_by(Technique.mitre_id)
tested_ids = [
row[0]
for row in db.query(Test.technique_id)
.filter(Test.technique_id.isnot(None))
.distinct()
.all()
)
]
query = db.query(Technique)
if tested_ids:
query = query.filter(~Technique.id.in_(tested_ids))
techniques = query.order_by(Technique.mitre_id).all()
return [
{
"mitre_id": t.mitre_id,
+4
View File
@@ -76,6 +76,10 @@ class ReportEngine:
logger.info("DOCX generated: %s", output_path)
return output_path
def generate_html(self, template_name: str, context: dict) -> str:
"""Render and save a standalone HTML report (alias for spec compatibility)."""
return self.generate_html_file(template_name, context)
def generate_html_file(self, template_name: str, context: dict) -> str:
"""Render and save a standalone HTML report."""
html_content = self.render_html(template_name, context)
@@ -2,6 +2,7 @@
import logging
from datetime import datetime, timedelta
from uuid import UUID
from sqlalchemy.orm import Session
@@ -22,14 +23,15 @@ def generate_purple_campaign_report(
output_format: str = "pdf",
) -> str:
"""Generate the full Purple Team campaign report."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign_id))
campaign = db.query(Campaign).filter(Campaign.id == cid).first()
if not campaign:
raise EntityNotFoundError("Campaign", campaign_id)
campaign_tests = (
db.query(Test)
.join(CampaignTest, CampaignTest.test_id == Test.id)
.filter(CampaignTest.campaign_id == campaign_id)
.filter(CampaignTest.campaign_id == cid)
.all()
)
@@ -227,6 +229,123 @@ def generate_executive_summary(
return _generate(output_format, "executive_summary", context)
def generate_quarterly_summary(
db: Session,
output_format: str = "pdf",
) -> str:
"""Quarterly summary — reuses executive metrics plus snapshot trend rows."""
from sqlalchemy import case as sql_case, func
org_score = _safe_org_score(db)
quarter_ago = datetime.utcnow() - timedelta(days=90)
tests_this_quarter = (
db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0
)
techniques = db.query(Technique).all()
validated_count = sum(
1 for t in techniques if t.status_global and t.status_global.value == "validated"
)
detected_count = (
db.query(func.count(Test.id))
.filter(Test.state == "validated", Test.detection_result == "detected")
.scalar() or 0
)
detection_rate = (
round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0
)
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label(
"validated",
),
)
.group_by(Technique.tactic)
.all()
)
top_gaps = sorted(
[
{
"tactic": r[0] or "Unknown",
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
],
key=lambda x: x["coverage_pct"],
)[:5]
snapshots = (
db.query(CoverageSnapshot)
.filter(CoverageSnapshot.created_at >= quarter_ago)
.order_by(CoverageSnapshot.created_at)
.all()
)
trend_rows = [
{
"date": s.created_at.strftime("%Y-%m-%d") if s.created_at else "",
"validated_count": s.validated_count,
"total_techniques": s.total_techniques,
"organization_score": round(s.organization_score, 1),
}
for s in snapshots
]
now = datetime.utcnow()
quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}"
context = {
"quarter_label": quarter_label,
"org_score": org_score,
"tests_this_quarter": tests_this_quarter,
"detection_rate": detection_rate,
"trend_rows": trend_rows,
"top_gaps": top_gaps,
}
return _generate(output_format, "quarterly_summary", context)
def generate_technique_detail_report(
db: Session,
technique_id: str,
output_format: str = "pdf",
) -> str:
"""Detailed report for a single MITRE technique and its tests."""
tid = technique_id if isinstance(technique_id, UUID) else UUID(str(technique_id))
technique = db.query(Technique).filter(Technique.id == tid).first()
if not technique:
raise EntityNotFoundError("Technique", str(technique_id))
related_tests = (
db.query(Test)
.filter(Test.technique_id == tid)
.order_by(Test.created_at.desc())
.all()
)
tests_data = [
{
"name": t.name,
"state": t.state.value if t.state else "draft",
"detection_result": (
t.detection_result.value if t.detection_result else "pending"
),
"created_at": t.created_at.strftime("%Y-%m-%d") if t.created_at else "",
}
for t in related_tests
]
context = {
"technique": technique,
"technique_status": (
technique.status_global.value if technique.status_global else "not_evaluated"
),
"tests": tests_data,
}
return _generate(output_format, "technique_detail", context)
# ── Helpers ──────────────────────────────────────────────────────────
@@ -7,7 +7,7 @@
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<img src="assets/logo.svg" class="logo" alt="Logo">
<h1>MITRE ATT&amp;CK Coverage Report</h1>
<h2>{{ company_name }}</h2>
<p class="date">{{ generated_at }}</p>
@@ -7,7 +7,7 @@
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<img src="assets/logo.svg" class="logo" alt="Logo">
<h1>Executive Security Summary</h1>
<h2>{{ company_name }}</h2>
<p class="date">{{ generated_at }}</p>
@@ -7,7 +7,7 @@
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<img src="assets/logo.svg" class="logo" alt="Logo">
<h1>Purple Team Assessment Report</h1>
<h2>{{ campaign.name }}</h2>
<p class="date">{{ generated_at }}</p>
@@ -0,0 +1,72 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="styles/report.css">
<title>Quarterly Summary — {{ company_name }}</title>
</head>
<body>
<section class="cover-page">
<img src="assets/logo.svg" class="logo" alt="Logo">
<h1>Quarterly Security Summary</h1>
<h2>{{ quarter_label }}</h2>
<p class="date">{{ generated_at }}</p>
<p class="classification">{{ classification | default('INTERNAL') }}</p>
</section>
<section>
<h2>1. Quarter Overview</h2>
<div class="stats-grid">
<div class="stat">
<span class="number">{{ tests_this_quarter }}</span>
<span class="label">Tests Executed</span>
</div>
<div class="stat">
<span class="number">{{ org_score.overall | default(0) }}%</span>
<span class="label">Org Score</span>
</div>
<div class="stat">
<span class="number">{{ detection_rate }}%</span>
<span class="label">Detection Rate</span>
</div>
</div>
</section>
<section>
<h2>2. Coverage Trend</h2>
<table class="data-table">
<thead>
<tr>
<th>Date</th>
<th>Validated</th>
<th>Total Techniques</th>
<th>Org Score</th>
</tr>
</thead>
<tbody>
{% for row in trend_rows %}
<tr>
<td>{{ row.date }}</td>
<td>{{ row.validated_count }}</td>
<td>{{ row.total_techniques }}</td>
<td>{{ row.organization_score }}%</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
<section>
<h2>3. Top Gaps</h2>
<ul>
{% for gap in top_gaps %}
<li><strong>{{ gap.tactic }}</strong>: {{ gap.coverage_pct }}% coverage</li>
{% endfor %}
</ul>
</section>
<footer>
<p>{{ company_name }} — Confidential</p>
</footer>
</body>
</html>
@@ -0,0 +1,62 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="styles/report.css">
<title>Technique Detail — {{ technique.mitre_id }}</title>
</head>
<body>
<section class="cover-page">
<img src="assets/logo.svg" class="logo" alt="Logo">
<h1>Technique Assessment</h1>
<h2>{{ technique.mitre_id }} — {{ technique.name }}</h2>
<p class="date">{{ generated_at }}</p>
</section>
<section>
<h2>1. Technique Profile</h2>
<table class="data-table">
<tbody>
<tr><th>MITRE ID</th><td>{{ technique.mitre_id }}</td></tr>
<tr><th>Name</th><td>{{ technique.name }}</td></tr>
<tr><th>Tactic</th><td>{{ technique.tactic or 'N/A' }}</td></tr>
<tr><th>Status</th><td>{{ technique_status }}</td></tr>
<tr><th>Review required</th><td>{{ 'Yes' if technique.review_required else 'No' }}</td></tr>
</tbody>
</table>
{% if technique.description %}
<p>{{ technique.description }}</p>
{% endif %}
</section>
<section>
<h2>2. Test History</h2>
<table class="data-table">
<thead>
<tr>
<th>Test</th>
<th>State</th>
<th>Detection</th>
<th>Created</th>
</tr>
</thead>
<tbody>
{% for test in tests %}
<tr class="result-{{ test.detection_result }}">
<td>{{ test.name }}</td>
<td>{{ test.state }}</td>
<td>{{ test.detection_result }}</td>
<td>{{ test.created_at }}</td>
</tr>
{% else %}
<tr><td colspan="4">No tests recorded for this technique.</td></tr>
{% endfor %}
</tbody>
</table>
</section>
<footer>
<p>{{ company_name }} — Confidential</p>
</footer>
</body>
</html>
@@ -0,0 +1,87 @@
"""Analytics and advanced metrics API tests (FASE-2.5, 2.6)."""
import uuid
from app.models.technique import Technique
from app.models.test import Test
from app.models.enums import TestState
def test_analytics_coverage_flat_json(client, auth_headers, db):
t = Technique(mitre_id="T1001", name="Test Tech", tactic="discovery")
db.add(t)
db.commit()
r = client.get("/api/v1/analytics/coverage", headers=auth_headers)
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
assert any(row["mitre_id"] == "T1001" for row in data)
assert "tactic" in data[0]
assert "status" in data[0]
def test_analytics_tests_date_filter(client, auth_headers, db, admin_user):
technique = Technique(mitre_id="T1002", name="Filter Tech", tactic="impact")
db.add(technique)
db.flush()
db.add(
Test(
name="dated test",
technique_id=technique.id,
state=TestState.draft,
created_by=admin_user.id,
),
)
db.commit()
r = client.get(
"/api/v1/analytics/tests",
params={"date_from": "2020-01-01"},
headers=auth_headers,
)
assert r.status_code == 200
assert isinstance(r.json(), list)
def test_analytics_operators_requires_admin(client, auth_headers, red_tech_headers):
client.cookies.clear()
denied = client.get("/api/v1/analytics/operators", headers=red_tech_headers)
assert denied.status_code == 403
client.cookies.clear()
ok = client.get("/api/v1/analytics/operators", headers=auth_headers)
assert ok.status_code == 200
assert isinstance(ok.json(), list)
def test_coverage_by_tactic(client, auth_headers, db):
db.add(Technique(mitre_id="T2001", name="A", tactic="initial-access"))
db.add(Technique(mitre_id="T2002", name="B", tactic="initial-access"))
db.commit()
r = client.get("/api/v1/metrics/advanced/coverage-by-tactic", headers=auth_headers)
assert r.status_code == 200
rows = r.json()
assert isinstance(rows, list)
ia = next((x for x in rows if x["tactic"] == "initial-access"), None)
assert ia is not None
assert ia["total"] >= 2
assert "coverage_pct" in ia
def test_never_tested_lists_untested_technique(client, auth_headers, db):
db.add(Technique(mitre_id="T9999", name="Never Run", tactic="collection"))
db.commit()
r = client.get("/api/v1/metrics/advanced/never-tested", headers=auth_headers)
assert r.status_code == 200
keys = {row["mitre_id"] for row in r.json()}
assert "T9999" in keys
def test_avg_validation_time_empty(client, auth_headers):
r = client.get("/api/v1/metrics/advanced/avg-validation-time", headers=auth_headers)
assert r.status_code == 200
body = r.json()
assert body["total_validated"] == 0
@@ -0,0 +1,42 @@
"""Professional reports router tests (FASE-2.4)."""
from unittest.mock import patch
from app.models.campaign import Campaign
@patch("app.services.report_generation_service.generate_purple_campaign_report")
def test_purple_campaign_pdf_download(mock_gen, client, auth_headers, db):
mock_gen.return_value = __file__ # existing file for FileResponse
campaign = Campaign(name="Export Camp", status="active")
db.add(campaign)
db.commit()
r = client.get(
f"/api/v1/reports/generate/purple-campaign/{campaign.id}",
params={"format": "pdf"},
headers=auth_headers,
)
assert r.status_code == 200
assert r.headers["content-type"] == "application/pdf"
@patch("app.services.report_generation_service.generate_coverage_report")
def test_coverage_summary_html(mock_gen, client, auth_headers):
import tempfile
import os
fd, path = tempfile.mkstemp(suffix=".html")
os.write(fd, b"<html><body>ok</body></html>")
os.close(fd)
mock_gen.return_value = path
r = client.get(
"/api/v1/reports/generate/coverage-summary",
params={"format": "html"},
headers=auth_headers,
)
assert r.status_code == 200
assert "text/html" in r.headers["content-type"]
os.unlink(path)
+63
View File
@@ -0,0 +1,63 @@
"""Report engine unit tests (FASE-2.1)."""
import os
import sys
from unittest.mock import MagicMock
import pytest
from app.services.report_engine import ReportEngine
@pytest.fixture
def engine(tmp_path, monkeypatch):
templates = tmp_path / "templates"
styles = templates / "styles"
styles.mkdir(parents=True)
(styles / "report.css").write_text("body { color: #0e7490; }", encoding="utf-8")
(templates / "coverage_report.html").write_text(
"<html><body><h1>{{ company_name }}</h1><p>{{ score }}</p></body></html>",
encoding="utf-8",
)
out_dir = tmp_path / "out"
monkeypatch.setattr("app.services.report_engine.settings.REPORT_TEMPLATES_DIR", str(templates))
monkeypatch.setattr("app.services.report_engine.settings.REPORT_OUTPUT_DIR", str(out_dir))
monkeypatch.setattr("app.services.report_engine.settings.COMPANY_NAME", "Test Org")
return ReportEngine()
def test_render_html_injects_company_and_timestamp(engine):
html = engine.render_html("coverage_report", {"score": 42})
assert "Test Org" in html
assert "42" in html
def test_generate_pdf_writes_file(engine, monkeypatch, tmp_path):
"""WeasyPrint is stubbed so tests run without GTK/Pango system libraries."""
def _write_pdf(output_path, stylesheets=None):
with open(output_path, "wb") as f:
f.write(b"%PDF-1.4 mock")
mock_html_instance = MagicMock()
mock_html_instance.write_pdf.side_effect = _write_pdf
mock_wp = MagicMock()
mock_wp.HTML.return_value = mock_html_instance
mock_wp.CSS = MagicMock()
for key in list(sys.modules):
if key == "weasyprint" or key.startswith("weasyprint."):
monkeypatch.delitem(sys.modules, key, raising=False)
monkeypatch.setitem(sys.modules, "weasyprint", mock_wp)
path = engine.generate_pdf("coverage_report", {"score": 99})
assert path.endswith(".pdf")
assert os.path.isfile(path)
mock_html_instance.write_pdf.assert_called_once()
def test_generate_html_file_writes_file(engine):
path = engine.generate_html("coverage_report", {"score": 7})
assert path.endswith(".html")
assert os.path.isfile(path)
with open(path, encoding="utf-8") as f:
assert "Test Org" in f.read()
@@ -0,0 +1,58 @@
"""Report generation service tests (FASE-2.3)."""
import uuid
from unittest.mock import patch
import pytest
from app.domain.exceptions import EntityNotFoundError
from app.models.campaign import Campaign, CampaignTest
from app.models.enums import TestState
from app.models.technique import Technique
from app.models.test import Test
@patch("app.services.report_generation_service.report_engine.generate_pdf")
def test_generate_purple_campaign_report_pdf(mock_pdf, db, admin_user):
mock_pdf.return_value = "/tmp/fake.pdf"
technique = Technique(
mitre_id="T1059.001",
name="PowerShell",
tactic="execution",
)
db.add(technique)
db.flush()
campaign = Campaign(name="Q1 Purple", description="Scope", status="active")
db.add(campaign)
db.flush()
test = Test(
name="PS test",
technique_id=technique.id,
state=TestState.validated,
created_by=admin_user.id,
)
db.add(test)
db.flush()
db.add(CampaignTest(campaign_id=campaign.id, test_id=test.id))
db.commit()
path = __import__(
"app.services.report_generation_service",
fromlist=["generate_purple_campaign_report"],
).generate_purple_campaign_report(db, str(campaign.id), "pdf")
assert path == "/tmp/fake.pdf"
mock_pdf.assert_called_once()
context = mock_pdf.call_args[0][1]
assert context["tests_validated"] == 1
assert len(context["tests"]) == 1
def test_generate_technique_detail_not_found(db):
from app.services.report_generation_service import generate_technique_detail_report
with pytest.raises(EntityNotFoundError):
generate_technique_detail_report(db, str(uuid.uuid4()), "html")