Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,72 @@ def track_executions_processed(queue_type: str, found_work: bool):
)


# Execution Node Metrics
_execution_nodes_counter = None
_execution_node_duration_histogram = None
_execution_cache_hits_counter = None


def get_execution_node_metrics():
"""Get or create execution node metrics."""
global _execution_nodes_counter, _execution_node_duration_histogram, _execution_cache_hits_counter

meter = get_meter()
if meter is None:
return None, None, None

if _execution_nodes_counter is None:
_execution_nodes_counter = meter.create_counter(
name="execution_nodes_total",
description="Total number of execution nodes by terminal status",
unit="1",
)

if _execution_node_duration_histogram is None:
_execution_node_duration_histogram = meter.create_histogram(
name="execution_node_duration_seconds",
description="Duration of execution nodes in seconds",
unit="s",
)

if _execution_cache_hits_counter is None:
_execution_cache_hits_counter = meter.create_counter(
name="execution_cache_hits_total",
description="Total number of execution cache hits",
unit="1",
)

return (
_execution_nodes_counter,
_execution_node_duration_histogram,
_execution_cache_hits_counter,
)


def track_execution_completed(status: str, duration_seconds: float | None = None):
"""
Track execution node completion.

Args:
status: Terminal status (succeeded/failed/skipped/system_error/cancelled)
duration_seconds: Total execution duration from creation to terminal state
"""
counter, histogram, _ = get_execution_node_metrics()

if counter:
counter.add(1, {"status": status})

if histogram and duration_seconds is not None:
histogram.record(duration_seconds, {"status": status})


def track_cache_hit():
"""Track execution cache hit."""
_, _, cache_counter = get_execution_node_metrics()
if cache_counter:
cache_counter.add(1)


class HTTPMetricsMiddleware(BaseHTTPMiddleware):
"""
Middleware to track HTTP request metrics.
Expand Down
79 changes: 79 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
record_system_error_exception(
execution=queued_execution, exception=ex
)
# Track execution completion metrics
_track_execution_completion(
execution_node=queued_execution,
container_execution=queued_execution.container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=queued_execution
Expand Down Expand Up @@ -332,6 +337,8 @@ def internal_process_one_queued_execution(
f"Reusing cached execution node {old_execution.id} with "
f"{old_execution.container_execution_id=}, {old_execution.container_execution_status=}"
)
# Track cache hit
metrics.track_cache_hit()
# Reusing the execution:
if not execution.extra_data:
execution.extra_data = {}
Expand Down Expand Up @@ -421,6 +428,11 @@ def internal_process_one_queued_execution(
execution.container_execution_status = (
bts.ContainerExecutionStatus.CANCELLED
)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution,
container_execution=execution.container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution
Expand Down Expand Up @@ -551,6 +563,11 @@ def generate_execution_log_uri(
bts.ContainerExecutionStatus.SYSTEM_ERROR
)
record_system_error_exception(execution=execution, exception=ex)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution,
container_execution=None,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution
Expand Down Expand Up @@ -665,6 +682,11 @@ def internal_process_one_running_execution(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.CANCELLED
)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution_node,
container_execution=container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
Expand Down Expand Up @@ -788,6 +810,11 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.FAILED
)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution_node,
container_execution=container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
Expand Down Expand Up @@ -834,6 +861,11 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.SUCCEEDED
)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution_node,
container_execution=container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
Expand Down Expand Up @@ -880,6 +912,11 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.FAILED
)
# Track execution completion metrics
_track_execution_completion(
execution_node=execution_node,
container_execution=container_execution,
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
Expand Down Expand Up @@ -928,6 +965,11 @@ def _mark_all_downstream_executions_as_skipped(
bts.ContainerExecutionStatus.QUEUED,
}:
execution.container_execution_status = bts.ContainerExecutionStatus.SKIPPED
# Track execution completion metrics
_track_execution_completion(
execution_node=execution,
container_execution=None,
)

# for artifact_node in execution.output_artifact_nodes:
# for downstream_execution in artifact_node.downstream_executions:
Expand Down Expand Up @@ -1029,6 +1071,43 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep
] = traceback.format_exc()


def _track_execution_completion(
execution_node: bts.ExecutionNode,
container_execution: bts.ContainerExecution | None = None,
):
"""
Track execution node completion metrics.

Args:
execution_node: Execution node that reached a terminal state
container_execution: Optional container execution for duration calculation
"""
status = execution_node.container_execution_status

# Map execution status to metric status
status_map = {
bts.ContainerExecutionStatus.SUCCEEDED: "succeeded",
bts.ContainerExecutionStatus.FAILED: "failed",
bts.ContainerExecutionStatus.CANCELLED: "cancelled",
bts.ContainerExecutionStatus.SYSTEM_ERROR: "system_error",
bts.ContainerExecutionStatus.SKIPPED: "skipped",
}

metric_status = status_map.get(status)
if metric_status:
# Calculate duration if container execution is available
duration_seconds = None
if container_execution and container_execution.created_at:
end_time = container_execution.ended_at or _get_current_time()
duration = end_time - container_execution.created_at
duration_seconds = duration.total_seconds()

metrics.track_execution_completed(
status=metric_status,
duration_seconds=duration_seconds,
)


def _track_pipeline_completion_if_root(
session: orm.Session,
execution_node: bts.ExecutionNode,
Expand Down