-
Notifications
You must be signed in to change notification settings - Fork 639
feat(go): added channel-based streaming flow API #4186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Genkit framework by introducing a new, more Go-idiomatic channel-based API for defining streaming flows. This provides developers with a familiar and efficient way to handle streaming data, complementing the existing callback-based mechanism. Additionally, it includes a crucial bug fix to gracefully manage early termination by streaming consumers, preventing potential panics and improving overall stability. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new channel-based streaming flow API, x.DefineStreamingFlow, which provides a more idiomatic Go approach for streaming use cases compared to the existing callback-based API. The implementation is well-structured, and the accompanying tests are thorough, covering various edge cases like early consumer termination. The fix in core/flow.go to prevent a panic on early loop exit is also a good improvement. I have one suggestion to prevent a potential goroutine leak in the non-streaming path. Overall, this is a valuable and well-implemented feature.
| output, err := fn(ctx, input, discardCh) | ||
| close(discardCh) | ||
| return output, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the non-streaming case (sendChunk == nil), if the user-provided function fn panics, close(discardCh) will not be called. This will cause the goroutine that ranges over discardCh to leak because it will block forever.
Using defer ensures that the channel is closed even if a panic occurs, thus preventing the goroutine leak.
| output, err := fn(ctx, input, discardCh) | |
| close(discardCh) | |
| return output, err | |
| defer close(discardCh) | |
| return fn(ctx, input, discardCh) |
Adds
genkitx.DefineStreamingFlow(experimental) as an alternative to the callback-basedgenkit.DefineStreamingFlow. The channel-based API is more idiomatic Go for streaming use cases and will replace the callback-based version in the next major version release.Usage
Changes
genkit/x/genkit.go: NewDefineStreamingFlowfunction with channel-based signaturecore/flow.go: Fix panic when consumer breaks out of streaming loop earlyChecklist (if applicable):