Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/mcp/src/keycardai/mcp/server/handlers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def wrapper(request: Request) -> Response:

request_metadata.resource = _create_resource_url(base_url, path)
request_metadata.jwks_uri = _create_jwks_uri(base_url)
# Resource URL serves as client_id for private_key_jwt auth (each resource is its own OAuth client)
request_metadata.client_id = str(request_metadata.resource)
request_metadata.client_name = "MCP Server"
request_metadata.token_endpoint_auth_method = TokenEndpointAuthMethod.PRIVATE_KEY_JWT
Expand Down
10 changes: 7 additions & 3 deletions packages/mcp/src/keycardai/mcp/server/routers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,18 @@ def well_known_metadata_routes(
issuer: The OAuth issuer URL used for authorization server metadata.
enable_multi_zone: Whether to enable multi-zone support for metadata endpoints.
jwks: Optional JSON Web Key Set to expose. If provided, adds a JWKS route.
resource: Optional resource path prefix (currently unused in route creation).
resource: Optional resource path suffix for dynamic path matching (e.g., "{resource_path:path}").

Returns:
A list of Starlette Route objects for the well-known endpoints.
"""
# Build route paths with optional resource suffix for dynamic matching
protected_resource_path = f"/oauth-protected-resource{resource}" if resource else "/oauth-protected-resource"
auth_server_path = f"/oauth-authorization-server{resource}" if resource else "/oauth-authorization-server"

routes = [
well_known_protected_resource_route(issuer, enable_multi_zone),
well_known_authorization_server_route(issuer, enable_multi_zone),
well_known_protected_resource_route(issuer, enable_multi_zone, resource=protected_resource_path),
well_known_authorization_server_route(issuer, enable_multi_zone, resource=auth_server_path),
]

if jwks:
Expand Down
95 changes: 95 additions & 0 deletions packages/mcp/tests/integration/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,101 @@ def test_protected_resource_has_correct_issuer(self, issuer, client):
assert f"{issuer}/" in data["authorization_servers"]


class TestDynamicResourcePaths:
"""Integration tests for dynamic resource path matching.

These tests verify that the metadata endpoints correctly handle
requests with resource path suffixes (e.g., /oauth-protected-resource/mcp).
"""

@pytest.fixture
def issuer(self):
return "https://auth.localdev.keycard.sh"

@pytest.fixture
def app(self, issuer):
"""Create app using auth_metadata_mount which should support dynamic paths."""
mount = auth_metadata_mount(issuer=issuer)
return Starlette(routes=[mount])

@pytest.fixture
def client(self, app):
return TestClient(app)

def test_protected_resource_with_path_suffix_returns_200(self, client):
"""Test that protected resource endpoint matches requests with path suffix."""
response = client.get("/.well-known/oauth-protected-resource/mcp")
assert response.status_code == 200, (
"Expected 200 for dynamic path /mcp, got 404. "
"Resource path parameter is not being propagated to routes."
)

def test_protected_resource_with_nested_path_suffix_returns_200(self, client):
"""Test that protected resource endpoint matches nested path suffixes."""
response = client.get("/.well-known/oauth-protected-resource/api/v1/service")
assert response.status_code == 200, (
"Expected 200 for nested path, got 404. "
"Resource path parameter should match nested paths."
)

def test_protected_resource_path_suffix_included_in_resource_field(self, client):
"""Test that the path suffix is included in the resource field of response."""
response = client.get("/.well-known/oauth-protected-resource/mcp")
data = response.json()

assert "resource" in data
assert "/mcp" in data["resource"], (
f"Expected '/mcp' in resource field, got '{data['resource']}'. "
"The path suffix should be reflected in the resource URL."
)

def test_protected_resource_nested_path_included_in_resource_field(self, client):
"""Test that nested path suffix is included in the resource field."""
response = client.get("/.well-known/oauth-protected-resource/api/v1/service")
data = response.json()

assert "resource" in data
assert "/api/v1/service" in data["resource"], (
f"Expected '/api/v1/service' in resource field, got '{data['resource']}'. "
"The full path suffix should be reflected in the resource URL."
)

def test_protected_resource_client_id_matches_resource(self, client):
"""Test that client_id matches the resource URL including path suffix."""
response = client.get("/.well-known/oauth-protected-resource/my-service")
data = response.json()

assert data["client_id"] == data["resource"], (
"client_id should equal resource URL including the path suffix."
)

def test_base_path_still_works(self, client):
"""Test that the base path without suffix still works."""
response = client.get("/.well-known/oauth-protected-resource")
assert response.status_code == 200

def test_authorization_server_with_path_suffix_returns_200(self, client):
"""Test that authorization server endpoint matches requests with path suffix."""
# Note: This will try to fetch from upstream, so we need to mock
with patch("httpx.Client") as mock_client_class:
mock_response = Mock()
mock_response.json.return_value = {
"issuer": "https://auth.localdev.keycard.sh",
"authorization_endpoint": "https://auth.localdev.keycard.sh/oauth/authorize",
"token_endpoint": "https://auth.localdev.keycard.sh/oauth/token",
}
mock_response.raise_for_status.return_value = None

mock_client = Mock()
mock_client.get.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client

response = client.get("/.well-known/oauth-authorization-server/zone123")
assert response.status_code == 200, (
"Expected 200 for authorization server with path suffix."
)


class TestMultiZone:
"""Integration tests for multi-zone functionality."""

Expand Down
68 changes: 68 additions & 0 deletions packages/mcp/tests/keycardai/mcp/server/routers/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,74 @@ def test_with_populated_jwks(self):
assert isinstance(result, Route)


class TestResourceParameterPropagation:
"""Tests to verify resource parameter is correctly propagated through the routing chain."""

def test_well_known_metadata_routes_passes_resource_to_protected_resource_route(self):
"""Test that well_known_metadata_routes passes resource parameter to route paths."""
issuer = "https://auth.example.com"
resource = "{resource_path:path}"

routes = well_known_metadata_routes(issuer, resource=resource)

# Find the protected resource route
protected_route = next(r for r in routes if r.name == "oauth-protected-resource")
assert resource in protected_route.path, (
f"Expected '{resource}' in route path, got '{protected_route.path}'. "
"Resource parameter is not being propagated to route creation."
)

def test_well_known_metadata_routes_passes_resource_to_auth_server_route(self):
"""Test that well_known_metadata_routes passes resource parameter to auth server route."""
issuer = "https://auth.example.com"
resource = "{resource_path:path}"

routes = well_known_metadata_routes(issuer, resource=resource)

# Find the authorization server route
auth_route = next(r for r in routes if r.name == "oauth-authorization-server")
assert resource in auth_route.path, (
f"Expected '{resource}' in route path, got '{auth_route.path}'. "
"Resource parameter is not being propagated to route creation."
)

def test_auth_metadata_mount_creates_dynamic_path_routes(self):
"""Test that auth_metadata_mount creates routes with dynamic path matching."""
issuer = "https://auth.example.com"

mount = auth_metadata_mount(issuer)

# Find the protected resource route within the mount
protected_route = next(
r for r in mount.routes
if hasattr(r, "name") and r.name == "oauth-protected-resource"
)

# The route should include the path parameter for dynamic matching
assert "{resource_path:path}" in protected_route.path, (
f"Expected dynamic path parameter in route, got '{protected_route.path}'. "
"auth_metadata_mount should create routes that match dynamic resource paths."
)

def test_well_known_metadata_mount_propagates_resource_parameter(self):
"""Test that well_known_metadata_mount propagates resource to child routes."""
issuer = "https://auth.example.com"
resource = "{custom_path:path}"

mount = well_known_metadata_mount(issuer, path="/.well-known", resource=resource)

# Verify the resource parameter appears in route paths
protected_route = next(
r for r in mount.routes
if hasattr(r, "name") and r.name == "oauth-protected-resource"
)

assert resource in protected_route.path, (
f"Expected '{resource}' in route path, got '{protected_route.path}'. "
"well_known_metadata_mount should propagate resource parameter to routes."
)


class TestEdgeCases:
"""Test edge cases and parameter combinations."""

Expand Down