Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/mistralai/_hooks/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,30 @@ def __init__(self) -> None:
def before_request(
self, hook_ctx: BeforeRequestContext, request: httpx.Request
) -> Union[httpx.Request, Exception]:
request, self.request_span = get_traced_request_and_span(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, request=request)
# Refresh tracer/provider per request so tracing can be enabled if the
# application configures OpenTelemetry after the client is instantiated.
self.tracing_enabled, self.tracer = get_or_create_otel_tracer()
self.request_span = None
request, self.request_span = get_traced_request_and_span(
tracing_enabled=self.tracing_enabled,
tracer=self.tracer,
span=self.request_span,
operation_id=hook_ctx.operation_id,
request=request,
)
return request

def after_success(
self, hook_ctx: AfterSuccessContext, response: httpx.Response
) -> Union[httpx.Response, Exception]:
response = get_traced_response(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, response=response)
response = get_traced_response(
tracing_enabled=self.tracing_enabled,
tracer=self.tracer,
span=self.request_span,
operation_id=hook_ctx.operation_id,
response=response,
)
self.request_span = None
return response

def after_error(
Expand All @@ -46,5 +63,13 @@ def after_error(
error: Optional[Exception],
) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]:
if response:
response, error = get_response_and_error(tracing_enabled=self.tracing_enabled, tracer=self.tracer, span=self.request_span, operation_id=hook_ctx.operation_id, response=response, error=error)
response, error = get_response_and_error(
tracing_enabled=self.tracing_enabled,
tracer=self.tracer,
span=self.request_span,
operation_id=hook_ctx.operation_id,
response=response,
error=error,
)
self.request_span = None
return response, error
83 changes: 22 additions & 61 deletions src/mistralai/extra/observability/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,17 @@
import opentelemetry.semconv._incubating.attributes.http_attributes as http_attributes
import opentelemetry.semconv.attributes.server_attributes as server_attributes
from opentelemetry import propagate, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExportResult
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span, Status, StatusCode, Tracer, set_span_in_context

logger = logging.getLogger(__name__)


OTEL_SERVICE_NAME: str = "mistralai_sdk"
OTEL_EXPORTER_OTLP_ENDPOINT: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
OTEL_EXPORTER_OTLP_TIMEOUT: int = int(os.getenv("OTEL_EXPORTER_OTLP_TIMEOUT", "2"))
OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE: int = int(os.getenv("OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE", "512"))
OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS: int = int(os.getenv("OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS", "1000"))
OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE: int = int(os.getenv("OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE", "2048"))
OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS: int = int(os.getenv("OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS", "5000"))

MISTRAL_SDK_OTEL_TRACER_NAME: str = OTEL_SERVICE_NAME + "_tracer"

MISTRAL_SDK_DEBUG_TRACING: bool = os.getenv("MISTRAL_SDK_DEBUG_TRACING", "false").lower() == "true"
DEBUG_HINT: str = "To see detailed exporter logs, set MISTRAL_SDK_DEBUG_TRACING=true."
DEBUG_HINT: str = "To see detailed tracing logs, set MISTRAL_SDK_DEBUG_TRACING=true."


class MistralAIAttributes:
Expand All @@ -51,13 +41,11 @@ class MistralAINameValues(Enum):
OCR = "ocr"

class TracingErrors(Exception, Enum):
FAILED_TO_EXPORT_OTEL_SPANS = "Failed to export OpenTelemetry (OTEL) spans."
FAILED_TO_INITIALIZE_OPENTELEMETRY_TRACING = "Failed to initialize OpenTelemetry tracing."
FAILED_TO_CREATE_SPAN_FOR_REQUEST = "Failed to create span for request."
FAILED_TO_ENRICH_SPAN_WITH_RESPONSE = "Failed to enrich span with response."
FAILED_TO_HANDLE_ERROR_IN_SPAN = "Failed to handle error in span."
FAILED_TO_END_SPAN = "Failed to end span."

def __str__(self):
return str(self.value)

