diff --git a/backend/app/api/main.py b/backend/app/api/main.py index 47cea3b1..bcd64eb5 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -20,11 +20,11 @@ onboarding, credentials, cron, - evaluations, fine_tuning, model_evaluation, collection_job, ) +from app.api.routes.evaluations import dataset as evaluation_dataset, evaluation from app.core.config import settings api_router = APIRouter() @@ -37,7 +37,8 @@ api_router.include_router(cron.router) api_router.include_router(documents.router) api_router.include_router(doc_transformation_job.router) -api_router.include_router(evaluations.router) +api_router.include_router(evaluation_dataset.router) +api_router.include_router(evaluation.router) api_router.include_router(llm.router) api_router.include_router(login.router) api_router.include_router(onboarding.router) diff --git a/backend/app/api/routes/evaluations/__init__.py b/backend/app/api/routes/evaluations/__init__.py deleted file mode 100644 index 3f7fe120..00000000 --- a/backend/app/api/routes/evaluations/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Evaluation API routes.""" - -from fastapi import APIRouter - -from app.api.routes.evaluations import dataset, evaluation - -router = APIRouter(prefix="/evaluations", tags=["evaluation"]) - -# Include dataset routes under /evaluations/datasets -router.include_router(dataset.router, prefix="/datasets") - -# Include evaluation routes directly under /evaluations -router.include_router(evaluation.router) diff --git a/backend/app/api/routes/evaluations/dataset.py b/backend/app/api/routes/evaluations/dataset.py index 25ecacff..d66ff71c 100644 --- a/backend/app/api/routes/evaluations/dataset.py +++ b/backend/app/api/routes/evaluations/dataset.py @@ -31,7 +31,7 @@ logger = logging.getLogger(__name__) -router = APIRouter() +router = APIRouter(prefix="/evaluations/datasets", tags=["Evaluation"]) def _dataset_to_response(dataset: EvaluationDataset) -> DatasetUploadResponse: diff --git a/backend/app/api/routes/evaluations/evaluation.py b/backend/app/api/routes/evaluations/evaluation.py index b51a5948..d40a88a1 100644 --- a/backend/app/api/routes/evaluations/evaluation.py +++ b/backend/app/api/routes/evaluations/evaluation.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) -router = APIRouter() +router = APIRouter(prefix="/evaluations", tags=["Evaluation"]) @router.post( diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index b2b118df..59dec8e2 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -207,7 +207,7 @@ def get_or_fetch_score( This function implements a cache-on-first-request pattern: - If score already has 'traces' key, return it - - Otherwise, fetch from Langfuse, update score column, and return + - Otherwise, fetch from Langfuse, merge with existing summary_scores, and return - If force_refetch is True, always fetch fresh data from Langfuse Args: @@ -224,8 +224,8 @@ def get_or_fetch_score( Exception: If Langfuse API calls fail """ # Check if score already exists with traces - has_score = eval_run.score is not None and "traces" in eval_run.score - if not force_refetch and has_score: + has_traces = eval_run.score is not None and "traces" in eval_run.score + if not force_refetch and has_traces: logger.info( f"[get_or_fetch_score] Returning existing score | evaluation_id={eval_run.id}" ) @@ -237,13 +237,31 @@ def get_or_fetch_score( f"run={eval_run.run_name} | force_refetch={force_refetch}" ) + # Get existing summary_scores if any (e.g., cosine_similarity from cron job) + existing_summary_scores = [] + if eval_run.score and "summary_scores" in eval_run.score: + existing_summary_scores = eval_run.score.get("summary_scores", []) + # Fetch from Langfuse - score = fetch_trace_scores_from_langfuse( + langfuse_score = fetch_trace_scores_from_langfuse( langfuse=langfuse, dataset_name=eval_run.dataset_name, run_name=eval_run.run_name, ) + # Merge summary_scores: existing scores + new scores from Langfuse + existing_scores_map = {s["name"]: s for s in existing_summary_scores} + for langfuse_summary in langfuse_score.get("summary_scores", []): + existing_scores_map[langfuse_summary["name"]] = langfuse_summary + + merged_summary_scores = list(existing_scores_map.values()) + + # Build final score with merged summary_scores and traces + score: dict[str, Any] = { + "summary_scores": merged_summary_scores, + "traces": langfuse_score.get("traces", []), + } + # Update score column using existing helper update_evaluation_run(session=session, eval_run=eval_run, score=score) diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index fbc2d231..076ac9f3 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -382,21 +382,19 @@ async def process_completed_embedding_batch( # Step 4: Calculate similarity scores similarity_stats = calculate_average_similarity(embedding_pairs=embedding_pairs) - # Step 5: Update evaluation_run with scores - if eval_run.score is None: - eval_run.score = {} - - eval_run.score["cosine_similarity"] = { - "avg": similarity_stats["cosine_similarity_avg"], - "std": similarity_stats["cosine_similarity_std"], - "total_pairs": similarity_stats["total_pairs"], - } - - # Optionally store per-item scores if not too large - if len(similarity_stats.get("per_item_scores", [])) <= 100: - eval_run.score["cosine_similarity"]["per_item_scores"] = similarity_stats[ - "per_item_scores" + # Step 5: Update evaluation_run with scores in summary_scores format + # This format is consistent with what Langfuse returns when fetching traces + eval_run.score = { + "summary_scores": [ + { + "name": "cosine_similarity", + "avg": round(float(similarity_stats["cosine_similarity_avg"]), 2), + "std": round(float(similarity_stats["cosine_similarity_std"]), 2), + "total_pairs": similarity_stats["total_pairs"], + "data_type": "NUMERIC", + } ] + } # Step 6: Update Langfuse traces with cosine similarity scores logger.info( diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index bf0d4dd0..4c1a5de7 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -263,19 +263,27 @@ def get_evaluation_with_scores( if not eval_run: return None, None - if not get_trace_info: - return eval_run, None - # Only fetch trace info for completed evaluations if eval_run.status != "completed": - return eval_run, ( - f"Trace info is only available for completed evaluations. " - f"Current status: {eval_run.status}" - ) + if get_trace_info: + return eval_run, ( + f"Trace info is only available for completed evaluations. " + f"Current status: {eval_run.status}" + ) + return eval_run, None + + # Check if we already have cached summary_scores + has_summary_scores = ( + eval_run.score is not None and "summary_scores" in eval_run.score + ) - # Check if we already have cached scores - has_cached_score = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_score: + # If not requesting trace info, return existing score (with summary_scores) + if not get_trace_info: + return eval_run, None + + # Check if we already have cached traces + has_cached_traces = eval_run.score is not None and "traces" in eval_run.score + if not resync_score and has_cached_traces: return eval_run, None langfuse = get_langfuse_client( @@ -288,9 +296,12 @@ def get_evaluation_with_scores( dataset_name = eval_run.dataset_name run_name = eval_run.run_name eval_run_id = eval_run.id + existing_summary_scores = ( + eval_run.score.get("summary_scores", []) if has_summary_scores else [] + ) try: - score = fetch_trace_scores_from_langfuse( + langfuse_score = fetch_trace_scores_from_langfuse( langfuse=langfuse, dataset_name=dataset_name, run_name=run_name, @@ -309,6 +320,23 @@ def get_evaluation_with_scores( ) return eval_run, f"Failed to fetch trace info from Langfuse: {str(e)}" + # Merge summary_scores: existing scores + new scores from Langfuse + # Create a map of existing scores by name + existing_scores_map = {s["name"]: s for s in existing_summary_scores} + langfuse_summary_scores = langfuse_score.get("summary_scores", []) + + # Merge: Langfuse scores take precedence (more up-to-date) + for langfuse_summary in langfuse_summary_scores: + existing_scores_map[langfuse_summary["name"]] = langfuse_summary + + merged_summary_scores = list(existing_scores_map.values()) + + # Build final score with merged summary_scores and traces + score = { + "summary_scores": merged_summary_scores, + "traces": langfuse_score.get("traces", []), + } + eval_run = save_score( eval_run_id=eval_run_id, organization_id=organization_id, diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 95a91bbd..afb0ac0e 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -527,9 +527,13 @@ async def test_process_completed_embedding_batch_success( db.refresh(result) assert result.status == "completed" assert result.score is not None - assert "cosine_similarity" in result.score - assert result.score["cosine_similarity"]["avg"] == 0.95 - mock_update_traces.assert_called_once() + assert "summary_scores" in result.score + summary_scores = result.score["summary_scores"] + cosine_score = next( + (s for s in summary_scores if s["name"] == "cosine_similarity"), None + ) + assert cosine_score is not None + assert cosine_score["avg"] == 0.95 @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results")