Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,40 @@ def track_database_query_duration(operation: str, duration_seconds: float):
histogram.record(duration_seconds, {"operation": operation.lower()})


# Orchestrator Error Tracking
_orchestrator_errors_counter = None


def get_orchestrator_errors_counter():
"""Get or create orchestrator errors counter."""
global _orchestrator_errors_counter

meter = get_meter()
if meter is None:
return None

if _orchestrator_errors_counter is None:
_orchestrator_errors_counter = meter.create_counter(
name="orchestrator_errors_total",
description="Total number of orchestrator errors by type",
unit="1",
)

return _orchestrator_errors_counter


def track_orchestrator_error(error_type: str):
"""
Track orchestrator error by type.

Args:
error_type: Type of error (system_error/launch_error/missing_outputs)
"""
counter = get_orchestrator_errors_counter()
if counter:
counter.add(1, {"error_type": error_type})


class HTTPMetricsMiddleware(BaseHTTPMiddleware):
"""
Middleware to track HTTP request metrics.
Expand Down
8 changes: 8 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
_logger.exception("Error processing queued execution")
# Track processing error
metrics.track_queue_processing_error(queue_type="queued")
# Track system error
metrics.track_orchestrator_error(error_type="system_error")
session.rollback()
queued_execution.container_execution_status = (
bts.ContainerExecutionStatus.SYSTEM_ERROR
Expand Down Expand Up @@ -189,6 +191,8 @@ def internal_process_running_executions_queue(self, session: orm.Session):
_logger.exception("Error processing running container execution")
# Track processing error
metrics.track_queue_processing_error(queue_type="running")
# Track system error
metrics.track_orchestrator_error(error_type="system_error")
session.rollback()
running_container_execution.status = (
bts.ContainerExecutionStatus.SYSTEM_ERROR
Expand Down Expand Up @@ -566,6 +570,8 @@ def generate_execution_log_uri(
launcher_class_name=launcher_class_name,
success=False,
)
# Track launch error
metrics.track_orchestrator_error(error_type="launch_error")
session.rollback()
with session.begin():
# Logs whole exception
Expand Down Expand Up @@ -818,6 +824,8 @@ def _maybe_preload_value(
if missing_output_names:
# Marking the container execution as FAILED (even though the program itself has completed successfully)
container_execution.status = bts.ContainerExecutionStatus.FAILED
# Track missing outputs error
metrics.track_orchestrator_error(error_type="missing_outputs")
# Track container execution duration
if container_execution.started_at and container_execution.ended_at:
duration = (
Expand Down