feat(phase-25): add detection rule associations, checklist UI and evaluation workflow (T-215, T-216)

This commit is contained in:
2026-02-09 16:44:35 +01:00
parent cd124b655b
commit f4de12d8ab
9 changed files with 970 additions and 0 deletions

View File

@@ -21,6 +21,7 @@ from app.routers import reports as reports_router
from app.routers import data_sources as data_sources_router
from app.routers import threat_actors as threat_actors_router
from app.routers import d3fend as d3fend_router
from app.routers import detection_rules as detection_rules_router
from app.storage import ensure_bucket_exists
from app.jobs.mitre_sync_job import start_scheduler, scheduler
@@ -66,6 +67,7 @@ app.include_router(reports_router.router, prefix="/api/v1")
app.include_router(data_sources_router.router, prefix="/api/v1")
app.include_router(threat_actors_router.router, prefix="/api/v1")
app.include_router(d3fend_router.router, prefix="/api/v1")
app.include_router(detection_rules_router.router, prefix="/api/v1")
@app.get("/health")

View File

@@ -11,6 +11,8 @@ from app.models.data_source import DataSource
from app.models.detection_rule import DetectionRule
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping
from app.models.test_template_detection_rule import TestTemplateDetectionRule
from app.models.test_detection_result import TestDetectionResult
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
__all__ = [
@@ -18,5 +20,6 @@ __all__ = [
"IntelItem", "AuditLog", "Notification", "DataSource",
"DetectionRule", "ThreatActor", "ThreatActorTechnique",
"DefensiveTechnique", "DefensiveTechniqueMapping",
"TestTemplateDetectionRule", "TestDetectionResult",
"TechniqueStatus", "TestState", "TestResult", "TeamSide",
]

View File

@@ -0,0 +1,55 @@
"""TestDetectionResult — tracks which detection rules triggered during a test.
When the Blue Team evaluates a test, they mark each associated detection
rule as triggered / not triggered / not applicable, along with notes.
"""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class TestDetectionResult(Base):
"""
Per-test, per-rule evaluation result.
- ``triggered`` = True: rule detected the attack
- ``triggered`` = False: rule did NOT detect the attack
- ``triggered`` = None: not yet evaluated
"""
__tablename__ = "test_detection_results"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
test_id = Column(
UUID(as_uuid=True),
ForeignKey("tests.id", ondelete="CASCADE"),
nullable=False,
)
detection_rule_id = Column(
UUID(as_uuid=True),
ForeignKey("detection_rules.id", ondelete="CASCADE"),
nullable=False,
)
triggered = Column(Boolean, nullable=True) # None = not evaluated
notes = Column(Text, nullable=True)
evaluated_by = Column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
)
evaluated_at = Column(DateTime, nullable=True)
# Relationships
test = relationship("Test")
detection_rule = relationship("DetectionRule")
evaluator = relationship("User", foreign_keys=[evaluated_by])
__table_args__ = (
Index('ix_tdr_test', 'test_id'),
Index('ix_tdr_rule', 'detection_rule_id'),
)

View File

@@ -0,0 +1,50 @@
"""TestTemplateDetectionRule — links test templates to detection rules.
Enables the Blue Team to see which detection rules should fire
for a given test template / attack procedure.
"""
import uuid
from datetime import datetime
from sqlalchemy import Column, Boolean, ForeignKey, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class TestTemplateDetectionRule(Base):
"""
Association between a test template and a detection rule.
Auto-generated by matching mitre_technique_id, or manually curated.
``is_primary`` marks rules with severity >= high as primary detections.
"""
__tablename__ = "test_template_detection_rules"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
test_template_id = Column(
UUID(as_uuid=True),
ForeignKey("test_templates.id", ondelete="CASCADE"),
nullable=True,
)
detection_rule_id = Column(
UUID(as_uuid=True),
ForeignKey("detection_rules.id", ondelete="CASCADE"),
nullable=False,
)
is_primary = Column(Boolean, default=False)
# Relationships
test_template = relationship("TestTemplate")
detection_rule = relationship("DetectionRule")
__table_args__ = (
Index('ix_ttdr_template', 'test_template_id'),
Index('ix_ttdr_rule', 'detection_rule_id'),
UniqueConstraint(
'test_template_id', 'detection_rule_id',
name='uq_template_detection_rule',
),
)

View File

@@ -0,0 +1,370 @@
"""Detection rules endpoints — listing, filtering, and template association.
Provides endpoints for browsing detection rules, querying rules by technique,
and managing the template ↔ detection rule associations.
"""
import logging
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role, require_any_role
from app.models.user import User
from app.models.detection_rule import DetectionRule
from app.models.test_template import TestTemplate
from app.models.test_template_detection_rule import TestTemplateDetectionRule
from app.models.test_detection_result import TestDetectionResult
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/detection-rules", tags=["detection-rules"])
# ---------------------------------------------------------------------------
# GET /detection-rules — List with filters
# ---------------------------------------------------------------------------
@router.get("")
def list_detection_rules(
technique: Optional[str] = Query(None, description="Filter by MITRE technique ID"),
source: Optional[str] = Query(None, description="Filter by source (sigma, elastic, splunk, custom)"),
severity: Optional[str] = Query(None),
search: Optional[str] = Query(None),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List detection rules with optional filters and pagination."""
query = db.query(DetectionRule).filter(DetectionRule.is_active == True) # noqa: E712
if technique:
query = query.filter(DetectionRule.mitre_technique_id == technique)
if source:
query = query.filter(DetectionRule.source == source)
if severity:
query = query.filter(DetectionRule.severity == severity)
if search:
pattern = f"%{search}%"
query = query.filter(
DetectionRule.title.ilike(pattern)
| DetectionRule.description.ilike(pattern)
)
total = query.count()
items = query.order_by(DetectionRule.mitre_technique_id, DetectionRule.title).offset(offset).limit(limit).all()
return {
"total": total,
"offset": offset,
"limit": limit,
"items": [
{
"id": str(r.id),
"mitre_technique_id": r.mitre_technique_id,
"title": r.title,
"description": r.description,
"source": r.source,
"source_url": r.source_url,
"rule_format": r.rule_format,
"severity": r.severity,
"platforms": r.platforms or [],
"log_sources": r.log_sources,
"is_active": r.is_active,
}
for r in items
],
}
# ---------------------------------------------------------------------------
# GET /test-templates/{id}/detection-rules — Rules for a template
# ---------------------------------------------------------------------------
@router.get("/for-template/{template_id}")
def get_detection_rules_for_template(
template_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get detection rules associated with a test template."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Test template not found")
associations = (
db.query(TestTemplateDetectionRule)
.filter(TestTemplateDetectionRule.test_template_id == template_id)
.all()
)
rules = []
for assoc in associations:
r = assoc.detection_rule
rules.append({
"id": str(r.id),
"mitre_technique_id": r.mitre_technique_id,
"title": r.title,
"description": r.description,
"source": r.source,
"source_url": r.source_url,
"rule_content": r.rule_content,
"rule_format": r.rule_format,
"severity": r.severity,
"platforms": r.platforms or [],
"log_sources": r.log_sources,
"is_primary": assoc.is_primary,
})
return {
"template_id": str(template.id),
"template_name": template.name,
"mitre_technique_id": template.mitre_technique_id,
"rules": rules,
"total": len(rules),
}
# ---------------------------------------------------------------------------
# POST /detection-rules/auto-associate — Auto-link templates ↔ rules
# ---------------------------------------------------------------------------
@router.post("/auto-associate")
def auto_associate_detection_rules(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Auto-associate test templates with detection rules by MITRE technique ID.
For each active template, find all active detection rules for the same
technique and create associations. Rules with severity >= high are marked
as primary.
"""
templates = db.query(TestTemplate).filter(TestTemplate.is_active == True).all() # noqa: E712
rules = db.query(DetectionRule).filter(DetectionRule.is_active == True).all() # noqa: E712
# Index rules by technique
rules_by_technique: dict[str, list] = {}
for rule in rules:
tid = rule.mitre_technique_id
if tid not in rules_by_technique:
rules_by_technique[tid] = []
rules_by_technique[tid].append(rule)
created = 0
skipped = 0
high_severities = {"high", "critical"}
for template in templates:
matching_rules = rules_by_technique.get(template.mitre_technique_id, [])
for rule in matching_rules:
# Check if association already exists
existing = (
db.query(TestTemplateDetectionRule)
.filter(
TestTemplateDetectionRule.test_template_id == template.id,
TestTemplateDetectionRule.detection_rule_id == rule.id,
)
.first()
)
if existing:
skipped += 1
continue
is_primary = (rule.severity or "").lower() in high_severities
assoc = TestTemplateDetectionRule(
test_template_id=template.id,
detection_rule_id=rule.id,
is_primary=is_primary,
)
db.add(assoc)
created += 1
db.commit()
total = db.query(TestTemplateDetectionRule).count()
return {
"created": created,
"skipped": skipped,
"total_associations": total,
}
# ---------------------------------------------------------------------------
# GET /detection-rules/for-test/{test_id} — Rules + results for a test
# ---------------------------------------------------------------------------
@router.get("/for-test/{test_id}")
def get_detection_rules_for_test(
test_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get detection rules relevant to a test, along with their evaluation results.
Finds rules by matching the test's technique_id to detection rules,
and returns any existing evaluation results.
"""
from app.models.test import Test
from app.models.technique import Technique
test = db.query(Test).filter(Test.id == test_id).first()
if not test:
raise HTTPException(status_code=404, detail="Test not found")
technique = db.query(Technique).filter(Technique.id == test.technique_id).first()
if not technique:
raise HTTPException(status_code=404, detail="Technique not found")
# Get detection rules for this technique
rules = (
db.query(DetectionRule)
.filter(
DetectionRule.mitre_technique_id == technique.mitre_id,
DetectionRule.is_active == True, # noqa: E712
)
.order_by(DetectionRule.severity.desc(), DetectionRule.title)
.all()
)
# Get existing results for this test
existing_results = (
db.query(TestDetectionResult)
.filter(TestDetectionResult.test_id == test_id)
.all()
)
results_map = {str(r.detection_rule_id): r for r in existing_results}
items = []
triggered_count = 0
evaluated_count = 0
for rule in rules:
result = results_map.get(str(rule.id))
triggered = result.triggered if result else None
notes = result.notes if result else None
evaluated_at = result.evaluated_at.isoformat() if result and result.evaluated_at else None
if triggered is not None:
evaluated_count += 1
if triggered:
triggered_count += 1
items.append({
"id": str(rule.id),
"mitre_technique_id": rule.mitre_technique_id,
"title": rule.title,
"description": rule.description,
"source": rule.source,
"source_url": rule.source_url,
"rule_content": rule.rule_content,
"rule_format": rule.rule_format,
"severity": rule.severity,
"platforms": rule.platforms or [],
"log_sources": rule.log_sources,
"triggered": triggered,
"notes": notes,
"evaluated_at": evaluated_at,
"result_id": str(result.id) if result else None,
})
return {
"test_id": str(test.id),
"mitre_technique_id": technique.mitre_id,
"rules": items,
"total": len(items),
"evaluated": evaluated_count,
"triggered": triggered_count,
"detection_rate": round(triggered_count / evaluated_count * 100, 1) if evaluated_count > 0 else 0,
}
# ---------------------------------------------------------------------------
# POST /detection-rules/evaluate — Save detection result for a rule
# ---------------------------------------------------------------------------
@router.post("/evaluate")
def evaluate_detection_rule(
payload: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Save or update the evaluation result for a detection rule on a test.
Body:
{
"test_id": "...",
"detection_rule_id": "...",
"triggered": true | false | null,
"notes": "optional notes"
}
"""
test_id = payload.get("test_id")
detection_rule_id = payload.get("detection_rule_id")
triggered = payload.get("triggered")
notes = payload.get("notes")
if not test_id or not detection_rule_id:
raise HTTPException(status_code=400, detail="test_id and detection_rule_id are required")
# Check test exists
from app.models.test import Test
test = db.query(Test).filter(Test.id == test_id).first()
if not test:
raise HTTPException(status_code=404, detail="Test not found")
# Check rule exists
rule = db.query(DetectionRule).filter(DetectionRule.id == detection_rule_id).first()
if not rule:
raise HTTPException(status_code=404, detail="Detection rule not found")
# Upsert result
existing = (
db.query(TestDetectionResult)
.filter(
TestDetectionResult.test_id == test_id,
TestDetectionResult.detection_rule_id == detection_rule_id,
)
.first()
)
if existing:
existing.triggered = triggered
existing.notes = notes
existing.evaluated_by = current_user.id
existing.evaluated_at = datetime.utcnow()
db.commit()
db.refresh(existing)
return {
"id": str(existing.id),
"triggered": existing.triggered,
"notes": existing.notes,
"evaluated_at": existing.evaluated_at.isoformat() if existing.evaluated_at else None,
}
else:
result = TestDetectionResult(
test_id=test_id,
detection_rule_id=detection_rule_id,
triggered=triggered,
notes=notes,
evaluated_by=current_user.id,
evaluated_at=datetime.utcnow(),
)
db.add(result)
db.commit()
db.refresh(result)
return {
"id": str(result.id),
"triggered": result.triggered,
"notes": result.notes,
"evaluated_at": result.evaluated_at.isoformat() if result.evaluated_at else None,
}