Compare commits
6 Commits
0d4c404f08
...
a8a24b5429
| Author | SHA1 | Date | |
|---|---|---|---|
| a8a24b5429 | |||
| b6f23f385d | |||
| 6ab950ec42 | |||
| ed2c34ef28 | |||
| 96fdd9fa85 | |||
| c28a47c43b |
@@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, Query
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.database import get_db
|
||||
from app.dependencies.auth import get_current_user, require_any_role
|
||||
from app.dependencies.auth import get_current_user, require_role
|
||||
from app.models.user import User
|
||||
from app.services import analytics_service
|
||||
|
||||
@@ -49,7 +49,7 @@ def analytics_trends(
|
||||
@router.get("/operators")
|
||||
def analytics_operators(
|
||||
db: Session = Depends(get_db),
|
||||
user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
||||
user: User = Depends(require_role("admin")),
|
||||
):
|
||||
"""Per-operator metrics — for workload management dashboards."""
|
||||
return analytics_service.get_operators_analytics(db)
|
||||
|
||||
@@ -70,3 +70,38 @@ def generate_executive_report(
|
||||
media_type=_MEDIA_TYPES[format],
|
||||
filename=f"executive_summary.{format}",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/quarterly-summary")
|
||||
def generate_quarterly_report(
|
||||
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
|
||||
db: Session = Depends(get_db),
|
||||
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
|
||||
):
|
||||
"""Generate a quarterly security summary report."""
|
||||
filepath = report_generation_service.generate_quarterly_summary(
|
||||
db, output_format=format,
|
||||
)
|
||||
return FileResponse(
|
||||
filepath,
|
||||
media_type=_MEDIA_TYPES[format],
|
||||
filename=f"quarterly_summary.{format}",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/technique/{technique_id}")
|
||||
def generate_technique_report(
|
||||
technique_id: UUID,
|
||||
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
|
||||
db: Session = Depends(get_db),
|
||||
user: User = Depends(get_current_user),
|
||||
):
|
||||
"""Generate a detailed report for one MITRE technique."""
|
||||
filepath = report_generation_service.generate_technique_detail_report(
|
||||
db, str(technique_id), output_format=format,
|
||||
)
|
||||
return FileResponse(
|
||||
filepath,
|
||||
media_type=_MEDIA_TYPES[format],
|
||||
filename=f"technique_{technique_id}.{format}",
|
||||
)
|
||||
|
||||
@@ -51,13 +51,17 @@ def get_coverage_by_tactic(db: Session) -> list[dict]:
|
||||
|
||||
def get_never_tested_techniques(db: Session) -> list[dict]:
|
||||
"""Techniques that have never had a test created."""
|
||||
tested_technique_ids = db.query(Test.technique_id).distinct().subquery()
|
||||
techniques = (
|
||||
db.query(Technique)
|
||||
.filter(~Technique.id.in_(db.query(tested_technique_ids)))
|
||||
.order_by(Technique.mitre_id)
|
||||
tested_ids = [
|
||||
row[0]
|
||||
for row in db.query(Test.technique_id)
|
||||
.filter(Test.technique_id.isnot(None))
|
||||
.distinct()
|
||||
.all()
|
||||
)
|
||||
]
|
||||
query = db.query(Technique)
|
||||
if tested_ids:
|
||||
query = query.filter(~Technique.id.in_(tested_ids))
|
||||
techniques = query.order_by(Technique.mitre_id).all()
|
||||
return [
|
||||
{
|
||||
"mitre_id": t.mitre_id,
|
||||
|
||||
@@ -76,6 +76,10 @@ class ReportEngine:
|
||||
logger.info("DOCX generated: %s", output_path)
|
||||
return output_path
|
||||
|
||||
def generate_html(self, template_name: str, context: dict) -> str:
|
||||
"""Render and save a standalone HTML report (alias for spec compatibility)."""
|
||||
return self.generate_html_file(template_name, context)
|
||||
|
||||
def generate_html_file(self, template_name: str, context: dict) -> str:
|
||||
"""Render and save a standalone HTML report."""
|
||||
html_content = self.render_html(template_name, context)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -22,14 +23,15 @@ def generate_purple_campaign_report(
|
||||
output_format: str = "pdf",
|
||||
) -> str:
|
||||
"""Generate the full Purple Team campaign report."""
|
||||
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
|
||||
cid = campaign_id if isinstance(campaign_id, UUID) else UUID(str(campaign_id))
|
||||
campaign = db.query(Campaign).filter(Campaign.id == cid).first()
|
||||
if not campaign:
|
||||
raise EntityNotFoundError("Campaign", campaign_id)
|
||||
|
||||
campaign_tests = (
|
||||
db.query(Test)
|
||||
.join(CampaignTest, CampaignTest.test_id == Test.id)
|
||||
.filter(CampaignTest.campaign_id == campaign_id)
|
||||
.filter(CampaignTest.campaign_id == cid)
|
||||
.all()
|
||||
)
|
||||
|
||||
@@ -227,6 +229,123 @@ def generate_executive_summary(
|
||||
return _generate(output_format, "executive_summary", context)
|
||||
|
||||
|
||||
def generate_quarterly_summary(
|
||||
db: Session,
|
||||
output_format: str = "pdf",
|
||||
) -> str:
|
||||
"""Quarterly summary — reuses executive metrics plus snapshot trend rows."""
|
||||
from sqlalchemy import case as sql_case, func
|
||||
|
||||
org_score = _safe_org_score(db)
|
||||
quarter_ago = datetime.utcnow() - timedelta(days=90)
|
||||
tests_this_quarter = (
|
||||
db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0
|
||||
)
|
||||
|
||||
techniques = db.query(Technique).all()
|
||||
validated_count = sum(
|
||||
1 for t in techniques if t.status_global and t.status_global.value == "validated"
|
||||
)
|
||||
detected_count = (
|
||||
db.query(func.count(Test.id))
|
||||
.filter(Test.state == "validated", Test.detection_result == "detected")
|
||||
.scalar() or 0
|
||||
)
|
||||
detection_rate = (
|
||||
round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0
|
||||
)
|
||||
|
||||
tactic_rows = (
|
||||
db.query(
|
||||
Technique.tactic,
|
||||
func.count(Technique.id).label("total"),
|
||||
func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label(
|
||||
"validated",
|
||||
),
|
||||
)
|
||||
.group_by(Technique.tactic)
|
||||
.all()
|
||||
)
|
||||
top_gaps = sorted(
|
||||
[
|
||||
{
|
||||
"tactic": r[0] or "Unknown",
|
||||
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
|
||||
}
|
||||
for r in tactic_rows
|
||||
],
|
||||
key=lambda x: x["coverage_pct"],
|
||||
)[:5]
|
||||
|
||||
snapshots = (
|
||||
db.query(CoverageSnapshot)
|
||||
.filter(CoverageSnapshot.created_at >= quarter_ago)
|
||||
.order_by(CoverageSnapshot.created_at)
|
||||
.all()
|
||||
)
|
||||
trend_rows = [
|
||||
{
|
||||
"date": s.created_at.strftime("%Y-%m-%d") if s.created_at else "",
|
||||
"validated_count": s.validated_count,
|
||||
"total_techniques": s.total_techniques,
|
||||
"organization_score": round(s.organization_score, 1),
|
||||
}
|
||||
for s in snapshots
|
||||
]
|
||||
|
||||
now = datetime.utcnow()
|
||||
quarter_label = f"Q{((now.month - 1) // 3) + 1} {now.year}"
|
||||
|
||||
context = {
|
||||
"quarter_label": quarter_label,
|
||||
"org_score": org_score,
|
||||
"tests_this_quarter": tests_this_quarter,
|
||||
"detection_rate": detection_rate,
|
||||
"trend_rows": trend_rows,
|
||||
"top_gaps": top_gaps,
|
||||
}
|
||||
return _generate(output_format, "quarterly_summary", context)
|
||||
|
||||
|
||||
def generate_technique_detail_report(
|
||||
db: Session,
|
||||
technique_id: str,
|
||||
output_format: str = "pdf",
|
||||
) -> str:
|
||||
"""Detailed report for a single MITRE technique and its tests."""
|
||||
tid = technique_id if isinstance(technique_id, UUID) else UUID(str(technique_id))
|
||||
technique = db.query(Technique).filter(Technique.id == tid).first()
|
||||
if not technique:
|
||||
raise EntityNotFoundError("Technique", str(technique_id))
|
||||
|
||||
related_tests = (
|
||||
db.query(Test)
|
||||
.filter(Test.technique_id == tid)
|
||||
.order_by(Test.created_at.desc())
|
||||
.all()
|
||||
)
|
||||
tests_data = [
|
||||
{
|
||||
"name": t.name,
|
||||
"state": t.state.value if t.state else "draft",
|
||||
"detection_result": (
|
||||
t.detection_result.value if t.detection_result else "pending"
|
||||
),
|
||||
"created_at": t.created_at.strftime("%Y-%m-%d") if t.created_at else "",
|
||||
}
|
||||
for t in related_tests
|
||||
]
|
||||
|
||||
context = {
|
||||
"technique": technique,
|
||||
"technique_status": (
|
||||
technique.status_global.value if technique.status_global else "not_evaluated"
|
||||
),
|
||||
"tests": tests_data,
|
||||
}
|
||||
return _generate(output_format, "technique_detail", context)
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</head>
|
||||
<body>
|
||||
<section class="cover-page">
|
||||
<img src="assets/logo.png" class="logo" alt="Logo">
|
||||
<img src="assets/logo.svg" class="logo" alt="Logo">
|
||||
<h1>MITRE ATT&CK Coverage Report</h1>
|
||||
<h2>{{ company_name }}</h2>
|
||||
<p class="date">{{ generated_at }}</p>
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</head>
|
||||
<body>
|
||||
<section class="cover-page">
|
||||
<img src="assets/logo.png" class="logo" alt="Logo">
|
||||
<img src="assets/logo.svg" class="logo" alt="Logo">
|
||||
<h1>Executive Security Summary</h1>
|
||||
<h2>{{ company_name }}</h2>
|
||||
<p class="date">{{ generated_at }}</p>
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</head>
|
||||
<body>
|
||||
<section class="cover-page">
|
||||
<img src="assets/logo.png" class="logo" alt="Logo">
|
||||
<img src="assets/logo.svg" class="logo" alt="Logo">
|
||||
<h1>Purple Team Assessment Report</h1>
|
||||
<h2>{{ campaign.name }}</h2>
|
||||
<p class="date">{{ generated_at }}</p>
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<link rel="stylesheet" href="styles/report.css">
|
||||
<title>Quarterly Summary — {{ company_name }}</title>
|
||||
</head>
|
||||
<body>
|
||||
<section class="cover-page">
|
||||
<img src="assets/logo.svg" class="logo" alt="Logo">
|
||||
<h1>Quarterly Security Summary</h1>
|
||||
<h2>{{ quarter_label }}</h2>
|
||||
<p class="date">{{ generated_at }}</p>
|
||||
<p class="classification">{{ classification | default('INTERNAL') }}</p>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>1. Quarter Overview</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="stat">
|
||||
<span class="number">{{ tests_this_quarter }}</span>
|
||||
<span class="label">Tests Executed</span>
|
||||
</div>
|
||||
<div class="stat">
|
||||
<span class="number">{{ org_score.overall | default(0) }}%</span>
|
||||
<span class="label">Org Score</span>
|
||||
</div>
|
||||
<div class="stat">
|
||||
<span class="number">{{ detection_rate }}%</span>
|
||||
<span class="label">Detection Rate</span>
|
||||
</div>
|
||||
</div>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>2. Coverage Trend</h2>
|
||||
<table class="data-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Date</th>
|
||||
<th>Validated</th>
|
||||
<th>Total Techniques</th>
|
||||
<th>Org Score</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for row in trend_rows %}
|
||||
<tr>
|
||||
<td>{{ row.date }}</td>
|
||||
<td>{{ row.validated_count }}</td>
|
||||
<td>{{ row.total_techniques }}</td>
|
||||
<td>{{ row.organization_score }}%</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>3. Top Gaps</h2>
|
||||
<ul>
|
||||
{% for gap in top_gaps %}
|
||||
<li><strong>{{ gap.tactic }}</strong>: {{ gap.coverage_pct }}% coverage</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<footer>
|
||||
<p>{{ company_name }} — Confidential</p>
|
||||
</footer>
|
||||
</body>
|
||||
</html>
|
||||
@@ -0,0 +1,62 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<link rel="stylesheet" href="styles/report.css">
|
||||
<title>Technique Detail — {{ technique.mitre_id }}</title>
|
||||
</head>
|
||||
<body>
|
||||
<section class="cover-page">
|
||||
<img src="assets/logo.svg" class="logo" alt="Logo">
|
||||
<h1>Technique Assessment</h1>
|
||||
<h2>{{ technique.mitre_id }} — {{ technique.name }}</h2>
|
||||
<p class="date">{{ generated_at }}</p>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>1. Technique Profile</h2>
|
||||
<table class="data-table">
|
||||
<tbody>
|
||||
<tr><th>MITRE ID</th><td>{{ technique.mitre_id }}</td></tr>
|
||||
<tr><th>Name</th><td>{{ technique.name }}</td></tr>
|
||||
<tr><th>Tactic</th><td>{{ technique.tactic or 'N/A' }}</td></tr>
|
||||
<tr><th>Status</th><td>{{ technique_status }}</td></tr>
|
||||
<tr><th>Review required</th><td>{{ 'Yes' if technique.review_required else 'No' }}</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
{% if technique.description %}
|
||||
<p>{{ technique.description }}</p>
|
||||
{% endif %}
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>2. Test History</h2>
|
||||
<table class="data-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Test</th>
|
||||
<th>State</th>
|
||||
<th>Detection</th>
|
||||
<th>Created</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for test in tests %}
|
||||
<tr class="result-{{ test.detection_result }}">
|
||||
<td>{{ test.name }}</td>
|
||||
<td>{{ test.state }}</td>
|
||||
<td>{{ test.detection_result }}</td>
|
||||
<td>{{ test.created_at }}</td>
|
||||
</tr>
|
||||
{% else %}
|
||||
<tr><td colspan="4">No tests recorded for this technique.</td></tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</section>
|
||||
|
||||
<footer>
|
||||
<p>{{ company_name }} — Confidential</p>
|
||||
</footer>
|
||||
</body>
|
||||
</html>
|
||||
@@ -0,0 +1,87 @@
|
||||
"""Analytics and advanced metrics API tests (FASE-2.5, 2.6)."""
|
||||
|
||||
import uuid
|
||||
|
||||
from app.models.technique import Technique
|
||||
from app.models.test import Test
|
||||
from app.models.enums import TestState
|
||||
|
||||
|
||||
def test_analytics_coverage_flat_json(client, auth_headers, db):
|
||||
t = Technique(mitre_id="T1001", name="Test Tech", tactic="discovery")
|
||||
db.add(t)
|
||||
db.commit()
|
||||
|
||||
r = client.get("/api/v1/analytics/coverage", headers=auth_headers)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert isinstance(data, list)
|
||||
assert any(row["mitre_id"] == "T1001" for row in data)
|
||||
assert "tactic" in data[0]
|
||||
assert "status" in data[0]
|
||||
|
||||
|
||||
def test_analytics_tests_date_filter(client, auth_headers, db, admin_user):
|
||||
technique = Technique(mitre_id="T1002", name="Filter Tech", tactic="impact")
|
||||
db.add(technique)
|
||||
db.flush()
|
||||
db.add(
|
||||
Test(
|
||||
name="dated test",
|
||||
technique_id=technique.id,
|
||||
state=TestState.draft,
|
||||
created_by=admin_user.id,
|
||||
),
|
||||
)
|
||||
db.commit()
|
||||
|
||||
r = client.get(
|
||||
"/api/v1/analytics/tests",
|
||||
params={"date_from": "2020-01-01"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert isinstance(r.json(), list)
|
||||
|
||||
|
||||
def test_analytics_operators_requires_admin(client, auth_headers, red_tech_headers):
|
||||
client.cookies.clear()
|
||||
denied = client.get("/api/v1/analytics/operators", headers=red_tech_headers)
|
||||
assert denied.status_code == 403
|
||||
|
||||
client.cookies.clear()
|
||||
ok = client.get("/api/v1/analytics/operators", headers=auth_headers)
|
||||
assert ok.status_code == 200
|
||||
assert isinstance(ok.json(), list)
|
||||
|
||||
|
||||
def test_coverage_by_tactic(client, auth_headers, db):
|
||||
db.add(Technique(mitre_id="T2001", name="A", tactic="initial-access"))
|
||||
db.add(Technique(mitre_id="T2002", name="B", tactic="initial-access"))
|
||||
db.commit()
|
||||
|
||||
r = client.get("/api/v1/metrics/advanced/coverage-by-tactic", headers=auth_headers)
|
||||
assert r.status_code == 200
|
||||
rows = r.json()
|
||||
assert isinstance(rows, list)
|
||||
ia = next((x for x in rows if x["tactic"] == "initial-access"), None)
|
||||
assert ia is not None
|
||||
assert ia["total"] >= 2
|
||||
assert "coverage_pct" in ia
|
||||
|
||||
|
||||
def test_never_tested_lists_untested_technique(client, auth_headers, db):
|
||||
db.add(Technique(mitre_id="T9999", name="Never Run", tactic="collection"))
|
||||
db.commit()
|
||||
|
||||
r = client.get("/api/v1/metrics/advanced/never-tested", headers=auth_headers)
|
||||
assert r.status_code == 200
|
||||
keys = {row["mitre_id"] for row in r.json()}
|
||||
assert "T9999" in keys
|
||||
|
||||
|
||||
def test_avg_validation_time_empty(client, auth_headers):
|
||||
r = client.get("/api/v1/metrics/advanced/avg-validation-time", headers=auth_headers)
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["total_validated"] == 0
|
||||
@@ -0,0 +1,42 @@
|
||||
"""Professional reports router tests (FASE-2.4)."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.models.campaign import Campaign
|
||||
|
||||
|
||||
@patch("app.services.report_generation_service.generate_purple_campaign_report")
|
||||
def test_purple_campaign_pdf_download(mock_gen, client, auth_headers, db):
|
||||
mock_gen.return_value = __file__ # existing file for FileResponse
|
||||
|
||||
campaign = Campaign(name="Export Camp", status="active")
|
||||
db.add(campaign)
|
||||
db.commit()
|
||||
|
||||
r = client.get(
|
||||
f"/api/v1/reports/generate/purple-campaign/{campaign.id}",
|
||||
params={"format": "pdf"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.headers["content-type"] == "application/pdf"
|
||||
|
||||
|
||||
@patch("app.services.report_generation_service.generate_coverage_report")
|
||||
def test_coverage_summary_html(mock_gen, client, auth_headers):
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
fd, path = tempfile.mkstemp(suffix=".html")
|
||||
os.write(fd, b"<html><body>ok</body></html>")
|
||||
os.close(fd)
|
||||
mock_gen.return_value = path
|
||||
|
||||
r = client.get(
|
||||
"/api/v1/reports/generate/coverage-summary",
|
||||
params={"format": "html"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert "text/html" in r.headers["content-type"]
|
||||
os.unlink(path)
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Report engine unit tests (FASE-2.1)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.report_engine import ReportEngine
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def engine(tmp_path, monkeypatch):
|
||||
templates = tmp_path / "templates"
|
||||
styles = templates / "styles"
|
||||
styles.mkdir(parents=True)
|
||||
(styles / "report.css").write_text("body { color: #0e7490; }", encoding="utf-8")
|
||||
(templates / "coverage_report.html").write_text(
|
||||
"<html><body><h1>{{ company_name }}</h1><p>{{ score }}</p></body></html>",
|
||||
encoding="utf-8",
|
||||
)
|
||||
out_dir = tmp_path / "out"
|
||||
monkeypatch.setattr("app.services.report_engine.settings.REPORT_TEMPLATES_DIR", str(templates))
|
||||
monkeypatch.setattr("app.services.report_engine.settings.REPORT_OUTPUT_DIR", str(out_dir))
|
||||
monkeypatch.setattr("app.services.report_engine.settings.COMPANY_NAME", "Test Org")
|
||||
return ReportEngine()
|
||||
|
||||
|
||||
def test_render_html_injects_company_and_timestamp(engine):
|
||||
html = engine.render_html("coverage_report", {"score": 42})
|
||||
assert "Test Org" in html
|
||||
assert "42" in html
|
||||
|
||||
|
||||
def test_generate_pdf_writes_file(engine, monkeypatch, tmp_path):
|
||||
"""WeasyPrint is stubbed so tests run without GTK/Pango system libraries."""
|
||||
def _write_pdf(output_path, stylesheets=None):
|
||||
with open(output_path, "wb") as f:
|
||||
f.write(b"%PDF-1.4 mock")
|
||||
|
||||
mock_html_instance = MagicMock()
|
||||
mock_html_instance.write_pdf.side_effect = _write_pdf
|
||||
mock_wp = MagicMock()
|
||||
mock_wp.HTML.return_value = mock_html_instance
|
||||
mock_wp.CSS = MagicMock()
|
||||
for key in list(sys.modules):
|
||||
if key == "weasyprint" or key.startswith("weasyprint."):
|
||||
monkeypatch.delitem(sys.modules, key, raising=False)
|
||||
monkeypatch.setitem(sys.modules, "weasyprint", mock_wp)
|
||||
|
||||
path = engine.generate_pdf("coverage_report", {"score": 99})
|
||||
|
||||
assert path.endswith(".pdf")
|
||||
assert os.path.isfile(path)
|
||||
mock_html_instance.write_pdf.assert_called_once()
|
||||
|
||||
|
||||
def test_generate_html_file_writes_file(engine):
|
||||
path = engine.generate_html("coverage_report", {"score": 7})
|
||||
assert path.endswith(".html")
|
||||
assert os.path.isfile(path)
|
||||
with open(path, encoding="utf-8") as f:
|
||||
assert "Test Org" in f.read()
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Report generation service tests (FASE-2.3)."""
|
||||
|
||||
import uuid
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from app.domain.exceptions import EntityNotFoundError
|
||||
from app.models.campaign import Campaign, CampaignTest
|
||||
from app.models.enums import TestState
|
||||
from app.models.technique import Technique
|
||||
from app.models.test import Test
|
||||
|
||||
|
||||
@patch("app.services.report_generation_service.report_engine.generate_pdf")
|
||||
def test_generate_purple_campaign_report_pdf(mock_pdf, db, admin_user):
|
||||
mock_pdf.return_value = "/tmp/fake.pdf"
|
||||
|
||||
technique = Technique(
|
||||
mitre_id="T1059.001",
|
||||
name="PowerShell",
|
||||
tactic="execution",
|
||||
)
|
||||
db.add(technique)
|
||||
db.flush()
|
||||
|
||||
campaign = Campaign(name="Q1 Purple", description="Scope", status="active")
|
||||
db.add(campaign)
|
||||
db.flush()
|
||||
|
||||
test = Test(
|
||||
name="PS test",
|
||||
technique_id=technique.id,
|
||||
state=TestState.validated,
|
||||
created_by=admin_user.id,
|
||||
)
|
||||
db.add(test)
|
||||
db.flush()
|
||||
db.add(CampaignTest(campaign_id=campaign.id, test_id=test.id))
|
||||
db.commit()
|
||||
|
||||
path = __import__(
|
||||
"app.services.report_generation_service",
|
||||
fromlist=["generate_purple_campaign_report"],
|
||||
).generate_purple_campaign_report(db, str(campaign.id), "pdf")
|
||||
|
||||
assert path == "/tmp/fake.pdf"
|
||||
mock_pdf.assert_called_once()
|
||||
context = mock_pdf.call_args[0][1]
|
||||
assert context["tests_validated"] == 1
|
||||
assert len(context["tests"]) == 1
|
||||
|
||||
|
||||
def test_generate_technique_detail_not_found(db):
|
||||
from app.services.report_generation_service import generate_technique_detail_report
|
||||
|
||||
with pytest.raises(EntityNotFoundError):
|
||||
generate_technique_detail_report(db, str(uuid.uuid4()), "html")
|
||||
Reference in New Issue
Block a user