Skip to content

Can't get Gemini FileSearch to work #4397

@johnkwaters

Description

@johnkwaters

Bug Description

See previous issue, which maybe was closed a little prematurely, so opening a new one:
#4376 (comment)

I have tried all kind of prompts but don't know what to refer to the tool as, so i get errors like
unknown AI function search {"function": "search", "speech_id": "speech_ba73a234a6d1"}
or unknown AI function search {"function": "filesearch", "speech_id": "speech_ba73a234a6d1"} etc

How do I get the gemini llm to use file search and how can I see in the logs if it is trying?

Also - I am ALSO using my own MCP tools. Could this be the issue?

Expected Behavior

Gemini to use my store to answer questions

Reproduction Steps

See other issue

Operating System

Windows

Models Used

gemini-2.0-flash

Package Versions

livekit==1.0.23
livekit-agents==1.3.10
livekit-api==1.1.0
livekit-plugins-elevenlabs==1.3.10
livekit-plugins-google==1.3.10
livekit-plugins-liveavatar==1.3.7
livekit-plugins-openai==1.3.10
livekit-plugins-silero==1.3.10
livekit-plugins-turn-detector==1.3.10
livekit-protocol==1.1.1

Session/Room/Call IDs

• roomID: RM_Sy72PfckBrUM
• sessionID: poc-0d8557af02db474f8f2cca966a2af5f2
• jobID: AJ_NvgENohs9LL7

Proposed Solution

Additional Context

Here is my agent

import asyncio
import json
import logging
import os
from typing import Any, Dict, List

import httpx
from dotenv import load_dotenv

from livekit import rtc
from livekit.agents import Agent, AgentServer, AgentSession, JobContext, JobRequest, RoomIO, cli, mcp, metrics, MetricsCollectedEvent

from livekit.plugins import silero

try:
    from livekit.plugins.turn_detector.multilingual import MultilingualModel
except Exception:
    MultilingualModel = None

try:
    from livekit.plugins import elevenlabs
except Exception:
    elevenlabs = None

try:
    from livekit.plugins import openai
except Exception:
    openai = None

try:
    from livekit.plugins import google
except Exception:
    google = None

# Optional LiveAvatar plugin (install if available in your environment)
try:
    from livekit.plugins import liveavatar
except Exception:
    liveavatar = None

load_dotenv()

from datetime import datetime, timezone

logger = logging.getLogger("poc-agent")
agent_logger = logging.getLogger("agent")
logging.basicConfig(level=logging.DEBUG)
# Also set LiveAvatar logger to DEBUG
logging.getLogger("livekit.plugins.liveavatar").setLevel(logging.DEBUG)
# logging.getLogger("httpx").setLevel(logging.DEBUG)
# logging.getLogger("httpcore").setLevel(logging.DEBUG)

def create_mcp_servers():
    """Create MCP server connections for the agent session"""
    enable_mcp = os.environ.get("ENABLE_MCP_TOOLS", "false").lower() in ("1", "true", "yes")
    if not enable_mcp:
        return []
    
    mcp_url = os.environ.get("MCP_TOOLS_URL", "http://localhost:5000")
    mcp_endpoint = f"{mcp_url}/mcp"
    api_key = os.environ.get("AGENT_API_KEY")
    
    logger.info(f"Enabling MCP server at {mcp_endpoint}")
    logger.debug(f"API key present: {bool(api_key)}")
    
    # Add API key header for authentication
    headers = {}
    if api_key:
        headers["X-Api-Key"] = api_key
        logger.debug(f"Added X-Api-Key header to MCP client")
    
    srv = mcp.MCPServerHTTP(url=mcp_endpoint, headers=headers)

    logger.info("MCP server object: %r", srv)

    return [srv]

