diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index f24846c..916a0ea 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -225,6 +225,72 @@ def track_executions_processed(queue_type: str, found_work: bool): ) +# Execution Node Metrics +_execution_nodes_counter = None +_execution_node_duration_histogram = None +_execution_cache_hits_counter = None + + +def get_execution_node_metrics(): + """Get or create execution node metrics.""" + global _execution_nodes_counter, _execution_node_duration_histogram, _execution_cache_hits_counter + + meter = get_meter() + if meter is None: + return None, None, None + + if _execution_nodes_counter is None: + _execution_nodes_counter = meter.create_counter( + name="execution_nodes_total", + description="Total number of execution nodes by terminal status", + unit="1", + ) + + if _execution_node_duration_histogram is None: + _execution_node_duration_histogram = meter.create_histogram( + name="execution_node_duration_seconds", + description="Duration of execution nodes in seconds", + unit="s", + ) + + if _execution_cache_hits_counter is None: + _execution_cache_hits_counter = meter.create_counter( + name="execution_cache_hits_total", + description="Total number of execution cache hits", + unit="1", + ) + + return ( + _execution_nodes_counter, + _execution_node_duration_histogram, + _execution_cache_hits_counter, + ) + + +def track_execution_completed(status: str, duration_seconds: float | None = None): + """ + Track execution node completion. + + Args: + status: Terminal status (succeeded/failed/skipped/system_error/cancelled) + duration_seconds: Total execution duration from creation to terminal state + """ + counter, histogram, _ = get_execution_node_metrics() + + if counter: + counter.add(1, {"status": status}) + + if histogram and duration_seconds is not None: + histogram.record(duration_seconds, {"status": status}) + + +def track_cache_hit(): + """Track execution cache hit.""" + _, _, cache_counter = get_execution_node_metrics() + if cache_counter: + cache_counter.add(1) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index da89d01..487d5b0 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -118,6 +118,11 @@ def internal_process_queued_executions_queue(self, session: orm.Session): record_system_error_exception( execution=queued_execution, exception=ex ) + # Track execution completion metrics + _track_execution_completion( + execution_node=queued_execution, + container_execution=queued_execution.container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=queued_execution @@ -332,6 +337,8 @@ def internal_process_one_queued_execution( f"Reusing cached execution node {old_execution.id} with " f"{old_execution.container_execution_id=}, {old_execution.container_execution_status=}" ) + # Track cache hit + metrics.track_cache_hit() # Reusing the execution: if not execution.extra_data: execution.extra_data = {} @@ -421,6 +428,11 @@ def internal_process_one_queued_execution( execution.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=execution.container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution @@ -551,6 +563,11 @@ def generate_execution_log_uri( bts.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception(execution=execution, exception=ex) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=None, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution @@ -665,6 +682,11 @@ def internal_process_one_running_execution( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -788,6 +810,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -834,6 +861,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.SUCCEEDED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -880,6 +912,11 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track execution completion metrics + _track_execution_completion( + execution_node=execution_node, + container_execution=container_execution, + ) # Track pipeline completion if this is a root execution _track_pipeline_completion_if_root( session=session, execution_node=execution_node @@ -928,6 +965,11 @@ def _mark_all_downstream_executions_as_skipped( bts.ContainerExecutionStatus.QUEUED, }: execution.container_execution_status = bts.ContainerExecutionStatus.SKIPPED + # Track execution completion metrics + _track_execution_completion( + execution_node=execution, + container_execution=None, + ) # for artifact_node in execution.output_artifact_nodes: # for downstream_execution in artifact_node.downstream_executions: @@ -1029,6 +1071,43 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep ] = traceback.format_exc() +def _track_execution_completion( + execution_node: bts.ExecutionNode, + container_execution: bts.ContainerExecution | None = None, +): + """ + Track execution node completion metrics. + + Args: + execution_node: Execution node that reached a terminal state + container_execution: Optional container execution for duration calculation + """ + status = execution_node.container_execution_status + + # Map execution status to metric status + status_map = { + bts.ContainerExecutionStatus.SUCCEEDED: "succeeded", + bts.ContainerExecutionStatus.FAILED: "failed", + bts.ContainerExecutionStatus.CANCELLED: "cancelled", + bts.ContainerExecutionStatus.SYSTEM_ERROR: "system_error", + bts.ContainerExecutionStatus.SKIPPED: "skipped", + } + + metric_status = status_map.get(status) + if metric_status: + # Calculate duration if container execution is available + duration_seconds = None + if container_execution and container_execution.created_at: + end_time = container_execution.ended_at or _get_current_time() + duration = end_time - container_execution.created_at + duration_seconds = duration.total_seconds() + + metrics.track_execution_completed( + status=metric_status, + duration_seconds=duration_seconds, + ) + + def _track_pipeline_completion_if_root( session: orm.Session, execution_node: bts.ExecutionNode,