Architecture¶
This document provides a comprehensive overview of the SO Campaign Manager architecture, design patterns, and internal workings.
System Overview¶
SO Campaign Manager is built on a modular architecture that separates concerns into distinct components:
Core Components¶
The system consists of five major components that work together to orchestrate HPC workflow campaigns:
Core Models - Data structures and validation
Bookkeeper - Main orchestration engine
Planner - Workflow scheduling and optimization
Enactor - Execution backends
Workflows - Task definitions and implementations
Component Diagram¶
┌─────────────────────────────────────────────────────────────┐
│ Bookkeeper │
│ (Main Orchestrator - Coordinates All Components) │
└──────────┬──────────────────────────────────┬───────────────┘
│ │
│ │
┌──────────▼──────────┐ ┌─────────▼──────────┐
│ Planner │ │ Enactor │
│ (HEFT Algorithm) │ │ (RADICAL-Pilot) │
└──────────┬──────────┘ └─────────┬──────────┘
│ │
│ ┌────────────────────────┘
│ │
┌──────────▼─────────▼───────────┐
│ Core Models │
│ (Campaign, Workflow, Resource)│
└───────────────┬────────────────┘
│
┌──────────▼──────────┐
│ Workflows │
│ (ML Mapmaking, │
│ Null Tests, etc.) │
└─────────────────────┘
Data Flow¶
The typical data flow through the system follows these stages:
Configuration → Planning → Execution → Monitoring¶
Stage 1: Configuration Parsing
User provides TOML configuration file
Configuration parser reads campaign settings
Workflow entries are extracted and validated
Workflow factory classes create instances
Campaign object is constructed with all workflows
Stage 2: Planning
Bookkeeper receives Campaign and Resource objects
Planner analyzes workflow dependencies
HEFT algorithm computes optimal schedule
Resource requirements are estimated (via Slurmise)
QoS policies are matched for each workflow
Execution plan is generated (list of PlanEntry objects)
Stage 3: Execution
Enactor receives execution plan
SLURM jobs are created for each workflow
Jobs are submitted to HPC scheduler
RADICAL-Pilot manages task execution
State callbacks update workflow status
Stage 4: Monitoring
Bookkeeper monitors workflow states
Enactor provides state updates via callbacks
Progress is logged and tracked
Completion or failure triggers next actions
Detailed Component Architecture¶
Core Models (src/socm/core/models.py)¶
Purpose: Define data structures with validation using Pydantic v2.
Key Classes:
class QosPolicy(BaseModel):
"""SLURM Quality of Service policy definition."""
name: str
max_walltime: Optional[int] # minutes
max_jobs: Optional[int]
max_cores: Optional[int]
class Resource(BaseModel):
"""HPC resource specification."""
name: str
nodes: int
cores_per_node: int
memory_per_node: int
qos: List[QosPolicy]
class Workflow(BaseModel):
"""Base class for all workflow types."""
name: str
executable: str
context: str
subcommand: str = ""
environment: Optional[Dict[str, str]]
resources: Optional[Dict[str, Union[int, float]]]
# Abstract methods (must be implemented by subclasses)
def get_command(self, **kwargs) -> str: ...
def get_arguments(self, **kwargs) -> str: ...
class Campaign(BaseModel):
"""Container for workflow collection with policies."""
id: int
workflows: List[Workflow]
campaign_policy: str
deadline: Optional[int] # minutes
Design Patterns:
Template Method: Workflow base class defines structure, subclasses implement specifics
Factory Pattern: Each workflow type has
get_workflows()class methodStrategy Pattern: Different campaign policies can be plugged in
Bookkeeper (src/socm/bookkeeper/bookkeeper.py)¶
Purpose: Main orchestration engine that coordinates the entire campaign lifecycle.
Responsibilities:
Initialize campaign from configuration
Set up resource management
Invoke planner for scheduling
Create and configure enactor
Monitor workflow execution
Handle state transitions
Manage cleanup and shutdown
Key Methods:
class Bookkeeper:
def __init__(self, campaign, resources, policy, target_resource,
deadline=None, enactor_config=None):
"""Initialize bookkeeper with campaign and resources."""
def _create_planner(self) -> BasePlanner:
"""Create planner instance based on policy."""
def _create_enactor(self) -> BaseEnactor:
"""Create enactor instance for execution."""
def _plan_campaign(self) -> Tuple[List[PlanEntry], nx.DiGraph]:
"""Generate execution plan using planner."""
def _execute_plan(self, plan, dag):
"""Execute workflows according to plan."""
def run(self):
"""Main entry point - runs entire campaign."""
Integration Points:
Integrates with Slurmise for SLURM job prediction
Uses RADICAL-Utils for logging and profiling
Communicates with Planner via defined interface
Manages Enactor lifecycle and callbacks
Planner (src/socm/planner/)¶
Purpose: Optimize workflow scheduling to meet campaign deadlines using HEFT algorithm.
Base Interface (base.py):
class PlanEntry:
"""Represents a scheduled workflow execution."""
workflow: Workflow
resource_range: Tuple[int, int] # (start_node, end_node)
start_time: float # minutes
end_time: float # minutes
qos: str # Selected QoS policy
class BasePlanner(ABC):
@abstractmethod
def plan(self, campaign: Campaign, resources: Dict[str, Resource])
-> Tuple[List[PlanEntry], nx.DiGraph]:
"""Generate execution plan and dependency graph."""
HEFT Implementation (heft_planner.py):
The Heterogeneous Earliest Finish Time (HEFT) algorithm consists of:
Rank Computation Phase:
Calculate upward rank for each workflow
Rank = computation cost + max(communication cost + successor rank)
Workflows with higher rank have higher priority
Processor Selection Phase:
Sort workflows by descending rank
For each workflow, find processor that minimizes finish time
Consider data transfer costs from parent workflows
Resource Estimation:
Query Slurmise for walltime, CPU, and memory estimates
Match estimates against QoS policies
Select appropriate QoS tier for each workflow
Plan Generation:
Create PlanEntry for each workflow
Assign resource ranges (nodes)
Set start/end times
Build dependency DAG
Algorithm Complexity: O(|V|² × |P|) where V = workflows, P = processors
Enactor (src/socm/enactor/)¶
Purpose: Execute workflows on HPC systems via SLURM.
Base Interface (base.py):
class BaseEnactor(ABC):
def __init__(self):
self._callbacks = defaultdict(list)
def register_callback(self, state: str, callback: Callable):
"""Register callback for workflow state changes."""
@abstractmethod
def submit_workflows(self, plan: List[PlanEntry], dag: nx.DiGraph):
"""Submit workflows according to execution plan."""
@abstractmethod
def monitor(self):
"""Monitor workflow execution and trigger callbacks."""
RADICAL-Pilot Implementation (rp_enactor.py):
Uses RADICAL-Pilot framework for HPC task execution:
class RPEnactor(BaseEnactor):
def __init__(self, resource_config):
self.session = rp.Session()
self.pmgr = rp.PilotManager(session=self.session)
self.tmgr = rp.TaskManager(session=self.session)
def submit_workflows(self, plan, dag):
# Create pilot job on HPC resource
pilot = self.pmgr.submit_pilots(pilot_description)
# Create tasks for each workflow
for entry in plan:
task_desc = self._create_task_description(entry)
self.tmgr.submit_tasks(task_desc)
def _create_task_description(self, entry: PlanEntry):
# Build RADICAL-Pilot TaskDescription
return rp.TaskDescription({
'executable': entry.workflow.get_command(),
'arguments': entry.workflow.get_arguments(),
'ranks': entry.workflow.resources['ranks'],
'cores_per_rank': entry.workflow.resources['threads'],
'environment': entry.workflow.environment,
})
State Callbacks:
Enactor triggers callbacks for state transitions:
SUBMITTED- Workflow submitted to schedulerRUNNING- Workflow execution startedCOMPLETED- Workflow finished successfullyFAILED- Workflow encountered errorCANCELLED- Workflow was cancelled
Dryrun Implementation (dryrun_enactor.py):
Mock implementation for testing without actual execution:
Simulates workflow execution
Updates states based on estimated durations
Useful for testing planning logic
Resources (src/socm/resources/)¶
Purpose: Define HPC resource characteristics and QoS policies.
Tiger Resource (tiger.py):
class TigerResource(Resource):
"""Tiger HPC cluster resource definition."""
def __init__(self):
super().__init__(
name="tiger3",
nodes=492,
cores_per_node=112,
memory_per_node=1000000, # MB
qos=self._get_qos_policies()
)
def _get_qos_policies(self) -> List[QosPolicy]:
return [
QosPolicy(name="test", max_walltime=60),
QosPolicy(name="vshort", max_walltime=300),
QosPolicy(name="short", max_walltime=1440),
QosPolicy(name="medium", max_walltime=4320),
QosPolicy(name="long", max_walltime=8640),
QosPolicy(name="vlong", max_walltime=21600),
]
def register_job(self, workflow: Workflow) -> str:
"""Select appropriate QoS based on workflow requirements."""
runtime = workflow.resources.get('runtime', 0)
for qos in sorted(self.qos, key=lambda q: q.max_walltime):
if runtime <= qos.max_walltime * 60: # Convert to seconds
return qos.name
return self.qos[-1].name # Default to longest QoS
Workflows (src/socm/workflows/)¶
Purpose: Define specific analysis tasks and their execution parameters.
Workflow Registry:
All workflows must be registered in workflows/__init__.py:
registered_workflows = {
"ml-mapmaking": MLMapmakingWorkflow,
"sat-sims": SATSimWorkflow,
"ml-null-tests.mission-tests": TimeNullTestWorkflow,
"ml-null-tests.wafer-tests": WaferNullTestWorkflow,
"ml-null-tests.direction-tests": DirectionNullTestWorkflow,
# ... more null tests
}
subcampaign_map = {
"ml-null-tests": [
"mission-tests", "wafer-tests", "direction-tests",
"pwv-tests", "day-night-tests", "moonrise-set-tests",
"elevation-tests", "sun-close-tests", "moon-close-tests"
]
}
Workflow Implementation Pattern:
Each workflow must:
Inherit from
Workflowbase classDefine workflow-specific parameters as Pydantic fields
Implement
get_command()methodImplement
get_arguments()methodProvide
get_workflows()class method for factory pattern
Example: ML Mapmaking Workflow
class MLMapmakingWorkflow(Workflow):
# Workflow-specific parameters
area: str
bands: str
output_dir: str
maxiter: str = "100"
tiled: int = 0
def get_command(self, **kwargs) -> str:
return f"{self.executable} {self.subcommand}"
def get_arguments(self, **kwargs) -> str:
args = [
f"--context {self.context}",
f"--area {self.area}",
f"--bands {self.bands}",
f"--output-dir {self.output_dir}",
f"--maxiter {self.maxiter}",
]
if self.tiled:
args.append("--tiled")
return " ".join(args)
@classmethod
def get_workflows(cls, descriptions: List[Dict]) -> List['MLMapmakingWorkflow']:
"""Factory method to create workflow instances."""
return [cls(**desc) for desc in descriptions]
Configuration System¶
TOML-Based Configuration¶
The configuration system uses TOML for human-readable campaign definitions.
Hierarchical Structure:
Top-level campaign section - Global settings
Workflow sections - Workflow-specific configuration
Subcampaign sections - Groups of related workflows
Resource sections - Per-workflow resource requirements
Configuration Inheritance:
Subcampaign workflows inherit common configuration from parent:
[campaign.ml-null-tests]
# Common configuration for all null tests
context = "file:///path/to/context.yaml"
area = "file:///path/to/area.fits"
bands = "f090"
[campaign.ml-null-tests.mission-tests]
# Mission-test specific configuration
chunk_nobs = 10
nsplits = 4
The mission-tests workflow inherits context, area, and bands from parent.
Configuration Parsing:
The get_workflow_entries() utility in utils/misc.py handles:
Parsing TOML structure
Expanding subcampaign hierarchies
Merging inherited configuration
Creating workflow descriptions
Dependency Management¶
Workflow Dependencies¶
The system supports dependency relationships between workflows:
Explicit dependencies - Defined in configuration
Implicit dependencies - Inferred from data flow
Dependency DAG - Built by planner using NetworkX
Dependency Resolution:
Planner constructs directed acyclic graph (DAG)
Topological sort determines execution order
HEFT algorithm schedules within dependency constraints
Enactor enforces dependencies during submission
State Management¶
Workflow State Machine¶
Each workflow transitions through defined states:
INITIAL → SUBMITTED → RUNNING → COMPLETED
↘ FAILED
↘ CANCELLED
State Definitions (utils/states.py):
class WorkflowState:
INITIAL = "INITIAL"
SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
State Transitions:
Managed by Enactor via RADICAL-Pilot callbacks
Logged for monitoring and debugging
Trigger downstream workflow activation when dependencies complete
Integration with External Systems¶
SLURM Integration¶
The system integrates with SLURM scheduler via two mechanisms:
RADICAL-Pilot: Submits and manages SLURM jobs
Slurmise: Predicts resource requirements
Slurmise Integration:
Slurmise provides ML-based prediction of:
Walltime estimation
CPU requirements
Memory requirements
Based on workflow characteristics (numeric and categorical features).
RADICAL-Pilot Integration¶
RADICAL-Pilot provides:
Pilot job management
Task scheduling within pilot
State monitoring and callbacks
Resource allocation within SLURM allocation
Session Management:
session = rp.Session()
try:
pmgr = rp.PilotManager(session=session)
tmgr = rp.TaskManager(session=session)
# Execute workflows
finally:
session.close()
Error Handling and Recovery¶
Failure Scenarios¶
The system handles various failure modes:
Configuration Errors: Validation fails during parsing
Resource Allocation Failures: SLURM rejects job
Workflow Execution Failures: Task crashes or times out
System Failures: Node crashes, network issues
Error Handling Strategies:
Validation: Pydantic validates all input data
Graceful Degradation: Log errors and continue with remaining workflows
State Tracking: Failed workflows marked in state machine
Cleanup: Session cleanup in finally blocks
Performance Considerations¶
Optimization Strategies¶
Efficient Scheduling: HEFT algorithm minimizes makespan
Resource Packing: Maximize node utilization
QoS Selection: Automatic selection of appropriate queue
Parallel Execution: Independent workflows run concurrently
Scalability¶
The system scales to:
Hundreds of workflows in a single campaign
Thousands of nodes on large HPC systems
Long-running campaigns (days to weeks)
Extensibility¶
Adding New Components¶
The architecture supports extension through:
New Workflow Types:
Create workflow class inheriting from
WorkflowImplement required methods
Register in
registered_workflowsdict
New Planners:
Create planner class inheriting from
BasePlannerImplement
plan()methodUpdate Bookkeeper to instantiate new planner
New Enactors:
Create enactor class inheriting from
BaseEnactorImplement required methods
Configure Bookkeeper to use new enactor
New Resources:
Create resource class inheriting from
ResourceDefine QoS policies
Implement resource-specific logic
Design Principles¶
The architecture follows these key principles:
Separation of Concerns: Each component has single responsibility
Interface-based Design: Abstract base classes define contracts
Dependency Injection: Components receive dependencies via constructors
Configuration over Code: TOML configuration drives behavior
Fail-Fast Validation: Pydantic validates early
Logging and Observability: Comprehensive logging throughout
Testability: Modular design enables unit testing
Testing Architecture¶
The test suite mirrors the package structure:
Unit Tests: Test individual components in isolation
Integration Tests: Test component interactions
Mock Objects: DryrunEnactor for testing without HPC
Fixtures: Reusable test data in
conftest.pyProperty-Based Testing: Hypothesis for edge cases
Summary¶
The SO Campaign Manager architecture provides:
Modularity: Clean separation of concerns
Extensibility: Easy to add new workflows and backends
Robustness: Validation and error handling throughout
Scalability: Handles large campaigns on massive HPC systems
Maintainability: Clear interfaces and comprehensive tests
The design enables efficient orchestration of complex mapmaking campaigns while remaining flexible and maintainable.