"""Test template service — framework-agnostic CRUD and queries.""" from __future__ import annotations import uuid from sqlalchemy import func, or_ from sqlalchemy.orm import Session from app.domain.errors import EntityNotFoundError from app.models.test_template import TestTemplate from app.utils import escape_like def list_templates( db: Session, *, source: str | None = None, platform: str | None = None, severity: str | None = None, mitre_technique_id: str | None = None, search: str | None = None, is_active: bool | None = None, offset: int = 0, limit: int = 50, ) -> list: """Return paginated, filterable list of test templates.""" query = db.query(TestTemplate) if is_active is not None: query = query.filter(TestTemplate.is_active == is_active) if source: query = query.filter(TestTemplate.source == source) if platform: query = query.filter(TestTemplate.platform.ilike(f"%{escape_like(platform)}%")) if severity: query = query.filter(TestTemplate.severity == severity) if mitre_technique_id: query = query.filter(TestTemplate.mitre_technique_id == mitre_technique_id) if search: pattern = f"%{escape_like(search)}%" query = query.filter( or_( TestTemplate.name.ilike(pattern), TestTemplate.description.ilike(pattern), ) ) templates = ( query .order_by(TestTemplate.mitre_technique_id, TestTemplate.name) .offset(offset) .limit(limit) .all() ) return templates def get_template_stats(db: Session) -> dict: """Return catalog statistics: totals by source, platform, active/inactive.""" total = db.query(func.count(TestTemplate.id)).scalar() or 0 active = ( db.query(func.count(TestTemplate.id)) .filter(TestTemplate.is_active == True) # noqa: E712 .scalar() ) or 0 inactive = total - active source_rows = ( db.query(TestTemplate.source, func.count(TestTemplate.id)) .filter(TestTemplate.is_active == True) # noqa: E712 .group_by(TestTemplate.source) .all() ) by_source = {source: cnt for source, cnt in source_rows} platform_rows = ( db.query(TestTemplate.platform, func.count(TestTemplate.id)) .filter(TestTemplate.is_active == True) # noqa: E712 .group_by(TestTemplate.platform) .all() ) by_platform = {(platform or "unspecified"): cnt for platform, cnt in platform_rows} return { "total": total, "active": active, "inactive": inactive, "by_source": by_source, "by_platform": by_platform, } def bulk_activate(db: Session, *, activate: bool) -> int: """Set all templates to active or inactive. Returns count of affected. Does NOT commit.""" count = ( db.query(TestTemplate) .filter(TestTemplate.is_active != activate) .update({TestTemplate.is_active: activate}) ) return count def get_templates_by_technique(db: Session, mitre_id: str) -> list: """Return all active templates mapped to a specific MITRE technique.""" return ( db.query(TestTemplate) .filter( TestTemplate.mitre_technique_id == mitre_id, TestTemplate.is_active == True, # noqa: E712 ) .order_by(TestTemplate.name) .all() ) def get_template_or_raise(db: Session, template_id: uuid.UUID) -> TestTemplate: """Return a template by ID. Raises EntityNotFoundError if not found.""" template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first() if template is None: raise EntityNotFoundError("Test template", str(template_id)) return template def create_template(db: Session, **fields: object) -> TestTemplate: """Create a test template from keyword args (e.g. payload.model_dump()). Does NOT commit.""" template = TestTemplate(**fields) db.add(template) return template def update_template(db: Session, template_id: uuid.UUID, **fields: object) -> TestTemplate: """Update an existing template. Raises EntityNotFoundError if not found. Does NOT commit.""" template = get_template_or_raise(db, template_id) for field, value in fields.items(): if hasattr(template, field): setattr(template, field, value) return template def toggle_template_active(db: Session, template_id: uuid.UUID) -> TestTemplate: """Toggle template active/inactive. Does NOT commit.""" template = get_template_or_raise(db, template_id) template.is_active = not template.is_active return template def soft_delete_template(db: Session, template_id: uuid.UUID) -> None: """Soft-delete a template by setting is_active=False. Does NOT commit.""" template = get_template_or_raise(db, template_id) template.is_active = False