diff --git a/src/mistralai/_hooks/tracing.py b/src/mistralai/_hooks/tracing.py index f2ac9c86..fc4656fd 100644 --- a/src/mistralai/_hooks/tracing.py +++ b/src/mistralai/_hooks/tracing.py @@ -30,13 +30,30 @@ def __init__(self) -> None: def before_request( self, hook_ctx: BeforeRequestContext, request: httpx.Request ) -> Union[httpx.Request, Exception]: - request, self.request_span = get_traced_request_and_span(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, request=request) + # Refresh tracer/provider per request so tracing can be enabled if the + # application configures OpenTelemetry after the client is instantiated. + self.tracing_enabled, self.tracer = get_or_create_otel_tracer() + self.request_span = None + request, self.request_span = get_traced_request_and_span( + tracing_enabled=self.tracing_enabled, + tracer=self.tracer, + span=self.request_span, + operation_id=hook_ctx.operation_id, + request=request, + ) return request def after_success( self, hook_ctx: AfterSuccessContext, response: httpx.Response ) -> Union[httpx.Response, Exception]: - response = get_traced_response(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, response=response) + response = get_traced_response( + tracing_enabled=self.tracing_enabled, + tracer=self.tracer, + span=self.request_span, + operation_id=hook_ctx.operation_id, + response=response, + ) + self.request_span = None return response def after_error( @@ -46,5 +63,13 @@ def after_error( error: Optional[Exception], ) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: if response: - response, error = get_response_and_error(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, response=response, error=error) + response, error = get_response_and_error( + tracing_enabled=self.tracing_enabled, + tracer=self.tracer, + span=self.request_span, + operation_id=hook_ctx.operation_id, + response=response, + error=error, + ) + self.request_span = None return response, error diff --git a/src/mistralai/extra/observability/otel.py b/src/mistralai/extra/observability/otel.py index 8be0841d..4a8808ce 100644 --- a/src/mistralai/extra/observability/otel.py +++ b/src/mistralai/extra/observability/otel.py @@ -11,27 +11,17 @@ import opentelemetry.semconv._incubating.attributes.http_attributes as http_attributes import opentelemetry.semconv.attributes.server_attributes as server_attributes from opentelemetry import propagate, trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import SpanProcessor, TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExportResult +from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.trace import Span, Status, StatusCode, Tracer, set_span_in_context logger = logging.getLogger(__name__) OTEL_SERVICE_NAME: str = "mistralai_sdk" -OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") -OTEL_EXPORTER_OTLP_TIMEOUT: int = int(os.getenv("OTEL_EXPORTER_OTLP_TIMEOUT", "2")) -OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE: int = int(os.getenv("OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE", "512")) -OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS: int = int(os.getenv("OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS", "1000")) -OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE: int = int(os.getenv("OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE", "2048")) -OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS: int = int(os.getenv("OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS", "5000")) - MISTRAL_SDK_OTEL_TRACER_NAME: str = OTEL_SERVICE_NAME + "_tracer" MISTRAL_SDK_DEBUG_TRACING: bool = os.getenv("MISTRAL_SDK_DEBUG_TRACING", "false").lower() == "true" -DEBUG_HINT: str = "To see detailed exporter logs, set MISTRAL_SDK_DEBUG_TRACING=true." +DEBUG_HINT: str = "To see detailed tracing logs, set MISTRAL_SDK_DEBUG_TRACING=true." class MistralAIAttributes: @@ -51,13 +41,11 @@ class MistralAINameValues(Enum): OCR = "ocr" class TracingErrors(Exception, Enum): - FAILED_TO_EXPORT_OTEL_SPANS = "Failed to export OpenTelemetry (OTEL) spans." - FAILED_TO_INITIALIZE_OPENTELEMETRY_TRACING = "Failed to initialize OpenTelemetry tracing." FAILED_TO_CREATE_SPAN_FOR_REQUEST = "Failed to create span for request." FAILED_TO_ENRICH_SPAN_WITH_RESPONSE = "Failed to enrich span with response." FAILED_TO_HANDLE_ERROR_IN_SPAN = "Failed to handle error in span." FAILED_TO_END_SPAN = "Failed to end span." - + def __str__(self): return str(self.value) @@ -179,6 +167,7 @@ def enrich_span_from_response(tracer: trace.Tracer, span: Span, operation_id: st start_ns = parse_time_to_nanos(output["created_at"]) end_ns = parse_time_to_nanos(output["completed_at"]) child_span = tracer.start_span("Tool Execution", start_time=start_ns, context=parent_context) + child_span.set_attributes({"agent.trace.public": ""}) tool_attributes = { gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value, gen_ai_attributes.GEN_AI_TOOL_CALL_ID: output.get("id", ""), @@ -191,6 +180,7 @@ def enrich_span_from_response(tracer: trace.Tracer, span: Span, operation_id: st start_ns = parse_time_to_nanos(output["created_at"]) end_ns = parse_time_to_nanos(output["completed_at"]) child_span = tracer.start_span("Message Output", start_time=start_ns, context=parent_context) + child_span.set_attributes({"agent.trace.public": ""}) message_attributes = { gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.CHAT.value, gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, @@ -216,60 +206,30 @@ def on_start(self, span, parent_context = None): span.set_attributes({"agent.trace.public": ""}) -class QuietOTLPSpanExporter(OTLPSpanExporter): - def export(self, spans): - try: - return super().export(spans) - except Exception: - logger.warning(f"{TracingErrors.FAILED_TO_EXPORT_OTEL_SPANS} {(traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT)}") - return SpanExportResult.FAILURE - - def get_or_create_otel_tracer() -> tuple[bool, Tracer]: """ - 3 possible cases: + Get a tracer from the current TracerProvider. - -> [SDK in a Workflow / App] If there is already a tracer provider set -> use that one + The SDK does not set up its own TracerProvider - it relies on the application + to configure OpenTelemetry. This follows OTEL best practices where: + - Libraries/SDKs get tracers from the global provider + - Applications configure the TracerProvider - -> [SDK standalone] If no tracer provider is set but the OTEL_EXPORTER_OTLP_ENDPOINT is set -> create a new tracer provider that exports to the OTEL_EXPORTER_OTLP_ENDPOINT + If no TracerProvider is configured, the ProxyTracerProvider (default) will + return a NoOp tracer, effectively disabling tracing. Once the application + sets up a real TracerProvider, subsequent spans will be recorded. - -> Else tracing is disabled + Returns: + Tuple[bool, Tracer]: (tracing_enabled, tracer) + - tracing_enabled is True if a real TracerProvider is configured + - tracer is always valid (may be NoOp if no provider configured) """ - tracing_enabled = True tracer_provider = trace.get_tracer_provider() - - if isinstance(tracer_provider, trace.ProxyTracerProvider): - if OTEL_EXPORTER_OTLP_ENDPOINT: - # SDK standalone: No tracer provider but OTEL_EXPORTER_OTLP_ENDPOINT is set -> create a new tracer provider that exports to the OTEL_EXPORTER_OTLP_ENDPOINT - try: - exporter = QuietOTLPSpanExporter( - endpoint=OTEL_EXPORTER_OTLP_ENDPOINT, - timeout=OTEL_EXPORTER_OTLP_TIMEOUT - ) - resource = Resource.create(attributes={SERVICE_NAME: OTEL_SERVICE_NAME}) - tracer_provider = TracerProvider(resource=resource) - - span_processor = BatchSpanProcessor( - exporter, - export_timeout_millis=OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS, - max_export_batch_size=OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE, - schedule_delay_millis=OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS, - max_queue_size=OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE - ) - - tracer_provider.add_span_processor(span_processor) - tracer_provider.add_span_processor(GenAISpanProcessor()) - trace.set_tracer_provider(tracer_provider) - - except Exception: - logger.warning(f"{TracingErrors.FAILED_TO_INITIALIZE_OPENTELEMETRY_TRACING} {(traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT)}") - tracing_enabled = False - else: - # No tracer provider nor OTEL_EXPORTER_OTLP_ENDPOINT set -> tracing is disabled - tracing_enabled = False - tracer = tracer_provider.get_tracer(MISTRAL_SDK_OTEL_TRACER_NAME) + # Tracing is considered enabled if we have a real TracerProvider (not the default proxy) + tracing_enabled = not isinstance(tracer_provider, trace.ProxyTracerProvider) + return tracing_enabled, tracer def get_traced_request_and_span( @@ -284,8 +244,9 @@ def get_traced_request_and_span( try: span = tracer.start_span(name=operation_id) + span.set_attributes({"agent.trace.public": ""}) # Inject the span context into the request headers to be used by the backend service to continue the trace - propagate.inject(request.headers) + propagate.inject(request.headers, context=set_span_in_context(span)) span = enrich_span_from_request(span, request) except Exception: logger.warning(