From 5f733b4061a17842d27b52b33d911e0257738edd Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Mon, 2 Feb 2026 12:29:38 -0800 Subject: [PATCH] feat: Add execution outcome and cache metrics Tracks execution lifecycle outcomes and cache hit rates across all executions. Metrics added: - execution_nodes_total: Counter of executions by terminal status (succeeded/failed/skipped/system_error/cancelled) - execution_node_duration_seconds: Histogram of execution durations by status - execution_cache_hits_total: Counter of cache hits when executions reuse results These metrics provide visibility into execution success rates, performance patterns, and cache effectiveness. Durations measure total lifecycle from creation to terminal state for accurate performance tracking. --- .../instrumentation/metrics.py | 66 ++++++++++++++++ cloud_pipelines_backend/orchestrator_sql.py | 79 +++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index f24846c..916a0ea 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -225,6 +225,72 @@ def track_executions_processed(queue_type: str, found_work: bool): ) +# Execution Node Metrics +_execution_nodes_counter = None +_execution_node_duration_histogram = None +_execution_cache_hits_counter = None + + +def get_execution_node_metrics(): + """Get or create execution node metrics.""" + global _execution_nodes_counter, _execution_node_duration_histogram, _execution_cache_hits_counter + + meter = get_meter() + if meter is None: + return None, None, None + + if _execution_nodes_counter is None: + _execution_nodes_counter = meter.create_counter( + name="execution_nodes_total", + description="Total number of execution nodes by terminal status", + unit="1", + ) + + if _execution_node_duration_histogram is None: + _execution_node_duration_histogram = meter.create_histogram( + name="execution_node_duration_seconds", + description="Duration of execution nodes in seconds", + unit="s", + ) + + if _execution_cache_hits_counter is None: + _execution_cache_hits_counter = meter.create_counter( + name="execution_cache_hits_total", + description="Total number of execution cache hits", + unit="1", + ) + + return ( + _execution_nodes_counter, + _execution_node_duration_histogram, + _execution_cache_hits_counter, + ) + + +def track_execution_completed(status: str, duration_seconds: float | None = None): + """ + Track execution node completion. + + Args: + status: Terminal status (succeeded/failed/skipped/system_error/cancelled) + duration_seconds: Total execution duration from creation to terminal state + """ + counter, histogram, _ = get_execution_node_metrics() + + if counter: + counter.add(1, {"status": status}) + + if histogram and duration_seconds is not None: + histogram.record(duration_seconds, {"status": status}) + + +def track_cache_hit(): + """Track execution cache hit.""" + _, _, cache_counter = get_execution_node_metrics() + if cache_counter: + cache_counter.add(1) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index da89d01..487d5b0 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -118,6 +118,11 @@ def internal_process_queued_executions_queue(self, session: orm.Session): record_system_error_exception( execution=queued_execution, exception=ex ) + # Track execution completion metrics + _track_execution_completion( + execution_node=queued_execution, + container_execution=queued_execution.container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=queued_execution @@ -332,6 +337,8 @@ def internal_process_one_queued_execution( f"Reusing cached execution node {old_execution.id} with " f"{old_execution.container_execution_id=}, {old_execution.container_execution_status=}" ) + # Track cache hit + metrics.track_cache_hit() # Reusing the execution: if not execution.extra_data: execution.extra_data = {} @@ -421,6 +428,11 @@ def internal_process_one_queued_execution( execution.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=execution.container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution @@ -551,6 +563,11 @@ def generate_execution_log_uri( bts.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception(execution=execution, exception=ex) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=None, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution @@ -665,6 +682,11 @@ def internal_process_one_running_execution( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -788,6 +810,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -834,6 +861,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.SUCCEEDED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -880,6 +912,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -928,6 +965,11 @@ def _mark_all_downstream_executions_as_skipped( bts.ContainerExecutionStatus.QUEUED, }: execution.container_execution_status = bts.ContainerExecutionStatus.SKIPPED + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=None, + ) # for artifact_node in execution.output_artifact_nodes: # for downstream_execution in artifact_node.downstream_executions: @@ -1029,6 +1071,43 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep ] = traceback.format_exc() +def _track_execution_completion( + execution_node: bts.ExecutionNode, + container_execution: bts.ContainerExecution | None = None, +): + """ + Track execution node completion metrics. + + Args: + execution_node: Execution node that reached a terminal state + container_execution: Optional container execution for duration calculation + """ + status = execution_node.container_execution_status + + # Map execution status to metric status + status_map = { + bts.ContainerExecutionStatus.SUCCEEDED: "succeeded", + bts.ContainerExecutionStatus.FAILED: "failed", + bts.ContainerExecutionStatus.CANCELLED: "cancelled", + bts.ContainerExecutionStatus.SYSTEM_ERROR: "system_error", + bts.ContainerExecutionStatus.SKIPPED: "skipped", + } + + metric_status = status_map.get(status) + if metric_status: + # Calculate duration if container execution is available + duration_seconds = None + if container_execution and container_execution.created_at: + end_time = container_execution.ended_at or _get_current_time() + duration = end_time - container_execution.created_at + duration_seconds = duration.total_seconds() + + metrics.track_execution_completed( + status=metric_status, + duration_seconds=duration_seconds, + ) + + def _track_pipeline_completion_if_root( session: orm.Session, execution_node: bts.ExecutionNode,