Files
Aegis/backend/app/routers/reports.py
T
kitos 394d5d9056 refactor(types): add comprehensive type annotations across backend Python codebase
Enable ANN rules in ruff.toml (flake8-annotations) and resolve all 221 violations:

ANN201/ANN202 — return types on 168 public/private functions:
- All 28 FastAPI routers: endpoints annotated with dict/list/specific schema/
  StreamingResponse/FileResponse/JSONResponse as appropriate
- main.py: lifespan→AsyncGenerator[None,None], exception handlers→JSONResponse
- database.py: get_db→Generator[Session,None,None], proxy methods→correct types
- middleware/request_context.py: dispatch→Response with Callable call_next type

ANN001/ANN002/ANN003 — 32 missing argument types:
- seed_demo.py: all db parameters typed as Session
- domain/unit_of_work.py: __aexit__ exc_type/exc_val/exc_tb typed with TracebackType
- services: audit_service user_id→UUID|None, heatmap_service query/model/builder,
  notification_service test→Test, tempo_service test→Test/user→User,
  test_workflow_service test_id→UUID, campaign_crud **fields→object,
  test_crud **fields→object (4 sites)

ANN401 — 16 Any usages resolved:
- Domain entities (campaign/technique/threat_actor/test_entity): replaced Any with
  actual ORM types via TYPE_CHECKING guards to avoid circular imports
- detection_rule_service: test_id/detection_rule_id/evaluator_id→UUID
- score_cache: kept Any with # noqa: ANN401 (genuinely generic cache)
- jira_service/tempo_service: kept Any with # noqa: ANN401 (lazy optional deps)
- d3fend_import_service: _to_str(v: Any) kept with # noqa: ANN401

ANN204/ANN205/ANN206 — special/static/class methods:
- database.py proxy __call__/__getattr__: *args: object/**kwargs: object
- schemas/test.py model_validate: obj→object, **kwargs→object
- sa_technique_repository._int_type→type

All 439 unit tests pass. ruff check app/ → All checks passed!

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-09 17:04:51 +02:00

90 lines
3.0 KiB
Python

"""Reports endpoints — export coverage summaries and test results.
Thin HTTP adapter: delegates all data logic to coverage_report_service.
Endpoints
---------
GET /reports/coverage-summary — full coverage JSON report
GET /reports/coverage-csv — CSV export of coverage
GET /reports/test-results — test results report (JSON)
GET /reports/remediation-status — remediation status report (JSON)
"""
import csv
import io
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.user import User
from app.services.coverage_report_service import (
build_coverage_csv_rows,
build_coverage_summary,
build_remediation_status_report,
build_test_results_report,
)
router = APIRouter(prefix="/reports", tags=["reports"])
@router.get("/coverage-summary")
def coverage_summary(
tactic: Optional[str] = Query(None, description="Filter by tactic"),
platform: Optional[str] = Query(None, description="Filter by platform (in techniques)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""Full coverage report as JSON — technique-by-technique with test counts."""
return build_coverage_summary(db, tactic=tactic, platform=platform)
@router.get("/coverage-csv")
def coverage_csv(
tactic: Optional[str] = Query(None),
platform: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> StreamingResponse:
"""Export coverage as a downloadable CSV."""
rows = build_coverage_csv_rows(db, tactic=tactic, platform=platform)
output = io.StringIO()
writer = csv.writer(output)
for row in rows:
writer.writerow(row)
output.seek(0)
filename = f"aegis_coverage_{datetime.utcnow().strftime('%Y%m%d')}.csv"
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@router.get("/test-results")
def test_results(
state: Optional[str] = Query(None),
date_from: Optional[str] = Query(None, description="ISO date string YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="ISO date string YYYY-MM-DD"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""Report of test results with optional filters."""
return build_test_results_report(db, state=state, date_from=date_from, date_to=date_to)
@router.get("/remediation-status")
def remediation_status(
status: Optional[str] = Query(None, description="Filter by remediation status"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""Report of remediation status across all tests."""
return build_remediation_status_report(db, status=status)