From 771fbd45d4ba497cd1666aa2979f88ec3bcc9086 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Wed, 14 Jan 2026 12:55:32 -0500 Subject: [PATCH 1/5] expose agents via mcp in controller http server and add test Signed-off-by: Jet Chiang --- go/internal/httpserver/server.go | 8 + go/internal/mcp/mcp_handler.go | 255 +++++++++++++++++++++++++++++++ go/pkg/app/app.go | 13 ++ go/test/e2e/invoke_mcp_test.go | 202 ++++++++++++++++++++++++ 4 files changed, 478 insertions(+) create mode 100644 go/internal/mcp/mcp_handler.go create mode 100644 go/test/e2e/invoke_mcp_test.go diff --git a/go/internal/httpserver/server.go b/go/internal/httpserver/server.go index 1e7896d57..9793afee9 100644 --- a/go/internal/httpserver/server.go +++ b/go/internal/httpserver/server.go @@ -9,6 +9,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/a2a" "github.com/kagent-dev/kagent/go/internal/database" "github.com/kagent-dev/kagent/go/internal/httpserver/handlers" + "github.com/kagent-dev/kagent/go/internal/mcp" common "github.com/kagent-dev/kagent/go/internal/utils" "github.com/kagent-dev/kagent/go/internal/version" "github.com/kagent-dev/kagent/go/pkg/auth" @@ -35,6 +36,7 @@ const ( APIPathMemories = "/api/memories" APIPathNamespaces = "/api/namespaces" APIPathA2A = "/api/a2a" + APIPathMCP = "/api/mcp" APIPathFeedback = "/api/feedback" APIPathLangGraph = "/api/langgraph" APIPathCrewAI = "/api/crewai" @@ -51,6 +53,7 @@ type ServerConfig struct { BindAddr string KubeClient ctrl_client.Client A2AHandler a2a.A2AHandlerMux + MCPHandler *mcp.MCPHandler WatchedNamespaces []string DbClient database.Client Authenticator auth.AuthProvider @@ -225,6 +228,11 @@ func (s *HTTPServer) setupRoutes() { // A2A s.router.PathPrefix(APIPathA2A + "/{namespace}/{name}").Handler(s.config.A2AHandler) + // MCP + if s.config.MCPHandler != nil { + s.router.PathPrefix(APIPathMCP).Handler(s.config.MCPHandler) + } + // Use middleware for common functionality s.router.Use(auth.AuthnMiddleware(s.authenticator)) s.router.Use(contentTypeMiddleware) diff --git a/go/internal/mcp/mcp_handler.go b/go/internal/mcp/mcp_handler.go new file mode 100644 index 000000000..09ba44372 --- /dev/null +++ b/go/internal/mcp/mcp_handler.go @@ -0,0 +1,255 @@ +package mcp + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/kagent-dev/kagent/go/api/v1alpha2" + "github.com/kagent-dev/kagent/go/internal/a2a" + "github.com/kagent-dev/kagent/go/internal/version" + "github.com/kagent-dev/kagent/go/pkg/auth" + "github.com/mark3labs/mcp-go/mcp" + mcpserver "github.com/mark3labs/mcp-go/server" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrllog "sigs.k8s.io/controller-runtime/pkg/log" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" + "trpc.group/trpc-go/trpc-a2a-go/protocol" +) + +// MCPHandler handles MCP requests and bridges them to A2A endpoints +type MCPHandler struct { + kubeClient client.Client + a2aBaseURL string + authenticator auth.AuthProvider + httpServer *mcpserver.StreamableHTTPServer + lock sync.RWMutex + // Map to store context IDs per session and agent + contextBySessionAndAgent sync.Map +} + +// NewMCPHandler creates a new MCP handler +// Wraps the StreamableHTTPServer handler adds A2A bridging and context management. +func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider) (*MCPHandler, error) { + handler := &MCPHandler{ + kubeClient: kubeClient, + a2aBaseURL: a2aBaseURL, + authenticator: authenticator, + } + + // Create MCP server with tools and session cleanup hooks + hooks := &mcpserver.Hooks{} + hooks.AddOnUnregisterSession(func(ctx context.Context, session mcpserver.ClientSession) { + sessionID := session.SessionID() + handler.contextBySessionAndAgent.Range(func(key, _ any) bool { + keyStr, ok := key.(string) + if !ok { + return true + } + if strings.HasPrefix(keyStr, sessionID+"|") { + handler.contextBySessionAndAgent.Delete(key) + } + return true + }) + }) + + s := mcpserver.NewMCPServer( + "kagent-agents", + version.Version, + mcpserver.WithToolCapabilities(false), + mcpserver.WithHooks(hooks), + ) + + // Add list_agents tool + s.AddTool(mcp.NewTool("list_agents", + mcp.WithDescription("List invokable kagent agents (accepted + deploymentReady)"), + ), handler.handleListAgents) + + // Add invoke_agent tool + s.AddTool(mcp.NewTool("invoke_agent", + mcp.WithDescription("Invoke a kagent agent via A2A"), + mcp.WithString("agent", mcp.Description("Agent name (or namespace/name)"), mcp.Required()), + mcp.WithString("task", mcp.Description("Task to run"), mcp.Required()), + ), handler.handleInvokeAgent) + + // Create HTTP server + handler.httpServer = mcpserver.NewStreamableHTTPServer(s) + + return handler, nil +} + +// handleListAgents handles the list_agents MCP tool +func (h *MCPHandler) handleListAgents(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") + + agentList := &v1alpha2.AgentList{} + if err := h.kubeClient.List(ctx, agentList); err != nil { + return mcp.NewToolResultErrorFromErr("list agents", err), nil + } + + type agentSummary struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` + } + + agents := make([]agentSummary, 0) + for _, agent := range agentList.Items { + // Check if agent is accepted and deployment ready + deploymentReady := false + accepted := false + for _, condition := range agent.Status.Conditions { + if condition.Type == "Ready" && condition.Reason == "DeploymentReady" && condition.Status == "True" { + deploymentReady = true + } + if condition.Type == "Accepted" && condition.Status == "True" { + accepted = true + } + } + + if !accepted || !deploymentReady { + continue + } + + ref := agent.Namespace + "/" + agent.Name + description := agent.Spec.Description + agents = append(agents, agentSummary{ + Ref: ref, + Description: description, + }) + } + + log.Info("Listed agents", "count", len(agents)) + if len(agents) == 0 { + return mcp.NewToolResultStructured(map[string]any{"agents": agents}, "No invokable agents found."), nil + } + + var fallbackText strings.Builder + for i, agent := range agents { + if i > 0 { + fallbackText.WriteByte('\n') + } + fallbackText.WriteString(agent.Ref) + if agent.Description != "" { + fallbackText.WriteString(" - ") + fallbackText.WriteString(agent.Description) + } + } + + return mcp.NewToolResultStructured(map[string]any{"agents": agents}, fallbackText.String()), nil +} + +// handleInvokeAgent handles the invoke_agent MCP tool +func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") + + agentRef, err := request.RequireString("agent") + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + task, err := request.RequireString("task") + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + // Parse agent reference (namespace/name or just name) + agentNS, agentName, ok := strings.Cut(agentRef, "/") + if !ok { + return mcp.NewToolResultError("agent must be in format 'namespace/name'"), nil + } + agentRef = agentNS + "/" + agentName + + // Get session ID from context if available + sessionID := "" + if session := mcpserver.ClientSessionFromContext(ctx); session != nil { + sessionID = session.SessionID() + } else if headerSessionID := request.Header.Get(mcpserver.HeaderKeySessionID); headerSessionID != "" { + sessionID = headerSessionID + } + if sessionID == "" { + sessionID = uuid.New().String() + } + + // Get or create context ID for this session and agent + contextKey := sessionID + "|" + agentRef + var contextIDPtr *string + if prior, ok := h.contextBySessionAndAgent.Load(contextKey); ok { + if priorStr, ok := prior.(string); ok && priorStr != "" { + contextIDPtr = &priorStr + } + } + + // Create A2A client + a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) + a2aClient, err := a2aclient.NewA2AClient(a2aURL, a2aclient.WithTimeout(30*time.Second)) + if err != nil { + log.Error(err, "Failed to create A2A client", "agent", agentRef) + return mcp.NewToolResultErrorFromErr("a2a client", err), nil + } + + // Send message via A2A + result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{ + Message: protocol.Message{ + Kind: protocol.KindMessage, + Role: protocol.MessageRoleUser, + ContextID: contextIDPtr, + Parts: []protocol.Part{protocol.NewTextPart(task)}, + }, + }) + if err != nil { + log.Error(err, "Failed to send A2A message", "agent", agentRef) + return mcp.NewToolResultErrorFromErr("a2a send", err), nil + } + + // Extract response text and context ID + var responseText, newContextID string + switch a2aResult := result.Result.(type) { + case *protocol.Message: + responseText = a2a.ExtractText(*a2aResult) + if a2aResult.ContextID != nil { + newContextID = *a2aResult.ContextID + } + case *protocol.Task: + newContextID = a2aResult.ContextID + if a2aResult.Status.Message != nil { + responseText = a2a.ExtractText(*a2aResult.Status.Message) + } + for _, artifact := range a2aResult.Artifacts { + responseText += a2a.ExtractText(protocol.Message{Parts: artifact.Parts}) + } + } + + if responseText == "" { + raw, err := result.MarshalJSON() + if err != nil { + return mcp.NewToolResultErrorFromErr("marshal result", err), nil + } + responseText = string(raw) + } + + // Store new context ID if available + if newContextID != "" { + h.contextBySessionAndAgent.Store(contextKey, newContextID) + } + + log.Info("Invoked agent", "agent", agentRef, "hasContextID", newContextID != "") + return mcp.NewToolResultStructured(map[string]any{ + "agent": agentRef, + "text": responseText, + }, responseText), nil +} + +// ServeHTTP implements http.Handler interface +func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // The MCP HTTP server handles all the routing internally + h.httpServer.ServeHTTP(w, r) +} + +// Shutdown gracefully shuts down the MCP handler +func (h *MCPHandler) Shutdown(ctx context.Context) error { + return h.httpServer.Shutdown(ctx) +} diff --git a/go/pkg/app/app.go b/go/pkg/app/app.go index 8fa6762ee..813b3b1d3 100644 --- a/go/pkg/app/app.go +++ b/go/pkg/app/app.go @@ -38,6 +38,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/a2a" "github.com/kagent-dev/kagent/go/internal/database" + "github.com/kagent-dev/kagent/go/internal/mcp" versionmetrics "github.com/kagent-dev/kagent/go/internal/metrics" "github.com/kagent-dev/kagent/go/internal/controller/reconciler" @@ -450,6 +451,17 @@ func Start(getExtensionConfig GetExtensionConfig) { os.Exit(1) } + // Create MCP handler that bridges to A2A + mcpHandler, err := mcp.NewMCPHandler( + mgr.GetClient(), + cfg.A2ABaseUrl+httpserver.APIPathA2A, + extensionCfg.Authenticator, + ) + if err != nil { + setupLog.Error(err, "unable to create MCP handler") + os.Exit(1) + } + // +kubebuilder:scaffold:builder if metricsCertWatcher != nil { setupLog.Info("Adding metrics certificate watcher to manager") @@ -486,6 +498,7 @@ func Start(getExtensionConfig GetExtensionConfig) { BindAddr: cfg.HttpServerAddr, KubeClient: mgr.GetClient(), A2AHandler: a2aHandler, + MCPHandler: mcpHandler, WatchedNamespaces: watchNamespacesList, DbClient: dbClient, Authorizer: extensionCfg.Authorizer, diff --git a/go/test/e2e/invoke_mcp_test.go b/go/test/e2e/invoke_mcp_test.go new file mode 100644 index 000000000..977d954d9 --- /dev/null +++ b/go/test/e2e/invoke_mcp_test.go @@ -0,0 +1,202 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "os" + "strings" + "testing" + "time" + + mcp_client "github.com/mark3labs/mcp-go/client" + "github.com/mark3labs/mcp-go/client/transport" + "github.com/mark3labs/mcp-go/mcp" + "github.com/stretchr/testify/require" +) + +// mcpEndpointURL returns the URL for the MCP endpoint +func mcpEndpointURL() string { + kagentURL := os.Getenv("KAGENT_URL") + if kagentURL == "" { + // if running locally on kind, do "kubectl port-forward -n kagent deployments/kagent-controller 8083" + kagentURL = "http://localhost:8083" + } + return kagentURL + "/api/mcp" +} + +// setupMCPClient creates and initializes an MCP client for testing +func setupMCPClient(t *testing.T) *mcp_client.Client { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + url := mcpEndpointURL() + tsp, err := transport.NewStreamableHTTP(url) + require.NoError(t, err, "Failed to create transport") + client := mcp_client.NewClient(tsp) + + err = client.Start(ctx) + require.NoError(t, err, "Failed to start MCP client") + + t.Cleanup(func() { + client.Close() + }) + + _, err = client.Initialize(ctx, mcp.InitializeRequest{ + Params: mcp.InitializeParams{ + ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, + Capabilities: mcp.ClientCapabilities{}, + ClientInfo: mcp.Implementation{ + Name: "e2e-test", + Version: "0.0.0", + }, + }, + }) + require.NoError(t, err, "Failed to initialize MCP client") + + return client +} + +// TestE2EMCPEndpointListAgents tests the list_agents tool via the controller's MCP endpoint +// These tests use the kebab-agent deployed via push-test-agent in CI. +func TestE2EMCPEndpointListAgents(t *testing.T) { + ctx := context.Background() + client := setupMCPClient(t) + + // List tools + toolsResult, err := client.ListTools(ctx, mcp.ListToolsRequest{}) + require.NoError(t, err, "Should list tools") + + // Verify expected tools exist + toolNames := make([]string, 0, len(toolsResult.Tools)) + for _, tool := range toolsResult.Tools { + toolNames = append(toolNames, tool.Name) + } + require.Contains(t, toolNames, "list_agents", "Should have list_agents tool") + require.Contains(t, toolNames, "invoke_agent", "Should have invoke_agent tool") + + // Call list_agents tool + listAgentsResult, err := client.CallTool(ctx, mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "list_agents", + }, + }) + require.NoError(t, err, "Should call list_agents tool") + require.NotEmpty(t, listAgentsResult.Content, "Should have content in response") + require.False(t, listAgentsResult.IsError, "Should not be an error") + + agentRef := "kagent/kebab-agent" + found := false + + // First check StructuredContent (preferred) + if listAgentsResult.StructuredContent != nil { + structuredBytes, err := json.Marshal(listAgentsResult.StructuredContent) + require.NoError(t, err, "Should marshal structured content") + var structuredData struct { + Agents []struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` + } `json:"agents"` + } + if err := json.Unmarshal(structuredBytes, &structuredData); err == nil { + for _, a := range structuredData.Agents { + if a.Ref == agentRef { + found = true + break + } + } + } + } + + // Check text format for fallback + if !found { + for _, content := range listAgentsResult.Content { + if textContent, ok := content.(*mcp.TextContent); ok { + if strings.Contains(textContent.Text, agentRef) { + found = true + break + } + } + } + } + + require.True(t, found, "Should find agent %s in list", agentRef) +} + +// TestE2EMCPEndpointInvokeAgent tests the invoke_agent tool via the controller's MCP endpoint +func TestE2EMCPEndpointInvokeAgent(t *testing.T) { + ctx := context.Background() + client := setupMCPClient(t) + + // Invoke kebab-agent + agentRef := "kagent/kebab-agent" + invokeResult, err := client.CallTool(ctx, mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]interface{}{ + "agent": agentRef, + "task": "What can you do?", + }, + }, + }) + require.NoError(t, err, "Should call invoke_agent tool") + require.NotEmpty(t, invokeResult.Content, "Should have content in response") + require.False(t, invokeResult.IsError, "Should not be an error") + + foundText := false + + if invokeResult.StructuredContent != nil { + structuredBytes, err := json.Marshal(invokeResult.StructuredContent) + require.NoError(t, err, "Should marshal structured content") + var structuredData struct { + Agent string `json:"agent"` + Text string `json:"text"` + } + if err := json.Unmarshal(structuredBytes, &structuredData); err == nil { + if strings.Contains(strings.ToLower(structuredData.Text), "kebab") { + foundText = true + } + } + } + + if !foundText { + for _, content := range invokeResult.Content { + if textContent, ok := content.(*mcp.TextContent); ok && textContent.Text != "" { + if strings.Contains(strings.ToLower(textContent.Text), "kebab") { + foundText = true + break + } + } + } + } + + require.True(t, foundText, "Should have text content containing 'kebab' in response") +} + +// TestE2EMCPEndpointErrorHandling tests error handling in the MCP endpoint +func TestE2EMCPEndpointErrorHandling(t *testing.T) { + ctx := context.Background() + client := setupMCPClient(t) + + // Try to invoke a non-existent agent + result, err := client.CallTool(ctx, mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]interface{}{ + "agent": "nonexistent/agent", + "task": "test", + }, + }, + }) + require.True(t, result.IsError, "Should return error") + // This content is the error text for the LLM to know what went wrong + require.NotEmpty(t, result.Content, "Should have error content") + + // Try to call a non-existent tool + _, err = client.CallTool(ctx, mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Name: "nonexistent_tool", + }, + }) + // Should return an error + require.Error(t, err, "Should return error for non-existent tool") +} From 61a09784b430c5449997bf32782e948aaf04b408 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Fri, 16 Jan 2026 17:34:03 -0500 Subject: [PATCH 2/5] migrate to go-sdk Signed-off-by: Jet Chiang --- go/go.mod | 7 +- go/go.sum | 14 +- .../controller/reconciler/reconciler.go | 79 +++-- go/internal/httpserver/server.go | 2 +- go/internal/mcp/mcp_handler.go | 282 +++++++++++------- go/test/e2e/invoke_mcp_test.go | 85 +++--- 6 files changed, 269 insertions(+), 200 deletions(-) diff --git a/go/go.mod b/go/go.mod index 611f2461e..cdc8d2081 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,7 +16,7 @@ require ( github.com/jedib0t/go-pretty/v6 v6.6.8 github.com/kagent-dev/kmcp v0.2.2 github.com/kagent-dev/mockllm v0.0.3 - github.com/mark3labs/mcp-go v0.40.0 + github.com/modelcontextprotocol/go-sdk v1.2.0 github.com/muesli/reflow v0.3.0 github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.1 @@ -43,8 +43,6 @@ require ( github.com/atotto/clipboard v0.1.4 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/aymanbagabas/go-udiff v0.3.1 // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/charmbracelet/colorprofile v0.3.2 // indirect github.com/charmbracelet/x/ansi v0.10.1 // indirect @@ -53,7 +51,7 @@ require ( github.com/charmbracelet/x/term v0.2.1 // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/evanphx/json-patch v5.9.11+incompatible // indirect - github.com/invopop/jsonschema v0.13.0 // indirect + github.com/google/jsonschema-go v0.3.0 // indirect github.com/lucasb-eyer/go-colorful v1.3.0 // indirect github.com/mattn/go-localereader v0.0.2-0.20220822084749-2491eb6c1c75 // indirect github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect @@ -66,7 +64,6 @@ require ( github.com/tidwall/match v1.2.0 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/sjson v1.2.5 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect diff --git a/go/go.sum b/go/go.sum index 8158577f6..e7c83f7f7 100644 --- a/go/go.sum +++ b/go/go.sum @@ -22,16 +22,12 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/aymanbagabas/go-udiff v0.3.1 h1:LV+qyBQ2pqe0u42ZsUEtPiCaUoqgA9gYRDs3vj1nolY= github.com/aymanbagabas/go-udiff v0.3.1/go.mod h1:G0fsKmG+P6ylD0r6N/KgQD/nWzgfnl8ZBcNLgcbrw8E= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/briandowns/spinner v1.23.2 h1:Zc6ecUnI+YzLmJniCfDNaMbW0Wid1d5+qcTq4L2FW8w= github.com/briandowns/spinner v1.23.2/go.mod h1:LaZeM4wm2Ywy6vO571mvhQNRcWfRUnXOs0RcKV0wYKM= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -128,6 +124,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/jsonschema-go v0.3.0 h1:6AH2TxVNtk3IlvkkhjrtbUc4S8AvO0Xii0DxIygDg+Q= +github.com/google/jsonschema-go v0.3.0/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8 h1:ZI8gCoCjGzPsum4L21jHdQs8shFBIQih1TM9Rd/c+EQ= github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -143,8 +141,6 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= -github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -193,8 +189,6 @@ github.com/lucasb-eyer/go-colorful v1.3.0 h1:2/yBRLdWBZKrf7gB40FoiKfAWYQ0lqNcbuQ github.com/lucasb-eyer/go-colorful v1.3.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mailru/easyjson v0.9.1 h1:LbtsOm5WAswyWbvTEOqhypdPeZzHavpZx96/n553mR8= github.com/mailru/easyjson v0.9.1/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= -github.com/mark3labs/mcp-go v0.40.0 h1:M0oqK412OHBKut9JwXSsj4KanSmEKpzoW8TcxoPOkAU= -github.com/mark3labs/mcp-go v0.40.0/go.mod h1:T7tUa2jO6MavG+3P25Oy/jR7iCeJPHImCZHRymCn39g= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= @@ -206,6 +200,8 @@ github.com/mattn/go-localereader v0.0.2-0.20220822084749-2491eb6c1c75/go.mod h1: github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.17 h1:78v8ZlW0bP43XfmAfPsdXcoNCelfMHsDmd/pkENfrjQ= github.com/mattn/go-runewidth v0.0.17/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/modelcontextprotocol/go-sdk v1.2.0 h1:Y23co09300CEk8iZ/tMxIX1dVmKZkzoSBZOpJwUnc/s= +github.com/modelcontextprotocol/go-sdk v1.2.0/go.mod h1:6fM3LCm3yV7pAs8isnKLn07oKtB0MP9LHd3DfAcKw10= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -299,8 +295,6 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd h1:dLuIF2kX9c+KknGJUdJi1Il1SDiTSK158/BB9kdgAew= -github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd/go.mod h1:DbzwytT4g/odXquuOCqroKvtxxldI4nb3nuesHF/Exo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= diff --git a/go/internal/controller/reconciler/reconciler.go b/go/internal/controller/reconciler/reconciler.go index 258fd62a9..04d8aacf7 100644 --- a/go/internal/controller/reconciler/reconciler.go +++ b/go/internal/controller/reconciler/reconciler.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "net/http" "reflect" "slices" "strings" @@ -27,9 +28,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/database" "github.com/kagent-dev/kagent/go/internal/utils" "github.com/kagent-dev/kagent/go/internal/version" - mcp_client "github.com/mark3labs/mcp-go/client" - "github.com/mark3labs/mcp-go/client/transport" - "github.com/mark3labs/mcp-go/mcp" + "github.com/modelcontextprotocol/go-sdk/mcp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -665,41 +664,73 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex return tools, nil } -func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServerSpec, namespace string) (transport.Interface, error) { +func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServerSpec, namespace string) (mcp.Transport, error) { headers, err := s.ResolveHeaders(ctx, a.kube, namespace) if err != nil { return nil, err } + httpClient := newHTTPClient(headers) + switch s.Protocol { case v1alpha2.RemoteMCPServerProtocolSse: - return transport.NewSSE(s.URL, transport.WithHeaders(headers)) + return &mcp.SSEClientTransport{ + Endpoint: s.URL, + HTTPClient: httpClient, + }, nil default: - return transport.NewStreamableHTTP(s.URL, transport.WithHTTPHeaders(headers)) + return &mcp.StreamableClientTransport{ + Endpoint: s.URL, + HTTPClient: httpClient, + }, nil } } -func (a *kagentReconciler) listTools(ctx context.Context, tsp transport.Interface, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) { - client := mcp_client.NewClient(tsp) - err := client.Start(ctx) - if err != nil { - return nil, fmt.Errorf("failed to start client for toolServer %s: %v", toolServer.Name, err) - } - defer client.Close() - _, err = client.Initialize(ctx, mcp.InitializeRequest{ - Params: mcp.InitializeParams{ - ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, - Capabilities: mcp.ClientCapabilities{}, - ClientInfo: mcp.Implementation{ - Name: "kagent-controller", - Version: version.Version, - }, +// go-sdk does not have a WithHeaders option when initializing transport +// so we need to create a custom HTTP client that adds headers to all requests. +func newHTTPClient(headers map[string]string) *http.Client { + if len(headers) == 0 { + return nil + } + return &http.Client{ + Transport: &headerTransport{ + headers: headers, + base: http.DefaultTransport, }, - }) + } +} + +// headerTransport is an http.RoundTripper that adds custom headers to requests. +type headerTransport struct { + headers map[string]string + base http.RoundTripper +} + +func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req = req.Clone(req.Context()) + for k, v := range t.headers { + req.Header.Set(k, v) + } + if t.base == nil { + t.base = http.DefaultTransport + } + return t.base.RoundTrip(req) +} + +func (a *kagentReconciler) listTools(ctx context.Context, tsp mcp.Transport, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) { + impl := &mcp.Implementation{ + Name: "kagent-controller", + Version: version.Version, + } + client := mcp.NewClient(impl, nil) + + session, err := client.Connect(ctx, tsp, nil) if err != nil { - return nil, fmt.Errorf("failed to initialize client for toolServer %s: %v", toolServer.Name, err) + return nil, fmt.Errorf("failed to connect client for toolServer %s: %v", toolServer.Name, err) } - result, err := client.ListTools(ctx, mcp.ListToolsRequest{}) + defer session.Close() + + result, err := session.ListTools(ctx, &mcp.ListToolsParams{}) if err != nil { return nil, fmt.Errorf("failed to list tools for toolServer %s: %v", toolServer.Name, err) } diff --git a/go/internal/httpserver/server.go b/go/internal/httpserver/server.go index 9793afee9..5d992ec56 100644 --- a/go/internal/httpserver/server.go +++ b/go/internal/httpserver/server.go @@ -36,7 +36,7 @@ const ( APIPathMemories = "/api/memories" APIPathNamespaces = "/api/namespaces" APIPathA2A = "/api/a2a" - APIPathMCP = "/api/mcp" + APIPathMCP = "/mcp" APIPathFeedback = "/api/feedback" APIPathLangGraph = "/api/langgraph" APIPathCrewAI = "/api/crewai" diff --git a/go/internal/mcp/mcp_handler.go b/go/internal/mcp/mcp_handler.go index 09ba44372..e14bd6744 100644 --- a/go/internal/mcp/mcp_handler.go +++ b/go/internal/mcp/mcp_handler.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/internal/a2a" + authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth" "github.com/kagent-dev/kagent/go/internal/version" "github.com/kagent-dev/kagent/go/pkg/auth" - "github.com/mark3labs/mcp-go/mcp" - mcpserver "github.com/mark3labs/mcp-go/server" + mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" @@ -26,14 +26,37 @@ type MCPHandler struct { kubeClient client.Client a2aBaseURL string authenticator auth.AuthProvider - httpServer *mcpserver.StreamableHTTPServer - lock sync.RWMutex - // Map to store context IDs per session and agent - contextBySessionAndAgent sync.Map + httpHandler *mcpsdk.StreamableHTTPHandler + server *mcpsdk.Server + a2aClients sync.Map +} + +// Input types for MCP tools +type ListAgentsInput struct{} + +type ListAgentsOutput struct { + Agents []AgentSummary `json:"agents"` +} + +type AgentSummary struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` +} + +type InvokeAgentInput struct { + Agent string `json:"agent" jsonschema:"Agent name (or namespace/name)"` + Task string `json:"task" jsonschema:"Task to run"` + ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` +} + +type InvokeAgentOutput struct { + Agent string `json:"agent"` + Text string `json:"text"` + ContextID string `json:"context_id,omitempty"` } // NewMCPHandler creates a new MCP handler -// Wraps the StreamableHTTPServer handler adds A2A bridging and context management. +// Wraps the StreamableHTTPHandler and adds A2A bridging and context management. func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider) (*MCPHandler, error) { handler := &MCPHandler{ kubeClient: kubeClient, @@ -41,62 +64,60 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au authenticator: authenticator, } - // Create MCP server with tools and session cleanup hooks - hooks := &mcpserver.Hooks{} - hooks.AddOnUnregisterSession(func(ctx context.Context, session mcpserver.ClientSession) { - sessionID := session.SessionID() - handler.contextBySessionAndAgent.Range(func(key, _ any) bool { - keyStr, ok := key.(string) - if !ok { - return true - } - if strings.HasPrefix(keyStr, sessionID+"|") { - handler.contextBySessionAndAgent.Delete(key) - } - return true - }) - }) - - s := mcpserver.NewMCPServer( - "kagent-agents", - version.Version, - mcpserver.WithToolCapabilities(false), - mcpserver.WithHooks(hooks), - ) + // Create MCP server + impl := &mcpsdk.Implementation{ + Name: "kagent-agents", + Version: version.Version, + } + server := mcpsdk.NewServer(impl, nil) + handler.server = server // Add list_agents tool - s.AddTool(mcp.NewTool("list_agents", - mcp.WithDescription("List invokable kagent agents (accepted + deploymentReady)"), - ), handler.handleListAgents) + mcpsdk.AddTool[ListAgentsInput, ListAgentsOutput]( + server, + &mcpsdk.Tool{ + Name: "list_agents", + Description: "List invokable kagent agents (accepted + deploymentReady)", + }, + handler.handleListAgents, + ) // Add invoke_agent tool - s.AddTool(mcp.NewTool("invoke_agent", - mcp.WithDescription("Invoke a kagent agent via A2A"), - mcp.WithString("agent", mcp.Description("Agent name (or namespace/name)"), mcp.Required()), - mcp.WithString("task", mcp.Description("Task to run"), mcp.Required()), - ), handler.handleInvokeAgent) + mcpsdk.AddTool[InvokeAgentInput, InvokeAgentOutput]( + server, + &mcpsdk.Tool{ + Name: "invoke_agent", + Description: "Invoke a kagent agent via A2A", + }, + handler.handleInvokeAgent, + ) - // Create HTTP server - handler.httpServer = mcpserver.NewStreamableHTTPServer(s) + // Create HTTP handler + handler.httpHandler = mcpsdk.NewStreamableHTTPHandler( + func(*http.Request) *mcpsdk.Server { + return server + }, + nil, + ) return handler, nil } // handleListAgents handles the list_agents MCP tool -func (h *MCPHandler) handleListAgents(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { +func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") agentList := &v1alpha2.AgentList{} if err := h.kubeClient.List(ctx, agentList); err != nil { - return mcp.NewToolResultErrorFromErr("list agents", err), nil - } - - type agentSummary struct { - Ref string `json:"ref"` - Description string `json:"description,omitempty"` + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, + }, + IsError: true, + }, ListAgentsOutput{}, nil } - agents := make([]agentSummary, 0) + agents := make([]AgentSummary, 0) for _, agent := range agentList.Items { // Check if agent is accepted and deployment ready deploymentReady := false @@ -116,79 +137,101 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, request mcp.CallToolR ref := agent.Namespace + "/" + agent.Name description := agent.Spec.Description - agents = append(agents, agentSummary{ + agents = append(agents, AgentSummary{ Ref: ref, Description: description, }) } log.Info("Listed agents", "count", len(agents)) - if len(agents) == 0 { - return mcp.NewToolResultStructured(map[string]any{"agents": agents}, "No invokable agents found."), nil - } + + output := ListAgentsOutput{Agents: agents} var fallbackText strings.Builder - for i, agent := range agents { - if i > 0 { - fallbackText.WriteByte('\n') - } - fallbackText.WriteString(agent.Ref) - if agent.Description != "" { - fallbackText.WriteString(" - ") - fallbackText.WriteString(agent.Description) + if len(agents) == 0 { + fallbackText.WriteString("No invokable agents found.") + } else { + for i, agent := range agents { + if i > 0 { + fallbackText.WriteByte('\n') + } + fallbackText.WriteString(agent.Ref) + if agent.Description != "" { + fallbackText.WriteString(" - ") + fallbackText.WriteString(agent.Description) + } } } - return mcp.NewToolResultStructured(map[string]any{"agents": agents}, fallbackText.String()), nil + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fallbackText.String()}, + }, + }, output, nil } // handleInvokeAgent handles the invoke_agent MCP tool -func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { +func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") - agentRef, err := request.RequireString("agent") - if err != nil { - return mcp.NewToolResultError(err.Error()), nil - } - - task, err := request.RequireString("task") - if err != nil { - return mcp.NewToolResultError(err.Error()), nil - } - // Parse agent reference (namespace/name or just name) - agentNS, agentName, ok := strings.Cut(agentRef, "/") + agentNS, agentName, ok := strings.Cut(input.Agent, "/") if !ok { - return mcp.NewToolResultError("agent must be in format 'namespace/name'"), nil - } - agentRef = agentNS + "/" + agentName - - // Get session ID from context if available - sessionID := "" - if session := mcpserver.ClientSessionFromContext(ctx); session != nil { - sessionID = session.SessionID() - } else if headerSessionID := request.Header.Get(mcpserver.HeaderKeySessionID); headerSessionID != "" { - sessionID = headerSessionID - } - if sessionID == "" { - sessionID = uuid.New().String() + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: "agent must be in format 'namespace/name'"}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil } + agentRef := agentNS + "/" + agentName + agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName} - // Get or create context ID for this session and agent - contextKey := sessionID + "|" + agentRef + // Get context ID from client request (stateless mode) + // If not provided, contextIDPtr will be nil and a new conversation will start var contextIDPtr *string - if prior, ok := h.contextBySessionAndAgent.Load(contextKey); ok { - if priorStr, ok := prior.(string); ok && priorStr != "" { - contextIDPtr = &priorStr - } + if input.ContextID != "" { + contextIDPtr = &input.ContextID + log.V(1).Info("Using context_id from client request", "context_id", input.ContextID) } - // Create A2A client + // Get or create cached A2A client for this agent a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) - a2aClient, err := a2aclient.NewA2AClient(a2aURL, a2aclient.WithTimeout(30*time.Second)) - if err != nil { - log.Error(err, "Failed to create A2A client", "agent", agentRef) - return mcp.NewToolResultErrorFromErr("a2a client", err), nil + var a2aClient *a2aclient.A2AClient + + if cached, ok := h.a2aClients.Load(agentRef); ok { + if client, ok := cached.(*a2aclient.A2AClient); ok { + a2aClient = client + } + } + + // Create new client if not cached + if a2aClient == nil { + // Build A2A client options with authentication propagation + a2aOpts := []a2aclient.Option{ + a2aclient.WithTimeout(30 * time.Second), + a2aclient.WithHTTPReqHandler( + authimpl.A2ARequestHandler( + h.authenticator, + agentNns, + ), + ), + } + + newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...) + if err != nil { + log.Error(err, "Failed to create A2A client", "agent", agentRef) + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil + } + + // Cache the client + h.a2aClients.Store(agentRef, newClient) + a2aClient = newClient } // Send message via A2A @@ -197,12 +240,17 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallTool Kind: protocol.KindMessage, Role: protocol.MessageRoleUser, ContextID: contextIDPtr, - Parts: []protocol.Part{protocol.NewTextPart(task)}, + Parts: []protocol.Part{protocol.NewTextPart(input.Task)}, }, }) if err != nil { log.Error(err, "Failed to send A2A message", "agent", agentRef) - return mcp.NewToolResultErrorFromErr("a2a send", err), nil + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to send A2A message: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil } // Extract response text and context ID @@ -213,6 +261,7 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallTool if a2aResult.ContextID != nil { newContextID = *a2aResult.ContextID } + // Kagent A2A only returns Task type for now case *protocol.Task: newContextID = a2aResult.ContextID if a2aResult.Status.Message != nil { @@ -226,30 +275,43 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallTool if responseText == "" { raw, err := result.MarshalJSON() if err != nil { - return mcp.NewToolResultErrorFromErr("marshal result", err), nil + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to marshal result: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil } responseText = string(raw) } - // Store new context ID if available + log.Info("Invoked agent", "agent", agentRef, "hasContextID", newContextID != "") + + // Return context_id in response so client can store it for stateless operation + output := InvokeAgentOutput{ + Agent: agentRef, + Text: responseText, + } if newContextID != "" { - h.contextBySessionAndAgent.Store(contextKey, newContextID) + output.ContextID = newContextID } - log.Info("Invoked agent", "agent", agentRef, "hasContextID", newContextID != "") - return mcp.NewToolResultStructured(map[string]any{ - "agent": agentRef, - "text": responseText, - }, responseText), nil + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: responseText}, + }, + }, output, nil } // ServeHTTP implements http.Handler interface func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // The MCP HTTP server handles all the routing internally - h.httpServer.ServeHTTP(w, r) + // The MCP HTTP handler handles all the routing internally + h.httpHandler.ServeHTTP(w, r) } // Shutdown gracefully shuts down the MCP handler func (h *MCPHandler) Shutdown(ctx context.Context) error { - return h.httpServer.Shutdown(ctx) + // The new SDK doesn't have an explicit Shutdown method on StreamableHTTPHandler + // The server will be shut down when the context is cancelled + return nil } diff --git a/go/test/e2e/invoke_mcp_test.go b/go/test/e2e/invoke_mcp_test.go index 977d954d9..ffa49e33b 100644 --- a/go/test/e2e/invoke_mcp_test.go +++ b/go/test/e2e/invoke_mcp_test.go @@ -8,9 +8,7 @@ import ( "testing" "time" - mcp_client "github.com/mark3labs/mcp-go/client" - "github.com/mark3labs/mcp-go/client/transport" - "github.com/mark3labs/mcp-go/mcp" + "github.com/modelcontextprotocol/go-sdk/mcp" "github.com/stretchr/testify/require" ) @@ -21,49 +19,43 @@ func mcpEndpointURL() string { // if running locally on kind, do "kubectl port-forward -n kagent deployments/kagent-controller 8083" kagentURL = "http://localhost:8083" } - return kagentURL + "/api/mcp" + return kagentURL + "/mcp" } // setupMCPClient creates and initializes an MCP client for testing -func setupMCPClient(t *testing.T) *mcp_client.Client { +func setupMCPClient(t *testing.T) *mcp.ClientSession { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() url := mcpEndpointURL() - tsp, err := transport.NewStreamableHTTP(url) - require.NoError(t, err, "Failed to create transport") - client := mcp_client.NewClient(tsp) + transport := &mcp.StreamableClientTransport{ + Endpoint: url, + } - err = client.Start(ctx) - require.NoError(t, err, "Failed to start MCP client") + impl := &mcp.Implementation{ + Name: "e2e-test", + Version: "0.0.0", + } + client := mcp.NewClient(impl, nil) - t.Cleanup(func() { - client.Close() - }) + session, err := client.Connect(ctx, transport, nil) + require.NoError(t, err, "Failed to connect MCP client") - _, err = client.Initialize(ctx, mcp.InitializeRequest{ - Params: mcp.InitializeParams{ - ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, - Capabilities: mcp.ClientCapabilities{}, - ClientInfo: mcp.Implementation{ - Name: "e2e-test", - Version: "0.0.0", - }, - }, + t.Cleanup(func() { + session.Close() }) - require.NoError(t, err, "Failed to initialize MCP client") - return client + return session } // TestE2EMCPEndpointListAgents tests the list_agents tool via the controller's MCP endpoint // These tests use the kebab-agent deployed via push-test-agent in CI. func TestE2EMCPEndpointListAgents(t *testing.T) { ctx := context.Background() - client := setupMCPClient(t) + session := setupMCPClient(t) // List tools - toolsResult, err := client.ListTools(ctx, mcp.ListToolsRequest{}) + toolsResult, err := session.ListTools(ctx, &mcp.ListToolsParams{}) require.NoError(t, err, "Should list tools") // Verify expected tools exist @@ -75,10 +67,8 @@ func TestE2EMCPEndpointListAgents(t *testing.T) { require.Contains(t, toolNames, "invoke_agent", "Should have invoke_agent tool") // Call list_agents tool - listAgentsResult, err := client.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolParams{ - Name: "list_agents", - }, + listAgentsResult, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "list_agents", }) require.NoError(t, err, "Should call list_agents tool") require.NotEmpty(t, listAgentsResult.Content, "Should have content in response") @@ -125,17 +115,15 @@ func TestE2EMCPEndpointListAgents(t *testing.T) { // TestE2EMCPEndpointInvokeAgent tests the invoke_agent tool via the controller's MCP endpoint func TestE2EMCPEndpointInvokeAgent(t *testing.T) { ctx := context.Background() - client := setupMCPClient(t) + session := setupMCPClient(t) // Invoke kebab-agent agentRef := "kagent/kebab-agent" - invokeResult, err := client.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolParams{ - Name: "invoke_agent", - Arguments: map[string]interface{}{ - "agent": agentRef, - "task": "What can you do?", - }, + invokeResult, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]interface{}{ + "agent": agentRef, + "task": "What can you do?", }, }) require.NoError(t, err, "Should call invoke_agent tool") @@ -175,27 +163,24 @@ func TestE2EMCPEndpointInvokeAgent(t *testing.T) { // TestE2EMCPEndpointErrorHandling tests error handling in the MCP endpoint func TestE2EMCPEndpointErrorHandling(t *testing.T) { ctx := context.Background() - client := setupMCPClient(t) + session := setupMCPClient(t) // Try to invoke a non-existent agent - result, err := client.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolParams{ - Name: "invoke_agent", - Arguments: map[string]interface{}{ - "agent": "nonexistent/agent", - "task": "test", - }, + result, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]interface{}{ + "agent": "nonexistent/agent", + "task": "test", }, }) + require.NoError(t, err, "CallTool should not return protocol error") require.True(t, result.IsError, "Should return error") // This content is the error text for the LLM to know what went wrong require.NotEmpty(t, result.Content, "Should have error content") // Try to call a non-existent tool - _, err = client.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolParams{ - Name: "nonexistent_tool", - }, + _, err = session.CallTool(ctx, &mcp.CallToolParams{ + Name: "nonexistent_tool", }) // Should return an error require.Error(t, err, "Should return error for non-existent tool") From 9909ba495808c7abd99e285a01ef7bd734f2ebf8 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Fri, 16 Jan 2026 17:42:17 -0500 Subject: [PATCH 3/5] fix go lint Signed-off-by: Jet Chiang --- go/test/e2e/invoke_mcp_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/e2e/invoke_mcp_test.go b/go/test/e2e/invoke_mcp_test.go index ffa49e33b..3bc5e2ddf 100644 --- a/go/test/e2e/invoke_mcp_test.go +++ b/go/test/e2e/invoke_mcp_test.go @@ -121,7 +121,7 @@ func TestE2EMCPEndpointInvokeAgent(t *testing.T) { agentRef := "kagent/kebab-agent" invokeResult, err := session.CallTool(ctx, &mcp.CallToolParams{ Name: "invoke_agent", - Arguments: map[string]interface{}{ + Arguments: map[string]any{ "agent": agentRef, "task": "What can you do?", }, @@ -168,7 +168,7 @@ func TestE2EMCPEndpointErrorHandling(t *testing.T) { // Try to invoke a non-existent agent result, err := session.CallTool(ctx, &mcp.CallToolParams{ Name: "invoke_agent", - Arguments: map[string]interface{}{ + Arguments: map[string]any{ "agent": "nonexistent/agent", "task": "test", }, From e1ab8696e775c7943f4a9ee8a0d265d844c3e6aa Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Fri, 16 Jan 2026 18:41:18 -0500 Subject: [PATCH 4/5] Update tool description Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Jet Chiang --- go/internal/mcp/mcp_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/mcp/mcp_handler.go b/go/internal/mcp/mcp_handler.go index e14bd6744..eb98d4280 100644 --- a/go/internal/mcp/mcp_handler.go +++ b/go/internal/mcp/mcp_handler.go @@ -44,7 +44,7 @@ type AgentSummary struct { } type InvokeAgentInput struct { - Agent string `json:"agent" jsonschema:"Agent name (or namespace/name)"` + Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name"` Task string `json:"task" jsonschema:"Task to run"` ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` } From b11d3846143da39371e63205b0f4c739fecbcc60 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Fri, 16 Jan 2026 18:42:12 -0500 Subject: [PATCH 5/5] Update go/internal/controller/reconciler/reconciler.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Jet Chiang --- go/internal/controller/reconciler/reconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/controller/reconciler/reconciler.go b/go/internal/controller/reconciler/reconciler.go index 04d8aacf7..df26103bb 100644 --- a/go/internal/controller/reconciler/reconciler.go +++ b/go/internal/controller/reconciler/reconciler.go @@ -690,7 +690,7 @@ func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.R // so we need to create a custom HTTP client that adds headers to all requests. func newHTTPClient(headers map[string]string) *http.Client { if len(headers) == 0 { - return nil + return http.DefaultClient } return &http.Client{ Transport: &headerTransport{