feat(review-queue): trigger review_required on new test templates
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Extends the review queue triggers to cover test template imports:
- atomic_import_service: flags techniques when new Atomic Red Team
  templates are imported
- caldera_import_service: same for Caldera templates
- lolbas_import_service: same for LOLBAS templates
- test_templates router (manual creation): flags the technique when
  an admin/lead creates a custom template via the API

Pattern is identical to the Sigma/Elastic detection rule approach:
collect new mitre_ids during the loop, bulk-update after commit.
Manual creation does a single technique lookup and sets the flag
inside the existing UnitOfWork.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-29 11:26:09 +02:00
parent 1b513b050e
commit d7d11dfdf5
4 changed files with 34 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ from sqlalchemy.orm import Session
from app.database import get_db from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role from app.dependencies.auth import get_current_user, require_any_role
from app.domain.unit_of_work import UnitOfWork from app.domain.unit_of_work import UnitOfWork
from app.models.technique import Technique
from app.models.user import User from app.models.user import User
from app.schemas.test_template import ( from app.schemas.test_template import (
TestTemplateCreate, TestTemplateCreate,
@@ -178,6 +179,15 @@ def create_template(
"""Create a custom test template.""" """Create a custom test template."""
template = create_template_svc(db, **payload.model_dump()) template = create_template_svc(db, **payload.model_dump())
with UnitOfWork(db) as uow: with UnitOfWork(db) as uow:
# Flag the associated technique for review — new template available
if template.mitre_technique_id:
technique = (
db.query(Technique)
.filter(Technique.mitre_id == template.mitre_technique_id)
.first()
)
if technique:
technique.review_required = True
log_action( log_action(
db, db,
user_id=current_user.id, user_id=current_user.id,

View File

@@ -35,6 +35,7 @@ import yaml
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate from app.models.test_template import TestTemplate
from app.models.technique import Technique
from app.services.audit_service import log_action from app.services.audit_service import log_action
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -218,6 +219,7 @@ def import_atomic_red_team(db: Session) -> dict:
created = 0 created = 0
skipped = 0 skipped = 0
new_technique_ids: set[str] = set()
for item in parsed_tests: for item in parsed_tests:
if item["atomic_test_id"] in existing_ids: if item["atomic_test_id"] in existing_ids:
@@ -238,8 +240,14 @@ def import_atomic_red_team(db: Session) -> dict:
) )
db.add(template) db.add(template)
existing_ids.add(item["atomic_test_id"]) existing_ids.add(item["atomic_test_id"])
new_technique_ids.add(item["technique_id"])
created += 1 created += 1
if new_technique_ids:
db.query(Technique).filter(
Technique.mitre_id.in_(new_technique_ids)
).update({"review_required": True}, synchronize_session=False)
db.commit() db.commit()
# Count distinct YAML files by technique_id # Count distinct YAML files by technique_id

View File

@@ -35,6 +35,7 @@ from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate from app.models.test_template import TestTemplate
from app.models.data_source import DataSource from app.models.data_source import DataSource
from app.models.technique import Technique
from app.services.audit_service import log_action from app.services.audit_service import log_action
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -237,6 +238,7 @@ def sync(db: Session) -> dict:
created = 0 created = 0
skipped = 0 skipped = 0
new_technique_ids: set[str] = set()
for item in parsed: for item in parsed:
if item["atomic_test_id"] in existing_ids: if item["atomic_test_id"] in existing_ids:
@@ -257,8 +259,14 @@ def sync(db: Session) -> dict:
) )
db.add(template) db.add(template)
existing_ids.add(item["atomic_test_id"]) existing_ids.add(item["atomic_test_id"])
new_technique_ids.add(item["mitre_technique_id"])
created += 1 created += 1
if new_technique_ids:
db.query(Technique).filter(
Technique.mitre_id.in_(new_technique_ids)
).update({"review_required": True}, synchronize_session=False)
db.commit() db.commit()
summary = { summary = {

View File

@@ -39,6 +39,7 @@ from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate from app.models.test_template import TestTemplate
from app.models.data_source import DataSource from app.models.data_source import DataSource
from app.models.technique import Technique
from app.services.audit_service import log_action from app.services.audit_service import log_action
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -295,6 +296,7 @@ def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict:
created = 0 created = 0
skipped = 0 skipped = 0
new_technique_ids: set[str] = set()
for item in items: for item in items:
if item["atomic_test_id"] in existing_ids: if item["atomic_test_id"] in existing_ids:
@@ -315,8 +317,14 @@ def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict:
) )
db.add(template) db.add(template)
existing_ids.add(item["atomic_test_id"]) existing_ids.add(item["atomic_test_id"])
new_technique_ids.add(item["mitre_technique_id"])
created += 1 created += 1
if new_technique_ids:
db.query(Technique).filter(
Technique.mitre_id.in_(new_technique_ids)
).update({"review_required": True}, synchronize_session=False)
db.commit() db.commit()
return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)} return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)}