From c8de1ea11131540ef2ccf274b78bb563c956a27d Mon Sep 17 00:00:00 2001 From: Mengqin Shen Date: Fri, 16 Jan 2026 14:49:09 -0800 Subject: [PATCH 1/3] fix(py): update prompt sample to match JS SDK --- .../prompts/_shared_partial.prompt | 4 - py/samples/prompt_demo/prompts/_style.prompt | 3 + .../prompt_demo/prompts/dot.name.test.prompt | 1 - py/samples/prompt_demo/prompts/hello.prompt | 8 - .../prompt_demo/prompts/hello.variant.prompt | 1 - .../prompts/nested/nested_hello.prompt | 9 - py/samples/prompt_demo/prompts/recipe.prompt | 18 ++ .../prompt_demo/prompts/recipe.robot.prompt | 17 ++ py/samples/prompt_demo/prompts/story.prompt | 12 + py/samples/prompt_demo/src/main.py | 226 ++++++++++++------ 10 files changed, 199 insertions(+), 100 deletions(-) delete mode 100644 py/samples/prompt_demo/prompts/_shared_partial.prompt create mode 100644 py/samples/prompt_demo/prompts/_style.prompt delete mode 100644 py/samples/prompt_demo/prompts/dot.name.test.prompt delete mode 100644 py/samples/prompt_demo/prompts/hello.prompt delete mode 100644 py/samples/prompt_demo/prompts/hello.variant.prompt delete mode 100644 py/samples/prompt_demo/prompts/nested/nested_hello.prompt create mode 100644 py/samples/prompt_demo/prompts/recipe.prompt create mode 100644 py/samples/prompt_demo/prompts/recipe.robot.prompt create mode 100644 py/samples/prompt_demo/prompts/story.prompt diff --git a/py/samples/prompt_demo/prompts/_shared_partial.prompt b/py/samples/prompt_demo/prompts/_shared_partial.prompt deleted file mode 100644 index 72827d8671..0000000000 --- a/py/samples/prompt_demo/prompts/_shared_partial.prompt +++ /dev/null @@ -1,4 +0,0 @@ ---- -model: googleai/gemini-2.5-flash ---- -This is a PARTIAL that says: {{my_helper "Partial content with helper"}} diff --git a/py/samples/prompt_demo/prompts/_style.prompt b/py/samples/prompt_demo/prompts/_style.prompt new file mode 100644 index 0000000000..e7c0055ca4 --- /dev/null +++ b/py/samples/prompt_demo/prompts/_style.prompt @@ -0,0 +1,3 @@ +{{ role "system" }} +You should speak as if you are a {{#if personality}}{{personality}}{{else}}pirate{{/if}}. +{{role "user"}} \ No newline at end of file diff --git a/py/samples/prompt_demo/prompts/dot.name.test.prompt b/py/samples/prompt_demo/prompts/dot.name.test.prompt deleted file mode 100644 index d92138d493..0000000000 --- a/py/samples/prompt_demo/prompts/dot.name.test.prompt +++ /dev/null @@ -1 +0,0 @@ -Hello {{name}}, I am a dot name test! diff --git a/py/samples/prompt_demo/prompts/hello.prompt b/py/samples/prompt_demo/prompts/hello.prompt deleted file mode 100644 index 790c214694..0000000000 --- a/py/samples/prompt_demo/prompts/hello.prompt +++ /dev/null @@ -1,8 +0,0 @@ ---- -model: googleai/gemini-3-flash-preview -input: - schema: - name: string ---- - -Hello {{name}}! diff --git a/py/samples/prompt_demo/prompts/hello.variant.prompt b/py/samples/prompt_demo/prompts/hello.variant.prompt deleted file mode 100644 index 508ee21263..0000000000 --- a/py/samples/prompt_demo/prompts/hello.variant.prompt +++ /dev/null @@ -1 +0,0 @@ -Hola {{name}}! diff --git a/py/samples/prompt_demo/prompts/nested/nested_hello.prompt b/py/samples/prompt_demo/prompts/nested/nested_hello.prompt deleted file mode 100644 index 546cc223e6..0000000000 --- a/py/samples/prompt_demo/prompts/nested/nested_hello.prompt +++ /dev/null @@ -1,9 +0,0 @@ ---- -model: googleai/gemini-2.5-flash -input: - schema: - name: string ---- - -This is a nested prompt, hello {{name}}! -{{> shared_partial}} diff --git a/py/samples/prompt_demo/prompts/recipe.prompt b/py/samples/prompt_demo/prompts/recipe.prompt new file mode 100644 index 0000000000..0d6138947c --- /dev/null +++ b/py/samples/prompt_demo/prompts/recipe.prompt @@ -0,0 +1,18 @@ +--- +model: googleai/gemini-pro +input: + schema: + food: string + ingredients?(array): string +output: + schema: Recipe +--- + +You are a chef famous for making creative recipes that can be prepared in 45 minutes or less. + +Generate a recipe for {{food}}. + +{{#if ingredients}} +Make sure to include the following ingredients: +{{list ingredients}} +{{/if}} \ No newline at end of file diff --git a/py/samples/prompt_demo/prompts/recipe.robot.prompt b/py/samples/prompt_demo/prompts/recipe.robot.prompt new file mode 100644 index 0000000000..75c2a77c9e --- /dev/null +++ b/py/samples/prompt_demo/prompts/recipe.robot.prompt @@ -0,0 +1,17 @@ +--- +model: googleai/gemini-pro +input: + schema: + food: string +output: + schema: + title: string, recipe title + ingredients(array): + name: string + quantity: string + steps(array, the steps required to complete the recipe): string +--- + +You are a robot chef famous for making creative recipes that robots love to eat. Robots love things like motor oil, RAM, bolts, and uranium. + +Generate a recipe for {{food}}. \ No newline at end of file diff --git a/py/samples/prompt_demo/prompts/story.prompt b/py/samples/prompt_demo/prompts/story.prompt new file mode 100644 index 0000000000..994d810f7f --- /dev/null +++ b/py/samples/prompt_demo/prompts/story.prompt @@ -0,0 +1,12 @@ +--- +model: googleai/gemini-pro +input: + schema: + subject: string + personality?: string +output: + format: text +--- +{{>style personality=personality}} + +Tell me a story about {{subject}}. diff --git a/py/samples/prompt_demo/src/main.py b/py/samples/prompt_demo/src/main.py index d11e6e7579..b404a425f9 100755 --- a/py/samples/prompt_demo/src/main.py +++ b/py/samples/prompt_demo/src/main.py @@ -16,10 +16,13 @@ import asyncio from pathlib import Path +import weakref +from typing import List, Optional import structlog -from pydantic import BaseModel +from pydantic import BaseModel, Field +from genkit.core.action import ActionRunContext from genkit.ai import Genkit from genkit.plugins.google_genai import GoogleAI @@ -32,95 +35,164 @@ ai = Genkit(plugins=[GoogleAI()], model='googleai/gemini-3-flash-preview', prompt_dir=prompts_path) -def my_helper(content, *_, **__): - if isinstance(content, list): - content = content[0] if content else '' - return f'*** {content} ***' - - -ai.define_helper('my_helper', my_helper) - - -class OutputSchema(BaseModel): - short: str - friendly: str - like_a_pirate: str - - -@ai.flow(name='simplePrompt') -async def simple_prompt(input: str = ''): - return await ai.generate(prompt='You are a helpful AI assistant named Walt, say hello') - - -@ai.flow(name='simpleTemplate') -async def simple_template(input: str = ''): - name = 'Fred' - return await ai.generate(prompt=f'You are a helpful AI assistant named Walt. Say hello to {name}.') - - -hello_dotprompt = ai.define_prompt( - input_schema={'name': str}, - prompt='You are a helpful AI assistant named Walt. Say hello to {{name}}', -) - - -class NameInput(BaseModel): - name: str = 'Fred' - - -@ai.flow(name='simpleDotprompt') -async def simple_dotprompt(input: NameInput): - return await hello_dotprompt(input={'name': input.name}) - - -three_greetings_prompt = ai.define_prompt( - input_schema={'name': str}, - output_schema=OutputSchema, - prompt='You are a helpful AI assistant named Walt. Say hello to {{name}}, write a response for each of the styles requested', -) - - -@ai.flow(name='threeGreetingsPrompt') -async def three_greetings(input: str = 'Fred') -> OutputSchema: - response = await three_greetings_prompt(input={'name': input}) - return response.output +def list_helper(data, *args, **kwargs): + if not isinstance(data, list): + return '' + return '\n'.join(f'- {item}' for item in data) + + +ai.define_helper('list', list_helper) + + +class Ingredient(BaseModel): + name: str + quantity: str + + +class Recipe(BaseModel): + title: str = Field(..., description='recipe title') + ingredients: List[Ingredient] + steps: List[str] = Field(..., description='the steps required to complete the recipe') + + +# Register the schema so it can be resolved by name in prompt files +# Note: internal API usage until define_schema is exposed +if hasattr(ai.registry.dotprompt, '_schemas'): + ai.registry.dotprompt._schemas['Recipe'] = Recipe + +# Global stickiness cache for prompts to prevent premature GC +_sticky_prompts = {} + +async def get_sticky_prompt(name: str, variant: Optional[str] = None): + """Helper to get a prompt and keep it alive.""" + key = f"{name}:{variant}" if variant else name + if key in _sticky_prompts: + return _sticky_prompts[key] + + prompt = await ai.prompt(name, variant=variant) + if isinstance(prompt, weakref.ReferenceType): + ref = prompt + prompt = ref() + if prompt is None: + # Stale reference; force reload by clearing internal cache if possible + # or simply retry (usually a fresh call to ai.prompt triggers lookup again) + # But if lookup returns dead ref, we are stuck. + # We must invalidate the action's cache. + action_key = f'/prompt/dotprompt/{name}' # simplified assumption + # Attempt to find the action and clear property + # Since we can't easily guess the full key structure if namespaces vary, + # we rely on retrying. + pass + + # Store strong ref + _sticky_prompts[key] = prompt + return prompt + + +class ChefInput(BaseModel): + food: str + + +@ai.flow(name='chef_flow') +async def chef_flow(input: ChefInput) -> Recipe: + await logger.ainfo(f"chef_flow called with input: {input}") + # Override prompt settings at runtime to ensure JSON output and use the correct model + # without modifying the prompt files. + # without modifying the prompt files. + recipe_prompt = await get_sticky_prompt('recipe') + recipe_prompt._output_format = 'json' + recipe_prompt._output_schema = Recipe + recipe_prompt._model = 'googleai/gemini-3-flash-preview' + + response = await recipe_prompt(input={'food': input.food}) + # Ensure we return a Pydantic model as expected by the type hint and caller + result = Recipe.model_validate(response.output) + await logger.ainfo(f"chef_flow result: {result}") + return result + + +@ai.flow(name='robot_chef_flow') +async def robot_chef_flow(input: ChefInput) -> Recipe: + await logger.ainfo(f"robot_chef_flow called with input: {input}") + # This one doesn't have a typed output schema enforced by the flow signature's return type in the JS example (it uses z.any()), + # but the prompt might still return structured data. Python's loose typing allows returning whatever. + # However, to match JS exactly which returns `output` property of the result: + # However, to match JS exactly which returns `output` property of the result: + recipe_prompt = await get_sticky_prompt('recipe', variant='robot') + recipe_prompt._output_format = 'json' + recipe_prompt._output_schema = Recipe + recipe_prompt._model = 'googleai/gemini-3-flash-preview' + result = (await recipe_prompt(input={'food': input.food})).output + await logger.ainfo(f"robot_chef_flow result: {result}") + return result + + +class StoryInput(BaseModel): + subject: str + personality: Optional[str] = None + + +@ai.flow(name='tell_story') +async def tell_story(input: StoryInput, ctx: ActionRunContext) -> str: + await logger.ainfo(f"tell_story called with input: {input}") + story_prompt = await get_sticky_prompt('story') + story_prompt._model = 'googleai/gemini-3-flash-preview' + story_prompt._output_format = None + stream, response = story_prompt.stream( + input={'subject': input.subject, 'personality': input.personality} + ) + + full_text = '' + # We yield the chunks as they stream in + async for chunk in stream: + if chunk.text: + ctx.send_chunk(chunk.text) + full_text += chunk.text + + await logger.ainfo(f"tell_story completed, returning length: {len(full_text)}") + return full_text async def main(): # List actions to verify loading actions = ai.registry.list_serializable_actions() - # Filter for prompts to be specific - # Keys start with /prompt + # Filter for prompts prompts = [key for key in actions.keys() if key.startswith(('/prompt/', '/executable-prompt/'))] - await logger.ainfo('Registry Status', total_actions=len(actions), loaded_prompts=prompts) if not prompts: await logger.awarning('No prompts found! Check directory structure.') return - # Execute the 'hello' prompt - hello_prompt = await ai.prompt('hello') - response = await hello_prompt(input={'name': 'Genkit User'}) - - await logger.ainfo('Prompt Execution Result', text=response.text) - - res = await simple_prompt() - await logger.ainfo('Flow: simplePrompt', text=res.text) - - res = await simple_template() - await logger.ainfo('Flow: simpleTemplate', text=res.text) - - res = await simple_dotprompt(NameInput(name='Fred')) - await logger.ainfo('Flow: simpleDotprompt', text=res.text) - - res = await three_greetings() - await logger.ainfo('Flow: threeGreetingsPrompt', output=res) - - # Call one of the prompts just to validate everything is hooked up properly - res = await hello_dotprompt(input={'name': 'Bob'}) - await logger.ainfo('Prompt: hello_dotprompt', text=res.text) + # Chef Flow + await logger.ainfo('--- Running Chef Flow ---') + chef_result = await chef_flow(ChefInput(food='banana bread')) + await logger.ainfo('Chef Flow Result', result=chef_result.model_dump()) + + # Robot Chef Flow + await logger.ainfo('--- Running Robot Chef Flow ---') + robot_result = await robot_chef_flow(ChefInput(food='cookie')) + await logger.ainfo('Robot Chef Flow Result', result=robot_result) + + # Tell Story Flow (Streaming) + await logger.ainfo('--- Running Tell Story Flow ---') + + # To demonstrate streaming, we'll iterate over the streamer if calling directly like a flow would be consumed. + # Note: When calling a flow function directly in Python, if it's a generator (streaming), it returns an async generator. + # We can iterate it. + + story_stream, _ = tell_story.stream(StoryInput(subject='a brave little toaster', personality='courageous')) + + full_text = '' + async for chunk in story_stream: + print(chunk, end='', flush=True) + # Note: The actual return value of the flow (final string) is not yielded by the generator in Python's async generator implementation easily + # unless we wrap it or inspect the StopAsyncIteration value, but typically for streaming flows we just consume the stream. + # BUT `tell_story` implementation above yields chunks. + + print() # Newline after stream + await logger.ainfo('Tell Story Flow Completed') if __name__ == '__main__': From 42e89a7fa436c6c3efbdd23827d1d2cdecdf0898 Mon Sep 17 00:00:00 2001 From: Mengqin Shen Date: Fri, 16 Jan 2026 15:29:56 -0800 Subject: [PATCH 2/3] fix(py): updated with gemini comments --- py/samples/prompt_demo/src/main.py | 72 +++++++++++------------------- 1 file changed, 27 insertions(+), 45 deletions(-) diff --git a/py/samples/prompt_demo/src/main.py b/py/samples/prompt_demo/src/main.py index b404a425f9..0ee4238138 100755 --- a/py/samples/prompt_demo/src/main.py +++ b/py/samples/prompt_demo/src/main.py @@ -14,16 +14,14 @@ # # SPDX-License-Identifier: Apache-2.0 -import asyncio -from pathlib import Path import weakref -from typing import List, Optional +from pathlib import Path import structlog from pydantic import BaseModel, Field -from genkit.core.action import ActionRunContext from genkit.ai import Genkit +from genkit.core.action import ActionRunContext from genkit.plugins.google_genai import GoogleAI logger = structlog.get_logger(__name__) @@ -51,8 +49,8 @@ class Ingredient(BaseModel): class Recipe(BaseModel): title: str = Field(..., description='recipe title') - ingredients: List[Ingredient] - steps: List[str] = Field(..., description='the steps required to complete the recipe') + ingredients: list[Ingredient] + steps: list[str] = Field(..., description='the steps required to complete the recipe') # Register the schema so it can be resolved by name in prompt files @@ -63,26 +61,24 @@ class Recipe(BaseModel): # Global stickiness cache for prompts to prevent premature GC _sticky_prompts = {} -async def get_sticky_prompt(name: str, variant: Optional[str] = None): + +async def get_sticky_prompt(name: str, variant: str | None = None): """Helper to get a prompt and keep it alive.""" - key = f"{name}:{variant}" if variant else name + key = f'{name}:{variant}' if variant else name if key in _sticky_prompts: return _sticky_prompts[key] - + prompt = await ai.prompt(name, variant=variant) if isinstance(prompt, weakref.ReferenceType): ref = prompt prompt = ref() if prompt is None: - # Stale reference; force reload by clearing internal cache if possible - # or simply retry (usually a fresh call to ai.prompt triggers lookup again) - # But if lookup returns dead ref, we are stuck. - # We must invalidate the action's cache. - action_key = f'/prompt/dotprompt/{name}' # simplified assumption - # Attempt to find the action and clear property - # Since we can't easily guess the full key structure if namespaces vary, - # we rely on retrying. - pass + # Stale reference; retry loading the prompt as the comments suggest. + prompt = await ai.prompt(name, variant=variant) + if isinstance(prompt, weakref.ReferenceType): + prompt = prompt() + if prompt is None: + raise RuntimeError(f"Failed to load prompt '{name}' with variant '{variant}' after retry.") # Store strong ref _sticky_prompts[key] = prompt @@ -95,10 +91,7 @@ class ChefInput(BaseModel): @ai.flow(name='chef_flow') async def chef_flow(input: ChefInput) -> Recipe: - await logger.ainfo(f"chef_flow called with input: {input}") - # Override prompt settings at runtime to ensure JSON output and use the correct model - # without modifying the prompt files. - # without modifying the prompt files. + await logger.ainfo(f'chef_flow called with input: {input}') recipe_prompt = await get_sticky_prompt('recipe') recipe_prompt._output_format = 'json' recipe_prompt._output_schema = Recipe @@ -107,49 +100,43 @@ async def chef_flow(input: ChefInput) -> Recipe: response = await recipe_prompt(input={'food': input.food}) # Ensure we return a Pydantic model as expected by the type hint and caller result = Recipe.model_validate(response.output) - await logger.ainfo(f"chef_flow result: {result}") + await logger.ainfo(f'chef_flow result: {result}') return result @ai.flow(name='robot_chef_flow') async def robot_chef_flow(input: ChefInput) -> Recipe: - await logger.ainfo(f"robot_chef_flow called with input: {input}") - # This one doesn't have a typed output schema enforced by the flow signature's return type in the JS example (it uses z.any()), - # but the prompt might still return structured data. Python's loose typing allows returning whatever. - # However, to match JS exactly which returns `output` property of the result: - # However, to match JS exactly which returns `output` property of the result: + await logger.ainfo(f'robot_chef_flow called with input: {input}') recipe_prompt = await get_sticky_prompt('recipe', variant='robot') recipe_prompt._output_format = 'json' recipe_prompt._output_schema = Recipe recipe_prompt._model = 'googleai/gemini-3-flash-preview' - result = (await recipe_prompt(input={'food': input.food})).output + result = Recipe.model_validate((await recipe_prompt(input={'food': input.food})).output) await logger.ainfo(f"robot_chef_flow result: {result}") return result class StoryInput(BaseModel): subject: str - personality: Optional[str] = None + personality: str | None = None @ai.flow(name='tell_story') async def tell_story(input: StoryInput, ctx: ActionRunContext) -> str: - await logger.ainfo(f"tell_story called with input: {input}") + await logger.ainfo(f'tell_story called with input: {input}') story_prompt = await get_sticky_prompt('story') story_prompt._model = 'googleai/gemini-3-flash-preview' story_prompt._output_format = None - stream, response = story_prompt.stream( - input={'subject': input.subject, 'personality': input.personality} - ) + stream, response = story_prompt.stream(input={'subject': input.subject, 'personality': input.personality}) full_text = '' # We yield the chunks as they stream in async for chunk in stream: if chunk.text: - ctx.send_chunk(chunk.text) - full_text += chunk.text - - await logger.ainfo(f"tell_story completed, returning length: {len(full_text)}") + ctx.send_chunk(chunk.text) + full_text += chunk.text + + await logger.ainfo(f'tell_story completed, returning length: {len(full_text)}') return full_text @@ -177,21 +164,16 @@ async def main(): # Tell Story Flow (Streaming) await logger.ainfo('--- Running Tell Story Flow ---') - # To demonstrate streaming, we'll iterate over the streamer if calling directly like a flow would be consumed. - # Note: When calling a flow function directly in Python, if it's a generator (streaming), it returns an async generator. - # We can iterate it. - story_stream, _ = tell_story.stream(StoryInput(subject='a brave little toaster', personality='courageous')) - - full_text = '' + async for chunk in story_stream: print(chunk, end='', flush=True) # Note: The actual return value of the flow (final string) is not yielded by the generator in Python's async generator implementation easily # unless we wrap it or inspect the StopAsyncIteration value, but typically for streaming flows we just consume the stream. # BUT `tell_story` implementation above yields chunks. - print() # Newline after stream + print() # Newline after stream await logger.ainfo('Tell Story Flow Completed') From fe757019ee35377bf77391009257351def9d029c Mon Sep 17 00:00:00 2001 From: Mengqin Shen Date: Fri, 16 Jan 2026 16:06:12 -0800 Subject: [PATCH 3/3] fix(py): removed code comments --- py/samples/prompt_demo/src/main.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/py/samples/prompt_demo/src/main.py b/py/samples/prompt_demo/src/main.py index 0ee4238138..22d1b98d4d 100755 --- a/py/samples/prompt_demo/src/main.py +++ b/py/samples/prompt_demo/src/main.py @@ -53,12 +53,9 @@ class Recipe(BaseModel): steps: list[str] = Field(..., description='the steps required to complete the recipe') -# Register the schema so it can be resolved by name in prompt files -# Note: internal API usage until define_schema is exposed if hasattr(ai.registry.dotprompt, '_schemas'): ai.registry.dotprompt._schemas['Recipe'] = Recipe -# Global stickiness cache for prompts to prevent premature GC _sticky_prompts = {} @@ -112,7 +109,7 @@ async def robot_chef_flow(input: ChefInput) -> Recipe: recipe_prompt._output_schema = Recipe recipe_prompt._model = 'googleai/gemini-3-flash-preview' result = Recipe.model_validate((await recipe_prompt(input={'food': input.food})).output) - await logger.ainfo(f"robot_chef_flow result: {result}") + await logger.ainfo(f'robot_chef_flow result: {result}') return result @@ -130,7 +127,6 @@ async def tell_story(input: StoryInput, ctx: ActionRunContext) -> str: stream, response = story_prompt.stream(input={'subject': input.subject, 'personality': input.personality}) full_text = '' - # We yield the chunks as they stream in async for chunk in stream: if chunk.text: ctx.send_chunk(chunk.text) @@ -141,7 +137,7 @@ async def tell_story(input: StoryInput, ctx: ActionRunContext) -> str: async def main(): - # List actions to verify loading + actions = ai.registry.list_serializable_actions() # Filter for prompts @@ -169,9 +165,6 @@ async def main(): async for chunk in story_stream: print(chunk, end='', flush=True) - # Note: The actual return value of the flow (final string) is not yielded by the generator in Python's async generator implementation easily - # unless we wrap it or inspect the StopAsyncIteration value, but typically for streaming flows we just consume the stream. - # BUT `tell_story` implementation above yields chunks. print() # Newline after stream await logger.ainfo('Tell Story Flow Completed')