"""Data sources management endpoints (admin only). Provides a centralized panel for managing all external data sources (Atomic Red Team, Sigma, LOLBAS, GTFOBins, CALDERA, Elastic, etc.) including sync triggers, enable/disable toggles, and statistics. """ # Import Optional from typing from typing import Optional # Import APIRouter, Depends from fastapi from fastapi import APIRouter, Depends # Import BaseModel from pydantic from pydantic import BaseModel # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import get_db from app.database from app.database import get_db # Import require_role from app.dependencies.auth from app.dependencies.auth import require_role # Import UnitOfWork from app.domain.unit_of_work from app.domain.unit_of_work import UnitOfWork # Import User from app.models.user from app.models.user import User # Import log_action from app.services.audit_service from app.services.audit_service import log_action # Import from app.services.data_source_service from app.services.data_source_service import ( get_source_stats, list_sources, sync_all_sources, sync_source, update_source, ) # --------------------------------------------------------------------------- # Pydantic schemas for request validation # --------------------------------------------------------------------------- class DataSourceUpdate(BaseModel): """Payload for updating a data source — only allowed fields.""" # Assign is_enabled = None is_enabled: Optional[bool] = None # Assign sync_frequency = None sync_frequency: Optional[str] = None # Assign config = None config: Optional[dict] = None # Assign router = APIRouter(prefix="/data-sources", tags=["data-sources"]) router = APIRouter(prefix="/data-sources", tags=["data-sources"]) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("") # Define function list_data_sources def list_data_sources( # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> list: """List all registered data sources. **Requires** the ``admin`` role. """ # Return list_sources(db) return list_sources(db) # Apply the @router.patch decorator @router.patch("/{source_id}") # Define function update_data_source def update_data_source( # Entry: source_id source_id: str, # Entry: body body: DataSourceUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> dict: """Update a data source (enable/disable, change config). **Requires** the ``admin`` role. """ # Assign update_data = body.model_dump(exclude_unset=True) update_data = body.model_dump(exclude_unset=True) # Open context manager with UnitOfWork(db) as uow: # Call update_source() update_source(db, source_id, **update_data) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_data_source", # Keyword argument: entity_type entity_type="data_source", # Keyword argument: entity_id entity_id=source_id, # Keyword argument: details details={"updates": update_data}, ) # Call uow.commit() uow.commit() # Return {"message": "Data source updated", "id": source_id} return {"message": "Data source updated", "id": source_id} # Apply the @router.post decorator @router.post("/{source_id}/sync") # Define function sync_data_source def sync_data_source( # Entry: source_id source_id: str, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> dict: """Trigger sync/import for a specific data source. **Requires** the ``admin`` role. """ # Return sync_source(db, source_id) return sync_source(db, source_id) # Apply the @router.post decorator @router.post("/sync-all") # Define function sync_all_data_sources def sync_all_data_sources( # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> dict: """Trigger sync for all enabled data sources (sequentially). **Requires** the ``admin`` role. """ # Assign results = sync_all_sources(db) results = sync_all_sources(db) # Open context manager with UnitOfWork(db) as uow: # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="sync_all_data_sources", # Keyword argument: entity_type entity_type="data_source", # Keyword argument: entity_id entity_id=None, # Keyword argument: details details={"results": results}, ) # Call uow.commit() uow.commit() # Return {"message": "Sync all complete", "results": results} return {"message": "Sync all complete", "results": results} # Apply the @router.get decorator @router.get("/{source_id}/stats") # Define function get_data_source_stats def get_data_source_stats( # Entry: source_id source_id: str, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> dict: """Get detailed statistics for a specific data source. **Requires** the ``admin`` role. """ # Return get_source_stats(db, source_id) return get_source_stats(db, source_id)