Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
onboarding,
credentials,
cron,
evaluations,
fine_tuning,
model_evaluation,
collection_job,
)
from app.api.routes.evaluations import dataset as evaluation_dataset, evaluation
from app.core.config import settings

api_router = APIRouter()
Expand All @@ -37,7 +37,8 @@
api_router.include_router(cron.router)
api_router.include_router(documents.router)
api_router.include_router(doc_transformation_job.router)
api_router.include_router(evaluations.router)
api_router.include_router(evaluation_dataset.router)
api_router.include_router(evaluation.router)
api_router.include_router(llm.router)
api_router.include_router(login.router)
api_router.include_router(onboarding.router)
Expand Down
13 changes: 0 additions & 13 deletions backend/app/api/routes/evaluations/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion backend/app/api/routes/evaluations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

logger = logging.getLogger(__name__)

router = APIRouter()
router = APIRouter(prefix="/evaluations/datasets", tags=["Evaluation"])


def _dataset_to_response(dataset: EvaluationDataset) -> DatasetUploadResponse:
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/routes/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

logger = logging.getLogger(__name__)

router = APIRouter()
router = APIRouter(prefix="/evaluations", tags=["Evaluation"])


@router.post(
Expand Down
26 changes: 22 additions & 4 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def get_or_fetch_score(

This function implements a cache-on-first-request pattern:
- If score already has 'traces' key, return it
- Otherwise, fetch from Langfuse, update score column, and return
- Otherwise, fetch from Langfuse, merge with existing summary_scores, and return
- If force_refetch is True, always fetch fresh data from Langfuse

Args:
Expand All @@ -224,8 +224,8 @@ def get_or_fetch_score(
Exception: If Langfuse API calls fail
"""
# Check if score already exists with traces
has_score = eval_run.score is not None and "traces" in eval_run.score
if not force_refetch and has_score:
has_traces = eval_run.score is not None and "traces" in eval_run.score
if not force_refetch and has_traces:
logger.info(
f"[get_or_fetch_score] Returning existing score | evaluation_id={eval_run.id}"
)
Expand All @@ -237,13 +237,31 @@ def get_or_fetch_score(
f"run={eval_run.run_name} | force_refetch={force_refetch}"
)

# Get existing summary_scores if any (e.g., cosine_similarity from cron job)
existing_summary_scores = []
if eval_run.score and "summary_scores" in eval_run.score:
existing_summary_scores = eval_run.score.get("summary_scores", [])

# Fetch from Langfuse
score = fetch_trace_scores_from_langfuse(
langfuse_score = fetch_trace_scores_from_langfuse(
langfuse=langfuse,
dataset_name=eval_run.dataset_name,
run_name=eval_run.run_name,
)

# Merge summary_scores: existing scores + new scores from Langfuse
existing_scores_map = {s["name"]: s for s in existing_summary_scores}
for langfuse_summary in langfuse_score.get("summary_scores", []):
existing_scores_map[langfuse_summary["name"]] = langfuse_summary

merged_summary_scores = list(existing_scores_map.values())

# Build final score with merged summary_scores and traces
score: dict[str, Any] = {
"summary_scores": merged_summary_scores,
"traces": langfuse_score.get("traces", []),
}

# Update score column using existing helper
update_evaluation_run(session=session, eval_run=eval_run, score=score)

Expand Down
26 changes: 12 additions & 14 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,19 @@ async def process_completed_embedding_batch(
# Step 4: Calculate similarity scores
similarity_stats = calculate_average_similarity(embedding_pairs=embedding_pairs)

# Step 5: Update evaluation_run with scores
if eval_run.score is None:
eval_run.score = {}

eval_run.score["cosine_similarity"] = {
"avg": similarity_stats["cosine_similarity_avg"],
"std": similarity_stats["cosine_similarity_std"],
"total_pairs": similarity_stats["total_pairs"],
}

# Optionally store per-item scores if not too large
if len(similarity_stats.get("per_item_scores", [])) <= 100:
eval_run.score["cosine_similarity"]["per_item_scores"] = similarity_stats[
"per_item_scores"
# Step 5: Update evaluation_run with scores in summary_scores format
# This format is consistent with what Langfuse returns when fetching traces
eval_run.score = {
"summary_scores": [
{
"name": "cosine_similarity",
"avg": round(float(similarity_stats["cosine_similarity_avg"]), 2),
"std": round(float(similarity_stats["cosine_similarity_std"]), 2),
"total_pairs": similarity_stats["total_pairs"],
"data_type": "NUMERIC",
}
]
}

# Step 6: Update Langfuse traces with cosine similarity scores
logger.info(
Expand Down
50 changes: 39 additions & 11 deletions backend/app/services/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,27 @@ def get_evaluation_with_scores(
if not eval_run:
return None, None

if not get_trace_info:
return eval_run, None

# Only fetch trace info for completed evaluations
if eval_run.status != "completed":
return eval_run, (
f"Trace info is only available for completed evaluations. "
f"Current status: {eval_run.status}"
)
if get_trace_info:
return eval_run, (
f"Trace info is only available for completed evaluations. "
f"Current status: {eval_run.status}"
)
return eval_run, None

# Check if we already have cached summary_scores
has_summary_scores = (
eval_run.score is not None and "summary_scores" in eval_run.score
)

# Check if we already have cached scores
has_cached_score = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_score:
# If not requesting trace info, return existing score (with summary_scores)
if not get_trace_info:
return eval_run, None

# Check if we already have cached traces
has_cached_traces = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_traces:
return eval_run, None

langfuse = get_langfuse_client(
Expand All @@ -288,9 +296,12 @@ def get_evaluation_with_scores(
dataset_name = eval_run.dataset_name
run_name = eval_run.run_name
eval_run_id = eval_run.id
existing_summary_scores = (
eval_run.score.get("summary_scores", []) if has_summary_scores else []
)

try:
score = fetch_trace_scores_from_langfuse(
langfuse_score = fetch_trace_scores_from_langfuse(
langfuse=langfuse,
dataset_name=dataset_name,
run_name=run_name,
Expand All @@ -309,6 +320,23 @@ def get_evaluation_with_scores(
)
return eval_run, f"Failed to fetch trace info from Langfuse: {str(e)}"

# Merge summary_scores: existing scores + new scores from Langfuse
# Create a map of existing scores by name
existing_scores_map = {s["name"]: s for s in existing_summary_scores}
langfuse_summary_scores = langfuse_score.get("summary_scores", [])

# Merge: Langfuse scores take precedence (more up-to-date)
for langfuse_summary in langfuse_summary_scores:
existing_scores_map[langfuse_summary["name"]] = langfuse_summary

merged_summary_scores = list(existing_scores_map.values())

# Build final score with merged summary_scores and traces
score = {
"summary_scores": merged_summary_scores,
"traces": langfuse_score.get("traces", []),
}

eval_run = save_score(
eval_run_id=eval_run_id,
organization_id=organization_id,
Expand Down
10 changes: 7 additions & 3 deletions backend/app/tests/crud/evaluations/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,13 @@ async def test_process_completed_embedding_batch_success(
db.refresh(result)
assert result.status == "completed"
assert result.score is not None
assert "cosine_similarity" in result.score
assert result.score["cosine_similarity"]["avg"] == 0.95
mock_update_traces.assert_called_once()
assert "summary_scores" in result.score
summary_scores = result.score["summary_scores"]
cosine_score = next(
(s for s in summary_scores if s["name"] == "cosine_similarity"), None
)
assert cosine_score is not None
assert cosine_score["avg"] == 0.95

@pytest.mark.asyncio
@patch("app.crud.evaluations.processing.download_batch_results")
Expand Down