Compare commits

..

2 Commits

Author SHA1 Message Date
kitos 31e116b4ba feat(phase-37): timer pause/resume + professional reporting engine
Aegis CI / lint-and-test (push) Has been cancelled
Pause/Resume timer:
- Add paused_at, red_paused_seconds, blue_paused_seconds fields to Test model
- Add pause_timer/resume_timer workflow functions with accumulated pause tracking
- Auto-resume on phase submit; subtract paused time from worklog duration
- Add POST /tests/{id}/pause-timer and resume-timer endpoints
- Update LiveTimer component with pause/resume button and paused visual state
- Wire pause/resume mutations through TestDetailPage and TestDetailHeader

Professional Reporting Engine - Fase 2:
- Add ReportEngine service with Jinja2 HTML rendering, WeasyPrint PDF, and docxtpl DOCX
- Add corporate CSS stylesheet with cover page, data tables, stats grid, findings
- Create purple_campaign, coverage_report, and executive_summary HTML templates
- Add report_generation_service collecting domain data for each report type
- Add professional_reports router: GET /reports/generate/purple-campaign/{id}, coverage-summary, executive-summary
- Add analytics router with flat JSON endpoints for PowerBI: /coverage, /tests, /trends, /operators
- Add advanced_metrics router: /coverage-by-tactic, /never-tested, /avg-validation-time, /detection-rate-trend
- Add weasyprint and docxtpl to requirements.txt
- Add REPORT_TEMPLATES_DIR, REPORT_OUTPUT_DIR, COMPANY_NAME, COMPANY_LOGO_PATH to config
2026-02-17 17:20:45 +01:00
kitos febf460580 feat(phase-36): automatic Tempo time tracking via workflow buttons + fix campaign test management
- Add red_started_at/blue_started_at timing fields to Test model with Alembic migration

- Modify workflow transitions to auto-create integrity-hashed worklogs: Start Execution records red_started_at, Submit to Blue Team stops Red timer and creates worklog then starts Blue timer, Submit for Review stops Blue timer and creates worklog

- Auto-sync worklogs to Tempo when test has a Jira link

- Add LiveTimer component showing real-time elapsed counter during active phases

- Clear timing fields on test reopen

