From e99612c5c188b0aab9c63a34a7016a794ea077fb Mon Sep 17 00:00:00 2001 From: Selam Waktola Date: Wed, 11 Feb 2026 14:49:51 -0800 Subject: [PATCH 1/3] fix(agent): Autofix trailing commas in LLM-generated JSON revert sample-agentand add fix to adk-agent for malformed JSON Add robust JSON autofix and tests Add robust JSON autofix logic Implement robust JSON autofix with tests --- .../extension/send_a2ui_to_client_toolset.py | 44 ++++++++++++++----- .../test_send_a2ui_to_client_toolset.py | 27 ++++++++++++ samples/agent/adk/restaurant_finder/agent.py | 4 ++ 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/a2a_agents/python/a2ui_agent/src/a2ui/extension/send_a2ui_to_client_toolset.py b/a2a_agents/python/a2ui_agent/src/a2ui/extension/send_a2ui_to_client_toolset.py index 1101869cf..9d637a08f 100644 --- a/a2a_agents/python/a2ui_agent/src/a2ui/extension/send_a2ui_to_client_toolset.py +++ b/a2a_agents/python/a2ui_agent/src/a2ui/extension/send_a2ui_to_client_toolset.py @@ -80,6 +80,7 @@ async def get_schema(ctx: ReadonlyContext) -> dict[str, Any]: import inspect import json import logging +import re from typing import Any, Awaitable, Callable, Optional, TypeAlias, Union import jsonschema @@ -252,17 +253,40 @@ async def run_async( f" arg {self.A2UI_JSON_ARG_NAME} " ) - a2ui_json_payload = json.loads(a2ui_json) + a2ui_schema = await self.get_a2ui_schema(tool_context) - # Auto-wrap single object in list - if not isinstance(a2ui_json_payload, list): - logger.info( - "Received a single JSON object, wrapping in a list for validation." - ) - a2ui_json_payload = [a2ui_json_payload] + try: + # Attempt to parse and validate + a2ui_json_payload = json.loads(a2ui_json) - a2ui_schema = await self.get_a2ui_schema(tool_context) - jsonschema.validate(instance=a2ui_json_payload, schema=a2ui_schema) + # Auto-wrap single object in list + if not isinstance(a2ui_json_payload, list): + logger.info("Received a single JSON object, wrapping in a list for validation.") + a2ui_json_payload = [a2ui_json_payload] + + jsonschema.validate(instance=a2ui_json_payload, schema=a2ui_schema) + + except (jsonschema.exceptions.ValidationError, json.JSONDecodeError) as e: + logger.warning(f"Initial A2UI JSON validation failed: {e}") + + # Run Fixer + fixed_a2ui_json = re.sub(r",(?=\s*[\]}])", "", a2ui_json) + + if fixed_a2ui_json != a2ui_json: + # Emit Warning + logger.warning("Detected trailing commas in LLM output; applied autofix.") + + # Re-parse and Re-validate + a2ui_json_payload = json.loads(fixed_a2ui_json) + + # Auto-wrap single object in list + if not isinstance(a2ui_json_payload, list): + logger.info("Received a single JSON object, wrapping in a list for validation.") + a2ui_json_payload = [a2ui_json_payload] + + jsonschema.validate(instance=a2ui_json_payload, schema=a2ui_schema) + else: + raise e logger.info( f"Validated call to tool {self.TOOL_NAME} with {self.A2UI_JSON_ARG_NAME}" @@ -328,4 +352,4 @@ def convert_send_a2ui_to_client_genai_part_to_a2a_part( converted_part = part_converter.convert_genai_part_to_a2a_part(part) logger.info(f"Returning converted part: {converted_part}") - return [converted_part] if converted_part else [] + return [converted_part] if converted_part else [] \ No newline at end of file diff --git a/a2a_agents/python/a2ui_agent/tests/extension/test_send_a2ui_to_client_toolset.py b/a2a_agents/python/a2ui_agent/tests/extension/test_send_a2ui_to_client_toolset.py index 03fd74748..c023fd780 100644 --- a/a2a_agents/python/a2ui_agent/tests/extension/test_send_a2ui_to_client_toolset.py +++ b/a2a_agents/python/a2ui_agent/tests/extension/test_send_a2ui_to_client_toolset.py @@ -243,6 +243,33 @@ async def test_send_tool_run_async_schema_validation_fail(): assert "'text' is a required property" in result["error"] +@pytest.mark.asyncio +async def test_send_tool_run_async_handles_trailing_comma(caplog): + """Tests that the tool's run_async can handle and fix a trailing comma in the JSON.""" + tool = SendA2uiToClientToolset._SendA2uiJsonToClientTool(TEST_A2UI_SCHEMA) + tool_context_mock = MagicMock(spec=ToolContext) + tool_context_mock.state = {} + tool_context_mock.actions = MagicMock(skip_summarization=False) + + # Malformed JSON with a trailing comma in the list + malformed_a2ui_str = '[{"type": "Text", "text": "Hello"},]' + + args = { + SendA2uiToClientToolset._SendA2uiJsonToClientTool.A2UI_JSON_ARG_NAME: malformed_a2ui_str + } + + result = await tool.run_async(args=args, tool_context=tool_context_mock) + + # Assert that the fix was successful and the result is correct + expected_a2ui = [{"type": "Text", "text": "Hello"}] + assert result == { + SendA2uiToClientToolset._SendA2uiJsonToClientTool.VALIDATED_A2UI_JSON_KEY: expected_a2ui + } + + # Assert that the warning was logged + assert "Detected trailing commas in LLM output; applied autofix." in caplog.text + + # endregion # region send_a2ui_to_client_part_converter Tests diff --git a/samples/agent/adk/restaurant_finder/agent.py b/samples/agent/adk/restaurant_finder/agent.py index 5283cd882..49c30b021 100644 --- a/samples/agent/adk/restaurant_finder/agent.py +++ b/samples/agent/adk/restaurant_finder/agent.py @@ -15,6 +15,7 @@ import json import logging import os +import re from collections.abc import AsyncIterable from typing import Any @@ -226,6 +227,9 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: if not json_string_cleaned: raise ValueError("Cleaned JSON string is empty.") + # Autofix: Remove trailing commas from arrays and objects + json_string_cleaned = re.sub(r",\s*([\]}])", r"\1", json_string_cleaned) + # --- New Validation Steps --- # 1. Check if it's parsable JSON parsed_json_data = json.loads(json_string_cleaned) From 886b10b3e08340f42d3f0435695ce099c29a431c Mon Sep 17 00:00:00 2001 From: Selam Waktola Date: Fri, 13 Feb 2026 12:23:10 -0800 Subject: [PATCH 2/3] back to original --- samples/agent/adk/restaurant_finder/agent.py | 478 +++++++++---------- 1 file changed, 235 insertions(+), 243 deletions(-) diff --git a/samples/agent/adk/restaurant_finder/agent.py b/samples/agent/adk/restaurant_finder/agent.py index 49c30b021..e4f9e3fef 100644 --- a/samples/agent/adk/restaurant_finder/agent.py +++ b/samples/agent/adk/restaurant_finder/agent.py @@ -15,7 +15,6 @@ import json import logging import os -import re from collections.abc import AsyncIterable from typing import Any @@ -55,253 +54,246 @@ class RestaurantAgent: - """An agent that finds restaurants based on user criteria.""" - - SUPPORTED_CONTENT_TYPES = ["text", "text/plain"] - - def __init__(self, base_url: str, use_ui: bool = False): - self.base_url = base_url - self.use_ui = use_ui - self._agent = self._build_agent(use_ui) - self._user_id = "remote_agent" - self._runner = Runner( - app_name=self._agent.name, - agent=self._agent, - artifact_service=InMemoryArtifactService(), - session_service=InMemorySessionService(), - memory_service=InMemoryMemoryService(), - ) - - # --- MODIFICATION: Wrap the schema --- - # Load the A2UI_SCHEMA string into a Python object for validation - try: - # First, load the schema for a *single message* - single_message_schema = json.loads(A2UI_SCHEMA) - - # The prompt instructs the LLM to return a *list* of messages. - # Therefore, our validation schema must be an *array* of the single message schema. - self.a2ui_schema_object = {"type": "array", "items": single_message_schema} - logger.info( - "A2UI_SCHEMA successfully loaded and wrapped in an array validator." - ) - except json.JSONDecodeError as e: - logger.error(f"CRITICAL: Failed to parse A2UI_SCHEMA: {e}") - self.a2ui_schema_object = None - # --- END MODIFICATION --- - - def get_processing_message(self) -> str: - return "Finding restaurants that match your criteria..." - - def _build_agent(self, use_ui: bool) -> LlmAgent: - """Builds the LLM agent for the restaurant agent.""" - LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash") - - if use_ui: - # Construct the full prompt with UI instructions, examples, and schema - instruction = AGENT_INSTRUCTION + get_ui_prompt( - self.base_url, RESTAURANT_UI_EXAMPLES + """An agent that finds restaurants based on user criteria.""" + + SUPPORTED_CONTENT_TYPES = ["text", "text/plain"] + + def __init__(self, base_url: str, use_ui: bool = False): + self.base_url = base_url + self.use_ui = use_ui + self._agent = self._build_agent(use_ui) + self._user_id = "remote_agent" + self._runner = Runner( + app_name=self._agent.name, + agent=self._agent, + artifact_service=InMemoryArtifactService(), + session_service=InMemorySessionService(), + memory_service=InMemoryMemoryService(), + ) + + # --- MODIFICATION: Wrap the schema --- + # Load the A2UI_SCHEMA string into a Python object for validation + try: + # First, load the schema for a *single message* + single_message_schema = json.loads(A2UI_SCHEMA) + + # The prompt instructs the LLM to return a *list* of messages. + # Therefore, our validation schema must be an *array* of the single message schema. + self.a2ui_schema_object = {"type": "array", "items": single_message_schema} + logger.info("A2UI_SCHEMA successfully loaded and wrapped in an array validator.") + except json.JSONDecodeError as e: + logger.error(f"CRITICAL: Failed to parse A2UI_SCHEMA: {e}") + self.a2ui_schema_object = None + # --- END MODIFICATION --- + + def get_processing_message(self) -> str: + return "Finding restaurants that match your criteria..." + + def _build_agent(self, use_ui: bool) -> LlmAgent: + """Builds the LLM agent for the restaurant agent.""" + LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash") + + if use_ui: + # Construct the full prompt with UI instructions, examples, and schema + instruction = AGENT_INSTRUCTION + get_ui_prompt( + self.base_url, RESTAURANT_UI_EXAMPLES + ) + else: + instruction = get_text_prompt() + + return LlmAgent( + model=LiteLlm(model=LITELLM_MODEL), + name="restaurant_agent", + description="An agent that finds restaurants and helps book tables.", + instruction=instruction, + tools=[get_restaurants], + ) + + async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: + session_state = {"base_url": self.base_url} + + session = await self._runner.session_service.get_session( + app_name=self._agent.name, + user_id=self._user_id, + session_id=session_id, + ) + if session is None: + session = await self._runner.session_service.create_session( + app_name=self._agent.name, + user_id=self._user_id, + state=session_state, + session_id=session_id, + ) + elif "base_url" not in session.state: + session.state["base_url"] = self.base_url + + # --- Begin: UI Validation and Retry Logic --- + max_retries = 1 # Total 2 attempts + attempt = 0 + current_query_text = query + + # Ensure schema was loaded + if self.use_ui and self.a2ui_schema_object is None: + logger.error( + "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " + "Cannot perform UI validation. ---" + ) + yield { + "is_task_complete": True, + "content": ( + "I'm sorry, I'm facing an internal configuration error with my UI" + " components. Please contact support." + ), + } + return + + while attempt <= max_retries: + attempt += 1 + logger.info( + f"--- RestaurantAgent.stream: Attempt {attempt}/{max_retries + 1} " + f"for session {session_id} ---" + ) + + current_message = types.Content( + role="user", parts=[types.Part.from_text(text=current_query_text)] + ) + final_response_content = None + + async for event in self._runner.run_async( + user_id=self._user_id, + session_id=session.id, + new_message=current_message, + ): + logger.info(f"Event from runner: {event}") + if event.is_final_response(): + if event.content and event.content.parts and event.content.parts[0].text: + final_response_content = "\n".join( + [p.text for p in event.content.parts if p.text] ) + break # Got the final response, stop consuming events else: - instruction = get_text_prompt() - - return LlmAgent( - model=LiteLlm(model=LITELLM_MODEL), - name="restaurant_agent", - description="An agent that finds restaurants and helps book tables.", - instruction=instruction, - tools=[get_restaurants], + logger.info(f"Intermediate event: {event}") + # Yield intermediate updates on every attempt + yield { + "is_task_complete": False, + "updates": self.get_processing_message(), + } + + if final_response_content is None: + logger.warning( + "--- RestaurantAgent.stream: Received no final response content from" + f" runner (Attempt {attempt}). ---" ) - - async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: - session_state = {"base_url": self.base_url} - - session = await self._runner.session_service.get_session( - app_name=self._agent.name, - user_id=self._user_id, - session_id=session_id, + if attempt <= max_retries: + current_query_text = ( + "I received no response. Please try again." + f"Please retry the original request: '{query}'" + ) + continue # Go to next retry + else: + # Retries exhausted on no-response + final_response_content = ( + "I'm sorry, I encountered an error and couldn't process your request." + ) + # Fall through to send this as a text-only error + + is_valid = False + error_message = "" + + if self.use_ui: + logger.info( + "--- RestaurantAgent.stream: Validating UI response (Attempt" + f" {attempt})... ---" ) - if session is None: - session = await self._runner.session_service.create_session( - app_name=self._agent.name, - user_id=self._user_id, - state=session_state, - session_id=session_id, - ) - elif "base_url" not in session.state: - session.state["base_url"] = self.base_url - - # --- Begin: UI Validation and Retry Logic --- - max_retries = 1 # Total 2 attempts - attempt = 0 - current_query_text = query - - # Ensure schema was loaded - if self.use_ui and self.a2ui_schema_object is None: - logger.error( - "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " - "Cannot perform UI validation. ---" - ) - yield { - "is_task_complete": True, - "content": ( - "I'm sorry, I'm facing an internal configuration error with my UI components. " - "Please contact support." - ), - } - return - - while attempt <= max_retries: - attempt += 1 - logger.info( - f"--- RestaurantAgent.stream: Attempt {attempt}/{max_retries + 1} " - f"for session {session_id} ---" - ) - - current_message = types.Content( - role="user", parts=[types.Part.from_text(text=current_query_text)] - ) - final_response_content = None - - async for event in self._runner.run_async( - user_id=self._user_id, - session_id=session.id, - new_message=current_message, - ): - logger.info(f"Event from runner: {event}") - if event.is_final_response(): - if ( - event.content - and event.content.parts - and event.content.parts[0].text - ): - final_response_content = "\n".join( - [p.text for p in event.content.parts if p.text] - ) - break # Got the final response, stop consuming events - else: - logger.info(f"Intermediate event: {event}") - # Yield intermediate updates on every attempt - yield { - "is_task_complete": False, - "updates": self.get_processing_message(), - } - - if final_response_content is None: - logger.warning( - f"--- RestaurantAgent.stream: Received no final response content from runner " - f"(Attempt {attempt}). ---" - ) - if attempt <= max_retries: - current_query_text = ( - "I received no response. Please try again." - f"Please retry the original request: '{query}'" - ) - continue # Go to next retry - else: - # Retries exhausted on no-response - final_response_content = "I'm sorry, I encountered an error and couldn't process your request." - # Fall through to send this as a text-only error - - is_valid = False - error_message = "" - - if self.use_ui: - logger.info( - f"--- RestaurantAgent.stream: Validating UI response (Attempt {attempt})... ---" - ) - try: - if "---a2ui_JSON---" not in final_response_content: - raise ValueError("Delimiter '---a2ui_JSON---' not found.") - - text_part, json_string = final_response_content.split( - "---a2ui_JSON---", 1 - ) - - if not json_string.strip(): - raise ValueError("JSON part is empty.") - - json_string_cleaned = ( - json_string.strip().lstrip("```json").rstrip("```").strip() - ) - - if not json_string_cleaned: - raise ValueError("Cleaned JSON string is empty.") - - # Autofix: Remove trailing commas from arrays and objects - json_string_cleaned = re.sub(r",\s*([\]}])", r"\1", json_string_cleaned) - - # --- New Validation Steps --- - # 1. Check if it's parsable JSON - parsed_json_data = json.loads(json_string_cleaned) - - # 2. Check if it validates against the A2UI_SCHEMA - # This will raise jsonschema.exceptions.ValidationError if it fails - logger.info( - "--- RestaurantAgent.stream: Validating against A2UI_SCHEMA... ---" - ) - jsonschema.validate( - instance=parsed_json_data, schema=self.a2ui_schema_object - ) - # --- End New Validation Steps --- - - logger.info( - f"--- RestaurantAgent.stream: UI JSON successfully parsed AND validated against schema. " - f"Validation OK (Attempt {attempt}). ---" - ) - is_valid = True - - except ( - ValueError, - json.JSONDecodeError, - jsonschema.exceptions.ValidationError, - ) as e: - logger.warning( - f"--- RestaurantAgent.stream: A2UI validation failed: {e} (Attempt {attempt}) ---" - ) - logger.warning( - f"--- Failed response content: {final_response_content[:500]}... ---" - ) - error_message = f"Validation failed: {e}." - - else: # Not using UI, so text is always "valid" - is_valid = True - - if is_valid: - logger.info( - f"--- RestaurantAgent.stream: Response is valid. Sending final response (Attempt {attempt}). ---" - ) - logger.info(f"Final response: {final_response_content}") - yield { - "is_task_complete": True, - "content": final_response_content, - } - return # We're done, exit the generator - - # --- If we're here, it means validation failed --- - - if attempt <= max_retries: - logger.warning( - f"--- RestaurantAgent.stream: Retrying... ({attempt}/{max_retries + 1}) ---" - ) - # Prepare the query for the retry - current_query_text = ( - f"Your previous response was invalid. {error_message} " - "You MUST generate a valid response that strictly follows the A2UI JSON SCHEMA. " - "The response MUST be a JSON list of A2UI messages. " - "Ensure the response is split by '---a2ui_JSON---' and the JSON part is well-formed. " - f"Please retry the original request: '{query}'" - ) - # Loop continues... - - # --- If we're here, it means we've exhausted retries --- - logger.error( - "--- RestaurantAgent.stream: Max retries exhausted. Sending text-only error. ---" + try: + if "---a2ui_JSON---" not in final_response_content: + raise ValueError("Delimiter '---a2ui_JSON---' not found.") + + text_part, json_string = final_response_content.split("---a2ui_JSON---", 1) + + if not json_string.strip(): + raise ValueError("JSON part is empty.") + + json_string_cleaned = ( + json_string.strip().lstrip("```json").rstrip("```").strip() + ) + + if not json_string_cleaned: + raise ValueError("Cleaned JSON string is empty.") + + # --- New Validation Steps --- + # 1. Check if it's parsable JSON + parsed_json_data = json.loads(json_string_cleaned) + + # 2. Check if it validates against the A2UI_SCHEMA + # This will raise jsonschema.exceptions.ValidationError if it fails + logger.info( + "--- RestaurantAgent.stream: Validating against A2UI_SCHEMA... ---" + ) + jsonschema.validate(instance=parsed_json_data, schema=self.a2ui_schema_object) + # --- End New Validation Steps --- + + logger.info( + "--- RestaurantAgent.stream: UI JSON successfully parsed AND validated" + f" against schema. Validation OK (Attempt {attempt}). ---" + ) + is_valid = True + + except ( + ValueError, + json.JSONDecodeError, + jsonschema.exceptions.ValidationError, + ) as e: + logger.warning( + f"--- RestaurantAgent.stream: A2UI validation failed: {e} (Attempt" + f" {attempt}) ---" + ) + logger.warning( + f"--- Failed response content: {final_response_content[:500]}... ---" + ) + error_message = f"Validation failed: {e}." + + else: # Not using UI, so text is always "valid" + is_valid = True + + if is_valid: + logger.info( + "--- RestaurantAgent.stream: Response is valid. Sending final response" + f" (Attempt {attempt}). ---" ) + logger.info(f"Final response: {final_response_content}") yield { "is_task_complete": True, - "content": ( - "I'm sorry, I'm having trouble generating the interface for that request right now. " - "Please try again in a moment." - ), + "content": final_response_content, } - # --- End: UI Validation and Retry Logic --- + return # We're done, exit the generator + + # --- If we're here, it means validation failed --- + + if attempt <= max_retries: + logger.warning( + f"--- RestaurantAgent.stream: Retrying... ({attempt}/{max_retries + 1}) ---" + ) + # Prepare the query for the retry + current_query_text = ( + f"Your previous response was invalid. {error_message} You MUST generate a" + " valid response that strictly follows the A2UI JSON SCHEMA. The response" + " MUST be a JSON list of A2UI messages. Ensure the response is split by" + " '---a2ui_JSON---' and the JSON part is well-formed. Please retry the" + f" original request: '{query}'" + ) + # Loop continues... + + # --- If we're here, it means we've exhausted retries --- + logger.error( + "--- RestaurantAgent.stream: Max retries exhausted. Sending text-only" + " error. ---" + ) + yield { + "is_task_complete": True, + "content": ( + "I'm sorry, I'm having trouble generating the interface for that request" + " right now. Please try again in a moment." + ), + } + # --- End: UI Validation and Retry Logic --- From a70cb0371653c1c9def2cb8ecac9de72c9f4ce55 Mon Sep 17 00:00:00 2001 From: Selam Waktola Date: Fri, 13 Feb 2026 12:31:37 -0800 Subject: [PATCH 3/3] back to original --- samples/agent/adk/restaurant_finder/agent.py | 474 ++++++++++--------- 1 file changed, 239 insertions(+), 235 deletions(-) diff --git a/samples/agent/adk/restaurant_finder/agent.py b/samples/agent/adk/restaurant_finder/agent.py index e4f9e3fef..5283cd882 100644 --- a/samples/agent/adk/restaurant_finder/agent.py +++ b/samples/agent/adk/restaurant_finder/agent.py @@ -54,246 +54,250 @@ class RestaurantAgent: - """An agent that finds restaurants based on user criteria.""" - - SUPPORTED_CONTENT_TYPES = ["text", "text/plain"] - - def __init__(self, base_url: str, use_ui: bool = False): - self.base_url = base_url - self.use_ui = use_ui - self._agent = self._build_agent(use_ui) - self._user_id = "remote_agent" - self._runner = Runner( - app_name=self._agent.name, - agent=self._agent, - artifact_service=InMemoryArtifactService(), - session_service=InMemorySessionService(), - memory_service=InMemoryMemoryService(), - ) - - # --- MODIFICATION: Wrap the schema --- - # Load the A2UI_SCHEMA string into a Python object for validation - try: - # First, load the schema for a *single message* - single_message_schema = json.loads(A2UI_SCHEMA) - - # The prompt instructs the LLM to return a *list* of messages. - # Therefore, our validation schema must be an *array* of the single message schema. - self.a2ui_schema_object = {"type": "array", "items": single_message_schema} - logger.info("A2UI_SCHEMA successfully loaded and wrapped in an array validator.") - except json.JSONDecodeError as e: - logger.error(f"CRITICAL: Failed to parse A2UI_SCHEMA: {e}") - self.a2ui_schema_object = None - # --- END MODIFICATION --- - - def get_processing_message(self) -> str: - return "Finding restaurants that match your criteria..." - - def _build_agent(self, use_ui: bool) -> LlmAgent: - """Builds the LLM agent for the restaurant agent.""" - LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash") - - if use_ui: - # Construct the full prompt with UI instructions, examples, and schema - instruction = AGENT_INSTRUCTION + get_ui_prompt( - self.base_url, RESTAURANT_UI_EXAMPLES - ) - else: - instruction = get_text_prompt() - - return LlmAgent( - model=LiteLlm(model=LITELLM_MODEL), - name="restaurant_agent", - description="An agent that finds restaurants and helps book tables.", - instruction=instruction, - tools=[get_restaurants], - ) - - async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: - session_state = {"base_url": self.base_url} - - session = await self._runner.session_service.get_session( - app_name=self._agent.name, - user_id=self._user_id, - session_id=session_id, - ) - if session is None: - session = await self._runner.session_service.create_session( - app_name=self._agent.name, - user_id=self._user_id, - state=session_state, - session_id=session_id, - ) - elif "base_url" not in session.state: - session.state["base_url"] = self.base_url - - # --- Begin: UI Validation and Retry Logic --- - max_retries = 1 # Total 2 attempts - attempt = 0 - current_query_text = query - - # Ensure schema was loaded - if self.use_ui and self.a2ui_schema_object is None: - logger.error( - "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " - "Cannot perform UI validation. ---" - ) - yield { - "is_task_complete": True, - "content": ( - "I'm sorry, I'm facing an internal configuration error with my UI" - " components. Please contact support." - ), - } - return - - while attempt <= max_retries: - attempt += 1 - logger.info( - f"--- RestaurantAgent.stream: Attempt {attempt}/{max_retries + 1} " - f"for session {session_id} ---" - ) - - current_message = types.Content( - role="user", parts=[types.Part.from_text(text=current_query_text)] - ) - final_response_content = None - - async for event in self._runner.run_async( - user_id=self._user_id, - session_id=session.id, - new_message=current_message, - ): - logger.info(f"Event from runner: {event}") - if event.is_final_response(): - if event.content and event.content.parts and event.content.parts[0].text: - final_response_content = "\n".join( - [p.text for p in event.content.parts if p.text] + """An agent that finds restaurants based on user criteria.""" + + SUPPORTED_CONTENT_TYPES = ["text", "text/plain"] + + def __init__(self, base_url: str, use_ui: bool = False): + self.base_url = base_url + self.use_ui = use_ui + self._agent = self._build_agent(use_ui) + self._user_id = "remote_agent" + self._runner = Runner( + app_name=self._agent.name, + agent=self._agent, + artifact_service=InMemoryArtifactService(), + session_service=InMemorySessionService(), + memory_service=InMemoryMemoryService(), + ) + + # --- MODIFICATION: Wrap the schema --- + # Load the A2UI_SCHEMA string into a Python object for validation + try: + # First, load the schema for a *single message* + single_message_schema = json.loads(A2UI_SCHEMA) + + # The prompt instructs the LLM to return a *list* of messages. + # Therefore, our validation schema must be an *array* of the single message schema. + self.a2ui_schema_object = {"type": "array", "items": single_message_schema} + logger.info( + "A2UI_SCHEMA successfully loaded and wrapped in an array validator." + ) + except json.JSONDecodeError as e: + logger.error(f"CRITICAL: Failed to parse A2UI_SCHEMA: {e}") + self.a2ui_schema_object = None + # --- END MODIFICATION --- + + def get_processing_message(self) -> str: + return "Finding restaurants that match your criteria..." + + def _build_agent(self, use_ui: bool) -> LlmAgent: + """Builds the LLM agent for the restaurant agent.""" + LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash") + + if use_ui: + # Construct the full prompt with UI instructions, examples, and schema + instruction = AGENT_INSTRUCTION + get_ui_prompt( + self.base_url, RESTAURANT_UI_EXAMPLES ) - break # Got the final response, stop consuming events else: - logger.info(f"Intermediate event: {event}") - # Yield intermediate updates on every attempt - yield { - "is_task_complete": False, - "updates": self.get_processing_message(), - } - - if final_response_content is None: - logger.warning( - "--- RestaurantAgent.stream: Received no final response content from" - f" runner (Attempt {attempt}). ---" + instruction = get_text_prompt() + + return LlmAgent( + model=LiteLlm(model=LITELLM_MODEL), + name="restaurant_agent", + description="An agent that finds restaurants and helps book tables.", + instruction=instruction, + tools=[get_restaurants], ) - if attempt <= max_retries: - current_query_text = ( - "I received no response. Please try again." - f"Please retry the original request: '{query}'" - ) - continue # Go to next retry - else: - # Retries exhausted on no-response - final_response_content = ( - "I'm sorry, I encountered an error and couldn't process your request." - ) - # Fall through to send this as a text-only error - - is_valid = False - error_message = "" - - if self.use_ui: - logger.info( - "--- RestaurantAgent.stream: Validating UI response (Attempt" - f" {attempt})... ---" + + async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: + session_state = {"base_url": self.base_url} + + session = await self._runner.session_service.get_session( + app_name=self._agent.name, + user_id=self._user_id, + session_id=session_id, ) - try: - if "---a2ui_JSON---" not in final_response_content: - raise ValueError("Delimiter '---a2ui_JSON---' not found.") - - text_part, json_string = final_response_content.split("---a2ui_JSON---", 1) - - if not json_string.strip(): - raise ValueError("JSON part is empty.") - - json_string_cleaned = ( - json_string.strip().lstrip("```json").rstrip("```").strip() - ) - - if not json_string_cleaned: - raise ValueError("Cleaned JSON string is empty.") - - # --- New Validation Steps --- - # 1. Check if it's parsable JSON - parsed_json_data = json.loads(json_string_cleaned) - - # 2. Check if it validates against the A2UI_SCHEMA - # This will raise jsonschema.exceptions.ValidationError if it fails - logger.info( - "--- RestaurantAgent.stream: Validating against A2UI_SCHEMA... ---" - ) - jsonschema.validate(instance=parsed_json_data, schema=self.a2ui_schema_object) - # --- End New Validation Steps --- - - logger.info( - "--- RestaurantAgent.stream: UI JSON successfully parsed AND validated" - f" against schema. Validation OK (Attempt {attempt}). ---" - ) - is_valid = True - - except ( - ValueError, - json.JSONDecodeError, - jsonschema.exceptions.ValidationError, - ) as e: - logger.warning( - f"--- RestaurantAgent.stream: A2UI validation failed: {e} (Attempt" - f" {attempt}) ---" - ) - logger.warning( - f"--- Failed response content: {final_response_content[:500]}... ---" - ) - error_message = f"Validation failed: {e}." - - else: # Not using UI, so text is always "valid" - is_valid = True - - if is_valid: - logger.info( - "--- RestaurantAgent.stream: Response is valid. Sending final response" - f" (Attempt {attempt}). ---" + if session is None: + session = await self._runner.session_service.create_session( + app_name=self._agent.name, + user_id=self._user_id, + state=session_state, + session_id=session_id, + ) + elif "base_url" not in session.state: + session.state["base_url"] = self.base_url + + # --- Begin: UI Validation and Retry Logic --- + max_retries = 1 # Total 2 attempts + attempt = 0 + current_query_text = query + + # Ensure schema was loaded + if self.use_ui and self.a2ui_schema_object is None: + logger.error( + "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " + "Cannot perform UI validation. ---" + ) + yield { + "is_task_complete": True, + "content": ( + "I'm sorry, I'm facing an internal configuration error with my UI components. " + "Please contact support." + ), + } + return + + while attempt <= max_retries: + attempt += 1 + logger.info( + f"--- RestaurantAgent.stream: Attempt {attempt}/{max_retries + 1} " + f"for session {session_id} ---" + ) + + current_message = types.Content( + role="user", parts=[types.Part.from_text(text=current_query_text)] + ) + final_response_content = None + + async for event in self._runner.run_async( + user_id=self._user_id, + session_id=session.id, + new_message=current_message, + ): + logger.info(f"Event from runner: {event}") + if event.is_final_response(): + if ( + event.content + and event.content.parts + and event.content.parts[0].text + ): + final_response_content = "\n".join( + [p.text for p in event.content.parts if p.text] + ) + break # Got the final response, stop consuming events + else: + logger.info(f"Intermediate event: {event}") + # Yield intermediate updates on every attempt + yield { + "is_task_complete": False, + "updates": self.get_processing_message(), + } + + if final_response_content is None: + logger.warning( + f"--- RestaurantAgent.stream: Received no final response content from runner " + f"(Attempt {attempt}). ---" + ) + if attempt <= max_retries: + current_query_text = ( + "I received no response. Please try again." + f"Please retry the original request: '{query}'" + ) + continue # Go to next retry + else: + # Retries exhausted on no-response + final_response_content = "I'm sorry, I encountered an error and couldn't process your request." + # Fall through to send this as a text-only error + + is_valid = False + error_message = "" + + if self.use_ui: + logger.info( + f"--- RestaurantAgent.stream: Validating UI response (Attempt {attempt})... ---" + ) + try: + if "---a2ui_JSON---" not in final_response_content: + raise ValueError("Delimiter '---a2ui_JSON---' not found.") + + text_part, json_string = final_response_content.split( + "---a2ui_JSON---", 1 + ) + + if not json_string.strip(): + raise ValueError("JSON part is empty.") + + json_string_cleaned = ( + json_string.strip().lstrip("```json").rstrip("```").strip() + ) + + if not json_string_cleaned: + raise ValueError("Cleaned JSON string is empty.") + + # --- New Validation Steps --- + # 1. Check if it's parsable JSON + parsed_json_data = json.loads(json_string_cleaned) + + # 2. Check if it validates against the A2UI_SCHEMA + # This will raise jsonschema.exceptions.ValidationError if it fails + logger.info( + "--- RestaurantAgent.stream: Validating against A2UI_SCHEMA... ---" + ) + jsonschema.validate( + instance=parsed_json_data, schema=self.a2ui_schema_object + ) + # --- End New Validation Steps --- + + logger.info( + f"--- RestaurantAgent.stream: UI JSON successfully parsed AND validated against schema. " + f"Validation OK (Attempt {attempt}). ---" + ) + is_valid = True + + except ( + ValueError, + json.JSONDecodeError, + jsonschema.exceptions.ValidationError, + ) as e: + logger.warning( + f"--- RestaurantAgent.stream: A2UI validation failed: {e} (Attempt {attempt}) ---" + ) + logger.warning( + f"--- Failed response content: {final_response_content[:500]}... ---" + ) + error_message = f"Validation failed: {e}." + + else: # Not using UI, so text is always "valid" + is_valid = True + + if is_valid: + logger.info( + f"--- RestaurantAgent.stream: Response is valid. Sending final response (Attempt {attempt}). ---" + ) + logger.info(f"Final response: {final_response_content}") + yield { + "is_task_complete": True, + "content": final_response_content, + } + return # We're done, exit the generator + + # --- If we're here, it means validation failed --- + + if attempt <= max_retries: + logger.warning( + f"--- RestaurantAgent.stream: Retrying... ({attempt}/{max_retries + 1}) ---" + ) + # Prepare the query for the retry + current_query_text = ( + f"Your previous response was invalid. {error_message} " + "You MUST generate a valid response that strictly follows the A2UI JSON SCHEMA. " + "The response MUST be a JSON list of A2UI messages. " + "Ensure the response is split by '---a2ui_JSON---' and the JSON part is well-formed. " + f"Please retry the original request: '{query}'" + ) + # Loop continues... + + # --- If we're here, it means we've exhausted retries --- + logger.error( + "--- RestaurantAgent.stream: Max retries exhausted. Sending text-only error. ---" ) - logger.info(f"Final response: {final_response_content}") yield { "is_task_complete": True, - "content": final_response_content, + "content": ( + "I'm sorry, I'm having trouble generating the interface for that request right now. " + "Please try again in a moment." + ), } - return # We're done, exit the generator - - # --- If we're here, it means validation failed --- - - if attempt <= max_retries: - logger.warning( - f"--- RestaurantAgent.stream: Retrying... ({attempt}/{max_retries + 1}) ---" - ) - # Prepare the query for the retry - current_query_text = ( - f"Your previous response was invalid. {error_message} You MUST generate a" - " valid response that strictly follows the A2UI JSON SCHEMA. The response" - " MUST be a JSON list of A2UI messages. Ensure the response is split by" - " '---a2ui_JSON---' and the JSON part is well-formed. Please retry the" - f" original request: '{query}'" - ) - # Loop continues... - - # --- If we're here, it means we've exhausted retries --- - logger.error( - "--- RestaurantAgent.stream: Max retries exhausted. Sending text-only" - " error. ---" - ) - yield { - "is_task_complete": True, - "content": ( - "I'm sorry, I'm having trouble generating the interface for that request" - " right now. Please try again in a moment." - ), - } - # --- End: UI Validation and Retry Logic --- + # --- End: UI Validation and Retry Logic ---