Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,22 @@ jobs:
working-directory: ./python
run: make test

python-test-dapr:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v5

- name: Install python
run: uv python install 3.13

- name: Run kagent-dapr tests
working-directory: ./python
run: make test-dapr

python-lint:
runs-on: ubuntu-latest

Expand Down
10 changes: 9 additions & 1 deletion python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ lint:

.PHONY: test
test: update generate-test-certs
uv run pytest ./packages/**/tests/
uv run pytest $(shell find ./packages -path '*/tests' -type d | grep -v kagent-dapr)

.PHONY: test-dapr
test-dapr:
cd packages/kagent-dapr && uv sync --all-extras && uv run pytest tests/ -v

.PHONY: build
build: update format
Expand Down Expand Up @@ -77,3 +81,7 @@ generate-test-certs:
.PHONY: basic-openai-sample
basic-openai-sample:
docker build . -f samples/openai/basic_agent/Dockerfile --tag localhost:5001/basic-openai:latest --push

.PHONY: dapr-agent-sample
dapr-agent-sample:
docker build . -f samples/dapr/Dockerfile --tag localhost:5001/my-dapr-agent:latest --push
2 changes: 1 addition & 1 deletion python/packages/kagent-core/src/kagent/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

configure_logging()

__all__ = ["KAgentConfig", "configure_tracing", "configure_logging"]
__all__ = ["KAgentConfig", "configure_tracing", "configure_logging"]
Empty file.
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from ._utils import configure
from ._span_processor import (
KagentAttributesSpanProcessor,
clear_kagent_span_attributes,
set_kagent_span_attributes,
)

