diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 1974bb9..f24846c 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -166,6 +166,65 @@ def track_pipeline_completed( ) +# Orchestrator Queue Metrics +_orchestrator_processing_errors_counter = None +_orchestrator_executions_processed_counter = None + + +def get_orchestrator_queue_metrics(): + """Get or create orchestrator queue metrics.""" + global _orchestrator_processing_errors_counter, _orchestrator_executions_processed_counter + + meter = get_meter() + if meter is None: + return None, None + + if _orchestrator_processing_errors_counter is None: + _orchestrator_processing_errors_counter = meter.create_counter( + name="orchestrator_queue_processing_errors_total", + description="Total number of orchestrator queue processing errors", + unit="1", + ) + + if _orchestrator_executions_processed_counter is None: + _orchestrator_executions_processed_counter = meter.create_counter( + name="orchestrator_executions_processed_total", + description="Total number of executions processed by orchestrator queues", + unit="1", + ) + + return ( + _orchestrator_processing_errors_counter, + _orchestrator_executions_processed_counter, + ) + + +def track_queue_processing_error(queue_type: str): + """Track orchestrator queue processing error.""" + error_counter, _ = get_orchestrator_queue_metrics() + if error_counter: + error_counter.add(1, {"queue_type": queue_type}) + + +def track_executions_processed(queue_type: str, found_work: bool): + """ + Track executions processed by orchestrator queue. + + Args: + queue_type: Type of queue (queued/running) + found_work: Whether the queue found work to process + """ + _, processed_counter = get_orchestrator_queue_metrics() + if processed_counter: + processed_counter.add( + 1, + { + "queue_type": queue_type, + "found_work": str(found_work).lower(), + }, + ) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index eb2c6f5..da89d01 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -103,8 +103,14 @@ def internal_process_queued_executions_queue(self, session: orm.Session): self.internal_process_one_queued_execution( session=session, execution=queued_execution ) + # Track successful processing + metrics.track_executions_processed( + queue_type="queued", found_work=True + ) except Exception as ex: _logger.exception("Error processing queued execution") + # Track processing error + metrics.track_queue_processing_error(queue_type="queued") session.rollback() queued_execution.container_execution_status = ( bts.ContainerExecutionStatus.SYSTEM_ERROR @@ -128,6 +134,8 @@ def internal_process_queued_executions_queue(self, session: orm.Session): if not self._queued_executions_queue_idle: self._queued_executions_queue_idle = True _logger.debug(f"No queued executions found") + # Track that queue sweep found no work + metrics.track_executions_processed(queue_type="queued", found_work=False) return False def internal_process_running_executions_queue(self, session: orm.Session): @@ -168,8 +176,14 @@ def internal_process_running_executions_queue(self, session: orm.Session): session=session, container_execution=running_container_execution, ) + # Track successful processing + metrics.track_executions_processed( + queue_type="running", found_work=True + ) except Exception as ex: _logger.exception("Error processing running container execution") + # Track processing error + metrics.track_queue_processing_error(queue_type="running") session.rollback() running_container_execution.status = ( bts.ContainerExecutionStatus.SYSTEM_ERROR @@ -205,6 +219,8 @@ def internal_process_running_executions_queue(self, session: orm.Session): if not self._running_executions_queue_idle: _logger.debug(f"No running container executions found") self._running_executions_queue_idle = True + # Track that queue sweep found no work + metrics.track_executions_processed(queue_type="running", found_work=False) return False def internal_process_one_queued_execution(