Skip to content

Conversation

@mattapperson
Copy link
Collaborator

@mattapperson mattapperson commented Dec 26, 2025

Summary

This PR enables real-time streaming of generator tool preliminary results - yields are streamed AS they happen during execution, instead of being batched after tool execution completes.

Key changes:

  • ToolEventBroadcaster: New push-based multi-consumer event broadcaster following the ReusableReadableStream pattern
  • Real-time streaming: getToolStream() and getFullResponsesStream() now yield preliminary results as generator tools emit them
  • Bug fix: Fixed toJSONSchema target from invalid 'openapi-3.0' to 'draft-7' (Zod v4 compatibility)

How it works

  1. When a generator tool executes, each yield is immediately pushed to ToolEventBroadcaster
  2. Consumers created via createConsumer() receive events in real-time as they're pushed
  3. Multiple consumers can be created and each gets all events from position 0
  4. The broadcaster buffers events for late-joining consumers

Example Usage

Define a generator tool with progress events

import { tool } from "@openrouter/sdk";
import { z } from "zod";

const searchTool = tool({
  name: "search_database",
  description: "Search with progress updates",
  inputSchema: z.object({ query: z.string() }),
  // eventSchema defines the type of intermediate yields
  eventSchema: z.object({
    progress: z.number(),
    message: z.string(),
  }),
  // outputSchema defines the final result sent to the model
  outputSchema: z.object({
    results: z.array(z.string()),
    totalFound: z.number(),
  }),
  // Generator function yields progress, final yield is the result
  execute: async function* (params) {
    yield { progress: 25, message: "Searching..." };
    yield { progress: 50, message: "Processing results..." };
    yield { progress: 75, message: "Almost done..." };
    // Final yield must match outputSchema
    yield { results: ["doc1", "doc2"], totalFound: 2 };
  },
});

Consume real-time events with getToolStream()

const result = client.callModel({
  model: "anthropic/claude-sonnet-4",
  input: [{ role: "user", content: "Search for documents about TypeScript" }],
  tools: [searchTool],
});

for await (const event of result.getToolStream()) {
  if (event.type === "preliminary_result") {
    // Real-time progress as generator yields
    console.log(`Progress: ${event.result.progress}% - ${event.result.message}`);
  } else if (event.type === "delta") {
    // Tool argument deltas from API
    process.stdout.write(event.content);
  }
}

Consume with getFullResponsesStream()

for await (const event of result.getFullResponsesStream()) {
  if (event.type === "tool.preliminary_result") {
    // Real-time tool progress events
    console.log(`Tool ${event.toolCallId}: ${JSON.stringify(event.result)}`);
  } else {
    // Handle other API stream events (response.created, response.delta, etc.)
    console.log(event.type);
  }
}

Files changed

File Changes
src/lib/tool-event-broadcaster.ts NEW - Push-based multi-consumer broadcaster
src/lib/model-result.ts Wire broadcaster into tool execution and streaming methods
src/lib/tool-executor.ts Fix toJSONSchema target, add typeguard for zod schema
src/index.ts Export ToolEventBroadcaster
tests/unit/tool-event-broadcaster.test.ts NEW - 13 unit tests
tests/e2e/call-model-tools.test.ts Fix test toJSONSchema targets

Test plan

  • Unit tests for ToolEventBroadcaster (13 tests)
  • E2E tests for tool execution (23 tests)
  • Build passes
  • Lint passes

This change enables generator tool yields to be streamed to consumers AS
they happen during execution, instead of being batched after completion.

Changes:
- Add ToolEventBroadcaster for push-based multi-consumer event streaming
- Wire onPreliminaryResult callback in executeToolRound to broadcast events
- Update getToolStream() and getFullResponsesStream() to consume broadcast
- Fix Zod v4 toJSONSchema target to 'draft-7' (openapi-3.0 was invalid)
- Add comprehensive unit tests for ToolEventBroadcaster
- Fix test files to use draft-7 target
- Add buffer cleanup after all consumers complete (memory leak fix)
- Use lazy initialization for broadcaster (race condition fix)
- Add try-catch in onPreliminaryResult callback (error handling)
- Add tests for completion between consumer iterations
@mattapperson mattapperson force-pushed the feat/real-time-generator-streaming branch from e15dc75 to de38fe8 Compare January 6, 2026 17:13
…push

Addresses review feedback from louisgv. The push() method doesn't throw,
so the try-catch was unnecessary. Using optional chaining (?.) instead of
non-null assertion (!) is safer and more idiomatic TypeScript.
@mattapperson mattapperson force-pushed the feat/real-time-generator-streaming branch from de38fe8 to 7236091 Compare January 6, 2026 17:23
louisgv
louisgv previously approved these changes Jan 6, 2026
Copy link
Contributor

@louisgv louisgv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise lgtm!

@mattapperson mattapperson merged commit 62dd6bb into main Jan 6, 2026
1 check passed
@mattapperson mattapperson deleted the feat/real-time-generator-streaming branch January 6, 2026 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants