Files
Aegis/backend/app/routers/campaigns.py
kitos c62dafbc1f
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(campaigns): campaign start date — scheduled activation, Jira start_date
DB: migration b047 adds start_date (DateTime nullable) + index to campaigns.

Backend:
- Campaign model: start_date field
- CampaignCreate/Update schemas: accept start_date (ISO string)
- CRUD service: persist + serialize start_date in both serializers
- Activation endpoint: blocks manual activation if start_date is in the future
  (campaign will auto-activate via scheduler)
- Scheduler: new hourly job _run_scheduled_campaign_activation — finds draft
  campaigns with start_date <= now, activates them, creates Jira tickets,
  notifies red_tech team
- Jira: campaign + test tickets now include JIRA_START_DATE_FIELD (configurable,
  default customfield_10015). Campaign uses start_date if set, else created_at.
  Tests inherit campaign start_date.
- config.py: JIRA_START_DATE_FIELD setting

Frontend:
- Campaign type: start_date field on Campaign + CampaignSummary
- CampaignCreatePayload: start_date optional field
- Create form: date picker with min=today, warning message explaining behavior
- Campaign detail header: start_date badge showing days remaining or started date

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 16:57:06 +02:00

578 lines
20 KiB
Python

"""Campaign endpoints — CRUD, test management, activation, and auto-generation.
Provides comprehensive campaign lifecycle management including
test ordering, progress tracking, and threat actor integration.
"""
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.models.campaign import Campaign, CampaignTest
from app.models.test import Test
from app.services.campaign_service import generate_campaign_from_threat_actor
from app.services.campaign_crud_service import (
add_test_to_campaign as crud_add_test,
activate_campaign as crud_activate,
complete_campaign as crud_complete,
create_campaign as crud_create,
delete_campaign as crud_delete,
get_campaign_detail as crud_get_detail,
get_campaign_history as crud_get_history,
get_campaign_progress_data as crud_get_progress,
list_campaigns as crud_list,
remove_test_from_campaign as crud_remove_test,
schedule_campaign as crud_schedule,
serialize_campaign,
update_campaign as crud_update,
)
from app.domain.unit_of_work import UnitOfWork
from app.services.audit_service import log_action
from app.services.notification_service import notify_role
from app.services.webhook_service import dispatch_webhook
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/campaigns", tags=["campaigns"])
# ── Pydantic schemas ─────────────────────────────────────────────────
class CampaignCreate(BaseModel):
name: str
description: Optional[str] = None
type: str = "custom"
threat_actor_id: Optional[str] = None
target_platform: Optional[str] = None
tags: Optional[list[str]] = Field(default_factory=list)
scheduled_at: Optional[str] = None
start_date: Optional[str] = None # ISO date — campaign won't activate before this
class CampaignUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
type: Optional[str] = None
target_platform: Optional[str] = None
tags: Optional[list[str]] = None
scheduled_at: Optional[str] = None
start_date: Optional[str] = None # ISO date — can be updated while still in draft
class AddTestPayload(BaseModel):
test_id: str
order_index: Optional[int] = None
depends_on: Optional[str] = None
phase: Optional[str] = None
class SchedulePayload(BaseModel):
is_recurring: bool
recurrence_pattern: Optional[str] = None # weekly, monthly, quarterly
next_run_at: Optional[str] = None
# ---------------------------------------------------------------------------
# GET /campaigns — List campaigns with filters
# ---------------------------------------------------------------------------
@router.get("")
def list_campaigns(
type: Optional[str] = Query(None),
status: Optional[str] = Query(None),
threat_actor_id: Optional[str] = Query(None),
search: Optional[str] = Query(None),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List campaigns with optional filters and pagination."""
return crud_list(
db,
type=type,
status=status,
threat_actor_id=threat_actor_id,
search=search,
offset=offset,
limit=limit,
)
# ---------------------------------------------------------------------------
# POST /campaigns — Create campaign
# ---------------------------------------------------------------------------
@router.post("", status_code=201)
def create_campaign(
payload: CampaignCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a new campaign."""
with UnitOfWork(db) as uow:
result = crud_create(
db,
creator_id=current_user.id,
name=payload.name,
description=payload.description,
type=payload.type,
threat_actor_id=payload.threat_actor_id,
target_platform=payload.target_platform,
tags=payload.tags,
scheduled_at=payload.scheduled_at,
start_date=payload.start_date,
)
campaign_id = result["id"]
log_action(
db,
user_id=current_user.id,
action="create_campaign",
entity_type="campaign",
entity_id=campaign_id,
details={"name": payload.name, "type": payload.type},
)
uow.commit()
return result
# ---------------------------------------------------------------------------
# GET /campaigns/{id} — Detail with tests and progress
# ---------------------------------------------------------------------------
@router.get("/{campaign_id}")
def get_campaign(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get detailed campaign info including tests and progress."""
return crud_get_detail(db, campaign_id)
# ---------------------------------------------------------------------------
# PATCH /campaigns/{id} — Update campaign
# ---------------------------------------------------------------------------
@router.patch("/{campaign_id}")
def update_campaign(
campaign_id: str,
payload: CampaignUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update a campaign. Only allowed in draft or active state."""
update_data = payload.model_dump(exclude_unset=True)
with UnitOfWork(db) as uow:
result = crud_update(
db,
campaign_id,
updater_id=current_user.id,
updater_role=current_user.role,
**update_data,
)
log_action(
db,
user_id=current_user.id,
action="update_campaign",
entity_type="campaign",
entity_id=campaign_id,
details={"updated_fields": list(update_data.keys())},
)
uow.commit()
return result
# ---------------------------------------------------------------------------
# DELETE /campaigns/{id} — Delete campaign
# ---------------------------------------------------------------------------
@router.delete("/{campaign_id}", status_code=204)
def delete_campaign(
campaign_id: str,
delete_tests: bool = Query(False, description="Also delete associated tests"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a campaign. Only draft campaigns can be deleted (admins can delete any)."""
with UnitOfWork(db) as uow:
crud_delete(
db,
campaign_id,
deleter_id=current_user.id,
deleter_role=current_user.role,
delete_tests=delete_tests,
)
log_action(
db,
user_id=current_user.id,
action="delete_campaign",
entity_type="campaign",
entity_id=campaign_id,
details={"delete_tests": delete_tests},
)
uow.commit()
# ---------------------------------------------------------------------------
# POST /campaigns/{id}/tests — Add test to campaign
# ---------------------------------------------------------------------------
@router.post("/{campaign_id}/tests")
def add_test_to_campaign(
campaign_id: str,
payload: AddTestPayload,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Add a test to a campaign with optional ordering and dependency."""
with UnitOfWork(db) as uow:
result = crud_add_test(
db,
campaign_id,
test_id=payload.test_id,
order_index=payload.order_index,
depends_on=payload.depends_on,
phase=payload.phase,
)
uow.commit()
return result
# ---------------------------------------------------------------------------
# DELETE /campaigns/{id}/tests/{campaign_test_id} — Remove test from campaign
# ---------------------------------------------------------------------------
@router.delete("/{campaign_id}/tests/{campaign_test_id}")
def remove_test_from_campaign(
campaign_id: str,
campaign_test_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Remove a test from a campaign."""
with UnitOfWork(db) as uow:
crud_remove_test(db, campaign_id, campaign_test_id)
uow.commit()
return {"detail": "Test removed from campaign"}
# ---------------------------------------------------------------------------
# POST /campaigns/{id}/activate — Activate campaign
# ---------------------------------------------------------------------------
@router.post("/{campaign_id}/activate")
def activate_campaign(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Activate a campaign, moving it from draft to active.
If the campaign has a start_date in the future, manual activation is blocked —
the campaign will be auto-activated by the scheduler when the date arrives.
"""
# Guard: start_date must have been reached before manual activation
campaign_obj = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if campaign_obj and campaign_obj.start_date:
now = datetime.utcnow()
if campaign_obj.start_date > now:
from fastapi import HTTPException
raise HTTPException(
status_code=422,
detail=(
f"This campaign is scheduled to start on "
f"{campaign_obj.start_date.strftime('%Y-%m-%d')}. "
f"It will be activated automatically on that date. "
f"To activate it now, remove the start date first."
),
)
with UnitOfWork(db) as uow:
campaign = crud_activate(db, campaign_id)
notify_role(
db,
role="red_tech",
type="campaign_activated",
title="Campaign activated",
message=f'Campaign "{campaign.name}" has been activated.',
entity_type="campaign",
entity_id=campaign.id,
)
log_action(
db,
user_id=current_user.id,
action="activate_campaign",
entity_type="campaign",
entity_id=campaign.id,
details={"name": campaign.name},
)
uow.commit()
db.refresh(campaign)
# Create Jira tickets for campaign and tests at activation time (non-fatal).
# Campaign ticket is created here if it doesn't already exist (deferred from creation).
try:
from app.services.jira_service import (
auto_create_campaign_issue,
auto_create_test_issue,
get_campaign_jira_key,
get_test_jira_key,
)
campaign_jira_key = get_campaign_jira_key(db, campaign_id)
if not campaign_jira_key:
campaign_jira_key = auto_create_campaign_issue(db, campaign, current_user)
if campaign_jira_key:
for ct in campaign.campaign_tests:
if ct.test and not get_test_jira_key(db, ct.test.id):
auto_create_test_issue(
db, ct.test, current_user,
parent_ticket_override=campaign_jira_key,
campaign_start_date=campaign.start_date,
)
db.commit()
except Exception:
logger.exception(
"Jira ticket creation failed during activation of campaign %s",
campaign_id,
)
return serialize_campaign(db, campaign)
# ---------------------------------------------------------------------------
# POST /campaigns/{id}/complete — Mark campaign as completed
# ---------------------------------------------------------------------------
@router.post("/{campaign_id}/complete")
def complete_campaign(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "admin")),
):
"""Mark a campaign as completed."""
with UnitOfWork(db) as uow:
campaign = crud_complete(db, campaign_id)
log_action(
db,
user_id=current_user.id,
action="complete_campaign",
entity_type="campaign",
entity_id=campaign.id,
details={"name": campaign.name},
)
uow.commit()
db.refresh(campaign)
dispatch_webhook("campaign.completed", {"campaign_id": str(campaign.id), "name": campaign.name})
return serialize_campaign(db, campaign)
# ---------------------------------------------------------------------------
# GET /campaigns/{id}/progress — Campaign progress
# ---------------------------------------------------------------------------
@router.get("/{campaign_id}/progress")
def get_campaign_progress_endpoint(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get progress statistics for a campaign."""
return crud_get_progress(db, campaign_id)
# ---------------------------------------------------------------------------
# POST /campaigns/from-threat-actor/{actor_id} — Auto-generate campaign
# ---------------------------------------------------------------------------
@router.post("/from-threat-actor/{actor_id}", status_code=201)
def generate_campaign_from_actor(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Auto-generate a campaign from a threat actor's uncovered techniques.
Creates tests from the best available templates and orders them
by kill chain phase.
"""
campaign = generate_campaign_from_threat_actor(
db,
uuid.UUID(actor_id),
current_user,
)
with UnitOfWork(db) as uow:
log_action(
db,
user_id=current_user.id,
action="generate_campaign",
entity_type="campaign",
entity_id=campaign.id,
details={"actor_id": actor_id, "campaign_name": campaign.name},
)
uow.commit()
return serialize_campaign(db, campaign)
# ---------------------------------------------------------------------------
# PATCH /campaigns/{id}/schedule — Configure recurrence
# ---------------------------------------------------------------------------
@router.patch("/{campaign_id}/schedule")
def schedule_campaign(
campaign_id: str,
payload: SchedulePayload,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Configure or update the recurrence schedule for a campaign.
Only the campaign creator or admin can change scheduling.
"""
with UnitOfWork(db) as uow:
campaign = crud_schedule(
db,
campaign_id,
owner_id=current_user.id,
owner_role=current_user.role,
is_recurring=payload.is_recurring,
recurrence_pattern=payload.recurrence_pattern,
next_run_at=payload.next_run_at,
)
log_action(
db,
user_id=current_user.id,
action="schedule_campaign",
entity_type="campaign",
entity_id=campaign.id,
details={
"is_recurring": campaign.is_recurring,
"recurrence_pattern": campaign.recurrence_pattern,
"next_run_at": campaign.next_run_at.isoformat() if campaign.next_run_at else None,
},
)
uow.commit()
db.refresh(campaign)
return serialize_campaign(db, campaign)
# ---------------------------------------------------------------------------
# GET /campaigns/{id}/history — Execution history (child campaigns)
# ---------------------------------------------------------------------------
@router.get("/{campaign_id}/history")
def get_campaign_history(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all child campaigns (execution history) of a recurring campaign."""
return crud_get_history(db, campaign_id)
# ---------------------------------------------------------------------------
# GET /campaigns/{id}/timing-summary — Aggregated timing across campaign tests
# ---------------------------------------------------------------------------
def _seconds_between(start: datetime | None, end: datetime | None) -> int:
"""Return elapsed seconds between two datetimes; 0 if either is None."""
if not start or not end:
return 0
diff = (end - start).total_seconds()
return max(0, int(diff))
@router.get("/{campaign_id}/timing-summary")
def get_campaign_timing_summary(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return aggregated Red/Blue timing metrics for all tests in a campaign.
For each test we calculate:
- red_execution_secs : red_started_at → blue_started_at (minus red_paused_seconds)
- blue_queue_secs : blue_started_at → blue_work_started_at (waiting for Blue pick-up)
- blue_evaluation_secs: blue_work_started_at → first validation timestamp (minus blue_paused_seconds)
- total_secs : sum of the three phases
Returns totals + per-test breakdown.
"""
# Load campaign
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if not campaign:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Campaign not found")
# Load all tests for this campaign
test_ids = [
ct.test_id
for ct in db.query(CampaignTest).filter(CampaignTest.campaign_id == campaign.id).all()
]
tests = db.query(Test).filter(Test.id.in_(test_ids)).all() if test_ids else []
breakdown = []
total_red = 0
total_queue = 0
total_blue = 0
for t in tests:
# Red execution: from start-execution to submit-to-blue, minus paused time
red_secs = max(
0,
_seconds_between(t.red_started_at, t.blue_started_at) - (t.red_paused_seconds or 0),
)
# Blue queue: from receiving the test to actually starting evaluation
queue_secs = _seconds_between(t.blue_started_at, t.blue_work_started_at)
# Blue evaluation: from starting evaluation to first validation, minus paused time
eval_end = t.red_validated_at or t.blue_validated_at
blue_secs = max(
0,
_seconds_between(t.blue_work_started_at, eval_end) - (t.blue_paused_seconds or 0),
)
total_red += red_secs
total_queue += queue_secs
total_blue += blue_secs
breakdown.append({
"test_id": str(t.id),
"test_name": t.name,
"state": t.state.value if t.state else None,
"red_execution_secs": red_secs,
"blue_queue_secs": queue_secs,
"blue_evaluation_secs": blue_secs,
"total_secs": red_secs + queue_secs + blue_secs,
"has_timing": bool(t.red_started_at),
})
total_secs = total_red + total_queue + total_blue
return {
"campaign_id": campaign_id,
"campaign_name": campaign.name,
"tests_total": len(tests),
"tests_with_timing": sum(1 for b in breakdown if b["has_timing"]),
"red_execution_secs": total_red,
"blue_queue_secs": total_queue,
"blue_evaluation_secs": total_blue,
"total_secs": total_secs,
"breakdown": sorted(breakdown, key=lambda x: -(x["total_secs"])),
}