-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Bug Description
See previous issue, which maybe was closed a little prematurely, so opening a new one:
#4376 (comment)
I have tried all kind of prompts but don't know what to refer to the tool as, so i get errors like
unknown AI function search {"function": "search", "speech_id": "speech_ba73a234a6d1"}
or unknown AI function search {"function": "filesearch", "speech_id": "speech_ba73a234a6d1"} etc
How do I get the gemini llm to use file search and how can I see in the logs if it is trying?
Also - I am ALSO using my own MCP tools. Could this be the issue?
Expected Behavior
Gemini to use my store to answer questions
Reproduction Steps
See other issueOperating System
Windows
Models Used
gemini-2.0-flash
Package Versions
livekit==1.0.23
livekit-agents==1.3.10
livekit-api==1.1.0
livekit-plugins-elevenlabs==1.3.10
livekit-plugins-google==1.3.10
livekit-plugins-liveavatar==1.3.7
livekit-plugins-openai==1.3.10
livekit-plugins-silero==1.3.10
livekit-plugins-turn-detector==1.3.10
livekit-protocol==1.1.1Session/Room/Call IDs
• roomID: RM_Sy72PfckBrUM
• sessionID: poc-0d8557af02db474f8f2cca966a2af5f2
• jobID: AJ_NvgENohs9LL7
Proposed Solution
Additional Context
Here is my agent
import asyncio
import json
import logging
import os
from typing import Any, Dict, List
import httpx
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import Agent, AgentServer, AgentSession, JobContext, JobRequest, RoomIO, cli, mcp, metrics, MetricsCollectedEvent
from livekit.plugins import silero
try:
from livekit.plugins.turn_detector.multilingual import MultilingualModel
except Exception:
MultilingualModel = None
try:
from livekit.plugins import elevenlabs
except Exception:
elevenlabs = None
try:
from livekit.plugins import openai
except Exception:
openai = None
try:
from livekit.plugins import google
except Exception:
google = None
# Optional LiveAvatar plugin (install if available in your environment)
try:
from livekit.plugins import liveavatar
except Exception:
liveavatar = None
load_dotenv()
from datetime import datetime, timezone
logger = logging.getLogger("poc-agent")
agent_logger = logging.getLogger("agent")
logging.basicConfig(level=logging.DEBUG)
# Also set LiveAvatar logger to DEBUG
logging.getLogger("livekit.plugins.liveavatar").setLevel(logging.DEBUG)
# logging.getLogger("httpx").setLevel(logging.DEBUG)
# logging.getLogger("httpcore").setLevel(logging.DEBUG)
def create_mcp_servers():
"""Create MCP server connections for the agent session"""
enable_mcp = os.environ.get("ENABLE_MCP_TOOLS", "false").lower() in ("1", "true", "yes")
if not enable_mcp:
return []
mcp_url = os.environ.get("MCP_TOOLS_URL", "http://localhost:5000")
mcp_endpoint = f"{mcp_url}/mcp"
api_key = os.environ.get("AGENT_API_KEY")
logger.info(f"Enabling MCP server at {mcp_endpoint}")
logger.debug(f"API key present: {bool(api_key)}")
# Add API key header for authentication
headers = {}
if api_key:
headers["X-Api-Key"] = api_key
logger.debug(f"Added X-Api-Key header to MCP client")
srv = mcp.MCPServerHTTP(url=mcp_endpoint, headers=headers)
logger.info("MCP server object: %r", srv)
return [srv]
class PocAgent(Agent):
def __init__(self, instructions: str, stt: Any, llm: Any, tts: Any, course_id: str = None, tools: list = None):
if tools is None:
tools = []
super().__init__(instructions=instructions, stt=stt, llm=llm, tts=tts, tools=tools)
self.transcript: List[Dict[str, Any]] = []
self.course_id = course_id
async def on_enter(self):
"""Called when the agent enters the session. Trigger immediate LLM turn to fetch course context."""
if self.course_id:
logger.info(f"Agent entered session with course_id: {self.course_id}")
# Kick off an LLM turn immediately (before user speaks) that will call MCP tools
try:
await self.session.generate_reply(
user_input=(
f"Preflight: courseId={self.course_id}. "
"Call the GetCourseDetails MCP tool to fetch the course information. "
"Then greet the student warmly and ask what they need help with today. "
"When mentioning the course, use ONLY the Subject and Level fields from the MCP response (e.g., 'AP English' or 'Honors Physics'), "
"NOT the full Title field. Keep it brief and natural."
),
tool_choice="auto",
)
logger.info("Generated initial reply with course context")
except Exception as ex:
logger.warning(f"Failed to generate initial reply: {ex}")
else:
logger.info("Agent entered session without course_id")
async def on_user_turn_completed(self, turn_ctx, new_message):
# Final user transcript for a turn
text = getattr(new_message, "text_content", None)
if text:
self.transcript.append({"role": "user", "text": text, "utc": datetime.now(timezone.utc).isoformat()})
async def post_transcript(api_base: str, session_id: str, items: List[Dict[str, Any]]):
api_key = os.environ.get("AGENT_API_KEY")
headers = {}
if api_key:
headers["X-Api-Key"] = api_key
async with httpx.AsyncClient(timeout=10) as client:
r = await client.post(
f"{api_base}/api/sessions/{session_id}/transcript",
headers=headers,
json=items,
)
r.raise_for_status()
def build_models():
# STT/TTS via ElevenLabs plugin
if not (elevenlabs and os.environ.get("ELEVENLABS_API_KEY")):
raise RuntimeError("Set ELEVENLABS_API_KEY (and install livekit-plugins-elevenlabs)")
stt = elevenlabs.STT(api_key=os.environ["ELEVENLABS_API_KEY"])
voice_id = os.environ.get("ELEVENLABS_VOICE_ID") or None
tts_kwargs = {
"api_key": os.environ["ELEVENLABS_API_KEY"],
"language": "en" # Force English to prevent language switching
}
if voice_id:
tts_kwargs["voice_id"] = voice_id
tts = elevenlabs.TTS(**tts_kwargs)
# LLM via Gemini plugin
if google and os.environ.get("GOOGLE_API_KEY"):
logger.info("Creating Gemini LLM")
return stt, google.LLM(model="gemini-2.0-flash"), tts
# Fallback to OpenAI plugin (optional)
if openai and os.environ.get("OPENAI_API_KEY"):
logger.info("Creating OpenAI LLM")
return stt, openai.LLM(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o-mini"), tts
# Last resort echo LLM
logger.warning("No LLM configured, using echo mode")
class EchoLLM:
async def chat(self, *args, **kwargs):
return "No LLM API key is set. Echo mode: I heard you."
return stt, EchoLLM(), tts
def enhance_system_prompt_for_gemini_file_search(base_prompt: str) -> str:
"""Enhance system prompt with file search instructions when Gemini file search is available."""
if not (google and os.environ.get("GOOGLE_API_KEY") and os.environ.get("GEMINI_STORE_ID")):
return base_prompt
file_search_instructions = (
"\n\nFor any question that appears to be about the course curriculum, you MUST use the file_search tool before responding. Do not answer from memory. If file_search returns nothing relevant, ask one clarifying question, then use file_search again."
"Provide accurate answers based on the search results. "
"When you use FileSearch or curriculum materials, add a Sources: section listing the file names you used "
"(and cite the specific section titles if available). If you did not use FileSearch, say Sources: none."
)
return base_prompt + file_search_instructions
async def maybe_start_liveavatar(ctx: JobContext, agent_session: AgentSession):
enable = os.environ.get("ENABLE_LIVEAVATAR", "false").lower() in ("1", "true", "yes")
if not enable:
return
if liveavatar is None:
raise RuntimeError("ENABLE_LIVEAVATAR=true but liveavatar plugin isn't available in this environment.")
api_key = os.environ.get("LIVEAVATAR_API_KEY")
avatar_id = os.environ.get("LIVEAVATAR_AVATAR_ID")
if not api_key or not avatar_id:
raise RuntimeError("Set LIVEAVATAR_API_KEY and LIVEAVATAR_AVATAR_ID")
try:
avatar = liveavatar.AvatarSession(api_key=api_key, avatar_id=avatar_id)
logger.info(f"Starting LiveAvatar with avatar_id={avatar_id}")
await avatar.start(room=ctx.room, agent_session=agent_session)
logger.info("LiveAvatar started successfully")
except Exception as ex:
logger.error(f"Failed to start LiveAvatar: {ex}", exc_info=True)
raise
async def handle_request(request: JobRequest) -> None:
await request.accept(identity=os.environ.get("AGENT_NAME", "poc-agent"))
async def on_session_end(ctx: JobContext):
"""Called when the session ends. Generate a session report for debugging."""
try:
import tempfile
report = ctx.make_session_report().to_dict()
temp_dir = tempfile.gettempdir()
filename = os.path.join(temp_dir, f"session_report_{ctx.room.name}_{datetime.now():%Y%m%d_%H%M%S}.json")
with open(filename, "w") as f:
json.dump(report, f, indent=2)
logger.info("Saved session report: %s", filename)
except Exception as ex:
logger.error("Failed to save session report: %s", ex)
server = AgentServer()
@server.rtc_session(on_request=handle_request, on_session_end=on_session_end)
async def entrypoint(ctx: JobContext):
# Extract session ID from room name (format: poc-{sessionId})
room_name = ctx.room.name
session_id = room_name.replace("poc-", "") if room_name.startswith("poc-") else "unknown-session"
api_base = os.environ.get("POC_API_BASE_URL", "http://localhost:5055")
# Fetch session details from API
system_prompt = "You are a helpful assistant."
course_id = None
course_code = None
api_key = os.environ.get("AGENT_API_KEY")
headers = {}
if api_key:
headers["X-Api-Key"] = api_key
try:
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(f"{api_base}/api/sessions/{session_id}", headers=headers)
if r.status_code == 200:
session_data = r.json()
system_prompt = session_data.get("systemPrompt", system_prompt)
course_id = session_data.get("courseId")
course_code = session_data.get("courseCode")
logger.info("Loaded session metadata for: %s (course: %s)", session_id, course_code or "none")
else:
logger.warning("Could not fetch session metadata: %s", r.status_code)
except Exception as ex:
logger.warning("Failed to fetch session metadata: %s", ex)
# Enhance system prompt with file search instructions if using Gemini
system_prompt = enhance_system_prompt_for_gemini_file_search(system_prompt)
logger.info("Job started: session=%s room=%s course=%s", session_id, ctx.room.name, course_code or "none")
await ctx.connect()
stt, llm, tts = build_models()
# ElevenLabs STT requires VAD for streaming
vad = silero.VAD.load()
td = None
if MultilingualModel is not None:
try:
td = MultilingualModel()
except Exception:
td = None
# Setup MCP servers for the session
mcp_servers = create_mcp_servers()
session = AgentSession(vad=vad, turn_detection=td, mcp_servers=mcp_servers)
# Set up metrics logging for debugging tool calls
@session.on("metrics_collected")
def _on_metrics(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
room_io = RoomIO(session, room=ctx.room)
await room_io.start()
# Start LiveAvatar AFTER room_io.start() but BEFORE agent starts speaking
await maybe_start_liveavatar(ctx, session)
# Create agent tools
agent_tools = []
if google and os.environ.get("GOOGLE_API_KEY") and os.environ.get("GEMINI_STORE_ID"):
agent_tools.append(
google.tools.FileSearch(
file_search_store_names=[os.environ.get("GEMINI_STORE_ID")],
top_k=5,
metadata_filter=None
)
)
logger.info("File Search tool added to agent")
# Create agent with course context - the on_enter() hook will trigger immediate LLM turn
agent = PocAgent(instructions=system_prompt, stt=stt, llm=llm, tts=tts, course_id=course_id, tools=agent_tools)
await session.start(agent=agent)
# Set up disconnect handler
disconnect_event = asyncio.Event()
@ctx.room.on("disconnected")
def on_disconnected():
disconnect_event.set()
# Wait for disconnect
await disconnect_event.wait()
# Best-effort transcript upload
try:
await post_transcript(api_base, session_id, agent.transcript)
except Exception as ex:
logger.error("Failed to post transcript: %s", ex)
if __name__ == "__main__":
cli.run_app(server)
Screenshots and Recordings
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working