class PocAgent(Agent):
    def __init__(self, instructions: str, stt: Any, llm: Any, tts: Any, course_id: str = None, tools: list = None):
        if tools is None:
            tools = []
        super().__init__(instructions=instructions, stt=stt, llm=llm, tts=tts, tools=tools)
        self.transcript: List[Dict[str, Any]] = []
        self.course_id = course_id

    async def on_enter(self):
        """Called when the agent enters the session. Trigger immediate LLM turn to fetch course context."""
        if self.course_id:
            logger.info(f"Agent entered session with course_id: {self.course_id}")
            # Kick off an LLM turn immediately (before user speaks) that will call MCP tools
            try:
                await self.session.generate_reply(
                    user_input=(
                        f"Preflight: courseId={self.course_id}. "
                        "Call the GetCourseDetails MCP tool to fetch the course information. "
                        "Then greet the student warmly and ask what they need help with today. "
                        "When mentioning the course, use ONLY the Subject and Level fields from the MCP response (e.g., 'AP English' or 'Honors Physics'), "
                        "NOT the full Title field. Keep it brief and natural."
                    ),
                    tool_choice="auto",
                )
                logger.info("Generated initial reply with course context")
            except Exception as ex:
                logger.warning(f"Failed to generate initial reply: {ex}")
        else:
            logger.info("Agent entered session without course_id")

    async def on_user_turn_completed(self, turn_ctx, new_message):
        # Final user transcript for a turn
        text = getattr(new_message, "text_content", None)
        if text:
            self.transcript.append({"role": "user", "text": text, "utc": datetime.now(timezone.utc).isoformat()})

async def post_transcript(api_base: str, session_id: str, items: List[Dict[str, Any]]):
    api_key = os.environ.get("AGENT_API_KEY")
    headers = {}
    if api_key:
        headers["X-Api-Key"] = api_key
    
    async with httpx.AsyncClient(timeout=10) as client:
        r = await client.post(
            f"{api_base}/api/sessions/{session_id}/transcript",
            headers=headers,
            json=items,
        )
        r.raise_for_status()


