refactor(status): consolidate status_service to delegate to TechniqueEntity.recalculate_status() eliminating duplicated business logic

This commit is contained in:
2026-02-19 15:23:01 +01:00
parent 2b6d9090c9
commit 42a9f4dcd4
2 changed files with 23 additions and 39 deletions

View File

@@ -79,11 +79,12 @@ class TechniqueEntity:
def from_orm(cls, model: Any) -> TechniqueEntity:
"""Build a TechniqueEntity from a SQLAlchemy Technique model."""
raw_status = model.status_global
status = (
raw_status
if isinstance(raw_status, TechniqueStatus)
else TechniqueStatus(raw_status)
)
if raw_status is None:
status = TechniqueStatus.not_evaluated
elif isinstance(raw_status, TechniqueStatus):
status = raw_status
else:
status = TechniqueStatus(raw_status)
return cls(
id=model.id,
mitre_id=model.mitre_id,

View File

@@ -1,47 +1,30 @@
"""Service for recalculating the global status of a Technique
based on the state and result of its associated tests.
"""Service for recalculating the global status of a Technique.
V2 rules account for dual Red/Blue validation and use
``detection_result`` (filled by Blue Team) instead of the legacy
``result`` field.
Delegates entirely to :meth:`TechniqueEntity.recalculate_status`
so that the business rules live in a single place (the domain entity).
This function mutates the technique but does **not** commit.
This thin adapter converts ORM objects into the format the entity
expects, then writes the result back onto the ORM model.
The function mutates the technique but does **not** commit.
The caller is responsible for committing the session.
"""
from sqlalchemy.orm import Session
from app.models.enums import TechniqueStatus, TestState
from app.domain.entities.technique import TechniqueEntity
from app.models.technique import Technique
def recalculate_technique_status(db: Session, technique: Technique) -> None:
"""Recompute ``technique.status_global`` from its tests and commit.
"""Recompute ``technique.status_global`` from its tests.
Rules (v2)
----------
1. No tests → ``not_evaluated``
2. All tests ``validated`` → look at detection results:
- All ``detected`` → ``validated``
- Any ``partially_detected`` → ``partial``
- Otherwise → ``not_covered``
3. Some tests ``validated``, others still in progress → ``partial``
4. All tests in intermediate states (no validated) → ``in_progress``
``db`` is accepted for backward compatibility but is not used
directly — test data comes from the ORM relationship.
"""
tests = technique.tests
if not tests:
technique.status_global = TechniqueStatus.not_evaluated
elif all(t.state == TestState.validated for t in tests):
# All validated — inspect detection results
results = [t.detection_result for t in tests if t.detection_result]
if results and all(str(r) == "detected" or r == "detected" for r in results):
technique.status_global = TechniqueStatus.validated
elif any(str(r) == "partially_detected" or r == "partially_detected" for r in results):
technique.status_global = TechniqueStatus.partial
else:
technique.status_global = TechniqueStatus.not_covered
elif any(t.state == TestState.validated for t in tests):
technique.status_global = TechniqueStatus.partial
else:
technique.status_global = TechniqueStatus.in_progress
entity = TechniqueEntity.from_orm(technique)
test_snapshots = [
(t.state, t.detection_result) for t in technique.tests
]
entity.recalculate_status(test_snapshots)
technique.status_global = entity.status_global