Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,65 @@ def track_pipeline_completed(
)


# Orchestrator Queue Metrics
_orchestrator_processing_errors_counter = None
_orchestrator_executions_processed_counter = None


def get_orchestrator_queue_metrics():
"""Get or create orchestrator queue metrics."""
global _orchestrator_processing_errors_counter, _orchestrator_executions_processed_counter

meter = get_meter()
if meter is None:
return None, None

if _orchestrator_processing_errors_counter is None:
_orchestrator_processing_errors_counter = meter.create_counter(
name="orchestrator_queue_processing_errors_total",
description="Total number of orchestrator queue processing errors",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resets after every restart/deploy, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I wanted to see a graph of number of errors in last few months to look at the trends?

unit="1",
)

if _orchestrator_executions_processed_counter is None:
_orchestrator_executions_processed_counter = meter.create_counter(
name="orchestrator_executions_processed_total",
description="Total number of executions processed by orchestrator queues",
unit="1",
)

return (
_orchestrator_processing_errors_counter,
_orchestrator_executions_processed_counter,
)


def track_queue_processing_error(queue_type: str):
"""Track orchestrator queue processing error."""
error_counter, _ = get_orchestrator_queue_metrics()
if error_counter:
error_counter.add(1, {"queue_type": queue_type})


def track_executions_processed(queue_type: str, found_work: bool):
"""
Track executions processed by orchestrator queue.

Args:
queue_type: Type of queue (queued/running)
found_work: Whether the queue found work to process
"""
_, processed_counter = get_orchestrator_queue_metrics()
if processed_counter:
processed_counter.add(
1,
{
"queue_type": queue_type,
"found_work": str(found_work).lower(),
},
)


class HTTPMetricsMiddleware(BaseHTTPMiddleware):
"""
Middleware to track HTTP request metrics.
Expand Down
16 changes: 16 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,14 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
self.internal_process_one_queued_execution(
session=session, execution=queued_execution
)
# Track successful processing
metrics.track_executions_processed(
queue_type="queued", found_work=True
)
except Exception as ex:
_logger.exception("Error processing queued execution")
# Track processing error
metrics.track_queue_processing_error(queue_type="queued")
session.rollback()
queued_execution.container_execution_status = (
bts.ContainerExecutionStatus.SYSTEM_ERROR
Expand All @@ -128,6 +134,8 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
if not self._queued_executions_queue_idle:
self._queued_executions_queue_idle = True
_logger.debug(f"No queued executions found")
# Track that queue sweep found no work
metrics.track_executions_processed(queue_type="queued", found_work=False)
return False

def internal_process_running_executions_queue(self, session: orm.Session):
Expand Down Expand Up @@ -168,8 +176,14 @@ def internal_process_running_executions_queue(self, session: orm.Session):
session=session,
container_execution=running_container_execution,
)
# Track successful processing
metrics.track_executions_processed(
queue_type="running", found_work=True
)
except Exception as ex:
_logger.exception("Error processing running container execution")
# Track processing error
metrics.track_queue_processing_error(queue_type="running")
session.rollback()
running_container_execution.status = (
bts.ContainerExecutionStatus.SYSTEM_ERROR
Expand Down Expand Up @@ -205,6 +219,8 @@ def internal_process_running_executions_queue(self, session: orm.Session):
if not self._running_executions_queue_idle:
_logger.debug(f"No running container executions found")
self._running_executions_queue_idle = True
# Track that queue sweep found no work
metrics.track_executions_processed(queue_type="running", found_work=False)
return False

def internal_process_one_queued_execution(
Expand Down