- Fix campaign test management: replace broken navigate-to-tests flow with AddTestToCampaignModal that lets users search and add existing tests directly from the campaign detail page
2026-02-17 16:59:19 +01:00
26 changed files with 2000 additions and 5 deletions
@@ -0,0 +1,38 @@
"""add_phase_timing_fields
Revision ID: b021phasetiming
Revises: b020jiraworklogs
Create Date: 2026-02-17 18:00:00.000000
Add red_started_at and blue_started_at columns to the tests table
so that automatic worklogs can record real elapsed time per phase.
"""
from alembic import op
revision = "b021phasetiming"
down_revision = "b020jiraworklogs"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("""
ALTER TABLE tests
ADD COLUMN IF NOT EXISTS red_started_at TIMESTAMP,
ADD COLUMN IF NOT EXISTS blue_started_at TIMESTAMP,
ADD COLUMN IF NOT EXISTS paused_at TIMESTAMP,
ADD COLUMN IF NOT EXISTS red_paused_seconds INTEGER DEFAULT 0,
ADD COLUMN IF NOT EXISTS blue_paused_seconds INTEGER DEFAULT 0;
""")
def downgrade() -> None:
op.execute("""
ALTER TABLE tests
DROP COLUMN IF EXISTS red_started_at,
DROP COLUMN IF EXISTS blue_started_at,
DROP COLUMN IF EXISTS paused_at,
DROP COLUMN IF EXISTS red_paused_seconds,
DROP COLUMN IF EXISTS blue_paused_seconds;
""")
+6
View File
@@ -58,6 +58,12 @@ class Settings(BaseSettings):
TEMPO_API_TOKEN: str = "" TEMPO_API_TOKEN: str = ""
TEMPO_DEFAULT_WORK_TYPE: str = "Red Team" TEMPO_DEFAULT_WORK_TYPE: str = "Red Team"
# ── Reporting ─────────────────────────────────────────────────────
REPORT_TEMPLATES_DIR: str = "app/templates/reports"
REPORT_OUTPUT_DIR: str = "/tmp/aegis_reports"
COMPANY_NAME: str = "Organization"
COMPANY_LOGO_PATH: str = "app/templates/reports/assets/logo.png"
# ── Scoring weights (must sum to 100) ──────────────────────────── # ── Scoring weights (must sum to 100) ────────────────────────────
SCORING_WEIGHT_TESTS: int = 40 SCORING_WEIGHT_TESTS: int = 40
SCORING_WEIGHT_DETECTION_RULES: int = 20 SCORING_WEIGHT_DETECTION_RULES: int = 20
+6
View File
@@ -34,6 +34,9 @@ from app.routers import compliance as compliance_router
from app.routers import snapshots as snapshots_router from app.routers import snapshots as snapshots_router
from app.routers import jira as jira_router from app.routers import jira as jira_router
from app.routers import worklogs as worklogs_router from app.routers import worklogs as worklogs_router
from app.routers import professional_reports as professional_reports_router
from app.routers import analytics as analytics_router
from app.routers import advanced_metrics as advanced_metrics_router
from app.domain.exceptions import DomainException from app.domain.exceptions import DomainException
from app.middleware.error_handler import domain_exception_handler from app.middleware.error_handler import domain_exception_handler
from app.storage import ensure_bucket_exists from app.storage import ensure_bucket_exists
@@ -114,6 +117,9 @@ app.include_router(compliance_router.router, prefix="/api/v1")
app.include_router(snapshots_router.router, prefix="/api/v1") app.include_router(snapshots_router.router, prefix="/api/v1")
app.include_router(jira_router.router, prefix="/api/v1") app.include_router(jira_router.router, prefix="/api/v1")
app.include_router(worklogs_router.router, prefix="/api/v1") app.include_router(worklogs_router.router, prefix="/api/v1")
app.include_router(professional_reports_router.router, prefix="/api/v1")
app.include_router(analytics_router.router, prefix="/api/v1")
app.include_router(advanced_metrics_router.router, prefix="/api/v1")
@app.get("/health", include_in_schema=False) @app.get("/health", include_in_schema=False)
+7
View File
@@ -49,6 +49,13 @@ class Test(Base):
blue_validation_status = Column(String, nullable=True) # pending / approved / rejected blue_validation_status = Column(String, nullable=True) # pending / approved / rejected
blue_validation_notes = Column(Text, nullable=True) blue_validation_notes = Column(Text, nullable=True)
# ── Phase timing fields (for automatic Tempo worklogs) ──────────
red_started_at = Column(DateTime, nullable=True)
blue_started_at = Column(DateTime, nullable=True)
paused_at = Column(DateTime, nullable=True)
red_paused_seconds = Column(Integer, default=0)
blue_paused_seconds = Column(Integer, default=0)
# ── Remediation fields ─────────────────────────────────────────── # ── Remediation fields ───────────────────────────────────────────
remediation_steps = Column(Text, nullable=True) remediation_steps = Column(Text, nullable=True)
remediation_status = Column(String, nullable=True) # pending / in_progress / completed / not_applicable remediation_status = Column(String, nullable=True) # pending / in_progress / completed / not_applicable
+184
View File
@@ -0,0 +1,184 @@
"""Advanced metrics endpoints — coverage by tactic, never-tested, avg validation time."""
from datetime import datetime
from fastapi import APIRouter, Depends
from sqlalchemy import func, case
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.audit import AuditLog
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
router = APIRouter(prefix="/metrics/advanced", tags=["advanced-metrics"])
@router.get("/coverage-by-tactic")
def coverage_by_tactic(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Coverage percentage broken down by MITRE ATT&CK tactic."""
results = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(
case((Technique.status_global == "validated", 1), else_=0)
).label("validated"),
func.sum(
case((Technique.status_global == "partial", 1), else_=0)
).label("partial"),
func.sum(
case((Technique.status_global == "not_covered", 1), else_=0)
).label("not_covered"),
func.sum(
case((Technique.status_global == "in_progress", 1), else_=0)
).label("in_progress"),
)
.group_by(Technique.tactic)
.order_by(Technique.tactic)
.all()
)
return [
{
"tactic": r[0] or "Unknown",
"total": r[1],
"validated": int(r[2]),
"partial": int(r[3]),
"not_covered": int(r[4]),
"in_progress": int(r[5]),
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in results
]
@router.get("/never-tested")
def never_tested_techniques(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Techniques that have never had a test created."""
tested_technique_ids = (
db.query(Test.technique_id).distinct().subquery()
)
techniques = (
db.query(Technique)
.filter(~Technique.id.in_(db.query(tested_technique_ids)))
.order_by(Technique.mitre_id)
.all()
)
return [
{
"mitre_id": t.mitre_id,
"name": t.name,
"tactic": t.tactic,
"is_subtechnique": t.is_subtechnique,
}
for t in techniques
]
@router.get("/avg-validation-time")
def avg_validation_time(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Average time from test creation to validation, computed from audit logs.
Returns overall average and per-phase averages where data is available.
"""
validated_tests = (
db.query(Test)
.filter(Test.state == "validated")
.all()
)
if not validated_tests:
return {
"total_validated": 0,
"avg_total_hours": 0,
"avg_red_phase_hours": 0,
"avg_blue_phase_hours": 0,
}
total_durations = []
red_durations = []
blue_durations = []
for test in validated_tests:
if test.created_at and test.red_validated_at:
total_seconds = (test.red_validated_at - test.created_at).total_seconds()
total_durations.append(total_seconds)
if test.red_started_at and test.blue_started_at:
red_sec = (test.blue_started_at - test.red_started_at).total_seconds()
red_paused = test.red_paused_seconds or 0
red_durations.append(max(red_sec - red_paused, 0))
if test.blue_started_at and test.blue_validated_at:
blue_sec = (test.blue_validated_at - test.blue_started_at).total_seconds()
blue_paused = test.blue_paused_seconds or 0
blue_durations.append(max(blue_sec - blue_paused, 0))
def avg_hours(durations: list[float]) -> float:
if not durations:
return 0
return round(sum(durations) / len(durations) / 3600, 2)
return {
"total_validated": len(validated_tests),
"avg_total_hours": avg_hours(total_durations),
"avg_red_phase_hours": avg_hours(red_durations),
"avg_blue_phase_hours": avg_hours(blue_durations),
}
@router.get("/detection-rate-trend")
def detection_rate_trend(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Monthly detection rate trend for the last 12 months."""
from datetime import timedelta
now = datetime.utcnow()
months = []
for i in range(11, -1, -1):
month_start = datetime(now.year, now.month, 1) - timedelta(days=i * 30)
month_end = month_start + timedelta(days=30)
validated = (
db.query(func.count(Test.id))
.filter(
Test.state == "validated",
Test.created_at >= month_start,
Test.created_at < month_end,
)
.scalar() or 0
)
detected = (
db.query(func.count(Test.id))
.filter(
Test.state == "validated",
Test.detection_result == "detected",
Test.created_at >= month_start,
Test.created_at < month_end,
)
.scalar() or 0
)
months.append({
"month": month_start.strftime("%Y-%m"),
"validated": validated,
"detected": detected,
"detection_rate": round((detected / validated) * 100, 1) if validated > 0 else 0,
})
return months
+127
View File
@@ -0,0 +1,127 @@
"""Analytics endpoints — flat JSON optimized for PowerBI / BI tools.
Returns complete datasets without pagination so BI tools can ingest
directly from URL. All endpoints require authentication.
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.coverage_snapshot import CoverageSnapshot
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
router = APIRouter(prefix="/analytics", tags=["analytics"])
@router.get("/coverage")
def analytics_coverage(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Coverage per technique — flat format for BI dashboards."""
techniques = db.query(Technique).all()
return [
{
"mitre_id": t.mitre_id,
"name": t.name,
"tactic": t.tactic,
"status": t.status_global.value if t.status_global else "not_evaluated",
"is_subtechnique": t.is_subtechnique,
"test_count": len(t.tests) if t.tests else 0,
"review_required": t.review_required,
"last_review_date": (
t.last_review_date.isoformat() if t.last_review_date else None
),
}
for t in techniques
]
@router.get("/tests")
def analytics_tests(
date_from: str = Query(None, description="ISO date filter (>=)"),
date_to: str = Query(None, description="ISO date filter (<=)"),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""All tests with timestamps — flat format for BI dashboards."""
query = db.query(Test)
if date_from:
query = query.filter(Test.created_at >= date_from)
if date_to:
query = query.filter(Test.created_at <= date_to)
tests = query.all()
return [
{
"id": str(t.id),
"technique_id": str(t.technique_id),
"name": t.name,
"state": t.state.value if t.state else None,
"result": t.result.value if t.result else None,
"detection_result": (
t.detection_result.value if t.detection_result else None
),
"created_at": t.created_at.isoformat() if t.created_at else None,
"execution_date": (
t.execution_date.isoformat() if t.execution_date else None
),
"platform": t.platform,
"tool_used": t.tool_used,
"attack_success": t.attack_success,
"remediation_status": t.remediation_status,
}
for t in tests
]
@router.get("/trends")
def analytics_trends(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Historical coverage snapshots for trend visualization."""
snapshots = (
db.query(CoverageSnapshot)
.order_by(CoverageSnapshot.created_at)
.all()
)
return [
{
"date": s.created_at.isoformat() if s.created_at else None,
"name": s.name,
"total_techniques": s.total_techniques,
"validated_count": s.validated_count,
"partial_count": s.partial_count,
"not_covered_count": s.not_covered_count,
"organization_score": s.organization_score,
}
for s in snapshots
]
@router.get("/operators")
def analytics_operators(
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Per-operator metrics — for workload management dashboards."""
results = (
db.query(
User.username,
User.role,
func.count(Test.id).label("test_count"),
)
.outerjoin(Test, Test.created_by == User.id)
.group_by(User.id, User.username, User.role)
.all()
)
return [
{"username": r[0], "role": r[1], "test_count": r[2]}
for r in results
]
@@ -0,0 +1,72 @@
"""Professional report generation endpoints — PDF, DOCX, HTML output."""
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.services import report_generation_service
router = APIRouter(prefix="/reports/generate", tags=["professional-reports"])
_MEDIA_TYPES = {
"pdf": "application/pdf",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"html": "text/html",
}
@router.get("/purple-campaign/{campaign_id}")
def generate_purple_report(
campaign_id: UUID,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Generate a Purple Team campaign assessment report."""
filepath = report_generation_service.generate_purple_campaign_report(
db, str(campaign_id), output_format=format,
)
return FileResponse(
filepath,
media_type=_MEDIA_TYPES[format],
filename=f"purple_report.{format}",
)
@router.get("/coverage-summary")
def generate_coverage_report(
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Generate an organization-wide MITRE ATT&CK coverage report."""
filepath = report_generation_service.generate_coverage_report(
db, output_format=format,
)
return FileResponse(
filepath,
media_type=_MEDIA_TYPES[format],
filename=f"coverage_report.{format}",
)
@router.get("/executive-summary")
def generate_executive_report(
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Generate an executive security summary report."""
filepath = report_generation_service.generate_executive_summary(
db, output_format=format,
)
return FileResponse(
filepath,
media_type=_MEDIA_TYPES[format],
filename=f"executive_summary.{format}",
)
+38
View File
@@ -54,6 +54,8 @@ from app.services.test_workflow_service import (
reopen_test as wf_reopen, reopen_test as wf_reopen,
handle_remediation_completed as wf_handle_remediation, handle_remediation_completed as wf_handle_remediation,
get_retest_chain as wf_get_retest_chain, get_retest_chain as wf_get_retest_chain,
pause_timer as wf_pause_timer,
resume_timer as wf_resume_timer,
) )
router = APIRouter(prefix="/tests", tags=["tests"]) router = APIRouter(prefix="/tests", tags=["tests"])
@@ -473,6 +475,42 @@ def submit_blue(
return test return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/pause-timer — pause the active phase timer
# ---------------------------------------------------------------------------
@router.post("/{test_id}/pause-timer", response_model=TestOut)
def pause_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Pause the running timer for the current phase (red_executing or blue_evaluating)."""
test = _get_test_or_404(db, test_id)
test = wf_pause_timer(db, test, current_user)
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/resume-timer — resume a paused phase timer
# ---------------------------------------------------------------------------
@router.post("/{test_id}/resume-timer", response_model=TestOut)
def resume_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Resume the paused timer for the current phase."""
test = _get_test_or_404(db, test_id)
test = wf_resume_timer(db, test, current_user)
db.refresh(test)
return test
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# POST /tests/{id}/validate-red — Red Lead validates # POST /tests/{id}/validate-red — Red Lead validates
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+7
View File
@@ -137,6 +137,13 @@ class TestOut(BaseModel):
blue_validation_status: str | None = None blue_validation_status: str | None = None
blue_validation_notes: str | None = None blue_validation_notes: str | None = None
# Phase timing fields (for Tempo worklogs)
red_started_at: datetime | None = None
blue_started_at: datetime | None = None
paused_at: datetime | None = None
red_paused_seconds: int = 0
blue_paused_seconds: int = 0
# Remediation fields # Remediation fields
remediation_steps: str | None = None remediation_steps: str | None = None
remediation_status: str | None = None remediation_status: str | None = None
+93
View File
@@ -0,0 +1,93 @@
"""Report engine — renders Jinja2 HTML templates to PDF, DOCX, and HTML.
Uses WeasyPrint for PDF generation and docxtpl for DOCX.
"""
import os
import uuid
import logging
from datetime import datetime
from jinja2 import Environment, FileSystemLoader
from app.config import settings
logger = logging.getLogger(__name__)
class ReportEngine:
"""Template-based report generator supporting PDF, DOCX, and HTML output."""
def __init__(self) -> None:
self.jinja_env = Environment(
loader=FileSystemLoader(settings.REPORT_TEMPLATES_DIR),
autoescape=True,
)
os.makedirs(settings.REPORT_OUTPUT_DIR, exist_ok=True)
def render_html(self, template_name: str, context: dict) -> str:
"""Render a Jinja2 template to an HTML string."""
template = self.jinja_env.get_template(f"{template_name}.html")
context.setdefault("company_name", settings.COMPANY_NAME)
context.setdefault("generated_at", datetime.utcnow().strftime("%B %d, %Y %H:%M UTC"))
return template.render(context)
def generate_pdf(self, template_name: str, context: dict) -> str:
"""Render HTML and convert to PDF with WeasyPrint."""
from weasyprint import HTML, CSS
html_content = self.render_html(template_name, context)
css_path = os.path.join(settings.REPORT_TEMPLATES_DIR, "styles", "report.css")
output_path = os.path.join(
settings.REPORT_OUTPUT_DIR,
f"{template_name}_{uuid.uuid4().hex[:8]}.pdf",
)
stylesheets = []
if os.path.exists(css_path):
stylesheets.append(CSS(filename=css_path))
HTML(
string=html_content,
base_url=settings.REPORT_TEMPLATES_DIR,
).write_pdf(output_path, stylesheets=stylesheets)
logger.info("PDF generated: %s", output_path)
return output_path
def generate_docx(self, template_name: str, context: dict) -> str:
"""Render a .docx template with docxtpl."""
from docxtpl import DocxTemplate
template_path = os.path.join(
settings.REPORT_TEMPLATES_DIR, f"{template_name}.docx"
)
output_path = os.path.join(
settings.REPORT_OUTPUT_DIR,
f"{template_name}_{uuid.uuid4().hex[:8]}.docx",
)
doc = DocxTemplate(template_path)
context.setdefault("company_name", settings.COMPANY_NAME)
context.setdefault("generated_at", datetime.utcnow().strftime("%B %d, %Y"))
doc.render(context)
doc.save(output_path)
logger.info("DOCX generated: %s", output_path)
return output_path
def generate_html_file(self, template_name: str, context: dict) -> str:
"""Render and save a standalone HTML report."""
html_content = self.render_html(template_name, context)
output_path = os.path.join(
settings.REPORT_OUTPUT_DIR,
f"{template_name}_{uuid.uuid4().hex[:8]}.html",
)
with open(output_path, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info("HTML report generated: %s", output_path)
return output_path
report_engine = ReportEngine()
@@ -0,0 +1,250 @@
"""High-level report generation — collects domain data and delegates to ReportEngine."""
import logging
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from app.domain.exceptions import EntityNotFoundError
from app.models.campaign import Campaign, CampaignTest
from app.models.coverage_snapshot import CoverageSnapshot
from app.models.technique import Technique
from app.models.test import Test
from app.models.threat_actor import ThreatActor
from app.services.report_engine import report_engine
logger = logging.getLogger(__name__)
def generate_purple_campaign_report(
db: Session,
campaign_id: str,
output_format: str = "pdf",
) -> str:
"""Generate the full Purple Team campaign report."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if not campaign:
raise EntityNotFoundError("Campaign", campaign_id)
campaign_tests = (
db.query(Test)
.join(CampaignTest, CampaignTest.test_id == Test.id)
.filter(CampaignTest.campaign_id == campaign_id)
.all()
)
tests_data = []
for test in campaign_tests:
technique = db.query(Technique).filter(Technique.id == test.technique_id).first()
tests_data.append({
"technique_mitre_id": technique.mitre_id if technique else "N/A",
"name": test.name,
"tactic": technique.tactic if technique else "N/A",
"state": test.state.value if test.state else "draft",
"detection_result": (
test.detection_result.value if test.detection_result else "pending"
),
})
validated = [t for t in campaign_tests if t.state and t.state.value == "validated"]
detected = [
t for t in validated
if t.detection_result and t.detection_result.value == "detected"
]
not_detected = [
t for t in validated
if t.detection_result and t.detection_result.value == "not_detected"
]
critical_findings = [
{
"technique_id": t["technique_mitre_id"],
"name": t["name"],
"severity": "critical",
"description": "Technique was not detected during campaign execution.",
"recommendation": "Implement detection rule or review existing SIEM/EDR configuration.",
}
for t in tests_data
if t["detection_result"] == "not_detected"
]
org_score = _safe_org_score(db)
threat_actors = []
if campaign.threat_actor_id:
actor = db.query(ThreatActor).filter(ThreatActor.id == campaign.threat_actor_id).first()
if actor:
threat_actors = [{"name": actor.name}]
context = {
"campaign": campaign,
"tests": tests_data,
"tests_validated": len(validated),
"tests_detected": len(detected),
"tests_not_detected": len(not_detected),
"critical_findings": critical_findings,
"org_score": org_score.get("overall", 0),
"tactics": list({t["tactic"] for t in tests_data}),
"threat_actors": threat_actors,
}
return _generate(output_format, "purple_campaign", context)
def generate_coverage_report(
db: Session,
output_format: str = "pdf",
) -> str:
"""Generate an organization-wide MITRE ATT&CK coverage report."""
from sqlalchemy import func, case
org_score = _safe_org_score(db)
techniques = db.query(Technique).all()
status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0}
for t in techniques:
s = t.status_global.value if t.status_global else "not_evaluated"
if s in status_counts:
status_counts[s] += 1
summary = {
"total_techniques": len(techniques),
**status_counts,
}
# Coverage by tactic
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(case((Technique.status_global == "validated", 1), else_=0)).label("validated"),
)
.group_by(Technique.tactic)
.all()
)
tactics_coverage = [
{
"tactic": r[0] or "Unknown",
"total": r[1],
"validated": int(r[2]),
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
]
# Never-tested techniques
tested_ids = {t.technique_id for t in db.query(Test.technique_id).distinct().all()}
never_tested = [
{"mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic}
for t in techniques
if t.id not in tested_ids
]
context = {
"org_score": org_score,
"summary": summary,
"tactics_coverage": tactics_coverage,
"never_tested": never_tested[:50],
}
return _generate(output_format, "coverage_report", context)
def generate_executive_summary(
db: Session,
output_format: str = "pdf",
) -> str:
"""Generate an executive summary report."""
from sqlalchemy import func
org_score = _safe_org_score(db)
techniques = db.query(Technique).all()
status_counts = {"validated": 0, "partial": 0, "not_covered": 0, "in_progress": 0, "not_evaluated": 0}
for t in techniques:
s = t.status_global.value if t.status_global else "not_evaluated"
if s in status_counts:
status_counts[s] += 1
summary = {"total_techniques": len(techniques), **status_counts}
total_tests = db.query(func.count(Test.id)).scalar() or 0
active_campaigns = (
db.query(func.count(Campaign.id)).filter(Campaign.status == "active").scalar() or 0
)
quarter_ago = datetime.utcnow() - timedelta(days=90)
tests_this_quarter = (
db.query(func.count(Test.id)).filter(Test.created_at >= quarter_ago).scalar() or 0
)
open_remediations = (
db.query(func.count(Test.id))
.filter(Test.remediation_status.in_(["pending", "in_progress"]))
.scalar() or 0
)
# Detection rate among validated tests
validated_count = status_counts["validated"]
detected_count = (
db.query(func.count(Test.id))
.filter(Test.state == "validated", Test.detection_result == "detected")
.scalar() or 0
)
detection_rate = round((detected_count / validated_count) * 100, 1) if validated_count > 0 else 0
# Top gaps — lowest coverage tactics
from sqlalchemy import case as sql_case
tactic_rows = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(sql_case((Technique.status_global == "validated", 1), else_=0)).label("validated"),
)
.group_by(Technique.tactic)
.all()
)
tactic_coverage = [
{
"tactic": r[0] or "Unknown",
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in tactic_rows
]
top_gaps = sorted(tactic_coverage, key=lambda x: x["coverage_pct"])[:5]
context = {
"org_score": org_score,
"summary": summary,
"total_tests": total_tests,
"active_campaigns": active_campaigns,
"tests_this_quarter": tests_this_quarter,
"open_remediations": open_remediations,
"detection_rate": detection_rate,
"top_gaps": top_gaps,
}
return _generate(output_format, "executive_summary", context)
# ── Helpers ──────────────────────────────────────────────────────────
def _safe_org_score(db: Session) -> dict:
"""Safely call the scoring service; return empty dict on failure."""
try:
from app.services.scoring_service import calculate_organization_score
return calculate_organization_score(db)
except Exception as e:
logger.warning("Scoring service unavailable: %s", e)
return {"overall": 0, "coverage": 0, "detection_maturity": 0}
def _generate(output_format: str, template_name: str, context: dict) -> str:
"""Dispatch to the correct ReportEngine method."""
if output_format == "pdf":
return report_engine.generate_pdf(template_name, context)
elif output_format == "docx":
return report_engine.generate_docx(template_name, context)
else:
return report_engine.generate_html_file(template_name, context)
+20 -2
View File
@@ -89,8 +89,26 @@ def auto_log_test_worklog(
def _calculate_duration(test, activity_type: str) -> int: def _calculate_duration(test, activity_type: str) -> int:
"""Estimate duration in seconds based on test timestamps and activity type.""" """Calculate real duration in seconds from the phase timing fields.
Uses the actual start/end timestamps recorded by the workflow buttons,
so the data cannot be falsified.
"""
from datetime import datetime
now = datetime.utcnow()
if activity_type == "red_team_execution" and test.red_started_at:
delta = now - test.red_started_at
return max(int(delta.total_seconds()), 1)
if activity_type == "blue_team_evaluation" and test.blue_started_at:
delta = now - test.blue_started_at
return max(int(delta.total_seconds()), 1)
# Fallback for legacy activity types
if activity_type == "execution" and test.execution_date and test.created_at: if activity_type == "execution" and test.execution_date and test.created_at:
delta = test.execution_date - test.created_at delta = test.execution_date - test.created_at
return max(int(delta.total_seconds()), 0) return max(int(delta.total_seconds()), 0)
return 3600 # default 1 hour if no timestamps available
return 0
+172 -1
View File
@@ -113,12 +113,15 @@ def start_execution(db: Session, test: Test, user: User) -> Test:
"""Move from ``draft`` → ``red_executing``. """Move from ``draft`` → ``red_executing``.
Typically called by a **red_tech** when they begin the attack. Typically called by a **red_tech** when they begin the attack.
Starts the Red Team timer by recording ``red_started_at``.
""" """
now = datetime.utcnow()
test = transition_state( test = transition_state(
db, test, TestState.red_executing, user, db, test, TestState.red_executing, user,
action_name="start_execution", action_name="start_execution",
) )
test.execution_date = datetime.utcnow() test.execution_date = now
test.red_started_at = now
db.commit() db.commit()
return test return test
@@ -127,11 +130,37 @@ def submit_red_evidence(db: Session, test: Test, user: User) -> Test:
"""Move from ``red_executing`` → ``blue_evaluating``. """Move from ``red_executing`` → ``blue_evaluating``.
Called by **red_tech** once they have finished documenting the attack. Called by **red_tech** once they have finished documenting the attack.
Stops the Red Team timer and creates an automatic worklog.
Starts the Blue Team timer by recording ``blue_started_at``.
""" """
now = datetime.utcnow()
# Auto-resume if paused
paused_extra = 0
if test.paused_at is not None:
paused_extra = max(int((now - test.paused_at).total_seconds()), 0)
test.paused_at = None
test = transition_state( test = transition_state(
db, test, TestState.blue_evaluating, user, db, test, TestState.blue_evaluating, user,
action_name="submit_red_evidence", action_name="submit_red_evidence",
) )
# Create automatic worklog for Red Team phase (subtract paused time)
_create_phase_worklog(
db,
test=test,
user=user,
phase_started_at=test.red_started_at,
phase_ended_at=now,
paused_seconds=(test.red_paused_seconds or 0) + paused_extra,
activity_type="red_team_execution",
description=f"Red Team execution: {test.name}",
)
# Start Blue Team timer
test.blue_started_at = now
test.blue_paused_seconds = 0
db.commit() db.commit()
return test return test
@@ -140,15 +169,150 @@ def submit_blue_evidence(db: Session, test: Test, user: User) -> Test:
"""Move from ``blue_evaluating`` → ``in_review``. """Move from ``blue_evaluating`` → ``in_review``.
Called by **blue_tech** once they have finished documenting detection. Called by **blue_tech** once they have finished documenting detection.
Stops the Blue Team timer and creates an automatic worklog.
""" """
now = datetime.utcnow()
# Auto-resume if paused
paused_extra = 0
if test.paused_at is not None:
paused_extra = max(int((now - test.paused_at).total_seconds()), 0)
test.paused_at = None
test = transition_state( test = transition_state(
db, test, TestState.in_review, user, db, test, TestState.in_review, user,
action_name="submit_blue_evidence", action_name="submit_blue_evidence",
) )
# Create automatic worklog for Blue Team phase (subtract paused time)
_create_phase_worklog(
db,
test=test,
user=user,
phase_started_at=test.blue_started_at,
phase_ended_at=now,
paused_seconds=(test.blue_paused_seconds or 0) + paused_extra,
activity_type="blue_team_evaluation",
description=f"Blue Team evaluation: {test.name}",
)
db.commit() db.commit()
return test return test
def pause_timer(db: Session, test: Test, user: User) -> Test:
"""Pause the active phase timer.
Can only be called when the test is in ``red_executing`` or
``blue_evaluating`` and is not already paused.
"""
if test.state not in (TestState.red_executing, TestState.blue_evaluating):
raise InvalidOperationError(
f"Cannot pause timer in '{test.state.value}' state"
)
if test.paused_at is not None:
raise InvalidOperationError("Timer is already paused")
test.paused_at = datetime.utcnow()
log_action(
db,
user_id=user.id,
action="pause_timer",
entity_type="test",
entity_id=test.id,
details={"state": test.state.value},
)
db.commit()
return test
def resume_timer(db: Session, test: Test, user: User) -> Test:
"""Resume a paused phase timer.
Accumulates the paused duration into the appropriate counter so
it is subtracted from the final worklog.
"""
if test.paused_at is None:
raise InvalidOperationError("Timer is not paused")
now = datetime.utcnow()
paused_seconds = max(int((now - test.paused_at).total_seconds()), 0)
if test.state == TestState.red_executing:
test.red_paused_seconds = (test.red_paused_seconds or 0) + paused_seconds
elif test.state == TestState.blue_evaluating:
test.blue_paused_seconds = (test.blue_paused_seconds or 0) + paused_seconds
test.paused_at = None
log_action(
db,
user_id=user.id,
action="resume_timer",
entity_type="test",
entity_id=test.id,
details={"paused_seconds": paused_seconds, "state": test.state.value},
)
db.commit()
return test
def _create_phase_worklog(
db: Session,
*,
test: Test,
user: User,
phase_started_at: datetime | None,
phase_ended_at: datetime,
paused_seconds: int = 0,
activity_type: str,
description: str,
) -> None:
"""Create an automatic, integrity-hashed worklog for a completed phase.
Subtracts accumulated *paused_seconds* from the gross elapsed time
so the worklog reflects only active working time.
Also triggers Tempo sync if the test has a Jira link.
"""
if not phase_started_at:
logger.warning(
"No phase start timestamp for test %s (%s), skipping worklog",
test.id, activity_type,
)
return
gross_seconds = int((phase_ended_at - phase_started_at).total_seconds())
duration_seconds = max(gross_seconds - paused_seconds, 1)
try:
from app.services.worklog_service import create_worklog
wl = create_worklog(
db,
entity_type="test",
entity_id=test.id,
user_id=user.id,
activity_type=activity_type,
started_at=phase_started_at,
ended_at=phase_ended_at,
duration_seconds=duration_seconds,
description=description,
)
logger.info(
"Auto-worklog created for test %s: %s, %ds (worklog %s)",
test.id, activity_type, duration_seconds, wl.id,
)
# Sync to Tempo if enabled
try:
from app.services.tempo_service import auto_log_test_worklog
auto_log_test_worklog(db, test, user, activity_type)
except Exception as e:
logger.warning("Tempo sync failed for worklog: %s", e, exc_info=True)
except Exception as e:
logger.error("Failed to create auto-worklog for test %s: %s", test.id, e, exc_info=True)
def validate_as_red_lead( def validate_as_red_lead(
db: Session, db: Session,
test: Test, test: Test,
@@ -428,5 +592,12 @@ def reopen_test(db: Session, test: Test, user: User) -> Test:
test.blue_validated_at = None test.blue_validated_at = None
test.blue_validation_notes = None test.blue_validation_notes = None
# Clear phase timing fields
test.red_started_at = None
test.blue_started_at = None
test.paused_at = None
test.red_paused_seconds = 0
test.blue_paused_seconds = 0
db.commit() db.commit()
return test return test
@@ -0,0 +1,4 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 200 60" width="200" height="60">
<rect width="200" height="60" rx="8" fill="#0e7490"/>
<text x="100" y="38" fill="white" font-family="Arial,sans-serif" font-size="28" font-weight="bold" text-anchor="middle">AEGIS</text>
</svg>

After

Width:  |  Height:  |  Size: 284 B

@@ -0,0 +1,119 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="styles/report.css">
<title>Coverage Report — {{ company_name }}</title>
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<h1>MITRE ATT&amp;CK Coverage Report</h1>
<h2>{{ company_name }}</h2>
<p class="date">{{ generated_at }}</p>
<p class="classification">{{ classification | default('INTERNAL') }}</p>
</section>
<section>
<h2>1. Organization Score</h2>
<div class="stats-grid">
<div class="stat">
<span class="number">{{ org_score.overall | default(0) }}%</span>
<span class="label">Overall Score</span>
</div>
<div class="stat">
<span class="number">{{ org_score.coverage | default(0) }}%</span>
<span class="label">Coverage</span>
</div>
<div class="stat">
<span class="number">{{ org_score.detection_maturity | default(0) }}%</span>
<span class="label">Detection Maturity</span>
</div>
</div>
</section>
<section>
<h2>2. Coverage Summary</h2>
<div class="metric-cards">
<div class="metric-card">
<div class="value">{{ summary.total_techniques }}</div>
<div class="label">Total Techniques</div>
</div>
<div class="metric-card">
<div class="value">{{ summary.validated }}</div>
<div class="label">Validated</div>
</div>
<div class="metric-card">
<div class="value">{{ summary.partial }}</div>
<div class="label">Partial</div>
</div>
<div class="metric-card">
<div class="value">{{ summary.not_covered }}</div>
<div class="label">Not Covered</div>
</div>
<div class="metric-card">
<div class="value">{{ summary.in_progress }}</div>
<div class="label">In Progress</div>
</div>
<div class="metric-card">
<div class="value">{{ summary.not_evaluated }}</div>
<div class="label">Not Evaluated</div>
</div>
</div>
</section>
<section>
<h2>3. Coverage by Tactic</h2>
<table class="data-table">
<thead>
<tr>
<th>Tactic</th>
<th>Total</th>
<th>Validated</th>
<th>Coverage %</th>
</tr>
</thead>
<tbody>
{% for tactic in tactics_coverage %}
<tr>
<td>{{ tactic.tactic }}</td>
<td>{{ tactic.total }}</td>
<td>{{ tactic.validated }}</td>
<td>{{ tactic.coverage_pct }}%</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
<section>
<h2>4. Never-Tested Techniques</h2>
{% if never_tested %}
<table class="data-table">
<thead>
<tr>
<th>MITRE ID</th>
<th>Name</th>
<th>Tactic</th>
</tr>
</thead>
<tbody>
{% for t in never_tested %}
<tr>
<td>{{ t.mitre_id }}</td>
<td>{{ t.name }}</td>
<td>{{ t.tactic }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>All techniques have been tested at least once.</p>
{% endif %}
</section>
<footer>
<p>{{ company_name }} — Confidential</p>
</footer>
</body>
</html>
@@ -0,0 +1,74 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="styles/report.css">
<title>Executive Summary — {{ company_name }}</title>
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<h1>Executive Security Summary</h1>
<h2>{{ company_name }}</h2>
<p class="date">{{ generated_at }}</p>
</section>
<section>
<h2>Security Posture Overview</h2>
<div class="stats-grid">
<div class="stat">
<span class="number">{{ org_score.overall | default(0) }}%</span>
<span class="label">Overall Score</span>
</div>
<div class="stat">
<span class="number">{{ total_tests }}</span>
<span class="label">Tests Conducted</span>
</div>
<div class="stat">
<span class="number">{{ active_campaigns }}</span>
<span class="label">Active Campaigns</span>
</div>
</div>
</section>
<section>
<h2>Key Metrics</h2>
<table class="data-table">
<thead>
<tr><th>Metric</th><th>Value</th></tr>
</thead>
<tbody>
<tr><td>Techniques validated</td><td>{{ summary.validated }} / {{ summary.total_techniques }}</td></tr>
<tr><td>Detection rate</td><td>{{ detection_rate }}%</td></tr>
<tr><td>Tests this quarter</td><td>{{ tests_this_quarter }}</td></tr>
<tr><td>Open remediations</td><td>{{ open_remediations }}</td></tr>
</tbody>
</table>
</section>
<section>
<h2>Top Gaps</h2>
{% if top_gaps %}
<table class="data-table">
<thead>
<tr><th>Tactic</th><th>Coverage</th></tr>
</thead>
<tbody>
{% for gap in top_gaps %}
<tr>
<td>{{ gap.tactic }}</td>
<td>{{ gap.coverage_pct }}%</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No significant gaps identified.</p>
{% endif %}
</section>
<footer>
<p>{{ company_name }} — Confidential</p>
</footer>
</body>
</html>
@@ -0,0 +1,130 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="styles/report.css">
<title>Purple Team Assessment Report — {{ campaign.name }}</title>
</head>
<body>
<section class="cover-page">
<img src="assets/logo.png" class="logo" alt="Logo">
<h1>Purple Team Assessment Report</h1>
<h2>{{ campaign.name }}</h2>
<p class="date">{{ generated_at }}</p>
<p class="classification">{{ classification | default('INTERNAL') }}</p>
</section>
<section class="toc">
<h2>Table of Contents</h2>
<ul>
<li>1. Executive Summary</li>
<li>2. Scope &amp; Methodology</li>
<li>3. Techniques Tested</li>
<li>4. Critical Findings</li>
<li>5. Coverage Evolution</li>
<li>6. Recommendations</li>
</ul>
</section>
<section>
<h2>1. Executive Summary</h2>
<p>Campaign <strong>{{ campaign.name }}</strong> tested
{{ tests | length }} techniques across {{ tactics | length }} tactics.
Overall organization coverage score: <strong>{{ org_score }}%</strong>.</p>
<div class="stats-grid">
<div class="stat">
<span class="number">{{ tests_validated }}</span>
<span class="label">Validated</span>
</div>
<div class="stat">
<span class="number">{{ tests_detected }}</span>
<span class="label">Detected</span>
</div>
<div class="stat">
<span class="number">{{ tests_not_detected }}</span>
<span class="label">Not Detected</span>
</div>
</div>
</section>
<section>
<h2>2. Scope &amp; Methodology</h2>
<p>{{ campaign.description or 'No description provided.' }}</p>
{% if campaign.scheduled_at and campaign.completed_at %}
<p>Period: {{ campaign.scheduled_at }} — {{ campaign.completed_at }}</p>
{% endif %}
{% if threat_actors %}
<p>Threat actors modeled:
{% for actor in threat_actors %}{{ actor.name }}{% if not loop.last %}, {% endif %}{% endfor %}
</p>
{% endif %}
</section>
<section>
<h2>3. Techniques Tested</h2>
<table class="data-table">
<thead>
<tr>
<th>MITRE ID</th>
<th>Name</th>
<th>Tactic</th>
<th>State</th>
<th>Detection</th>
</tr>
</thead>
<tbody>
{% for test in tests %}
<tr class="result-{{ test.detection_result }}">
<td>{{ test.technique_mitre_id }}</td>
<td>{{ test.name }}</td>
<td>{{ test.tactic }}</td>
<td>{{ test.state }}</td>
<td>{{ test.detection_result }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
<section>
<h2>4. Critical Findings</h2>
{% if critical_findings %}
{% for finding in critical_findings %}
<div class="finding {{ finding.severity }}">
<h3>{{ finding.technique_id }}: {{ finding.name }}</h3>
<p>{{ finding.description }}</p>
<p><strong>Recommendation:</strong> {{ finding.recommendation }}</p>
</div>
{% endfor %}
{% else %}
<p>No critical findings — all tested techniques were detected.</p>
{% endif %}
</section>
<section>
<h2>5. Coverage Evolution</h2>
{% if previous_campaign %}
<p>Compared to previous campaign (<em>{{ previous_campaign.name }}</em>):
Coverage changed from {{ previous_score }}% to {{ org_score }}%.</p>
{% else %}
<p>This is the first campaign run — no historical comparison available.</p>
{% endif %}
</section>
<section>
<h2>6. Recommendations</h2>
<ul>
{% for finding in critical_findings %}
<li><strong>{{ finding.technique_id }}</strong>: {{ finding.recommendation }}</li>
{% endfor %}
{% if not critical_findings %}
<li>Continue periodic purple team exercises to maintain coverage.</li>
{% endif %}
</ul>
</section>
<footer>
<p>{{ company_name }} — Confidential</p>
</footer>
</body>
</html>
@@ -0,0 +1,238 @@
/* ── Aegis Professional Report CSS ─────────────────────────────── */
@page {
size: A4;
margin: 2cm 2.5cm;
@bottom-center {
content: "Page " counter(page) " of " counter(pages);
font-size: 9px;
color: #6b7280;
}
}
* { box-sizing: border-box; }
body {
font-family: "Segoe UI", -apple-system, "Helvetica Neue", Arial, sans-serif;
font-size: 11pt;
color: #1f2937;
line-height: 1.6;
}
/* ── Cover Page ─────────────────────────────────────────────────── */
.cover-page {
page-break-after: always;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 80vh;
text-align: center;
}
.cover-page .logo {
max-width: 180px;
margin-bottom: 3rem;
}
.cover-page h1 {
font-size: 28pt;
color: #0e7490;
font-weight: 700;
margin: 0 0 0.5rem;
}
.cover-page h2 {
font-size: 18pt;
color: #374151;
font-weight: 400;
margin: 0 0 2rem;
}
.cover-page .date {
font-size: 12pt;
color: #6b7280;
}
.cover-page .classification {
margin-top: 2rem;
padding: 0.4rem 1.5rem;
border: 2px solid #ef4444;
color: #ef4444;
font-weight: 700;
font-size: 10pt;
text-transform: uppercase;
letter-spacing: 0.1em;
}
/* ── Section headings ───────────────────────────────────────────── */
section {
page-break-inside: avoid;
margin-bottom: 1.5rem;
}
h2 {
font-size: 16pt;
color: #0e7490;
border-bottom: 2px solid #0e7490;
padding-bottom: 0.3rem;
margin-top: 2rem;
page-break-after: avoid;
}
h3 {
font-size: 13pt;
color: #1f2937;
margin-top: 1.2rem;
}
/* ── Stats grid ─────────────────────────────────────────────────── */
.stats-grid {
display: flex;
gap: 1.5rem;
margin: 1.5rem 0;
}
.stat {
flex: 1;
text-align: center;
padding: 1rem;
border: 1px solid #d1d5db;
border-radius: 8px;
background: #f9fafb;
}
.stat .number {
display: block;
font-size: 28pt;
font-weight: 700;
color: #0e7490;
}
.stat .label {
display: block;
font-size: 10pt;
color: #6b7280;
text-transform: uppercase;
letter-spacing: 0.05em;
}
/* ── Data table ─────────────────────────────────────────────────── */
.data-table {
width: 100%;
border-collapse: collapse;
margin: 1rem 0;
font-size: 10pt;
}
.data-table th {
background: #0e7490;
color: white;
padding: 0.5rem 0.75rem;
text-align: left;
font-weight: 600;
}
.data-table td {
padding: 0.4rem 0.75rem;
border-bottom: 1px solid #e5e7eb;
}
.data-table tbody tr:nth-child(even) {
background: #f9fafb;
}
/* ── Detection result row colors ───────────────────────────────── */
tr.result-detected td:last-child { color: #059669; font-weight: 600; }
tr.result-not_detected td:last-child { color: #dc2626; font-weight: 600; }
tr.result-partially_detected td:last-child { color: #d97706; font-weight: 600; }
tr.result-pending td:last-child { color: #6b7280; }
/* ── Findings ───────────────────────────────────────────────────── */
.finding {
padding: 1rem;
border-left: 4px solid #d1d5db;
margin-bottom: 1rem;
background: #f9fafb;
border-radius: 0 6px 6px 0;
}
.finding.critical { border-left-color: #dc2626; }
.finding.high { border-left-color: #ea580c; }
.finding.medium { border-left-color: #d97706; }
.finding.low { border-left-color: #059669; }
.finding h3 {
margin-top: 0;
color: #1f2937;
}
/* ── Footer ─────────────────────────────────────────────────────── */
footer {
position: fixed;
bottom: 0;
left: 0;
right: 0;
text-align: center;
font-size: 9pt;
color: #9ca3af;
border-top: 1px solid #e5e7eb;
padding-top: 0.5rem;
}
/* ── Table of Contents ──────────────────────────────────────────── */
.toc {
page-break-after: always;
}
.toc h2 {
border-bottom: none;
}
.toc ul {
list-style: none;
padding: 0;
}
.toc li {
padding: 0.3rem 0;
border-bottom: 1px dotted #d1d5db;
}
/* ── Metric cards (for coverage reports) ────────────────────────── */
.metric-cards {
display: flex;
flex-wrap: wrap;
gap: 1rem;
margin: 1rem 0;
}
.metric-card {
flex: 1 1 calc(33% - 1rem);
min-width: 140px;
padding: 1rem;
border: 1px solid #e5e7eb;
border-radius: 8px;
text-align: center;
}
.metric-card .value {
font-size: 24pt;
font-weight: 700;
color: #0e7490;
}
.metric-card .label {
font-size: 9pt;
color: #6b7280;
text-transform: uppercase;
}
+2
View File
@@ -19,6 +19,8 @@ slowapi
defusedxml defusedxml
redis>=5.0.0 redis>=5.0.0
atlassian-python-api>=4.0.0 atlassian-python-api>=4.0.0
weasyprint>=62.0
docxtpl>=0.18.0
# Testing # Testing
pytest pytest
+14
View File
@@ -141,6 +141,20 @@ export async function submitRedEvidence(testId: string): Promise<Test> {
return data; return data;
} }
// ── Timer Controls ─────────────────────────────────────────────────
/** Pause the active phase timer. */
export async function pauseTimer(testId: string): Promise<Test> {
const { data } = await client.post<Test>(`/tests/${testId}/pause-timer`);
return data;
}
/** Resume a paused phase timer. */
export async function resumeTimer(testId: string): Promise<Test> {
const { data } = await client.post<Test>(`/tests/${testId}/resume-timer`);
return data;
}
// ── Blue Team ────────────────────────────────────────────────────── // ── Blue Team ──────────────────────────────────────────────────────
/** Blue Team updates their fields (blue_evaluating only). */ /** Blue Team updates their fields (blue_evaluating only). */
@@ -0,0 +1,197 @@
import { useState, useMemo } from "react";
import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
import {
X,
Search,
Plus,
Loader2,
CheckCircle,
FlaskConical,
} from "lucide-react";
import { getTests } from "../api/tests";
import { addTestToCampaign } from "../api/campaigns";
import type { Test, TestState } from "../types/models";
const stateBadge: Record<TestState, string> = {
draft: "bg-gray-800/50 text-gray-400 border-gray-600/30",
red_executing: "bg-orange-900/50 text-orange-400 border-orange-500/30",
blue_evaluating: "bg-indigo-900/50 text-indigo-400 border-indigo-500/30",
in_review: "bg-blue-900/50 text-blue-400 border-blue-500/30",
validated: "bg-green-900/50 text-green-400 border-green-500/30",
rejected: "bg-red-900/50 text-red-400 border-red-500/30",
};
interface AddTestToCampaignModalProps {
campaignId: string;
existingTestIds: string[];
open: boolean;
onClose: () => void;
onSuccess: () => void;
}
export default function AddTestToCampaignModal({
campaignId,
existingTestIds,
open,
onClose,
onSuccess,
}: AddTestToCampaignModalProps) {
const queryClient = useQueryClient();
const [searchText, setSearchText] = useState("");
const [addedIds, setAddedIds] = useState<Set<string>>(new Set());
const { data: allTests, isLoading } = useQuery({
queryKey: ["tests", "for-campaign-picker"],
queryFn: () => getTests({ limit: 200 }),
enabled: open,
});
const filteredTests = useMemo(() => {
if (!allTests) return [];
const alreadyIn = new Set([...existingTestIds, ...addedIds]);
let results = allTests.filter((t) => !alreadyIn.has(t.id));
if (searchText.trim()) {
const q = searchText.toLowerCase();
results = results.filter(
(t) =>
t.name.toLowerCase().includes(q) ||
(t.technique_mitre_id && t.technique_mitre_id.toLowerCase().includes(q)) ||
(t.technique_name && t.technique_name.toLowerCase().includes(q))
);
}
return results;
}, [allTests, searchText, existingTestIds, addedIds]);
const addMutation = useMutation({
mutationFn: (testId: string) =>
addTestToCampaign(campaignId, { test_id: testId }),
onSuccess: (_data, testId) => {
setAddedIds((prev) => new Set(prev).add(testId));
queryClient.invalidateQueries({ queryKey: ["campaign", campaignId] });
onSuccess();
},
});
if (!open) return null;
return (
<div className="fixed inset-0 z-50 flex items-center justify-center bg-black/60 backdrop-blur-sm">
<div className="w-full max-w-2xl rounded-xl border border-gray-700 bg-gray-900 shadow-2xl">
{/* Header */}
<div className="flex items-center justify-between border-b border-gray-800 px-6 py-4">
<h2 className="text-lg font-semibold text-white">
Add Tests to Campaign
</h2>
<button
onClick={onClose}
className="rounded-lg p-1.5 text-gray-400 hover:bg-gray-800 hover:text-white transition-colors"
>
<X className="h-5 w-5" />
</button>
</div>
{/* Search */}
<div className="border-b border-gray-800 px-6 py-3">
<div className="relative">
<Search className="absolute left-3 top-1/2 h-4 w-4 -translate-y-1/2 text-gray-500" />
<input
type="text"
value={searchText}
onChange={(e) => setSearchText(e.target.value)}
placeholder="Search tests by name or technique..."
autoFocus
className="w-full rounded-lg border border-gray-700 bg-gray-800 pl-9 pr-3 py-2.5 text-sm text-gray-300 placeholder-gray-500 focus:border-cyan-500 focus:outline-none"
/>
</div>
</div>
{/* Test list */}
<div className="max-h-[400px] overflow-y-auto px-6 py-3">
{isLoading ? (
<div className="flex items-center justify-center py-12">
<Loader2 className="h-6 w-6 animate-spin text-cyan-400" />
</div>
) : filteredTests.length === 0 ? (
<div className="flex flex-col items-center justify-center py-12 text-gray-400">
<FlaskConical className="mb-2 h-8 w-8 text-gray-600" />
<p className="text-sm">
{searchText
? "No tests match your search."
: "All available tests are already in this campaign."}
</p>
</div>
) : (
<div className="space-y-1">
{filteredTests.map((test: Test) => (
<div
key={test.id}
className="flex items-center justify-between rounded-lg border border-gray-800 px-4 py-3 hover:border-gray-700 hover:bg-gray-800/50 transition-colors"
>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2">
<span className="text-sm font-medium text-gray-200 truncate">
{test.name}
</span>
<span
className={`inline-flex shrink-0 rounded-full border px-2 py-0.5 text-[10px] font-medium ${
stateBadge[test.state]
}`}
>
{test.state.replace(/_/g, " ")}
</span>
</div>
<div className="mt-0.5 flex items-center gap-3 text-xs text-gray-500">
{test.technique_mitre_id && (
<span className="font-mono text-cyan-400/70">
{test.technique_mitre_id}
</span>
)}
{test.technique_name && (
<span className="truncate">{test.technique_name}</span>
)}
{test.platform && (
<span className="capitalize">{test.platform}</span>
)}
</div>
</div>
<button
onClick={() => addMutation.mutate(test.id)}
disabled={addMutation.isPending && addMutation.variables === test.id}
className="ml-3 flex shrink-0 items-center gap-1.5 rounded-lg border border-cyan-500/30 bg-cyan-500/10 px-3 py-1.5 text-xs font-medium text-cyan-400 hover:bg-cyan-500/20 disabled:opacity-50 transition-colors"
>
{addMutation.isPending && addMutation.variables === test.id ? (
<Loader2 className="h-3.5 w-3.5 animate-spin" />
) : (
<Plus className="h-3.5 w-3.5" />
)}
Add
</button>
</div>
))}
</div>
)}
</div>
{/* Footer */}
<div className="flex items-center justify-between border-t border-gray-800 px-6 py-4">
<div className="text-xs text-gray-500">
{addedIds.size > 0 && (
<span className="flex items-center gap-1 text-green-400">
<CheckCircle className="h-3.5 w-3.5" />
{addedIds.size} test{addedIds.size !== 1 ? "s" : ""} added
</span>
)}
</div>
<button
onClick={onClose}
className="rounded-lg border border-gray-700 bg-gray-800 px-4 py-2 text-sm font-medium text-gray-300 hover:bg-gray-700 transition-colors"
>
Done
</button>
</div>
</div>
</div>
);
}
@@ -0,0 +1,108 @@
import { useState, useEffect } from "react";
import { Timer, Pause, Play } from "lucide-react";
interface LiveTimerProps {
startedAt: string;
pausedAt: string | null;
pausedSeconds: number;
label: string;
variant: "red" | "blue";
onPause: () => void;
onResume: () => void;
canControl: boolean;
isToggling: boolean;
}
/**
* Real-time elapsed timer that counts up from a given start timestamp,
* subtracting accumulated pause time. Shows pause/resume controls.
*/
export default function LiveTimer({
startedAt,
pausedAt,
pausedSeconds,
label,
variant,
onPause,
onResume,
canControl,
isToggling,
}: LiveTimerProps) {
const [elapsed, setElapsed] = useState(0);
const isPaused = pausedAt !== null;
useEffect(() => {
const start = new Date(startedAt).getTime();
const tick = () => {
const now = Date.now();
const grossSeconds = Math.floor((now - start) / 1000);
let totalPaused = pausedSeconds;
if (isPaused) {
const pauseStart = new Date(pausedAt!).getTime();
totalPaused += Math.floor((now - pauseStart) / 1000);
}
setElapsed(Math.max(0, grossSeconds - totalPaused));
};
tick();
if (!isPaused) {
const interval = setInterval(tick, 1000);
return () => clearInterval(interval);
}
}, [startedAt, pausedAt, pausedSeconds, isPaused]);
const hours = Math.floor(elapsed / 3600);
const minutes = Math.floor((elapsed % 3600) / 60);
const seconds = elapsed % 60;
const pad = (n: number) => String(n).padStart(2, "0");
const colors = isPaused
? "border-yellow-500/40 bg-yellow-900/30 text-yellow-300"
: variant === "red"
? "border-orange-500/40 bg-orange-900/30 text-orange-300"
: "border-indigo-500/40 bg-indigo-900/30 text-indigo-300";
const dotColor = isPaused
? "bg-yellow-400"
: variant === "red"
? "bg-orange-400"
: "bg-indigo-400";
return (
<div className={`flex items-center gap-2.5 rounded-lg border px-3 py-2 ${colors}`}>
<div className="relative flex items-center">
<Timer className="h-4 w-4" />
<span
className={`absolute -right-0.5 -top-0.5 h-2 w-2 rounded-full ${dotColor} ${
isPaused ? "" : "animate-pulse"
}`}
/>
</div>
<div className="flex flex-col">
<span className="text-[10px] font-medium uppercase tracking-wider opacity-70">
{label}{isPaused ? " (Paused)" : ""}
</span>
<span className="font-mono text-sm font-bold tabular-nums">
{pad(hours)}:{pad(minutes)}:{pad(seconds)}
</span>
</div>
{canControl && (
<button
onClick={isPaused ? onResume : onPause}
disabled={isToggling}
className={`ml-1 rounded-md p-1.5 transition-colors disabled:opacity-50 ${
isPaused
? "bg-green-600/20 text-green-400 hover:bg-green-600/30"
: "bg-yellow-600/20 text-yellow-400 hover:bg-yellow-600/30"
}`}
title={isPaused ? "Resume timer" : "Pause timer"}
>
{isPaused ? <Play className="h-3.5 w-3.5" /> : <Pause className="h-3.5 w-3.5" />}
</button>
)}
</div>
);
}
@@ -10,6 +10,7 @@ import {
ShieldCheck, ShieldCheck,
} from "lucide-react"; } from "lucide-react";
import type { Test, TestState, User } from "../../types/models"; import type { Test, TestState, User } from "../../types/models";
import LiveTimer from "./LiveTimer";
// ── Progress steps ───────────────────────────────────────────────── // ── Progress steps ─────────────────────────────────────────────────
@@ -52,6 +53,9 @@ interface TestDetailHeaderProps {
onSubmitBlue: () => void; onSubmitBlue: () => void;
onOpenValidateModal: (side: "red" | "blue") => void; onOpenValidateModal: (side: "red" | "blue") => void;
onReopen: () => void; onReopen: () => void;
onPauseTimer: () => void;
onResumeTimer: () => void;
isTogglingTimer: boolean;
} }
// ── Component ────────────────────────────────────────────────────── // ── Component ──────────────────────────────────────────────────────
@@ -65,6 +69,9 @@ export default function TestDetailHeader({
onSubmitBlue, onSubmitBlue,
onOpenValidateModal, onOpenValidateModal,
onReopen, onReopen,
onPauseTimer,
onResumeTimer,
isTogglingTimer,
}: TestDetailHeaderProps) { }: TestDetailHeaderProps) {
const role = user?.role ?? ""; const role = user?.role ?? "";
const currentIdx = STATE_INDEX[test.state]; const currentIdx = STATE_INDEX[test.state];
@@ -235,6 +242,46 @@ export default function TestDetailHeader({
); );
}; };
// ── Live timer ───────────────────────────────────────────────────
const canControlTimer =
(test.state === "red_executing" && (role === "red_tech" || role === "admin")) ||
(test.state === "blue_evaluating" && (role === "blue_tech" || role === "admin"));
const renderLiveTimer = () => {
if (test.state === "red_executing" && test.red_started_at) {
return (
<LiveTimer
startedAt={test.red_started_at}
pausedAt={test.paused_at}
pausedSeconds={test.red_paused_seconds}
label="Red Team Timer"
variant="red"
onPause={onPauseTimer}
onResume={onResumeTimer}
canControl={canControlTimer}
isToggling={isTogglingTimer}
/>
);
}
if (test.state === "blue_evaluating" && test.blue_started_at) {
return (
<LiveTimer
startedAt={test.blue_started_at}
pausedAt={test.paused_at}
pausedSeconds={test.blue_paused_seconds}
label="Blue Team Timer"
variant="blue"
onPause={onPauseTimer}
onResume={onResumeTimer}
canControl={canControlTimer}
isToggling={isTogglingTimer}
/>
);
}
return null;
};
// ── Render ─────────────────────────────────────────────────────── // ── Render ───────────────────────────────────────────────────────
return ( return (
@@ -263,7 +310,10 @@ export default function TestDetailHeader({
</div> </div>
</div> </div>
{renderActions()} <div className="flex flex-col items-end gap-2">
{renderLiveTimer()}
{renderActions()}
</div>
</div> </div>
{/* Progress bar */} {/* Progress bar */}
+12 -1
View File
@@ -30,6 +30,7 @@ import { useAuth } from "../context/AuthContext";
import CampaignTimeline from "../components/CampaignTimeline"; import CampaignTimeline from "../components/CampaignTimeline";
import JiraLinkPanel from "../components/JiraLinkPanel"; import JiraLinkPanel from "../components/JiraLinkPanel";
import WorklogTimeline from "../components/WorklogTimeline"; import WorklogTimeline from "../components/WorklogTimeline";
import AddTestToCampaignModal from "../components/AddTestToCampaignModal";
const statusColors: Record<string, string> = { const statusColors: Record<string, string> = {
draft: "bg-gray-800/50 text-gray-400 border-gray-600/30", draft: "bg-gray-800/50 text-gray-400 border-gray-600/30",
@@ -61,6 +62,7 @@ export default function CampaignDetailPage() {
const { user } = useAuth(); const { user } = useAuth();
const [toast, setToast] = useState<{ message: string; type: "success" | "error" } | null>(null); const [toast, setToast] = useState<{ message: string; type: "success" | "error" } | null>(null);
const [showAddTestModal, setShowAddTestModal] = useState(false);
const showToast = (message: string, type: "success" | "error") => { const showToast = (message: string, type: "success" | "error") => {
setToast({ message, type }); setToast({ message, type });
@@ -500,7 +502,7 @@ export default function CampaignDetailPage() {
</h2> </h2>
{canManage && campaign.status === "draft" && ( {canManage && campaign.status === "draft" && (
<button <button
onClick={() => navigate(`/tests?campaign=${campaignId}`)} onClick={() => setShowAddTestModal(true)}
className="flex items-center gap-1.5 rounded-lg border border-cyan-500/30 bg-cyan-500/10 px-3 py-1.5 text-sm font-medium text-cyan-400 hover:bg-cyan-500/20 transition-colors" className="flex items-center gap-1.5 rounded-lg border border-cyan-500/30 bg-cyan-500/10 px-3 py-1.5 text-sm font-medium text-cyan-400 hover:bg-cyan-500/20 transition-colors"
> >
<Plus className="h-4 w-4" /> <Plus className="h-4 w-4" />
@@ -606,6 +608,15 @@ export default function CampaignDetailPage() {
<WorklogTimeline entityType="campaign" entityId={campaignId!} /> <WorklogTimeline entityType="campaign" entityId={campaignId!} />
</div> </div>
{/* Add Test to Campaign Modal */}
<AddTestToCampaignModal
campaignId={campaignId!}
existingTestIds={campaign.tests.map((ct) => ct.test_id)}
open={showAddTestModal}
onClose={() => setShowAddTestModal(false)}
onSuccess={() => showToast("Test added to campaign", "success")}
/>
{/* Toast notification */} {/* Toast notification */}
{toast && ( {toast && (
<div <div
+24
View File
@@ -13,6 +13,8 @@ import {
validateAsRedLead, validateAsRedLead,
validateAsBlueLead, validateAsBlueLead,
reopenTest, reopenTest,
pauseTimer,
resumeTimer,
getTestTimeline, getTestTimeline,
getRetestChain, getRetestChain,
} from "../api/tests"; } from "../api/tests";
@@ -223,6 +225,25 @@ export default function TestDetailPage() {
}, },
}); });
// Timer pause/resume
const pauseTimerMutation = useMutation({
mutationFn: () => pauseTimer(testId!),
onSuccess: () => {
invalidateAll();
showToast("Timer paused", "success");
},
onError: (err: unknown) => showToast(extractError(err), "error"),
});
const resumeTimerMutation = useMutation({
mutationFn: () => resumeTimer(testId!),
onSuccess: () => {
invalidateAll();
showToast("Timer resumed", "success");
},
onError: (err: unknown) => showToast(extractError(err), "error"),
});
// Evidence upload // Evidence upload
const uploadMutation = useMutation({ const uploadMutation = useMutation({
mutationFn: ({ file, team }: { file: File; team: TeamSide }) => mutationFn: ({ file, team }: { file: File; team: TeamSide }) =>
@@ -351,6 +372,9 @@ export default function TestDetailPage() {
onSubmitBlue={() => submitBlueMutation.mutate()} onSubmitBlue={() => submitBlueMutation.mutate()}
onOpenValidateModal={(side) => setValidationModal({ open: true, side })} onOpenValidateModal={(side) => setValidationModal({ open: true, side })}
onReopen={() => setConfirmReopen(true)} onReopen={() => setConfirmReopen(true)}
onPauseTimer={() => pauseTimerMutation.mutate()}
onResumeTimer={() => resumeTimerMutation.mutate()}
isTogglingTimer={pauseTimerMutation.isPending || resumeTimerMutation.isPending}
/> />
{/* Content: Tabs + Sidebar */} {/* Content: Tabs + Sidebar */}
+7
View File
@@ -86,6 +86,13 @@ export interface Test {
blue_validation_status: ValidationStatus | null; blue_validation_status: ValidationStatus | null;
blue_validation_notes: string | null; blue_validation_notes: string | null;
// Phase timing fields (for automatic Tempo worklogs)
red_started_at: string | null;
blue_started_at: string | null;
paused_at: string | null;
red_paused_seconds: number;
blue_paused_seconds: number;
// Remediation fields // Remediation fields
remediation_steps: string | null; remediation_steps: string | null;
remediation_status: string | null; remediation_status: string | null;