Expand Down Expand Up @@ -179,6 +167,7 @@ def enrich_span_from_response(tracer: trace.Tracer, span: Span, operation_id: st
start_ns = parse_time_to_nanos(output["created_at"])
end_ns = parse_time_to_nanos(output["completed_at"])
child_span = tracer.start_span("Tool Execution", start_time=start_ns, context=parent_context)
child_span.set_attributes({"agent.trace.public": ""})
tool_attributes = {
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value,
gen_ai_attributes.GEN_AI_TOOL_CALL_ID: output.get("id", ""),
Expand All @@ -191,6 +180,7 @@ def enrich_span_from_response(tracer: trace.Tracer, span: Span, operation_id: st
start_ns = parse_time_to_nanos(output["created_at"])
end_ns = parse_time_to_nanos(output["completed_at"])
child_span = tracer.start_span("Message Output", start_time=start_ns, context=parent_context)
child_span.set_attributes({"agent.trace.public": ""})
message_attributes = {
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.CHAT.value,
gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value,
Expand All @@ -216,60 +206,30 @@ def on_start(self, span, parent_context = None):
span.set_attributes({"agent.trace.public": ""})


class QuietOTLPSpanExporter(OTLPSpanExporter):
def export(self, spans):
try:
return super().export(spans)
except Exception:
logger.warning(f"{TracingErrors.FAILED_TO_EXPORT_OTEL_SPANS} {(traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT)}")
return SpanExportResult.FAILURE


def get_or_create_otel_tracer() -> tuple[bool, Tracer]:
"""
3 possible cases:
Get a tracer from the current TracerProvider.

-> [SDK in a Workflow / App] If there is already a tracer provider set -> use that one
The SDK does not set up its own TracerProvider - it relies on the application
to configure OpenTelemetry. This follows OTEL best practices where:
- Libraries/SDKs get tracers from the global provider
- Applications configure the TracerProvider

-> [SDK standalone] If no tracer provider is set but the OTEL_EXPORTER_OTLP_ENDPOINT is set -> create a new tracer provider that exports to the OTEL_EXPORTER_OTLP_ENDPOINT
If no TracerProvider is configured, the ProxyTracerProvider (default) will
return a NoOp tracer, effectively disabling tracing. Once the application
sets up a real TracerProvider, subsequent spans will be recorded.

-> Else tracing is disabled
Returns:
Tuple[bool, Tracer]: (tracing_enabled, tracer)
- tracing_enabled is True if a real TracerProvider is configured
- tracer is always valid (may be NoOp if no provider configured)
"""
tracing_enabled = True
tracer_provider = trace.get_tracer_provider()

if isinstance(tracer_provider, trace.ProxyTracerProvider):
if OTEL_EXPORTER_OTLP_ENDPOINT:
# SDK standalone: No tracer provider but OTEL_EXPORTER_OTLP_ENDPOINT is set -> create a new tracer provider that exports to the OTEL_EXPORTER_OTLP_ENDPOINT
try:
exporter = QuietOTLPSpanExporter(
endpoint=OTEL_EXPORTER_OTLP_ENDPOINT,
timeout=OTEL_EXPORTER_OTLP_TIMEOUT
)
resource = Resource.create(attributes={SERVICE_NAME: OTEL_SERVICE_NAME})
tracer_provider = TracerProvider(resource=resource)

span_processor = BatchSpanProcessor(
exporter,
export_timeout_millis=OTEL_EXPORTER_OTLP_EXPORT_TIMEOUT_MILLIS,
max_export_batch_size=OTEL_EXPORTER_OTLP_MAX_EXPORT_BATCH_SIZE,
schedule_delay_millis=OTEL_EXPORTER_OTLP_SCHEDULE_DELAY_MILLIS,
max_queue_size=OTEL_EXPORTER_OTLP_MAX_QUEUE_SIZE
)

tracer_provider.add_span_processor(span_processor)
tracer_provider.add_span_processor(GenAISpanProcessor())
trace.set_tracer_provider(tracer_provider)

except Exception:
logger.warning(f"{TracingErrors.FAILED_TO_INITIALIZE_OPENTELEMETRY_TRACING} {(traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT)}")
tracing_enabled = False
else:
# No tracer provider nor OTEL_EXPORTER_OTLP_ENDPOINT set -> tracing is disabled
tracing_enabled = False

tracer = tracer_provider.get_tracer(MISTRAL_SDK_OTEL_TRACER_NAME)

# Tracing is considered enabled if we have a real TracerProvider (not the default proxy)
tracing_enabled = not isinstance(tracer_provider, trace.ProxyTracerProvider)

return tracing_enabled, tracer

def get_traced_request_and_span(
Expand All @@ -284,8 +244,9 @@ def get_traced_request_and_span(

try:
span = tracer.start_span(name=operation_id)
span.set_attributes({"agent.trace.public": ""})
# Inject the span context into the request headers to be used by the backend service to continue the trace
propagate.inject(request.headers)
propagate.inject(request.headers, context=set_span_in_context(span))
span = enrich_span_from_request(span, request)
except Exception:
logger.warning(
Expand Down