Python
The Orca Python SDK provides a framework for building, registering, and executing algorithms that integrate with the Orca gRPC service. Algorithms can have dependencies managed by Orca-core and are triggered by time-windowed events.
Overview
Orca enables you to:
- Define algorithms as Python functions with declarative metadata
- Specify dependencies between algorithms
- Register algorithms with the Orca Core service
- Execute algorithms in response to time-windowed triggers
- Emit windows to trigger algorithm execution
Installation
pip install orca-python
from orca_python import Processor, WindowType, MetadataField, Window
from orca_python import ValueResult, ArrayResult, StructResult, NoneResult
from orca_python import ExecutionParams, EmitWindow
Core Concepts
Processor
The Processor class is the main entry point for the SDK. It manages algorithm registration, execution, and communication with Orca Core.
processor = Processor(name="MyProcessor", max_workers=10)
Parameters:
name(str): Unique identifier for this processormax_workers(int, optional): Maximum number of worker threads for concurrent execution. Default: 10
Methods:
algorithm(): Decorator for registering algorithm functionsRegister(): Registers all algorithms with Orca CoreStart(): Starts the gRPC server to handle execution requests
WindowType
Defines the type of time window that triggers algorithm execution.
window_type = WindowType(
name="HourlyWindow",
version="1.0.0",
description="Processes data on an hourly basis",
metadataFields=[
MetadataField(name="region", description="Geographic region"),
MetadataField(name="sensor_id", description="Sensor identifier")
]
)
Parameters:
name(str): Window type name in PascalCaseversion(str): Semantic version (e.g., "1.0.0")description(str): Description of the window typemetadataFields(List[MetadataField], optional): Additional metadata fields
MetadataField
Defines metadata that can be attached to windows.
field = MetadataField(
name="region",
description="Geographic region for data processing"
)
Window
Represents a time window instance that triggers algorithm execution.
window = Window(
time_from=datetime(2024, 1, 1, 0, 0),
time_to=datetime(2024, 1, 1, 1, 0),
name="HourlyWindow",
version="1.0.0",
origin="sensor-123",
metadata={"region": "us-west", "sensor_id": "abc-456"}
)
Attributes:
time_from(datetime): Start of the time windowtime_to(datetime): End of the time windowname(str): Window type nameversion(str): Window type versionorigin(str): Source identifiermetadata(Dict[str, Any], optional): Additional metadata
Result Types
Algorithms must return one of four result types:
ValueResult
For single numeric or boolean values.
from orca_python import ValueResult
def my_algorithm(params: ExecutionParams) -> ValueResult:
return ValueResult(42.0)
ArrayResult
For arrays of numeric or boolean values.
from orca_python import ArrayResult
def my_algorithm(params: ExecutionParams) -> ArrayResult:
return ArrayResult([1, 2, 3, 4, 5])
StructResult
For dictionary-based structured results.
from orca_python import StructResult
def my_algorithm(params: ExecutionParams) -> StructResult:
return StructResult({
"min": -1.1,
"median": 4.2,
"max": 5.0
})
NoneResult
For algorithms that produce no output (side effects only).
from orca_python import NoneResult
def my_algorithm(params: ExecutionParams) -> NoneResult:
# Perform side effects
return NoneResult()
Defining Algorithms
Use the @processor.algorithm() decorator to register functions as algorithms.
@processor.algorithm(
name="CalculateAverage",
version="1.0.0",
window_type=hourly_window,
description="Calculates the average of sensor readings",
depends_on=[]
)
def calculate_average(params: ExecutionParams) -> ValueResult:
"""Computes the average value from sensor data."""
# Access the triggering window
window = params.window
# Your algorithm logic here
result = compute_average(window)
return ValueResult(result)
Decorator Parameters:
name(str): Algorithm name in PascalCaseversion(str): Semantic version (e.g., "1.0.0")window_type(WindowType): The window type that triggers this algorithmdescription(str, optional): Description of the algorithm. Uses docstring if not provideddepends_on(List[Callable], optional): List of algorithm functions this depends on
Function Parameters:
params(ExecutionParams): Execution context containing window and dependency results
ExecutionParams
The ExecutionParams object provides context for algorithm execution.
@dataclass
class ExecutionParams:
window: Window # The triggering window
dependencies: Optional[Iterable[AlgorithmResult]] # Results from dependencies
Attributes:
window: The time window that triggered this executiondependencies: Results from dependency algorithms (if any)
Algorithm Dependencies
Algorithms can depend on other algorithms. Dependencies are automatically resolved and executed before the dependent algorithm.
# Base algorithm
@processor.algorithm(
name="FetchData",
version="1.0.0",
window_type=hourly_window
)
def fetch_data(params: ExecutionParams) -> ArrayResult:
data = fetch_from_source(params.window)
return ArrayResult(data)
# Dependent algorithm
@processor.algorithm(
name="ProcessData",
version="1.0.0",
window_type=hourly_window,
depends_on=[fetch_data]
)
def process_data(params: ExecutionParams) -> StructResult:
# Access dependency results
if params.dependencies:
for dep in params.dependencies:
# Extract values based on result type
if hasattr(dep.result, 'float_values'):
data = list(dep.result.float_values.values)
elif hasattr(dep.result, 'single_value'):
data = dep.result.single_value
elif hasattr(dep.result, 'struct_value'):
data = json_format.MessageToDict(dep.result.struct_value)
processed = analyze(data)
return StructResult(processed)
Emitting Windows
Emit windows to trigger algorithm execution across the Orca system.
from orca_python import EmitWindow
import datetime as dt
window = Window(
time_from=dt.datetime(2024, 1, 1, 0, 0),
time_to=dt.datetime(2024, 1, 1, 1, 0),
name="HourlyWindow",
version="1.0.0",
origin="data-pipeline",
metadata={"region": "us-east"}
)
EmitWindow(window)
Complete Example
from orca_python import (
Processor, WindowType, MetadataField, ExecutionParams,
ValueResult, ArrayResult, StructResult, EmitWindow, Window
)
import datetime as dt
# Initialize processor
processor = Processor(name="DataProcessor", max_workers=10)
# Define window type
hourly_window = WindowType(
name="HourlyWindow",
version="1.0.0",
description="Hourly data processing window",
metadataFields=[
MetadataField(name="sensor_id", description="Sensor identifier")
]
)
# Define base algorithm
@processor.algorithm(
name="CollectMetrics",
version="1.0.0",
window_type=hourly_window,
description="Collects raw metrics from sensors"
)
def collect_metrics(params: ExecutionParams) -> ArrayResult:
sensor_id = params.window.metadata.get("sensor_id")
metrics = fetch_sensor_data(sensor_id, params.window.time_from, params.window.time_to)
return ArrayResult(metrics)
# Define dependent algorithm
@processor.algorithm(
name="AnalyzeMetrics",
version="1.0.0",
window_type=hourly_window,
depends_on=[collect_metrics],
description="Analyzes collected metrics"
)
def analyze_metrics(params: ExecutionParams) -> StructResult:
# Extract dependency data
metrics = []
if params.dependencies:
for dep in params.dependencies:
if hasattr(dep.result, 'float_values'):
metrics = list(dep.result.float_values.values)
analysis = {
"mean": sum(metrics) / len(metrics),
"min": min(metrics),
"max": max(metrics)
}
return StructResult(analysis)
# Register with Orca Core
processor.Register()
# Start the processor
processor.Start()
# In another part of your system, emit windows to trigger execution
window = Window(
time_from=dt.datetime.now() - dt.timedelta(hours=1),
time_to=dt.datetime.now(),
name="HourlyWindow",
version="1.0.0",
origin="scheduler",
metadata={"sensor_id": "sensor-001"}
)
EmitWindow(window)
Validation Rules
Algorithm Names
- Must be in PascalCase
- Pattern:
^[A-Z][a-zA-Z0-9]*$
Window Names
- Must be in PascalCase
- Pattern:
^[A-Z][a-zA-Z0-9]*$
Versions
- Must follow semantic versioning
- Pattern:
^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$ - Example: "1.0.0", "2.1.3"
Error Handling
The SDK includes several exception types:
InvalidAlgorithmArgument: Raised for invalid algorithm configurationInvalidWindowArgument: Raised for invalid window configurationInvalidDependency: Raised when a dependency is not properly registeredInvalidAlgorithmReturnType: Raised when algorithm return type doesn't match annotationInvalidMetadataFieldArgument: Raised for invalid metadata field configurationBrokenRemoteAlgorithmStubs: Raised when remote algorithm stubs are corrupted
Environment Configuration
The SDK uses environment variables for configuration:
ORCA_CORE: Address of Orca Core servicePROCESSOR_PORT: Port for the processor gRPC serverPROCESSOR_HOST: Host address for the processorPROCESSOR_EXTERNAL_PORT: External port for processor registrationPROJECT_NAME: Optional project name for multi-project deployments
Logging
The SDK uses Python's standard logging module. Configure logging level as needed:
import logging
logging.basicConfig(level=logging.INFO)
Log levels used:
INFO: Algorithm execution, registration eventsDEBUG: Detailed execution flow, dependency resolutionERROR: Execution failures, registration errors