def build_models():
    # STT/TTS via ElevenLabs plugin
    if not (elevenlabs and os.environ.get("ELEVENLABS_API_KEY")):
        raise RuntimeError("Set ELEVENLABS_API_KEY (and install livekit-plugins-elevenlabs)")

    stt = elevenlabs.STT(api_key=os.environ["ELEVENLABS_API_KEY"])

    voice_id = os.environ.get("ELEVENLABS_VOICE_ID") or None
    tts_kwargs = {
        "api_key": os.environ["ELEVENLABS_API_KEY"],
        "language": "en"  # Force English to prevent language switching
    }
    if voice_id:
        tts_kwargs["voice_id"] = voice_id
    tts = elevenlabs.TTS(**tts_kwargs)

    # LLM via Gemini plugin
    if google and os.environ.get("GOOGLE_API_KEY"):
        logger.info("Creating Gemini LLM")
        return stt, google.LLM(model="gemini-2.0-flash"), tts
    
    # Fallback to OpenAI plugin (optional)
    if openai and os.environ.get("OPENAI_API_KEY"):
        logger.info("Creating OpenAI LLM")
        return stt, openai.LLM(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o-mini"), tts

    # Last resort echo LLM
    logger.warning("No LLM configured, using echo mode")
    class EchoLLM:
        async def chat(self, *args, **kwargs):
            return "No LLM API key is set. Echo mode: I heard you."

    return stt, EchoLLM(), tts


def enhance_system_prompt_for_gemini_file_search(base_prompt: str) -> str:
    """Enhance system prompt with file search instructions when Gemini file search is available."""
    if not (google and os.environ.get("GOOGLE_API_KEY") and os.environ.get("GEMINI_STORE_ID")):
        return base_prompt
    
    file_search_instructions = (
        "\n\nFor any question that appears to be about the course curriculum, you MUST use the file_search tool before responding. Do not answer from memory. If file_search returns nothing relevant, ask one clarifying question, then use file_search again."
        "Provide accurate answers based on the search results. "
        "When you use FileSearch or curriculum materials, add a Sources: section listing the file names you used "
        "(and cite the specific section titles if available). If you did not use FileSearch, say Sources: none."
    )
    return base_prompt + file_search_instructions


async def maybe_start_liveavatar(ctx: JobContext, agent_session: AgentSession):
    enable = os.environ.get("ENABLE_LIVEAVATAR", "false").lower() in ("1", "true", "yes")
    if not enable:
        return

    if liveavatar is None:
        raise RuntimeError("ENABLE_LIVEAVATAR=true but liveavatar plugin isn't available in this environment.")

    api_key = os.environ.get("LIVEAVATAR_API_KEY")
    avatar_id = os.environ.get("LIVEAVATAR_AVATAR_ID")
    if not api_key or not avatar_id:
        raise RuntimeError("Set LIVEAVATAR_API_KEY and LIVEAVATAR_AVATAR_ID")

    try:
        avatar = liveavatar.AvatarSession(api_key=api_key, avatar_id=avatar_id)
        logger.info(f"Starting LiveAvatar with avatar_id={avatar_id}")
        await avatar.start(room=ctx.room, agent_session=agent_session)
        logger.info("LiveAvatar started successfully")
    except Exception as ex:
        logger.error(f"Failed to start LiveAvatar: {ex}", exc_info=True)
        raise


async def handle_request(request: JobRequest) -> None:
    await request.accept(identity=os.environ.get("AGENT_NAME", "poc-agent"))


async def on_session_end(ctx: JobContext):
    """Called when the session ends. Generate a session report for debugging."""
    try:
        import tempfile
        report = ctx.make_session_report().to_dict()
        temp_dir = tempfile.gettempdir()
        filename = os.path.join(temp_dir, f"session_report_{ctx.room.name}_{datetime.now():%Y%m%d_%H%M%S}.json")
        with open(filename, "w") as f:
            json.dump(report, f, indent=2)
        logger.info("Saved session report: %s", filename)
    except Exception as ex:
        logger.error("Failed to save session report: %s", ex)


server = AgentServer()


@server.rtc_session(on_request=handle_request, on_session_end=on_session_end)
async def entrypoint(ctx: JobContext):
    # Extract session ID from room name (format: poc-{sessionId})
    room_name = ctx.room.name
    session_id = room_name.replace("poc-", "") if room_name.startswith("poc-") else "unknown-session"
    
    api_base = os.environ.get("POC_API_BASE_URL", "http://localhost:5055")
    
    # Fetch session details from API
    system_prompt = "You are a helpful assistant."
    course_id = None
    course_code = None
    
    api_key = os.environ.get("AGENT_API_KEY")
    headers = {}
    if api_key:
        headers["X-Api-Key"] = api_key
    
    try:
        async with httpx.AsyncClient(timeout=10) as client:
            r = await client.get(f"{api_base}/api/sessions/{session_id}", headers=headers)
            if r.status_code == 200:
                session_data = r.json()
                system_prompt = session_data.get("systemPrompt", system_prompt)
                course_id = session_data.get("courseId")
                course_code = session_data.get("courseCode")
                logger.info("Loaded session metadata for: %s (course: %s)", session_id, course_code or "none")
            else:
                logger.warning("Could not fetch session metadata: %s", r.status_code)
    except Exception as ex:
        logger.warning("Failed to fetch session metadata: %s", ex)
    
    # Enhance system prompt with file search instructions if using Gemini
    system_prompt = enhance_system_prompt_for_gemini_file_search(system_prompt)

    logger.info("Job started: session=%s room=%s course=%s", session_id, ctx.room.name, course_code or "none")

    await ctx.connect()
    
    stt, llm, tts = build_models()

    # ElevenLabs STT requires VAD for streaming
    vad = silero.VAD.load()
    
    td = None
    if MultilingualModel is not None:
        try:
            td = MultilingualModel()
        except Exception:
            td = None

    # Setup MCP servers for the session
    mcp_servers = create_mcp_servers()

    session = AgentSession(vad=vad, turn_detection=td, mcp_servers=mcp_servers)
    
    # Set up metrics logging for debugging tool calls
    @session.on("metrics_collected")
    def _on_metrics(ev: MetricsCollectedEvent):
        metrics.log_metrics(ev.metrics)
    
    room_io = RoomIO(session, room=ctx.room)
    await room_io.start()

    # Start LiveAvatar AFTER room_io.start() but BEFORE agent starts speaking
    await maybe_start_liveavatar(ctx, session)

    # Create agent tools
    agent_tools = []
    if google and os.environ.get("GOOGLE_API_KEY") and os.environ.get("GEMINI_STORE_ID"):
        agent_tools.append(
            google.tools.FileSearch(
                file_search_store_names=[os.environ.get("GEMINI_STORE_ID")],
                top_k=5,
                metadata_filter=None
            )
        )
        logger.info("File Search tool added to agent")

    # Create agent with course context - the on_enter() hook will trigger immediate LLM turn
    agent = PocAgent(instructions=system_prompt, stt=stt, llm=llm, tts=tts, course_id=course_id, tools=agent_tools)
    await session.start(agent=agent)

    # Set up disconnect handler
    disconnect_event = asyncio.Event()
    
    @ctx.room.on("disconnected")
    def on_disconnected():
        disconnect_event.set()
    
    # Wait for disconnect
    await disconnect_event.wait()

    # Best-effort transcript upload
    try:
        await post_transcript(api_base, session_id, agent.transcript)
    except Exception as ex:
        logger.error("Failed to post transcript: %s", ex)


if __name__ == "__main__":
    cli.run_app(server)

Screenshots and Recordings

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions