feat: extract advanced_metrics, analytics, test_templates, and auth to services (Tier 1 complete)

This commit is contained in:
2026-02-20 14:28:52 +01:00
parent bbc2dddd86
commit 9e22fde746
8 changed files with 579 additions and 422 deletions

View File

@@ -0,0 +1,150 @@
"""Test template service — framework-agnostic CRUD and queries."""
from __future__ import annotations
import uuid
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.domain.errors import EntityNotFoundError
from app.models.test_template import TestTemplate
from app.utils import escape_like
def list_templates(
db: Session,
*,
source: str | None = None,
platform: str | None = None,
severity: str | None = None,
mitre_technique_id: str | None = None,
search: str | None = None,
is_active: bool | None = None,
offset: int = 0,
limit: int = 50,
) -> list:
"""Return paginated, filterable list of test templates."""
query = db.query(TestTemplate)
if is_active is not None:
query = query.filter(TestTemplate.is_active == is_active)
if source:
query = query.filter(TestTemplate.source == source)
if platform:
query = query.filter(TestTemplate.platform.ilike(f"%{escape_like(platform)}%"))
if severity:
query = query.filter(TestTemplate.severity == severity)
if mitre_technique_id:
query = query.filter(TestTemplate.mitre_technique_id == mitre_technique_id)
if search:
pattern = f"%{escape_like(search)}%"
query = query.filter(
or_(
TestTemplate.name.ilike(pattern),
TestTemplate.description.ilike(pattern),
)
)
templates = (
query
.order_by(TestTemplate.mitre_technique_id, TestTemplate.name)
.offset(offset)
.limit(limit)
.all()
)
return templates
def get_template_stats(db: Session) -> dict:
"""Return catalog statistics: totals by source, platform, active/inactive."""
total = db.query(func.count(TestTemplate.id)).scalar() or 0
active = (
db.query(func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.scalar()
) or 0
inactive = total - active
source_rows = (
db.query(TestTemplate.source, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.source)
.all()
)
by_source = {source: cnt for source, cnt in source_rows}
platform_rows = (
db.query(TestTemplate.platform, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.platform)
.all()
)
by_platform = {(platform or "unspecified"): cnt for platform, cnt in platform_rows}
return {
"total": total,
"active": active,
"inactive": inactive,
"by_source": by_source,
"by_platform": by_platform,
}
def bulk_activate(db: Session, *, activate: bool) -> int:
"""Set all templates to active or inactive. Returns count of affected. Does NOT commit."""
count = (
db.query(TestTemplate)
.filter(TestTemplate.is_active != activate)
.update({TestTemplate.is_active: activate})
)
return count
def get_templates_by_technique(db: Session, mitre_id: str) -> list:
"""Return all active templates mapped to a specific MITRE technique."""
return (
db.query(TestTemplate)
.filter(
TestTemplate.mitre_technique_id == mitre_id,
TestTemplate.is_active == True, # noqa: E712
)
.order_by(TestTemplate.name)
.all()
)
def get_template_or_raise(db: Session, template_id: uuid.UUID) -> TestTemplate:
"""Return a template by ID. Raises EntityNotFoundError if not found."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise EntityNotFoundError("Test template", str(template_id))
return template
def create_template(db: Session, **fields: object) -> TestTemplate:
"""Create a test template from keyword args (e.g. payload.model_dump()). Does NOT commit."""
template = TestTemplate(**fields)
db.add(template)
return template
def update_template(db: Session, template_id: uuid.UUID, **fields: object) -> TestTemplate:
"""Update an existing template. Raises EntityNotFoundError if not found. Does NOT commit."""
template = get_template_or_raise(db, template_id)
for field, value in fields.items():
if hasattr(template, field):
setattr(template, field, value)
return template
def toggle_template_active(db: Session, template_id: uuid.UUID) -> TestTemplate:
"""Toggle template active/inactive. Does NOT commit."""
template = get_template_or_raise(db, template_id)
template.is_active = not template.is_active
return template
def soft_delete_template(db: Session, template_id: uuid.UUID) -> None:
"""Soft-delete a template by setting is_active=False. Does NOT commit."""
template = get_template_or_raise(db, template_id)
template.is_active = False