__all__ = ["configure"]
__all__ = ["configure", "KagentAttributesSpanProcessor", "clear_kagent_span_attributes", "set_kagent_span_attributes"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from contextvars import Token
from typing import Optional
from typing import Any, Dict, Optional

from opentelemetry import context as otel_context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
Expand Down Expand Up @@ -41,7 +41,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


def set_kagent_span_attributes(attributes: dict) -> Token[otel_context.Context]:
def set_kagent_span_attributes(attributes: Dict[Any, Any]) -> Token[otel_context.Context]:
"""Set kagent span attributes in the context.
Args:
attributes: Dictionary of kagent span attributes to store in context
Expand Down
50 changes: 50 additions & 0 deletions python/packages/kagent-dapr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# kagent-dapr

Dapr-Agents integration for KAgent. Wraps Dapr-Agents `DurableAgent` and exposes it
via the A2A protocol through FastAPI with durable workflow execution.

## Features

- **DurableAgent Support**: Durable workflow orchestration via Dapr workflow runtime
- **A2A Protocol Compatibility**: Compatible with KAgent's Agent-to-Agent protocol
- **State Persistence**: Durable workflow state via Dapr state stores (Redis, etc.)
- **OpenTelemetry Tracing**: Automatic span attribute injection via kagent-core
- **Debug Endpoints**: Health check and thread dump endpoints for observability

## Quick Start

```python
from dapr_agents import DurableAgent
from kagent.core import KAgentConfig
from kagent.dapr import KAgentApp

agent = DurableAgent(name="my-agent", role="assistant", instructions=["Be helpful."])

agent_card = {
"name": "my-agent",
"description": "A Dapr-Agents based agent",
"url": "http://localhost:8080",
"version": "0.1.0",
"skills": [{"id": "chat", "name": "Chat", "description": "General chat"}],
"capabilities": {},
}

config = KAgentConfig()
app = KAgentApp(agent=agent, agent_card=agent_card, config=config)
fastapi_app = app.build()
```

## Architecture

- **KAgentApp**: FastAPI application builder with A2A route integration
- **DaprDurableAgentExecutor**: Schedules `agent_workflow`, waits for completion, emits A2A events
- **Tracing**: Automatic span attribute injection via kagent-core's span processor

## Configuration

- `KAGENT_URL`: KAgent controller URL (default: `http://kagent-controller.kagent:8083`)
- `PORT`: Server port (default: `8080`)

## Deployment

See `samples/dapr/` for a complete Kubernetes deployment example with Dapr components.
36 changes: 36 additions & 0 deletions python/packages/kagent-dapr/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[project]
name = "kagent-dapr"
version = "0.1.0"
description = "Dapr Agents integration for KAgent"
readme = "README.md"
requires-python = ">=3.11,<3.14"
dependencies = [
"dapr-agents>=0.11.0",
"kagent-core>=0.1.0",
"a2a-sdk[http-server]>=0.3.1",
"durabletask-dapr>=0.2.0a15",
]

[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"ruff>=0.1.0",
]

[tool.uv]
prerelease = "allow"

[tool.uv.sources]
kagent-core = { path = "../kagent-core", editable = true }

[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.hatch.build.targets.wheel]
packages = ["src/kagent"]
6 changes: 6 additions & 0 deletions python/packages/kagent-dapr/src/kagent/dapr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""KAgent Dapr-Agents Integration Package."""

from ._a2a import KAgentApp

__all__ = ["KAgentApp"]
__version__ = "0.1.0"
107 changes: 107 additions & 0 deletions python/packages/kagent-dapr/src/kagent/dapr/_a2a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""KAgent Dapr-Agents A2A Server Integration.

Provides the KAgentApp class that builds a FastAPI application
with A2A protocol support for Dapr-Agents DurableAgent.
"""

import faulthandler
import logging

import httpx
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.types import AgentCard
from dapr_agents import DurableAgent
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
from kagent.core import KAgentConfig, configure_tracing
from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore

from ._durable import DaprDurableAgentExecutor

logger = logging.getLogger(__name__)


def health_check(request: Request) -> PlainTextResponse:
"""Health check endpoint."""
return PlainTextResponse("OK")


def thread_dump(request: Request) -> PlainTextResponse:
"""Thread dump endpoint for debugging."""
import io

buf = io.StringIO()
faulthandler.dump_traceback(file=buf)
buf.seek(0)
return PlainTextResponse(buf.read())


class KAgentApp:
"""Main application class for Dapr-Agents + KAgent integration.

Builds a FastAPI application with A2A protocol support,
using Dapr-Agents DurableAgent for execution and KAgent for state persistence.
"""

def __init__(
self,
*,
agent: DurableAgent,
agent_card: AgentCard,
config: KAgentConfig,
tracing: bool = True,
):
self._agent = agent
self.agent_card = AgentCard.model_validate(agent_card)
self.config = config
self._enable_tracing = tracing

def build(self) -> FastAPI:
"""Build the FastAPI application with A2A integration.

Returns:
Configured FastAPI application ready for deployment.
"""
http_client = httpx.AsyncClient(base_url=self.config.url)

agent_executor = DaprDurableAgentExecutor(
durable_agent=self._agent,
app_name=self.config.app_name,
)

task_store = KAgentTaskStore(http_client)
request_context_builder = KAgentRequestContextBuilder(task_store=task_store)

request_handler = DefaultRequestHandler(
agent_executor=agent_executor,
task_store=task_store,
request_context_builder=request_context_builder,
)

a2a_app = A2AStarletteApplication(
agent_card=self.agent_card,
http_handler=request_handler,
)

faulthandler.enable()

app = FastAPI(
title=f"KAgent Dapr-Agents: {self.config.app_name}",
description=f"Dapr-Agents agent with KAgent integration: {self.agent_card.description}",
version=self.agent_card.version,
)

if self._enable_tracing:
try:
configure_tracing(app)
logger.info("Tracing configured for KAgent Dapr-Agents app")
except Exception:
logger.exception("Failed to configure tracing")

app.add_route("/health", methods=["GET"], route=health_check)
app.add_route("/thread_dump", methods=["GET"], route=thread_dump)

a2a_app.add_routes_to_app(app)

return app
Loading