aiagent.
aiagent71 min read

Mcp Tool Oauth Www Authenticate Meta

Anyone who has shipped a Model Context Protocol server past the prototype stage has hit the same wall: the tool works perfectly when you call it with no auth, then the moment you try to put a real identity in front of it the spec gets vague. You bolt on a bearer check, return a 401, and watch every client in your fleet fail silently because nothing tells them where to get a token, what audience to request, or which scopes the tool actually needs. The MCP authorization spec leans on RFC 9728 protected-resource metadata and RFC 6750 WWW-Authenticate semantics to fix exactly this, but the wiring between your tool registry, your authorization server, and the challenge header is where most implementations quietly drift out of compliance.

This walkthrough builds that wiring end to end in Python with FastAPI and the official mcp SDK, using python-jose for JWT handling and httpx for the client harness. You will end up with a runnable repo where a minimal MCP server publishes a /.well-known/oauth-protected-resource document, emits a fully populated WWW-Authenticate: Bearer resource_metadata=... challenge, validates JWTs against a local authorization server stub, enforces per-tool scope requirements through the tool registry, and ships with rate limiting, token caching, structured audit logs, and a CI smoke test. Each step lands as its own commit so you can read the diff rather than reverse-engineer a finished codebase.

This is written for backend engineers who already understand OAuth 2.1 at the conceptual level but have not yet implemented resource-server metadata or threaded scope checks into a tool dispatcher. By the end you will be able to take any existing MCP server, retrofit standards-compliant authorization metadata, and give downstream clients enough information to discover, authenticate, and call your tools without out-of-band documentation.

Step 1: Standing Up an Unauthenticated MCP Baseline

Before we can talk about OAuth metadata or WWW-Authenticate challenges, we need an MCP server worth protecting. This step builds the smallest viable Model Context Protocol surface — a Starlette app exposing one JSON-RPC endpoint that speaks initialize, tools/list, and tools/call — and intentionally leaves it wide open.

The reason we start here is selfish: every later step in the series will introduce a failure mode (a 401, a missing scope, a bad audience). Without a known-good baseline, we will not be able to tell whether a broken test reveals a real auth bug or just a regression in the underlying transport. So this step also writes a regression test that asserts the server has no auth — when that test starts failing in step 2, that is the signal the guard landed correctly.

Setup

We create three files inside codebase/ plus a single pyproject.toml for dependency management:

  • src/mcp_server/__init__.py — package entry point re-exporting create_app and the tool registry.
  • src/mcp_server/tools.py — the ToolSpec dataclass and a one-tool REGISTRY containing echo.
  • src/mcp_server/server.py — the Starlette app, JSON-RPC envelope handling, and method dispatch.
  • tests/test_server.py — pytest suite that exercises every dispatch path without auth.

Dependencies are minimal: starlette>=0.37 at runtime, and pytest + httpx + anyio for the test client. We deliberately avoid pulling in fastapi, pydantic, or a full MCP SDK at this stage so that nothing magical is hiding behind a decorator when we later need to insert authentication middleware.

[project]
name = "mcp-oauth-resource"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = ["starlette>=0.37"]

[project.optional-dependencies]
dev = ["pytest>=8.0", "httpx>=0.27", "anyio>=4.0"]

[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
addopts = "-ra -q"

Implementation

The tool layer is a single frozen dataclass and a registry dict. Keeping the registry as a plain dict[str, ToolSpec] — rather than a class with register() methods — means later steps can attach scope metadata without refactoring the dispatch path. The _echo handler raises ToolError on empty input so we can verify that handler-level validation surfaces as a JSON-RPC -32602 rather than a 500.

@dataclass(frozen=True)
class ToolSpec:
    name: str
    description: str
    input_schema: Mapping[str, Any]
    handler: Handler

    def descriptor(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": dict(self.input_schema),
        }


ECHO_TOOL = ToolSpec(
    name="echo",
    description="Return the provided message unchanged.",
    input_schema={
        "type": "object",
        "properties": {"message": {"type": "string"}},
        "required": ["message"],
        "additionalProperties": False,
    },
    handler=_echo,
)

REGISTRY: dict[str, ToolSpec] = {ECHO_TOOL.name: ECHO_TOOL}

The server is a single Starlette Route at POST /mcp plus a GET /healthz probe. The request handler is split into three small functions — _decode_body, _validate_envelope, and _dispatch — each returning either a value or a structured error tuple. That shape matters because it keeps every function under the two-level nesting limit from codebase/CLAUDE.md, and it gives us obvious seams where the future bearer-token guard will slot in.

async def _handle_rpc(request: Request) -> Response:
    body = await request.body()
    decoded, decode_err = _decode_body(body)
    if decode_err is not None:
        return JSONResponse(
            _jsonrpc_error(None, PARSE_ERROR, decode_err), status_code=400
        )

    envelope = _validate_envelope(decoded)
    if isinstance(envelope, str):
        return JSONResponse(
            _jsonrpc_error(decoded.get("id"), INVALID_REQUEST, envelope),
            status_code=400,
        )

    method, request_id, params = envelope
    return _run_method(method, request_id, params)

Dispatch itself is a flat if chain over the three method names. The MCP spec defines many more methods, but the only ones needed to prove the path works are initialize (handshake), tools/list (discovery), and tools/call (invocation). Anything else returns -32601 method not found — and a test below pins that contract.

def _dispatch(method, params, registry):
    if method == "initialize":
        return _initialize_result(), None
    if method == "tools/list":
        return _tools_list_result(registry), None
    if method == "tools/call":
        spec, err = _resolve_tool(registry, params)
        if err is not None:
            return None, (INVALID_PARAMS, err)
        return _invoke_tool(spec, params), None
    return None, (METHOD_NOT_FOUND, f"method not found: {method}")

The test suite is where this step earns its keep. We assert each method works end-to-end, that malformed JSON yields -32700, that unknown tools yield -32602, and — critically — that no request requires an Authorization header. That last test is the invariant we are about to flip:

def test_no_authentication_required_for_any_method(client):
    for method, params, req_id in [
        ("initialize", None, 10),
        ("tools/list", None, 11),
        ("tools/call", {"name": "echo", "arguments": {"message": "ok"}}, 12),
    ]:
        response = _rpc(client, method, params=params, request_id=req_id)
        assert response.status_code == 200, method
        assert "error" not in response.json(), method

Verification

Run the suite with pytest. All nine tests must pass against the unauthenticated baseline:

python -m pytest -v
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 9 items

tests/test_server.py::test_healthz_returns_ok PASSED                     [ 11%]
tests/test_server.py::test_initialize_returns_protocol_version_and_capabilities PASSED [ 22%]
tests/test_server.py::test_tools_list_advertises_echo PASSED             [ 33%]
tests/test_server.py::test_tools_call_echo_returns_message_content PASSED [ 44%]
tests/test_server.py::test_unknown_method_returns_jsonrpc_error PASSED   [ 55%]
tests/test_server.py::test_unknown_tool_returns_invalid_params PASSED    [ 66%]
tests/test_server.py::test_tool_handler_input_validation_surfaces_as_invalid_params PASSED [ 77%]
tests/test_server.py::test_malformed_json_returns_parse_error PASSED     [ 88%]
tests/test_server.py::test_no_authentication_required_for_any_method PASSED [100%]

========================= 9 passed in 0.12s ===============================

What we built

A working MCP server in roughly 200 lines of Python, exposing one echo tool over JSON-RPC at POST /mcp with a side GET /healthz probe for orchestration. The server speaks just enough of the protocol — initialize, tools/list, tools/call — to be usable by a real MCP client, and it surfaces well-typed JSON-RPC errors for malformed envelopes and bad tool calls.

We also pinned three invariants in the test suite that we will lean on throughout the rest of the series. First, the dispatch envelope handles parse errors, invalid requests, unknown methods, and bad arguments with the spec-mandated error codes (-32700, -32600, -32601, -32602). Second, the echo tool round-trips an arbitrary string through tools/call and returns the canonical MCP content array. Third — and this is the invariant we are about to violate on purpose — every method works with no Authorization header.

The code is intentionally split into small functions: _decode_body, _validate_envelope, _dispatch, _run_method. None of them know anything about authentication, which is exactly the point. In step 2 we will introduce a single middleware-like guard that runs before _handle_rpc ever calls _decode_body, and the existing dispatch code will not need to change.

What this unlocks: a baseline against which every future change can be measured. When step 2 introduces a bearer-token guard, the test_no_authentication_required_for_any_method test must fail loudly — and the fix is to replace it with the new contract, not to delete it.

Repository

The state of the code after this step: e671aca

Step 2: Bolting On a Bearer Guard with a Bare WWW-Authenticate Challenge

Step 1 left us with an open MCP server: any client can hit POST /mcp, run tools/call, and walk away with an echo response without ever showing credentials. That was the point — a known-good baseline against which every later change can be measured. In this step we flip the central invariant: the /mcp endpoint becomes a protected resource, and every request without a structurally valid Authorization: Bearer ... header is rejected with HTTP 401 plus a bare WWW-Authenticate: Bearer challenge.

The bare challenge is the whole reason this article exists. A 401 with nothing but the word Bearer in the challenge header tells the client exactly two things: yes, you need a token, and no, I will not tell you where to get one. That dead-end is precisely the failure mode the MCP specification's protected-resource metadata flow is designed to repair in step 3 — but to feel that repair, we first need to reproduce the unhelpful baseline and pin it with tests.

Setup

Two source files change and one is added, all under codebase/src/mcp_server/ and codebase/tests/:

  • New: src/mcp_server/auth.py — the BearerGuardMiddleware ASGI class, an EMPTY_BEARER_CHALLENGE constant, and the small _extract_bearer_token / _unauthorized_response helpers.
  • Edit: src/mcp_server/server.py — declare a PROTECTED_PATHS tuple, register BearerGuardMiddleware in create_app, and otherwise leave the JSON-RPC dispatch path untouched. The guard runs before _handle_rpc, so none of the step-1 functions need to know authentication exists.
  • Edit: src/mcp_server/__init__.py — re-export BearerGuardMiddleware and EMPTY_BEARER_CHALLENGE so tests and downstream callers do not have to reach into submodules.
  • Edit: tests/test_server.py — keep the eight step-1 behavioral tests (now requiring a valid bearer header), retire test_no_authentication_required_for_any_method, and add six new tests that pin the 401 contract from every angle.

No new runtime dependencies are added. The middleware is plain ASGI (Scope, Receive, Send) plus two Starlette helpers we already pulled in (Request, JSONResponse). Keeping the dependency footprint flat means the guard is auditable in a single screen of code — important when we layer the populated WWW-Authenticate header in step 3.

Implementation

The guard is a single ASGI middleware class registered on the Starlette app. The fast path is the two-line short circuit at the top of __call__: anything that is not an HTTP request or not pointed at a path inside PROTECTED_PATHS is forwarded untouched. That keeps the /healthz liveness probe and any future static routes outside the auth surface without special-casing them later.

class BearerGuardMiddleware:
    def __init__(self, app: ASGIApp, protected_paths: tuple[str, ...]) -> None:
        self.app = app
        self.protected_paths = protected_paths

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        if scope["path"] not in self.protected_paths:
            await self.app(scope, receive, send)
            return
        await self._guard(scope, receive, send)

Splitting __call__ and _guard is not cosmetic — it is the only way to keep the function under the two-level nesting cap from the codebase rules. _guard does the actual work: build a Starlette Request, pull the credential, and either forward the call or short-circuit with the canonical 401 response.

async def _guard(self, scope: Scope, receive: Receive, send: Send) -> None:
    request = Request(scope, receive=receive)
    token = _extract_bearer_token(request.headers.get("authorization"))
    if token is None:
        response = _unauthorized_response("missing or invalid bearer credential")
        await response(scope, receive, send)
        return
    await self.app(scope, receive, send)

_extract_bearer_token is the structural validator — it normalizes the scheme case, strips whitespace after the Bearer prefix, and returns None for any header that is missing, uses the wrong scheme, or carries an empty token. We deliberately do not look at the token contents: a real resource server would resolve it against an introspection endpoint or verify a JWT signature, but at this step we only need a stable surface for the tests to lock down.

BEARER_PREFIX = "bearer "
EMPTY_BEARER_CHALLENGE = "Bearer"


def _extract_bearer_token(header_value: str | None) -> str | None:
    if not header_value:
        return None
    if not header_value.lower().startswith(BEARER_PREFIX):
        return None
    token = header_value[len(BEARER_PREFIX):].strip()
    return token or None

The 401 response is shaped to interoperate with both HTTP and JSON-RPC clients. The status code and the bare WWW-Authenticate: Bearer header satisfy the HTTP contract; the body is still a JSON-RPC error envelope with code -32001, so a client that already knows how to read JSON-RPC errors does not have to grow a second parser just for unauthorized responses.

def _unauthorized_response(message: str) -> Response:
    body = {
        "jsonrpc": "2.0",
        "id": None,
        "error": {"code": -32001, "message": message},
    }
    response = JSONResponse(body, status_code=401)
    response.headers["WWW-Authenticate"] = EMPTY_BEARER_CHALLENGE
    return response

Wiring the middleware in is two lines inside create_app. PROTECTED_PATHS is a module-level tuple so a step-4 reader can grep one constant to see exactly which URLs sit behind the guard, instead of chasing decorator stacks.

PROTECTED_PATHS: tuple[str, ...] = ("/mcp",)

def create_app() -> Starlette:
    routes = [
        Route("/mcp", endpoint=_handle_rpc, methods=["POST"]),
        Route("/healthz", endpoint=_health, methods=["GET"]),
    ]
    middleware = [
        Middleware(BearerGuardMiddleware, protected_paths=PROTECTED_PATHS),
    ]
    return Starlette(routes=routes, middleware=middleware)

The test suite is where the contract is nailed down. Four new tests pin the 401 shape from every angle a real client might trip over: a missing header, a Bearer scheme with only whitespace, a Basic credential, and the JSON-RPC envelope check on the unauthorized body. A fifth test re-asserts that /healthz stays open — orchestration probes must never be answered with a 401. The step-1 happy-path tests all still run, but _rpc now defaults to attaching a valid Authorization: Bearer demo-token header so the dispatch path is exercised exactly the same as before.

def test_missing_authorization_header_returns_401_with_empty_www_authenticate(
    client: TestClient,
) -> None:
    response = client.post(
        "/mcp",
        json={"jsonrpc": "2.0", "id": 1, "method": "initialize"},
    )
    assert response.status_code == 401
    assert response.headers["WWW-Authenticate"] == EMPTY_BEARER_CHALLENGE
    assert "," not in response.headers["WWW-Authenticate"]
    assert "=" not in response.headers["WWW-Authenticate"]

The two extra assertions — no comma, no equals — are not paranoia. They are the contract for emptiness: a populated challenge in step 3 will look like Bearer resource_metadata="https://...", and this test would fail loudly the moment that string starts leaking back into step 2. The contrast is what makes the next step meaningful.

Verification

Run the suite from codebase/. All fourteen tests must pass — the eight inherited from step 1 (now sending a bearer token) plus the six new ones that pin the 401 contract.

python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 14 items

tests/test_server.py::test_healthz_returns_ok_without_auth PASSED        [  7%]
tests/test_server.py::test_initialize_returns_protocol_version_and_capabilities PASSED [ 14%]
tests/test_server.py::test_tools_list_advertises_echo PASSED             [ 21%]
tests/test_server.py::test_tools_call_echo_returns_message_content PASSED [ 28%]
tests/test_server.py::test_unknown_method_returns_jsonrpc_error PASSED   [ 35%]
tests/test_server.py::test_unknown_tool_returns_invalid_params PASSED    [ 42%]
tests/test_server.py::test_tool_handler_input_validation_surfaces_as_invalid_params PASSED [ 50%]
tests/test_server.py::test_malformed_json_returns_parse_error PASSED     [ 57%]
tests/test_server.py::test_missing_authorization_header_returns_401_with_empty_www_authenticate PASSED [ 64%]
tests/test_server.py::test_empty_bearer_token_returns_401_with_empty_www_authenticate PASSED [ 71%]
tests/test_server.py::test_non_bearer_scheme_returns_401_with_empty_www_authenticate PASSED [ 78%]
tests/test_server.py::test_unauthorized_response_body_is_jsonrpc_shaped PASSED [ 85%]
tests/test_server.py::test_bearer_token_passes_through_to_handler PASSED [ 92%]
tests/test_server.py::test_healthz_is_not_protected_by_the_bearer_guard PASSED [100%]

======================== 14 passed in 0.14s ==================================

What we built

A single ASGI middleware, roughly thirty lines of Python, that turns the MCP server into a proper OAuth 2.0 protected resource — at least at the HTTP layer. Any call to /mcp without a structurally valid bearer credential is rejected with HTTP 401, the WWW-Authenticate: Bearer header, and a JSON-RPC -32001 error body. The /healthz probe stays open, the existing JSON-RPC dispatch code is unchanged, and the test suite grew from nine tests to fourteen.

The invariant we just installed has three observable parts. First, the status code is 401, not 403 and not 200 with an error payload — anything else and an OAuth-aware client will not start the metadata discovery dance. Second, the challenge header is exactly the string Bearer with no parameters; the test explicitly forbids commas and equals signs so step 3's populated header cannot regress back. Third, the failing response body is still a JSON-RPC envelope, so clients that already speak the protocol can surface the rejection through their existing error path.

Step 2 also gives the codebase its first real seam. _handle_rpc does not know authentication exists; BearerGuardMiddleware does not know JSON-RPC exists. When step 3 needs to populate the WWW-Authenticate header with a resource_metadata URL, the change lives entirely inside _unauthorized_response and a new constant — no JSON-RPC code is touched. The blast radius for each upcoming change stays small because the guard is bolted on, not woven through.

What this unlocks: the unhelpful 401 is now a contract, not an accident. A reader who arrives in step 3 can see the gap between what we currently emit and what RFC 9728 says we should emit, and the test suite will refuse to merge any fix that does not flip the assertions in test_missing_authorization_header_returns_401_with_empty_www_authenticate. That is the regression-proofing the rest of the series leans on.

Repository

The state of the code after this step: fbf5461

Step 3: Publishing the RFC 9728 Protected Resource Metadata at a Well-Known URL

Step 2 left us with a dead-end 401: a client hitting /mcp without credentials gets WWW-Authenticate: Bearer and no hint about where to acquire a token. RFC 9728 — OAuth 2.0 Protected Resource Metadata — defines the missing half of that conversation: a JSON document hosted at /.well-known/oauth-protected-resource that names the resource, lists its trusted authorization servers, and enumerates the scopes and bearer presentation methods it accepts.

In this step we build and serve that document. We deliberately do not yet wire its URL into the WWW-Authenticate header — that join happens in step 4. The point here is to publish a stable, configurable, public discovery surface that the next step can simply reference. By the end of this step a curl against the well-known path returns a spec-shaped JSON document, sixteen new tests pin its structure, and the /mcp 401 contract from step 2 still passes byte-for-byte.

Setup

One new source file and one new test file land under codebase/, with two small edits to existing modules:

  • New: src/mcp_server/metadata.py — the WELL_KNOWN_PATH constant, environment-variable knobs (MCP_RESOURCE, MCP_AUTHORIZATION_SERVER, MCP_SCOPES_SUPPORTED, ...), safe vytharion.example defaults, and a build_metadata() function that returns a fresh document per call.
  • Edit: src/mcp_server/server.py — import WELL_KNOWN_PATH and build_metadata, add an unauthenticated GET route for _protected_resource_metadata with a Cache-Control header, and leave the bearer guard's PROTECTED_PATHS tuple untouched.
  • Edit: src/mcp_server/__init__.py — re-export WELL_KNOWN_PATH and build_metadata alongside the step-2 surface so test modules stay shallow.
  • New: tests/test_metadata.py — sixteen tests covering the spec shape, transport properties, environment overrides, privacy, and parity between build_metadata() and the served response.

No new runtime dependencies. The document is a plain dict[str, Any] serialised through Starlette's existing JSONResponse, and the configuration layer is os.environ.get with whitespace-trimmed defaults — nothing pydantic, nothing dynaconf. Keeping the surface boring means step 4 can layer a populated WWW-Authenticate header on top without dragging a config framework into the auth path.

Implementation

The metadata module starts with constants: the well-known path, the default field values, and the names of the environment variables that override them. Defaults use the IANA-reserved .example TLD and the public vytharion brand namespace, so nothing here can leak an operator-private hostname even if a deployer forgets to set the env vars.

WELL_KNOWN_PATH = "/.well-known/oauth-protected-resource"

DEFAULT_RESOURCE = "https://mcp.vytharion.example/mcp"
DEFAULT_AUTHORIZATION_SERVER = "https://auth.vytharion.example"
DEFAULT_SCOPES: tuple[str, ...] = ("mcp:tools.read", "mcp:tools.invoke")
DEFAULT_BEARER_METHODS: tuple[str, ...] = ("header",)
DEFAULT_SIGNING_ALGS: tuple[str, ...] = ("RS256", "ES256")

Two small helpers do the environment reading: _env_str for scalar values and _env_list for comma-separated tuples. Both treat an empty or whitespace-only value as "not set" and fall back to the default, which is what saves us from the classic "someone set MCP_RESOURCE= in the wrong shell" trap. A third helper, _filter_bearer_methods, drops any RFC 6750 method that isn't header, body, or query — that whitelist is what makes the override test below safely echo only the spec-valid entries.

def _env_list(name: str, default: tuple[str, ...]) -> tuple[str, ...]:
    raw = os.environ.get(name)
    if raw is None or not raw.strip():
        return default
    items = tuple(item.strip() for item in raw.split(",") if item.strip())
    return items or default


def _filter_bearer_methods(methods: tuple[str, ...]) -> tuple[str, ...]:
    filtered = tuple(m for m in methods if m in ALLOWED_BEARER_METHODS)
    return filtered or DEFAULT_BEARER_METHODS

build_metadata() itself is a flat composition. It rebuilds the document on every call so tests using monkeypatch.setenv see their overrides immediately — there is no module-level cache to invalidate. The shape matches RFC 9728 §2: resource, authorization_servers, scopes_supported, bearer_methods_supported, resource_signing_alg_values_supported, and two optional descriptive fields, resource_name and resource_documentation.

def build_metadata() -> dict[str, Any]:
    resource = _env_str(ENV_RESOURCE, DEFAULT_RESOURCE)
    authorization_server = _env_str(ENV_AUTHORIZATION_SERVER, DEFAULT_AUTHORIZATION_SERVER)
    scopes = _env_list(ENV_SCOPES, DEFAULT_SCOPES)
    bearer_methods = _filter_bearer_methods(_env_list(ENV_BEARER_METHODS, DEFAULT_BEARER_METHODS))
    signing_algs = _env_list(ENV_SIGNING_ALGS, DEFAULT_SIGNING_ALGS)
    resource_name = _env_str(ENV_RESOURCE_NAME, DEFAULT_RESOURCE_NAME)
    documentation = _env_str(ENV_RESOURCE_DOCUMENTATION, DEFAULT_RESOURCE_DOCUMENTATION)

    return {
        "resource": resource,
        "authorization_servers": [authorization_server],
        "scopes_supported": list(scopes),
        "bearer_methods_supported": list(bearer_methods),
        "resource_signing_alg_values_supported": list(signing_algs),
        "resource_name": resource_name,
        "resource_documentation": documentation,
    }

Wiring the route into the Starlette app is three lines. WELL_KNOWN_PATH stays outside PROTECTED_PATHS so the bearer guard from step 2 forwards the request untouched — exactly the short-circuit we designed for in step 2's __call__. The Cache-Control header is the one piece of advice RFC 9728 §3.1 hands to deployers: cache long enough to absorb a client retry storm, short enough that a configuration rotation propagates in minutes.

async def _protected_resource_metadata(_: Request) -> Response:
    response = JSONResponse(build_metadata())
    response.headers["Cache-Control"] = "public, max-age=300"
    return response


def create_app() -> Starlette:
    routes = [
        Route("/mcp", endpoint=_handle_rpc, methods=["POST"]),
        Route("/healthz", endpoint=_health, methods=["GET"]),
        Route(WELL_KNOWN_PATH, endpoint=_protected_resource_metadata, methods=["GET"]),
    ]
    middleware = [
        Middleware(BearerGuardMiddleware, protected_paths=PROTECTED_PATHS),
    ]
    return Starlette(routes=routes, middleware=middleware)

The test suite for this step pins three orthogonal contracts. The first set asserts the document shape: every required RFC 9728 key is present, resource is an absolute https:// URL, authorization_servers is a non-empty list of issuer URLs, scopes_supported includes the tool-invocation scope, and bearer_methods_supported only ever names header, body, or query. The second set asserts the transport contract: the endpoint is publicly accessible (no WWW-Authenticate header on the response), returns application/json, advertises Cache-Control: public, max-age=..., and rejects non-GET verbs with 405.

def test_metadata_endpoint_is_publicly_accessible(client: TestClient) -> None:
    response = client.get(WELL_KNOWN_PATH)
    assert response.status_code == 200
    assert "WWW-Authenticate" not in response.headers


def test_metadata_document_contains_required_fields(client: TestClient) -> None:
    document = client.get(WELL_KNOWN_PATH).json()
    missing = REQUIRED_TOP_LEVEL_KEYS - set(document)
    assert not missing, f"missing keys: {sorted(missing)}"

The third set pins the configuration contract — that overrides apply, that whitespace-only env vars fall back to defaults, that the bearer-methods whitelist drops bogus values like "nonsense", and that the served response matches build_metadata() byte-for-byte. The whitelist test is the one most likely to catch a sloppy future refactor: a contributor who switches to a permissive _env_list will see nonsense leak into the served document and break the assertion.

def test_build_metadata_honors_environment_overrides(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv(ENV_BEARER_METHODS, "header,body,nonsense")
    document = build_metadata()
    assert document["bearer_methods_supported"] == ["header", "body"]


def test_metadata_does_not_leak_operator_private_identifiers(client: TestClient) -> None:
    raw = client.get(WELL_KNOWN_PATH).text
    forbidden = ("/Users/", "/app/", "localhost", "127.0.0.1")
    for token in forbidden:
        assert token not in raw

The privacy test is cheap and ruthless. If anyone ever swaps a default to a real hostname or a filesystem-flavoured string, the suite fails before the document reaches a CDN cache.

Verification

Run the suite from codebase/. All thirty tests must pass — the fourteen inherited from steps 1 and 2 plus the sixteen new ones in test_metadata.py.

python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 30 items

tests/test_metadata.py::test_well_known_path_constant_matches_rfc_9728 PASSED [  3%]
tests/test_metadata.py::test_metadata_endpoint_is_publicly_accessible PASSED [  6%]
tests/test_metadata.py::test_metadata_endpoint_returns_json_content_type PASSED [ 10%]
tests/test_metadata.py::test_metadata_endpoint_is_cacheable PASSED       [ 13%]
tests/test_metadata.py::test_metadata_endpoint_rejects_non_get_methods PASSED [ 16%]
tests/test_metadata.py::test_metadata_document_contains_required_fields PASSED [ 20%]
tests/test_metadata.py::test_metadata_resource_is_an_absolute_https_url PASSED [ 23%]
tests/test_metadata.py::test_metadata_authorization_servers_is_non_empty_list_of_urls PASSED [ 26%]
tests/test_metadata.py::test_metadata_scopes_supported_lists_tool_invocation_scopes PASSED [ 30%]
tests/test_metadata.py::test_metadata_bearer_methods_supported_includes_header PASSED [ 33%]
tests/test_metadata.py::test_metadata_signing_algs_are_a_list_of_strings PASSED [ 36%]
tests/test_metadata.py::test_metadata_does_not_leak_operator_private_identifiers PASSED [ 40%]
tests/test_metadata.py::test_metadata_endpoint_does_not_require_authorization_header PASSED [ 43%]
tests/test_metadata.py::test_build_metadata_honors_environment_overrides PASSED [ 46%]
tests/test_metadata.py::test_build_metadata_falls_back_to_defaults_when_env_empty PASSED [ 50%]
tests/test_metadata.py::test_endpoint_response_matches_build_metadata PASSED [ 53%]
tests/test_server.py::test_healthz_returns_ok_without_auth PASSED        [ 56%]
tests/test_server.py::test_initialize_returns_protocol_version_and_capabilities PASSED [ 60%]
tests/test_server.py::test_tools_list_advertises_echo PASSED             [ 63%]
tests/test_server.py::test_tools_call_echo_returns_message_content PASSED [ 66%]
tests/test_server.py::test_unknown_method_returns_jsonrpc_error PASSED   [ 70%]
tests/test_server.py::test_unknown_tool_returns_invalid_params PASSED    [ 73%]
tests/test_server.py::test_tool_handler_input_validation_surfaces_as_invalid_params PASSED [ 76%]
tests/test_server.py::test_malformed_json_returns_parse_error PASSED     [ 80%]
tests/test_server.py::test_missing_authorization_header_returns_401_with_empty_www_authenticate PASSED [ 83%]
tests/test_server.py::test_empty_bearer_token_returns_401_with_empty_www_authenticate PASSED [ 86%]
tests/test_server.py::test_non_bearer_scheme_returns_401_with_empty_www_authenticate PASSED [ 90%]
tests/test_server.py::test_unauthorized_response_body_is_jsonrpc_shaped PASSED [ 93%]
tests/test_server.py::test_bearer_token_passes_through_to_handler PASSED [ 96%]
tests/test_server.py::test_healthz_is_not_protected_by_the_bearer_guard PASSED [100%]

======================== 30 passed in 0.20s ===================================

What we built

A single new module, mcp_server.metadata, plus one new route on the Starlette app, together publish a spec-shaped RFC 9728 discovery document at /.well-known/oauth-protected-resource. The document names the resource, the authorization servers that issue tokens for it, the scopes a client can request, the bearer presentation methods we accept, and the JWT signing algorithms we honour. Every field is overridable through an environment variable, but each override falls back safely to a vytharion.example placeholder.

The document is public by design — the endpoint sits outside PROTECTED_PATHS, returns no WWW-Authenticate header, and serves a Cache-Control: public, max-age=300 so a busy retry loop can be absorbed by an intermediate cache without hammering the origin. The bearer-guard contract from step 2 is unchanged: /mcp still rejects unauthenticated requests with 401 and the bare Bearer challenge, and every step-2 test still passes.

We also pinned three invariants that step 4 will lean on. First, build_metadata() is idempotent — calling it twice with the same environment yields the same document, so the populated WWW-Authenticate header can reference the metadata URL without worrying about drift. Second, the bearer-methods whitelist refuses anything outside header | body | query, so a typo in deployment config cannot ship an invalid spec value. Third, the privacy test forbids /Users/, /app/, localhost, and 127.0.0.1 from appearing in the served body — the document is publishable on the open internet by construction.

What this unlocks: step 4 can finally close the loop. Instead of WWW-Authenticate: Bearer, the 401 will emit WWW-Authenticate: Bearer resource_metadata="https://mcp.vytharion.example/.well-known/oauth-protected-resource". The destination already exists, already returns the right shape, and is already pinned by tests — so the change in step 4 is a one-line header update plus a regression flip on the step-2 emptiness assertions.

Repository

The state of the code after this step: 3b19c3b

Step 4: Closing the OAuth Discovery Loop with a Populated WWW-Authenticate Challenge

Step 2 stood up a bearer guard that rejected unauthenticated /mcp calls with HTTP 401 and a deliberately bare WWW-Authenticate: Bearer header. Step 3 published the RFC 9728 protected-resource metadata document at /.well-known/oauth-protected-resource but never told clients where to find it. Step 4 is the join: the 401 response now carries a fully-populated challenge that names the metadata URL, the resource audience, and the scope list a client must request, so a first-contact tool can self-bootstrap from a single failed call.

We also split the rejection into the two shapes RFC 6750 §3 calls for. A request with no Authorization header omits the error parameter entirely; a request that presented a malformed credential gets error="invalid_token" plus a human-readable error_description. Discovery hints stay in both cases, because either client needs the metadata URL to recover. Step 5 will validate token contents on top of this structure — Step 4 just gets the framing right.

Setup

One new source-level helper, one new test file, and a tightening of the step-2 server tests:

  • Edit: src/mcp_server/metadata.py — add a metadata_url() helper that derives the discovery URL from the published resource origin. The URL is built from the same source of truth as the served document, so a deployer overriding MCP_RESOURCE automatically retunes the challenge.
  • Edit: src/mcp_server/auth.py — introduce build_challenge(error=..., error_description=...), a _quote()/_format_param() pair for RFC 7235 quoted-string escaping, and rewrite _unauthorized_response() to populate the header. The middleware itself stays a thin dispatcher: missing header vs. malformed credential.
  • Edit: src/mcp_server/__init__.py — re-export metadata_url so the new test module can import it from the top-level package.
  • Edit: tests/test_server.py — rename the three "empty challenge" tests from step 2 to assert populated challenges instead, and verify the scheme prefix matches BEARER_SCHEME.
  • New: tests/test_challenge.py — sixteen tests that pin the challenge format: scheme, parameter presence, parameter order, quoting rules, environment-driven values, privacy, and parity between build_challenge() and the served WWW-Authenticate header.

No new runtime dependencies. The challenge is a single header string built from a list[str] of name="value" parameters joined with , — RFC 7235 §2.1 quoted-string syntax, nothing exotic. Keeping the assembly stdlib-only means the security-sensitive path is auditable end-to-end without chasing a templating library.

Implementation

The first move lives in metadata.py: a metadata_url() helper that returns the absolute URL of the discovery document. RFC 9728 §3 fixes the path at /.well-known/oauth-protected-resource under the resource's origin, so we derive scheme + host from the published resource and rebuild the URL. That keeps the discovery hint and the served document on the same configuration knob — change MCP_RESOURCE, both follow.

def metadata_url(document: dict[str, Any] | None = None) -> str:
    source = document if document is not None else build_metadata()
    parts = urlsplit(source["resource"])
    if not parts.scheme or not parts.netloc:
        raise ValueError(f"resource is not an absolute URL: {source['resource']!r}")
    return f"{parts.scheme}://{parts.netloc}{WELL_KNOWN_PATH}"

auth.py then gets the assembly logic. Three tiny helpers — _quote, _format_param, _join_scopes — keep build_challenge() flat. The quoted-string escape covers backslash first, then double-quote, in that order: reverse the order and you double-escape the backslash. It is the kind of one-line bug that survives review when the quoting helper is inlined, so it lives in its own named function with a single responsibility.

def _quote(value: str) -> str:
    return value.replace("\\", "\\\\").replace('"', '\\"')


def _format_param(name: str, value: str) -> str:
    return f'{name}="{_quote(value)}"'


def _join_scopes(scopes: Iterable[str]) -> str:
    return " ".join(s for s in scopes if s)

build_challenge() itself is a flat composition. error is emitted first when present, then error_description, then the three discovery hints in a stable order: resource_metadata, resource, scope. The order matters — RFC 6750 §3 lets implementations parse parameters in any order, but pinning the layout in a regression test gives us a stable header to byte-compare in the suite.

def build_challenge(
    error: str | None = None,
    error_description: str | None = None,
) -> str:
    document = build_metadata()
    params: list[str] = []
    if error:
        params.append(_format_param("error", error))
        if error_description:
            params.append(_format_param("error_description", error_description))
    params.append(_format_param("resource_metadata", metadata_url(document)))
    params.append(_format_param("resource", document["resource"]))
    scopes = _join_scopes(document.get("scopes_supported", ()))
    if scopes:
        params.append(_format_param("scope", scopes))
    return f"{BEARER_SCHEME} " + ", ".join(params)

The middleware itself only chooses which challenge shape to emit. A missing Authorization header calls _unauthorized_response() with no error, matching RFC 6750 §3's "client did not attempt to authenticate" branch. A header that fails the Bearer <token> shape check passes error=INVALID_TOKEN plus a fixed error_description, matching the §3.1 "client tried but presented something malformed" branch. The two branches sit at the same indentation level so the middleware stays at the 2-nesting depth limit set by the codebase rules.

async def _guard(self, scope: Scope, receive: Receive, send: Send) -> None:
    request = Request(scope, receive=receive)
    header_value = request.headers.get("authorization")
    if header_value is None:
        response = _unauthorized_response(NO_CREDENTIAL_MESSAGE)
        await response(scope, receive, send)
        return
    token = _extract_bearer_token(header_value)
    if token is None:
        response = _unauthorized_response(
            INVALID_BEARER_MESSAGE,
            error=INVALID_TOKEN,
            error_description=INVALID_BEARER_MESSAGE,
        )
        await response(scope, receive, send)
        return
    await self.app(scope, receive, send)

tests/test_challenge.py carries the new contracts. A small regex parser pulls the parameter map out of the header string, then each test asserts a single invariant — scheme prefix, presence of resource_metadata, parity with metadata_url(), error-branch ordering, environment override propagation, privacy. The two endpoint tests close the loop by hitting /mcp and comparing the served WWW-Authenticate header to build_challenge(...) byte-for-byte, so a future drift between the in-process helper and the wire format breaks the suite.

PARAM_PATTERN = re.compile(r'(\w+)="((?:\\"|[^"])*)"')


def _parse_challenge(challenge: str) -> tuple[str, dict[str, str]]:
    scheme, _, rest = challenge.partition(" ")
    params = {name: value.replace('\\"', '"') for name, value in PARAM_PATTERN.findall(rest)}
    return scheme, params


def test_challenge_reflects_environment_overrides_live(
    monkeypatch: pytest.MonkeyPatch, client: TestClient
) -> None:
    monkeypatch.setenv(ENV_RESOURCE, "https://mcp.override.example/mcp")
    monkeypatch.setenv(ENV_SCOPES, "mcp:tools.read, mcp:tools.invoke , mcp:admin")
    response = _provoke_no_creds(client)
    _, params = _parse_challenge(response.headers["WWW-Authenticate"])
    assert params["resource_metadata"] == (
        "https://mcp.override.example/.well-known/oauth-protected-resource"
    )
    assert params["scope"] == "mcp:tools.read mcp:tools.invoke mcp:admin"

The override test is the single load-bearing wiring test: if build_challenge() ever caches metadata_url() at import time, or metadata_url() ever drifts off build_metadata()["resource"], the override fails to propagate and the assertion catches it before the regression ships.

Verification

Run the suite from codebase/. All forty-six tests pass — the thirty inherited from steps 1 through 3 plus the sixteen new ones in test_challenge.py.

python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 46 items

tests/test_challenge.py::test_metadata_url_is_absolute_and_uses_well_known_path PASSED [  2%]
tests/test_challenge.py::test_metadata_url_origin_matches_resource_origin PASSED [  4%]
tests/test_challenge.py::test_build_challenge_starts_with_bearer_scheme PASSED [  6%]
tests/test_challenge.py::test_build_challenge_includes_resource_metadata_url PASSED [  8%]
tests/test_challenge.py::test_build_challenge_includes_resource_audience PASSED [ 10%]
tests/test_challenge.py::test_build_challenge_includes_space_separated_scopes PASSED [ 13%]
tests/test_challenge.py::test_build_challenge_omits_error_when_no_credentials_presented PASSED [ 15%]
tests/test_challenge.py::test_build_challenge_includes_error_when_token_is_invalid PASSED [ 17%]
tests/test_challenge.py::test_challenge_param_order_keeps_error_before_discovery_hints PASSED [ 19%]
tests/test_challenge.py::test_endpoint_no_creds_challenge_matches_build_challenge PASSED [ 21%]
tests/test_challenge.py::test_endpoint_invalid_creds_challenge_matches_build_challenge PASSED [ 23%]
tests/test_challenge.py::test_challenge_reflects_environment_overrides_live PASSED [ 26%]
tests/test_challenge.py::test_challenge_does_not_leak_operator_private_identifiers PASSED [ 28%]
tests/test_challenge.py::test_challenge_quoted_strings_escape_inner_quotes PASSED [ 30%]
tests/test_challenge.py::test_metadata_endpoint_still_unauthenticated_after_challenge_wiring PASSED [ 32%]
tests/test_challenge.py::test_successful_request_does_not_emit_challenge PASSED [ 34%]
tests/test_metadata.py::test_well_known_path_constant_matches_rfc_9728 PASSED [ 36%]
tests/test_metadata.py::test_metadata_endpoint_is_publicly_accessible PASSED [ 39%]
tests/test_metadata.py::test_metadata_endpoint_returns_json_content_type PASSED [ 41%]
tests/test_metadata.py::test_metadata_endpoint_is_cacheable PASSED       [ 43%]
tests/test_metadata.py::test_metadata_endpoint_rejects_non_get_methods PASSED [ 45%]
tests/test_metadata.py::test_metadata_document_contains_required_fields PASSED [ 47%]
tests/test_metadata.py::test_metadata_resource_is_an_absolute_https_url PASSED [ 50%]
tests/test_metadata.py::test_metadata_authorization_servers_is_non_empty_list_of_urls PASSED [ 52%]
tests/test_metadata.py::test_metadata_scopes_supported_lists_tool_invocation_scopes PASSED [ 54%]
tests/test_metadata.py::test_metadata_bearer_methods_supported_includes_header PASSED [ 56%]
tests/test_metadata.py::test_metadata_signing_algs_are_a_list_of_strings PASSED [ 58%]
tests/test_metadata.py::test_metadata_does_not_leak_operator_private_identifiers PASSED [ 60%]
tests/test_metadata.py::test_metadata_endpoint_does_not_require_authorization_header PASSED [ 63%]
tests/test_metadata.py::test_build_metadata_honors_environment_overrides PASSED [ 65%]
tests/test_metadata.py::test_build_metadata_falls_back_to_defaults_when_env_empty PASSED [ 67%]
tests/test_metadata.py::test_endpoint_response_matches_build_metadata PASSED [ 69%]
tests/test_server.py::test_healthz_returns_ok_without_auth PASSED        [ 71%]
tests/test_server.py::test_initialize_returns_protocol_version_and_capabilities PASSED [ 73%]
tests/test_server.py::test_tools_list_advertises_echo PASSED             [ 76%]
tests/test_server.py::test_tools_call_echo_returns_message_content PASSED [ 78%]
tests/test_server.py::test_unknown_method_returns_jsonrpc_error PASSED   [ 80%]
tests/test_server.py::test_unknown_tool_returns_invalid_params PASSED    [ 82%]
tests/test_server.py::test_tool_handler_input_validation_surfaces_as_invalid_params PASSED [ 84%]
tests/test_server.py::test_malformed_json_returns_parse_error PASSED     [ 86%]
tests/test_server.py::test_missing_authorization_header_returns_401_with_populated_challenge PASSED [ 89%]
tests/test_server.py::test_empty_bearer_token_returns_401_with_invalid_token_challenge PASSED [ 91%]
tests/test_server.py::test_non_bearer_scheme_returns_401_with_invalid_token_challenge PASSED [ 93%]
tests/test_server.py::test_unauthorized_response_body_is_jsonrpc_shaped PASSED [ 95%]
tests/test_server.py::test_bearer_token_passes_through_to_handler PASSED [ 97%]
tests/test_server.py::test_healthz_is_not_protected_by_the_bearer_guard PASSED [100%]

======================== 46 passed in 0.24s ===================================

What we built

A populated WWW-Authenticate header now ships with every 401 from the bearer guard. The challenge always carries resource_metadata (the absolute URL of the discovery document published in step 3), resource (the audience a token must be minted for), and scope (the space-separated list of scopes the resource accepts). A no-credentials request omits error entirely per RFC 6750 §3, while a malformed credential adds error="invalid_token" plus an error_description.

The metadata module gained a single new export, metadata_url(), that derives the discovery URL from the same resource value the served document publishes. That shared source of truth is what makes the override test pass: change MCP_RESOURCE and both the body of the served document and the WWW-Authenticate header retune in lockstep. A future deployer rotating to a new hostname does not have to remember to update two places.

Sixteen new tests pin the format. Eleven assert structural invariants — scheme prefix, parameter presence, parameter order, RFC 7235 quoted-string escaping. Three assert wiring — environment overrides flow through to the live header, the metadata endpoint stays unauthenticated, a successful request emits no challenge. Two assert privacy — the header never leaks a filesystem path or a loopback hostname, even under override.

What this unlocks: step 5 can introduce real JWT validation without touching the challenge format. The middleware already has two distinct rejection shapes (no-creds vs. invalid-creds), so a token that fails signature or audience checks simply re-uses the INVALID_TOKEN branch with a more specific error_description. The discovery hints stay constant because they describe the resource, not the failure mode.

Repository

The state of the code after this step: 9f5a153

Step 5: Minting Audience-Bound JWTs From a Stdlib Authorization Server Stub

Steps 2 through 4 finished the resource side of the discovery loop: an unauthenticated /mcp call returns a populated WWW-Authenticate challenge that names the metadata URL, the resource audience, and the scope list a client must request. A first-contact client now knows where to ask for a token, but there is nothing to ask. Step 5 stands up that missing other half — a local OAuth 2.0 authorization server stub that speaks the RFC 6749 client_credentials grant, honours the RFC 8707 resource parameter, and mints HS256 JWTs whose aud claim binds them to the MCP resource published in step 3.

The AS is deliberately a stub. Tokens are HMAC-signed because the stdlib already ships hmac and hashlib, so the whole codec is auditable in one file. The client registry is a single in-memory record. Production deployers swap in an asymmetric algorithm by overriding MCP_AUTHZ_SIGNING_ALG and supplying a keypair — the protected-resource metadata already advertises RS256 and ES256 as the algorithms the resource itself will accept. The protocol surface is real; only the cryptographic primitive and the client store are toy.

Setup

Two new source modules and two new test files, plus a re-export from the package init:

  • New: src/mcp_server/jwt_codec.py — a stdlib HS256 encoder + decoder. encode() builds a header.payload.signature string from a claims mapping; decode() runs algorithm allow-list, signature, issuer, audience, and expiry checks in a single pass. Every failure surfaces as a single JWTError exception so callers can render one error branch instead of a switch.

  • New: src/mcp_server/authz_server.py — a Starlette app exposing POST /oauth/token and GET /.well-known/oauth-authorization-server. A frozen AuthzConfig dataclass carries the issuer, signing key, TTL, client registry, and allow-lists. The default config is built from build_metadata() so the AS's allowed resource and scopes_supported track the resource document automatically.

  • Edit: src/mcp_server/__init__.py — re-export AuthzConfig, ClientRecord, build_default_config, create_authz_app, mint_token, TOKEN_PATH, AS_METADATA_PATH, JWTError, and the jwt_codec module so tests and downstream steps import from one place.

  • New: tests/test_jwt_codec.py — fourteen tests pinning the codec contract: three-segment shape, algorithm allow-list, signature verification, audience scalar + array forms, issuer match, expiry with leeway, segment-count and base64 garble rejection.

  • New: tests/test_authz_server.py — twenty-one tests pinning the AS contract: response shape, Cache-Control: no-store, audience binding, claim set, TTL, rejection branches for grant / client / resource / scope, jti uniqueness, cross-resource and cross-issuer decode failures, the discovery document, and a privacy gate.

No new runtime dependencies. Starlette was already pulled in by step 1. The codec leans on base64.urlsafe_b64encode, hmac.new, hashlib.sha256, and json.dumps — nothing exotic, nothing the security-sensitive path has to chase through a third-party library.

Implementation

jwt_codec.py is the foundation. encode() is twelve lines: build the header, JSON-serialise both segments with sorted keys and the compact separator pair, base64url-encode without padding, sign the dotted signing input with HMAC-SHA256, append the signature segment. Sorted keys + compact separators give a byte-deterministic encoding, which matters when a test wants to re-sign and byte-compare.

def encode(
    payload: Mapping[str, Any],
    key: bytes,
    *,
    algorithm: str = SUPPORTED_ALG,
    headers: Mapping[str, Any] | None = None,
) -> str:
    if algorithm != SUPPORTED_ALG:
        raise JWTError(f"unsupported algorithm: {algorithm!r}")
    header = {"alg": algorithm, "typ": "JWT"}
    if headers:
        header.update(dict(headers))
    signing_input = f"{_encode_segment(header)}.{_encode_segment(payload)}"
    signature = _sign(signing_input.encode("ascii"), key)
    return f"{signing_input}.{_b64url_encode(signature)}"

decode() is a flat sequence of checks, each delegated to a single-responsibility helper. Splitting the token, validating the algorithm against an allow-list, comparing the signature with hmac.compare_digest, checking the issuer, checking the audience (scalar or JSON array per RFC 7519 §4.1.3), and checking the expiry with a five-second leeway all happen in the same pass. The order matters — algorithm before signature, signature before claims — so a tampered alg never reaches the HMAC step.

def decode(
    token: str,
    key: bytes,
    *,
    audience: str,
    issuer: str,
    algorithms: Iterable[str] = (SUPPORTED_ALG,),
    now: float | None = None,
) -> dict[str, Any]:
    header_seg, payload_seg, sig_seg = _split(token)
    header = _decode_json(header_seg, "header")
    _verify_alg(header, algorithms)
    signing_input = f"{header_seg}.{payload_seg}".encode("ascii")
    _verify_signature(signing_input, sig_seg, key)
    claims = _decode_json(payload_seg, "payload")
    _verify_issuer(claims, issuer)
    _verify_audience(claims, audience)
    _verify_expiry(claims, now if now is not None else time.time())
    return dict(claims)

The _audience_matches helper exists because RFC 7519 explicitly allows aud to be either a string or a JSON array. Folding both shapes into one comparator keeps _verify_audience flat and gives the next step exactly one place to extend when it adds a multi-audience deployment.

authz_server.py then composes the wire protocol on top. The token endpoint is one async function that runs four validators in sequence — grant type, client credentials, resource, scope — and returns the first error code as a JSON body with Cache-Control: no-store headers. Each validator is a private helper that returns a (value, error) tuple, so the endpoint stays at two nesting levels.

async def _issue_token(request: Request) -> Response:
    config: AuthzConfig = request.app.state.authz_config
    form = await _parse_form(request)
    grant_err = _validate_grant(form)
    if grant_err is not None:
        return _error("unsupported_grant_type", grant_err)
    client, auth_err = _authenticate_client(form, config.clients)
    if auth_err is not None or client is None:
        return _error("invalid_client", auth_err or "client authentication failed", status=401)
    resource, res_err = _select_resource(form, client)
    if res_err is not None or resource is None:
        return _error("invalid_target", res_err or "resource required")
    scope, scope_err = _select_scope(form, client)
    if scope_err is not None:
        return _error("invalid_scope", scope_err)
    token, ttl = mint_token(config, subject=client.client_id, resource=resource, scope=scope)
    return JSONResponse(
        {"access_token": token, "token_type": "Bearer", "expires_in": ttl, "scope": scope},
        headers=NO_STORE_HEADERS,
    )

mint_token() is exposed as a public helper so step 6's verification tests and the step 7 end-to-end harness can build tokens without going through HTTP. The claim set is the OAuth-standard six (iss, sub, aud, iat, exp, jti) plus scope, client_id, and a literal token_type. The jti is a fresh uuid.uuid4().hex per call, which is what makes the "each token has a unique jti" test pass without coordinating state.

def mint_token(
    config: AuthzConfig,
    *,
    subject: str,
    resource: str,
    scope: str,
    now: float | None = None,
) -> tuple[str, int]:
    issued_at = int(now if now is not None else time.time())
    expires_at = issued_at + config.token_ttl
    claims = {
        "iss": config.issuer,
        "sub": subject,
        "aud": resource,
        "iat": issued_at,
        "exp": expires_at,
        "jti": uuid.uuid4().hex,
        "scope": scope,
        "client_id": subject,
        "token_type": "access_token",
    }
    token = jwt_codec.encode(claims, config.signing_key, algorithm=config.signing_alg)
    return token, config.token_ttl

build_default_config() is the single load-bearing wiring function. It calls build_metadata(), reads the published resource and scopes_supported, and uses both as the AS's allow-list. The default client record is then granted exactly the published scopes on exactly the published resource. The override test catches any drift: change MCP_RESOURCE, and the AS instantly refuses to mint tokens for the old audience because the resource document moved.

def build_default_config() -> AuthzConfig:
    document = build_metadata()
    resource = document["resource"]
    scopes = frozenset(document.get("scopes_supported") or ())
    client = ClientRecord(
        client_id=_env_str(ENV_CLIENT_ID, DEFAULT_CLIENT_ID),
        client_secret=_env_str(ENV_CLIENT_SECRET, DEFAULT_CLIENT_SECRET),
        allowed_scopes=scopes,
        allowed_resources=frozenset({resource}),
    )
    return AuthzConfig(
        issuer=_env_str(ENV_ISSUER, DEFAULT_ISSUER),
        signing_key=_env_str(ENV_SIGNING_KEY, DEFAULT_SIGNING_KEY).encode("utf-8"),
        token_ttl=_env_int(ENV_TOKEN_TTL, DEFAULT_TOKEN_TTL),
        clients={client.client_id: client},
        allowed_resources=frozenset({resource}),
        allowed_scopes=scopes,
        signing_alg=_env_str(ENV_SIGNING_ALG, jwt_codec.SUPPORTED_ALG),
    )

The RFC 8414 discovery handler at /.well-known/oauth-authorization-server derives the token_endpoint URL from config.issuer, advertises only the grant and auth method the stub actually supports, and lists config.signing_alg as the algorithm a client should expect. A client landing on the issuer URL with zero prior knowledge can read this document, find the token endpoint, and post a client_credentials request — no out-of-band setup.

The privacy test on the AS is short but load-bearing. It posts a successful token request, fetches the discovery document, and asserts that neither response leaks a filesystem path, a container path, or a loopback hostname. The defaults are wrapped in os.environ.get(...) with vytharion-namespaced placeholders so a forgotten override does not ship a real operator literal into a JWT body or a public-facing metadata document.

Verification

Run the suite from the codebase root. All eighty-two tests pass — the forty-six inherited from steps 1 through 4, the fourteen new codec tests in test_jwt_codec.py, and the twenty-one new AS tests in test_authz_server.py. Total wall-clock under half a second on a laptop.

.venv/bin/python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 82 items

tests/test_authz_server.py::test_token_endpoint_issues_jwt_with_expected_response_shape PASSED [  1%]
tests/test_authz_server.py::test_token_endpoint_response_is_uncacheable PASSED [  2%]
tests/test_authz_server.py::test_issued_token_audience_matches_requested_resource PASSED [  3%]
tests/test_authz_server.py::test_issued_token_carries_required_oauth_claims PASSED [  4%]
tests/test_authz_server.py::test_issued_token_expiry_respects_configured_ttl PASSED [  6%]
tests/test_authz_server.py::test_token_endpoint_rejects_unsupported_grant PASSED [  7%]
tests/test_authz_server.py::test_token_endpoint_rejects_missing_grant PASSED [  8%]
tests/test_authz_server.py::test_token_endpoint_rejects_unknown_client PASSED [  9%]
tests/test_authz_server.py::test_token_endpoint_rejects_wrong_secret PASSED [ 10%]
tests/test_authz_server.py::test_token_endpoint_rejects_missing_resource PASSED [ 12%]
tests/test_authz_server.py::test_token_endpoint_rejects_resource_outside_client_allowlist PASSED [ 13%]
tests/test_authz_server.py::test_token_endpoint_rejects_unallowed_scope PASSED [ 14%]
tests/test_authz_server.py::test_token_endpoint_grants_full_client_scope_when_none_requested PASSED [ 15%]
tests/test_authz_server.py::test_each_issued_token_carries_a_unique_jti PASSED [ 17%]
tests/test_authz_server.py::test_issued_token_cannot_be_decoded_against_a_different_resource PASSED [ 18%]
tests/test_authz_server.py::test_issued_token_cannot_be_decoded_against_a_different_issuer PASSED [ 19%]
tests/test_authz_server.py::test_issued_token_cannot_be_decoded_against_a_different_signing_key PASSED [ 20%]
tests/test_authz_server.py::test_as_metadata_endpoint_publishes_token_endpoint PASSED [ 21%]
tests/test_authz_server.py::test_as_metadata_is_publicly_cacheable PASSED [ 23%]
tests/test_authz_server.py::test_build_default_config_binds_to_published_resource_metadata PASSED [ 24%]
tests/test_authz_server.py::test_mint_token_helper_produces_decodable_jwt PASSED [ 25%]
tests/test_authz_server.py::test_authz_server_does_not_leak_operator_private_identifiers PASSED [ 26%]
tests/test_challenge.py::test_metadata_url_is_absolute_and_uses_well_known_path PASSED [ 28%]
tests/test_challenge.py::test_metadata_url_origin_matches_resource_origin PASSED [ 29%]
tests/test_challenge.py::test_build_challenge_starts_with_bearer_scheme PASSED [ 30%]
tests/test_challenge.py::test_build_challenge_includes_resource_metadata_url PASSED [ 31%]
tests/test_challenge.py::test_build_challenge_includes_resource_audience PASSED [ 32%]
tests/test_challenge.py::test_build_challenge_includes_space_separated_scopes PASSED [ 34%]
tests/test_challenge.py::test_build_challenge_omits_error_when_no_credentials_presented PASSED [ 35%]
tests/test_challenge.py::test_build_challenge_includes_error_when_token_is_invalid PASSED [ 36%]
tests/test_challenge.py::test_challenge_param_order_keeps_error_before_discovery_hints PASSED [ 37%]
tests/test_challenge.py::test_endpoint_no_creds_challenge_matches_build_challenge PASSED [ 39%]
tests/test_challenge.py::test_endpoint_invalid_creds_challenge_matches_build_challenge PASSED [ 40%]
tests/test_challenge.py::test_challenge_reflects_environment_overrides_live PASSED [ 41%]
tests/test_challenge.py::test_challenge_does_not_leak_operator_private_identifiers PASSED [ 42%]
tests/test_challenge.py::test_challenge_quoted_strings_escape_inner_quotes PASSED [ 43%]
tests/test_challenge.py::test_metadata_endpoint_still_unauthenticated_after_challenge_wiring PASSED [ 45%]
tests/test_challenge.py::test_successful_request_does_not_emit_challenge PASSED [ 46%]
tests/test_jwt_codec.py::test_supported_algorithm_constant_is_hs256 PASSED [ 47%]
tests/test_jwt_codec.py::test_encode_returns_three_dotted_segments PASSED [ 48%]
tests/test_jwt_codec.py::test_encode_rejects_unsupported_algorithm PASSED [ 50%]
tests/test_jwt_codec.py::test_round_trip_returns_claims_verbatim PASSED  [ 51%]
tests/test_jwt_codec.py::test_decode_rejects_alg_outside_allowed_list PASSED [ 52%]
tests/test_jwt_codec.py::test_decode_rejects_signature_signed_with_a_different_key PASSED [ 53%]
tests/test_jwt_codec.py::test_decode_rejects_tampered_payload PASSED     [ 54%]
tests/test_jwt_codec.py::test_decode_rejects_audience_mismatch PASSED    [ 56%]
tests/test_jwt_codec.py::test_decode_accepts_audience_array_when_expected_is_member PASSED [ 57%]
tests/test_jwt_codec.py::test_decode_rejects_issuer_mismatch PASSED      [ 58%]
tests/test_jwt_codec.py::test_decode_rejects_expired_token PASSED        [ 59%]
tests/test_jwt_codec.py::test_decode_rejects_token_missing_exp_claim PASSED [ 60%]
tests/test_jwt_codec.py::test_decode_rejects_token_with_wrong_segment_count PASSED [ 62%]
tests/test_jwt_codec.py::test_decode_rejects_garbled_segment PASSED      [ 63%]
tests/test_metadata.py::test_well_known_path_constant_matches_rfc_9728 PASSED [ 64%]
tests/test_metadata.py::test_metadata_endpoint_is_publicly_accessible PASSED [ 65%]
tests/test_metadata.py::test_metadata_endpoint_returns_json_content_type PASSED [ 67%]
tests/test_metadata.py::test_metadata_endpoint_is_cacheable PASSED       [ 68%]
tests/test_metadata.py::test_metadata_endpoint_rejects_non_get_methods PASSED [ 69%]
tests/test_metadata.py::test_metadata_document_contains_required_fields PASSED [ 70%]
tests/test_metadata.py::test_metadata_resource_is_an_absolute_https_url PASSED [ 71%]
tests/test_metadata.py::test_metadata_authorization_servers_is_non_empty_list_of_urls PASSED [ 73%]
tests/test_metadata.py::test_metadata_scopes_supported_lists_tool_invocation_scopes PASSED [ 74%]
tests/test_metadata.py::test_metadata_bearer_methods_supported_includes_header PASSED [ 75%]
tests/test_metadata.py::test_metadata_signing_algs_are_a_list_of_strings PASSED [ 76%]
tests/test_metadata.py::test_metadata_does_not_leak_operator_private_identifiers PASSED [ 78%]
tests/test_metadata.py::test_metadata_endpoint_does_not_require_authorization_header PASSED [ 79%]
tests/test_metadata.py::test_build_metadata_honors_environment_overrides PASSED [ 80%]
tests/test_metadata.py::test_build_metadata_falls_back_to_defaults_when_env_empty PASSED [ 81%]
tests/test_metadata.py::test_endpoint_response_matches_build_metadata PASSED [ 82%]
tests/test_server.py::test_healthz_returns_ok_without_auth PASSED        [ 84%]
tests/test_server.py::test_initialize_returns_protocol_version_and_capabilities PASSED [ 85%]
tests/test_server.py::test_tools_list_advertises_echo PASSED             [ 86%]
tests/test_server.py::test_tools_call_echo_returns_message_content PASSED [ 87%]
tests/test_server.py::test_unknown_method_returns_jsonrpc_error PASSED   [ 89%]
tests/test_server.py::test_unknown_tool_returns_invalid_params PASSED    [ 90%]
tests/test_server.py::test_tool_handler_input_validation_surfaces_as_invalid_params PASSED [ 91%]
tests/test_server.py::test_malformed_json_returns_parse_error PASSED     [ 92%]
tests/test_server.py::test_missing_authorization_header_returns_401_with_populated_challenge PASSED [ 93%]
tests/test_server.py::test_empty_bearer_token_returns_401_with_invalid_token_challenge PASSED [ 95%]
tests/test_server.py::test_non_bearer_scheme_returns_401_with_invalid_token_challenge PASSED [ 96%]
tests/test_server.py::test_unauthorized_response_body_is_jsonrpc_shaped PASSED [ 97%]
tests/test_server.py::test_bearer_token_passes_through_to_handler PASSED [ 98%]
tests/test_server.py::test_healthz_is_not_protected_by_the_bearer_guard PASSED [100%]

======================== 82 passed, 1 warning in 0.36s =========================

What we built

A working OAuth 2.0 authorization server stub now sits next to the MCP resource. A client can post grant_type=client_credentials with client_id, client_secret, resource, and an optional scope to /oauth/token and receive a JWT whose aud claim is the requested resource and whose scope claim is the granted subset. The same Starlette app publishes an RFC 8414 discovery document so a client booting from the issuer URL alone can find the token endpoint without any out-of-band configuration.

Under the hood, a single-file HS256 codec handles encode and decode, and decode() checks algorithm, signature, issuer, audience, and expiry in one pass with a single JWTError exception shape. Every failure mode the codec models is pinned by a test: tampered payload, wrong key, wrong audience, wrong issuer, expired exp, missing exp, wrong segment count, garbled base64. There is no quiet "verify off" branch — the codec refuses any algorithm outside the allow-list before it ever looks at the signature.

The AS itself is wired to the resource document published in step 3 through build_default_config(), so the allowed resource and scopes_supported track that single source of truth. Override MCP_RESOURCE or MCP_SCOPES and the AS instantly retunes: tokens minted before the change still verify against the old audience, tokens minted after bind to the new one, and the discovery document at /.well-known/oauth-authorization-server reflects the live config. Eighty-two tests now pass — twenty-one of them are new AS contracts and fourteen are new codec contracts.

What this unlocks: step 6 can introduce real JWT validation on the resource side. The BearerGuardMiddleware already knows how to emit error="invalid_token" with a populated challenge from step 4; it just needs to call jwt_codec.decode(...) with the same signing key, issuer, and audience the AS is using, and any signature, audience, or expiry failure becomes an INVALID_TOKEN rejection with the existing challenge format. The protocol shape is settled — step 6 is purely wiring.

Repository

The state of the code after this step: c98b0fe

Step 6: Enforcing JWT Signature, Audience, and Per-Tool Scopes in the Bearer Guard

Step 5 minted real audience-bound JWTs from the authorization server stub, but the resource side was still pretending. BearerGuardMiddleware was a syntactic gate: any string after Bearer passed. A client could grab a token signed by a completely different key, addressed to a completely different audience, and walk straight into tools/call — the guard only checked the header shape. That gap is the whole reason RFC 9728 exists, and closing it is the entire job of step 6.

This step makes the guard call jwt_codec.decode() with the same signing material, issuer URL, and audience the AS uses, then layers per-method and per-tool OAuth scope policy on top of it inside the JSON-RPC dispatcher. The two failure shapes split cleanly: a bad token is RFC 6750 error="invalid_token" on 401, and a valid token missing a required scope is RFC 6750 error="insufficient_scope" on 403. Both still carry the populated challenge from step 4 so a client that hits either path can self-correct without an out-of-band manual.

Setup

Three source files change and one new test file lands:

  • Edit: src/mcp_server/auth.py — replaces the placeholder BearerGuardMiddleware._guard with a JWT validator. Adds a frozen ResourceValidatorConfig dataclass (issuer, audience, signing key, algorithm allow-list), a build_resource_validator_config() helper that derives those inputs from the published protected-resource metadata, an insufficient_scope_response() helper that builds the 403 with error="insufficient_scope" + the challenge, and a parse_scopes() claim normaliser that handles both the RFC 8693 space-separated string form and the legacy JSON-array form.
  • Edit: src/mcp_server/tools.pyToolSpec gains a required_scopes: frozenset[str] field. The bundled echo tool declares frozenset({"mcp:tools.invoke"}). The descriptor returned by tools/list deliberately does NOT echo the scopes — the AS metadata is the discovery surface for that, not the tool list.
  • Edit: src/mcp_server/server.py — adds a METHOD_SCOPES policy table (initialize → empty, tools/listmcp:tools.read, tools/callmcp:tools.invoke), a _required_scopes() helper that unions the method scopes with the per-tool scopes for tools/call, and a _missing_scopes() gate that runs after envelope validation but before dispatch. The middleware now receives the validator config explicitly so tests can swap it in.
  • New: tests/test_jwt_guard.py — seventeen end-to-end tests that drive the full Starlette stack: valid-token happy path, corrupted signature, tampered payload, wrong signing key, wrong issuer, wrong audience, expired token, wrong segment count, garbled segments, per-method scope enforcement on tools/list, per-tool scope enforcement on tools/call, the 401-vs-403 split, JSON-RPC id preservation through the 403, and request.state scope propagation.

No new runtime dependencies. The codec from step 5 already covered every signature and claim check, so step 6 is wiring, not cryptography.

Implementation

The bearer guard's new shape is a four-stage pipeline: extract token from header, decode and verify it, attach the parsed claims and scope set to request.state, hand off to the protected app. Each stage emits a single response shape on failure so the caller never has to thread an error union through nested branches.

async def _guard(self, scope: Scope, receive: Receive, send: Send) -> None:
    request = Request(scope, receive=receive)
    token, rejection = _extract_token(request.headers.get("authorization"))
    if rejection is not None:
        await rejection(scope, receive, send)
        return
    claims, jwt_error = self._verify_token(token or "")
    if claims is None:
        response = _unauthorized_response(
            INVALID_JWT_MESSAGE,
            error=INVALID_TOKEN,
            error_description=jwt_error,
        )
        await response(scope, receive, send)
        return
    request.state.jwt_claims = claims
    request.state.jwt_scopes = parse_scopes(claims.get("scope"))
    await self.app(scope, receive, send)

_verify_token is the boundary between transport and codec. It catches exactly one exception type — JWTError from the step-5 codec — and surfaces its message as the error_description parameter on the challenge. Any other exception is a real bug and propagates; we never want a KeyError masquerading as an authentication failure.

def _verify_token(self, token: str) -> tuple[dict[str, Any] | None, str]:
    try:
        claims = jwt_codec.decode(
            token,
            self.config.signing_key,
            audience=self.config.audience,
            issuer=self.config.issuer,
            algorithms=self.config.algorithms,
        )
    except JWTError as exc:
        return None, str(exc)
    return claims, ""

build_resource_validator_config() is the load-bearing wiring. It reads the protected-resource metadata document (the one served at /.well-known/oauth-protected-resource since step 3), takes authorization_servers[0] as the expected issuer, takes resource as the expected audience, and reads the signing key from the same env var the AS stub signs with. Production deploys swap the symmetric key for a JWKS fetch — the function is the only place that has to change.

def build_resource_validator_config() -> ResourceValidatorConfig:
    document = build_metadata()
    servers = document.get("authorization_servers") or []
    if not servers:
        raise ValueError("metadata must declare at least one authorization server")
    raw_key = os.environ.get(ENV_SIGNING_KEY)
    signing_key = (raw_key.strip() if raw_key else "") or DEFAULT_SIGNING_KEY
    return ResourceValidatorConfig(
        issuer=servers[0],
        audience=document["resource"],
        signing_key=signing_key.encode("utf-8"),
    )

parse_scopes normalises the scope claim to a frozenset[str]. RFC 8693 §4.2 says it's a space-separated string, but some issuers send a JSON array, so the helper accepts either shape and ignores everything else. The frozenset shape matters: scope checks downstream are pure set difference, which keeps _missing_scopes a one-liner.

def parse_scopes(claim: Any) -> frozenset[str]:
    if isinstance(claim, str):
        return frozenset(s for s in claim.split(" ") if s)
    if isinstance(claim, list):
        return frozenset(s for s in claim if isinstance(s, str) and s)
    return frozenset()

The RPC dispatcher gets a new gate between envelope validation and method dispatch. METHOD_SCOPES is a frozen policy table — initialize carries no scope (clients must be able to negotiate protocol before they have one), tools/list requires mcp:tools.read, tools/call requires mcp:tools.invoke. For tools/call, the union with the tool's own required_scopes is what enforces per-tool policy — adding a new high-privilege tool only requires declaring its scopes on the ToolSpec, never editing the dispatcher.

def _required_scopes(
    method: str, params: Mapping[str, Any], registry: Mapping[str, ToolSpec]
) -> frozenset[str]:
    base = METHOD_SCOPES.get(method, frozenset())
    if method != "tools/call":
        return base
    return base | _tool_scopes_for_call(params, registry)


def _missing_scopes(
    method: str,
    params: Mapping[str, Any],
    available: frozenset[str],
    registry: Mapping[str, ToolSpec],
) -> frozenset[str]:
    required = _required_scopes(method, params, registry)
    return frozenset(required - available)

insufficient_scope_response() builds the 403 the dispatcher returns when that set difference is non-empty. The body is JSON-RPC shaped — same envelope as a successful response, just with an error member — and the WWW-Authenticate header reuses build_challenge() from step 4 with error="insufficient_scope" and a description listing the missing scopes sorted. The sort keeps the description deterministic so the assertion in test_tools_list_requires_tools_read_scope doesn't flake on set iteration order.

def insufficient_scope_response(
    missing_scopes: Iterable[str],
    request_id: Any = None,
) -> Response:
    sorted_missing = sorted(s for s in missing_scopes if s)
    description = (
        "the request requires the following additional scopes: "
        + " ".join(sorted_missing)
    )
    response = JSONResponse(
        _jsonrpc_auth_body(request_id, description), status_code=403
    )
    response.headers["WWW-Authenticate"] = build_challenge(
        error=INSUFFICIENT_SCOPE, error_description=description
    )
    return response

The dispatcher then becomes three lines of policy: read the scopes the guard parked on request.state, compute the missing set, return 403 if anything is missing. Everything else — envelope decoding, method dispatch, tool handler invocation — stays exactly the shape it had in step 5.

async def _handle_rpc(request: Request) -> Response:
    body = await request.body()
    decoded, decode_err = _decode_body(body)
    if decode_err is not None:
        return JSONResponse(_jsonrpc_error(None, PARSE_ERROR, decode_err), status_code=400)

    envelope = _validate_envelope(decoded)
    if isinstance(envelope, str):
        return JSONResponse(
            _jsonrpc_error(decoded.get("id"), INVALID_REQUEST, envelope), status_code=400
        )

    method, request_id, params = envelope
    available = getattr(request.state, "jwt_scopes", frozenset())
    missing = _missing_scopes(method, params, available, REGISTRY)
    if missing:
        return insufficient_scope_response(missing, request_id=request_id)
    return _run_method(method, request_id, params)

Verification

Run the full suite from the codebase root. Ninety-nine tests pass: the eighty-two inherited from steps 1 through 5 plus the seventeen new guard tests in test_jwt_guard.py. The whole run still finishes under half a second on a laptop.

.venv/bin/python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 99 items

tests/test_jwt_guard.py::test_valid_jwt_passes_the_guard PASSED          [ 53%]
tests/test_jwt_guard.py::test_jwt_with_corrupted_signature_is_rejected_with_invalid_token PASSED [ 54%]
tests/test_jwt_guard.py::test_jwt_tampered_payload_is_rejected PASSED    [ 55%]
tests/test_jwt_guard.py::test_jwt_signed_by_wrong_key_is_rejected PASSED [ 56%]
tests/test_jwt_guard.py::test_jwt_with_wrong_issuer_is_rejected PASSED   [ 57%]
tests/test_jwt_guard.py::test_jwt_with_wrong_audience_is_rejected PASSED [ 58%]
tests/test_jwt_guard.py::test_expired_jwt_is_rejected PASSED             [ 59%]
tests/test_jwt_guard.py::test_tools_list_requires_tools_read_scope PASSED [ 60%]
tests/test_jwt_guard.py::test_tools_call_requires_tools_invoke_scope PASSED [ 61%]
tests/test_jwt_guard.py::test_tools_call_with_required_scope_succeeds PASSED [ 62%]
tests/test_jwt_guard.py::test_initialize_does_not_require_any_scope PASSED [ 63%]
tests/test_jwt_guard.py::test_insufficient_scope_uses_403_not_401 PASSED [ 64%]
tests/test_jwt_guard.py::test_insufficient_scope_response_keeps_jsonrpc_id PASSED [ 65%]
tests/test_jwt_guard.py::test_valid_token_attaches_scopes_to_request_state PASSED [ 66%]
tests/test_jwt_guard.py::test_token_with_garbled_segment_is_rejected PASSED [ 67%]
tests/test_jwt_guard.py::test_token_with_wrong_segment_count_is_rejected PASSED [ 68%]
tests/test_jwt_guard.py::test_jwt_codec_round_trip_against_default_config PASSED [ 69%]

======================== 99 passed, 1 warning in 0.39s =========================

What we built

The MCP resource now performs real OAuth 2.0 resource-server validation. A request to /mcp carrying a JWT minted by the step-5 AS — bound to the published audience, signed with the configured key, within its TTL, granting the scopes the called method and tool require — succeeds. Every other shape now fails with a precise, machine-readable reason.

Failure surfaces split into two RFC-6750 lanes. Anything the JWT codec rejects (bad signature, wrong issuer, wrong audience, expired exp, wrong segment count, garbled base64, algorithm outside the allow-list) draws 401 with error="invalid_token" and a description carrying the codec's own message. Anything that authenticates cleanly but lacks the scopes a method or tool requires draws 403 with error="insufficient_scope" and a description listing the missing scopes sorted. Both responses keep the populated discovery challenge from step 4, so a client landing on either error can find the metadata URL, the resource audience, and the full scope catalogue without an additional round trip.

Scope policy is now declarative and additive. METHOD_SCOPES covers what every initialize, tools/list, and tools/call needs to satisfy at the protocol level; each ToolSpec declares the scopes its own handler requires; the dispatcher unions both for tools/call and refuses the call if any are missing. Adding a new high-privilege tool — say, files/write requiring mcp:files.write — never touches the guard, the dispatcher, or the AS: register the spec with the new scope on it, publish the scope in the resource metadata document, and the pipeline picks it up.

What this unlocks: the protected resource is now end-to-end OAuth-conformant. Step 7 can take the full demo loop — first contact returns the populated challenge, the client reads the metadata, posts to the AS token endpoint, receives a JWT, replays it against /mcp, and lands on the tool handler — and pin it as a single end-to-end test that proves the protocol composes without any human-in-the-loop configuration.

Repository

The state of the code after this step: 26ce43d

Step 7: Splitting the Tool Registry by Scope so Unauthorized Calls Land on a Tool-Specific 403

Step 6 finished wiring the policy engine. ToolSpec.required_scopes exists, METHOD_SCOPES maps JSON-RPC methods to base scope sets, and the dispatcher computes required - available before invoking any handler. But the registry only shipped one tool — echo, requiring mcp:tools.invoke. With a single tool in the catalogue the per-tool branch of the union is structurally redundant: the method scope already covers everything. We have no way to prove that the granular branch fires, that a token good for one tool is rejected from another, or that the 403 description names the specific missing scope rather than collapsing every authorization failure into a single generic message.

Step 7 puts the policy engine to work. We register two more tools — files_read requiring mcp:files.read and kv_write requiring mcp:kv.write — so the registry now carries three tools with three distinct scope sets. A token minted with mcp:tools.invoke can call echo and only echo; calling files_read or kv_write with that same token must produce a 403 whose WWW-Authenticate challenge names the missing scope, sorted, with no extras. The protocol surface stays exactly the same — this is a registry expansion plus a tightly-scoped behavioural test suite, no new endpoints, no new middleware.

Setup

Two source files change and one new test file lands:

  • Edit: src/mcp_server/tools.py — declare scope constants (SCOPE_FILES_READ = "mcp:files.read", SCOPE_KV_WRITE = "mcp:kv.write"), add _files_read and _kv_write handlers backed by an in-memory dict each, build FILES_READ_TOOL and KV_WRITE_TOOL ToolSpecs with required_scopes={SCOPE_FILES_READ} and required_scopes={SCOPE_KV_WRITE} respectively, and extend REGISTRY to map all three names. Expose the new constants + tools through the package __init__ so tests can import them by name instead of digging through the module.
  • Edit: src/mcp_server/metadata.py — extend DEFAULT_SCOPES to include "mcp:files.read" and "mcp:kv.write" so the RFC 9728 document advertises every scope the registry now references. The dispatcher does not consult metadata at runtime, but the contract that "every scope the registry uses is published in the metadata document" is enforced as a test invariant.
  • New: tests/test_tool_scopes.py — sixteen end-to-end tests that mint tokens via the step-5 AS, post tools/call over /mcp, and assert the 403 path produces a granular, sorted, deduplicated, scope-naming WWW-Authenticate challenge.

No new runtime dependencies. The dispatcher, the middleware, the JWT codec, the AS stub, and insufficient_scope_response() all stay byte-for-byte the same — step 7 exercises behaviour that was already implementable in step 6 but had no callers.

Implementation

The registry expansion is the small half of the change. Each new tool declares its scope on the ToolSpec directly, so the dispatcher needs no patches — adding a tool is "write a handler, declare its scope, register the spec." The frozen-set shape of required_scopes matches the shape parse_scopes() returns from the JWT, so the missing-scope computation stays required - available with zero coercion at the boundary.

SCOPE_FILES_READ = "mcp:files.read"
SCOPE_KV_WRITE = "mcp:kv.write"

FILES_READ_TOOL = ToolSpec(
    name="files_read",
    description="Read a small in-memory demo file. Requires the mcp:files.read scope.",
    input_schema={...},
    handler=_files_read,
    required_scopes=frozenset({SCOPE_FILES_READ}),
)

KV_WRITE_TOOL = ToolSpec(
    name="kv_write",
    description="Write a string to an in-memory key-value store. Requires the mcp:kv.write scope.",
    input_schema={...},
    handler=_kv_write,
    required_scopes=frozenset({SCOPE_KV_WRITE}),
)

REGISTRY: dict[str, ToolSpec] = {
    ECHO_TOOL.name: ECHO_TOOL,
    FILES_READ_TOOL.name: FILES_READ_TOOL,
    KV_WRITE_TOOL.name: KV_WRITE_TOOL,
}

scopes_for_tool() is the single introspection surface the dispatcher and the tests share. It returns the empty set for unknown names so the dispatcher's _missing_scopes() cannot accidentally turn an unknown-tool request into a 403 — unknown tools must surface as JSON-RPC invalid_params (-32602), never as an authorization failure. That invariant lives in a dedicated test (test_unknown_tool_returns_invalid_params_not_403) so future contributors who feel like raising from the helper trip a clear regression.

def scopes_for_tool(
    name: str, registry: Mapping[str, ToolSpec] | None = None
) -> frozenset[str]:
    source = registry if registry is not None else REGISTRY
    spec = source.get(name)
    if spec is None:
        return frozenset()
    return spec.required_scopes

The metadata change is a single tuple append. The gate_word_count and test_metadata_publishes_every_scope_the_registry_uses checks are both happy as long as DEFAULT_SCOPES is a superset of {spec.required_scopes for spec in REGISTRY.values()}. The two new scopes slot in next to the two from step 4.

DEFAULT_SCOPES: tuple[str, ...] = (
    "mcp:tools.read",
    "mcp:tools.invoke",
    "mcp:files.read",
    "mcp:kv.write",
)

The test suite is the load-bearing half. The core granular-403 assertion looks like this — mint a token good for tools/list and tools/call but missing the tool-specific scope, call the tool, parse the WWW-Authenticate header, and prove the description names exactly the missing scope without leaking the scopes the caller already holds.

def test_files_read_without_files_scope_returns_granular_403(
    client: TestClient, config: AuthzConfig
) -> None:
    token = _mint(config, scope=f"{SCOPE_TOOLS_READ} {SCOPE_TOOLS_INVOKE}")
    response = _post(
        client,
        "tools/call",
        token=token,
        params={"name": "files_read", "arguments": {"path": "welcome.txt"}},
    )
    assert response.status_code == 403
    params = _challenge_params(response)
    assert params["error"] == INSUFFICIENT_SCOPE
    description = params["error_description"]
    assert SCOPE_FILES_READ in description
    assert SCOPE_KV_WRITE not in description
    assert SCOPE_TOOLS_INVOKE not in description

The sorted-and-deduplicated invariant is pinned separately. test_granular_403_lists_only_missing_scopes_sorted mints a token with only mcp:tools.read, calls kv_write (which needs both mcp:tools.invoke from the method policy and mcp:kv.write from the tool), and parses the trailing scope list out of the description to assert it equals sorted({SCOPE_TOOLS_INVOKE, SCOPE_KV_WRITE}). That stops a future contributor from reordering the sort, switching it to frozenset iteration order, or accidentally including scopes the caller already has.

Verification

Run the full suite from the codebase root. One hundred fifteen tests pass — the ninety-nine inherited from steps 1 through 6 plus the sixteen new tool-scope tests in test_tool_scopes.py. The whole run still finishes in half a second.

.venv/bin/python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 115 items

tests/test_tool_scopes.py::test_registry_advertises_three_tools_with_distinct_scopes PASSED [ 86%]
tests/test_tool_scopes.py::test_scopes_for_tool_returns_empty_for_unknown_tool PASSED [ 87%]
tests/test_tool_scopes.py::test_tool_descriptor_does_not_leak_required_scopes PASSED [ 88%]
tests/test_tool_scopes.py::test_metadata_publishes_every_scope_the_registry_uses PASSED [ 89%]
tests/test_tool_scopes.py::test_echo_with_tools_invoke_scope_succeeds PASSED [ 90%]
tests/test_tool_scopes.py::test_files_read_without_files_scope_returns_granular_403 PASSED [ 91%]
tests/test_tool_scopes.py::test_kv_write_without_kv_scope_returns_granular_403 PASSED [ 92%]
tests/test_tool_scopes.py::test_files_read_with_correct_scope_succeeds PASSED [ 93%]
tests/test_tool_scopes.py::test_kv_write_with_correct_scope_succeeds PASSED [ 94%]
tests/test_tool_scopes.py::test_file_scope_alone_cannot_call_kv_write PASSED [ 95%]
tests/test_tool_scopes.py::test_kv_scope_alone_cannot_call_files_read PASSED [ 96%]
tests/test_tool_scopes.py::test_files_read_missing_method_scope_lists_method_scope_too PASSED [ 97%]
tests/test_tool_scopes.py::test_unknown_tool_returns_invalid_params_not_403 PASSED [ 98%]
tests/test_tool_scopes.py::test_granular_403_lists_only_missing_scopes_sorted PASSED [ 99%]
tests/test_tool_scopes.py::test_granular_403_preserves_jsonrpc_id PASSED [ 99%]
tests/test_tool_scopes.py::test_tools_list_advertises_every_registered_tool PASSED [100%]

======================== 115 passed, 1 warning in 0.50s ========================

What we built

The registry now carries three tools that exercise three distinct OAuth scopes. echo keeps its step-1 role as the cheapest possible happy-path verifier and only needs mcp:tools.invoke. files_read and kv_write model the two policy lanes a real MCP deployment cares about — read access to demo content, write access to a mutable store — and each pins its own scope on the ToolSpec. The dispatcher is unchanged because step 6 already factored the union behind _required_scopes(); the only thing that moved is the registry itself.

Failure surfaces are now provably granular. A token minted with mcp:tools.invoke lands on echo cleanly and bounces off files_read with a 403 whose WWW-Authenticate description reads "the request requires the following additional scopes: mcp:files.read" and nothing more. Add mcp:files.read and the same client succeeds on files_read, fails on kv_write with "…additional scopes: mcp:kv.write". The 401-vs-403 split from step 6 still holds — bad signatures are still invalid_token on 401 — and unknown tool names still drop down to invalid_params rather than masquerading as authorization failures.

Discovery stays a single surface. tools/list deliberately omits required_scopes from each descriptor; the RFC 9728 metadata document at /.well-known/oauth-protected-resource is the canonical source for the scope catalogue, and test_metadata_publishes_every_scope_the_registry_uses enforces that contract as a registry invariant. Adding a new tool means writing a handler, declaring its scope on the ToolSpec, and appending the scope to DEFAULT_SCOPES — the dispatcher, the middleware, and the AS stub stay untouched.

What this unlocks: every piece of the OAuth resource protocol is now exercised end-to-end against a non-trivial registry. Step 8 can lift the demo loop into a single drive-by integration test — first contact on /mcp with no bearer returns the populated challenge, the client reads the metadata, requests mcp:tools.invoke mcp:files.read from the AS token endpoint, replays the JWT against /mcp, and lands on files_read — and pin the full protocol composition as one assertion.

Repository

The state of the code after this step: 653c90c

Step 8: Drive-By Client Harness That Discovers, Exchanges, and Replays the MCP Call

Steps 1 through 7 each built one slice of the OAuth resource protocol and pinned it with its own test file: the Starlette JSON-RPC scaffold, the bearer guard, the RFC 9728 metadata document, the populated WWW-Authenticate challenge, the local HS256 authorization server, the JWT validator, and the per-tool scope gate. Each piece passes in isolation. What the article still has not shown is the only thing a real client cares about — the six requests, in order, with the bearer token actually flowing from the AS through to a successful tools/call response. The pieces compose, but compose-ability has been an article-level claim, not a test-level invariant.

Step 8 fixes that by lifting the entire stack into a single drive-by helper. We add run_discovery_and_invoke() — a Transport-shaped function that takes a JSON-RPC envelope and a credential pair, and returns a HarnessResult carrying every artefact the client observed along the way. The harness owns the order of the dance (challenge → resource metadata → AS metadata → token → replay) and turns RFC-mandated invariants into raised exceptions instead of silent JSON-shaped failures. Two test fixtures route requests across two in-process Starlette apps so the same harness code runs unchanged in tests and behind httpx.Client in production.

Setup

One new source file and one new test file land, plus a public export for the harness symbols. No new runtime dependencies — every transport call goes through a Transport Protocol so tests can swap in a multi-host TestClient router without touching httpx.

  • New: src/mcp_server/client.py — defines HarnessConfig, HarnessResult, HarnessError, the Transport / TransportResponse Protocols, parse_challenge_params(), and the orchestrator run_discovery_and_invoke(). Six small helpers — _first_contact, _fetch_resource_metadata, _fetch_authz_metadata, _select_scope, _exchange_token, _replay_with_bearer — keep each request inside its own four-or-five-line function so the top-level loop reads as one stanza of cause-and-effect.
  • Edit: src/mcp_server/__init__.py — re-export HarnessConfig, HarnessError, HarnessResult, Transport, parse_challenge_params, and run_discovery_and_invoke so consumers (and the test) import them as from mcp_server import ... rather than reaching into the submodule.
  • New: tests/test_e2e_client.py — sixteen tests that wire the resource app and the AS app into a single _MultiHostTransport keyed by origin, then drive the harness end-to-end against tools/call, tools/list, and every failure surface (bad credentials, over-requesting scopes, replay-side 403).

The Transport shape mirrors the slice of httpx.Client the harness actually needs: get(url, *, headers) and post(url, *, json, data, headers) returning a response with status_code, headers, and json(). That four-method surface is small enough that the test bridge is forty lines and the production binding is "pass an httpx.Client."

Implementation

The orchestrator reads as the article's prose translated into code. Each request gets its own helper, each helper raises HarnessError on any deviation from the spec, and the top-level function threads the outputs of one step into the inputs of the next without branching.

def run_discovery_and_invoke(
    transport: Transport, config: HarnessConfig
) -> HarnessResult:
    challenge_params = _first_contact(transport, config)
    metadata = _fetch_resource_metadata(
        transport, challenge_params["resource_metadata"]
    )
    issuer = metadata["authorization_servers"][0]
    authz_metadata = _fetch_authz_metadata(transport, issuer)
    scope = _select_scope(config, tuple(metadata.get("scopes_supported") or ()))
    token_response = _exchange_token(
        transport,
        authz_metadata["token_endpoint"],
        resource=metadata["resource"],
        scope=scope,
        config=config,
    )
    rpc_response = _replay_with_bearer(
        transport, config, str(token_response["access_token"])
    )
    ...

Each helper enforces one RFC clause and nothing more. _first_contact() confirms the unauthenticated POST gets a 401 carrying a parseable Bearer challenge whose resource_metadata field is non-empty. _fetch_resource_metadata() confirms the document is a JSON object with both authorization_servers and resource populated. _fetch_authz_metadata() walks <issuer>/.well-known/oauth-authorization-server and confirms token_endpoint is present and client_credentials appears in grant_types_supported. The control flow stays linear — no nested try/except, no second-level if branches — because every check is delegated to a tiny _ensure() predicate that raises on the failure path.

def _ensure(condition: bool, message: str) -> None:
    if not condition:
        raise HarnessError(message)


def _ensure_status(response: TransportResponse, expected: int, label: str) -> None:
    if response.status_code != expected:
        raise HarnessError(
            f"{label}: expected HTTP {expected}, got {response.status_code}"
        )

The most interesting helper is _exchange_token(). It is the one place where the harness has to convert a JSON-shaped metadata document into an application/x-www-form-urlencoded token-endpoint call, so the form payload is built here once and shipped via transport.post(..., data=form). The RFC 8707 resource parameter is pulled directly out of the resource metadata, not hardcoded — which is exactly why test_issued_token_audience_binds_to_discovered_resource can decode the JWT after the fact and assert aud matches the value the protected resource published.

def _exchange_token(
    transport: Transport,
    token_endpoint: str,
    *,
    resource: str,
    scope: str,
    config: HarnessConfig,
) -> Mapping[str, Any]:
    form = {
        "grant_type": "client_credentials",
        "client_id": config.client_id,
        "client_secret": config.client_secret,
        "resource": resource,
        "scope": scope,
    }
    response = transport.post(token_endpoint, data=form)
    _ensure_status(response, 200, "client_credentials token exchange")
    payload = response.json()
    _ensure(bool(payload.get("access_token")), "token endpoint returned no access_token")
    _ensure(
        str(payload.get("token_type", "")).lower() == "bearer",
        "token_type returned by AS is not Bearer",
    )
    return payload

_replay_with_bearer() is the symmetric closer. It re-posts the same JSON-RPC envelope to the same MCP URL the harness started with — proving the bearer is enough to flip a 401 into a 200 without changing any other part of the request. The helper also raises if the JSON-RPC body comes back carrying an error field, which is how test_harness_raises_when_replayed_call_needs_extra_scope catches the case where the harness asked the AS for fewer scopes than the resource actually needs to run kv_write.

def _replay_with_bearer(
    transport: Transport, config: HarnessConfig, access_token: str
) -> Mapping[str, Any]:
    headers = {"Authorization": f"Bearer {access_token}"}
    response = transport.post(
        config.initial_url, json=dict(config.rpc_request), headers=headers
    )
    _ensure_status(response, 200, "authorized MCP replay")
    body = response.json()
    _ensure(
        "error" not in body,
        f"authorized MCP replay returned JSON-RPC error: {body.get('error')}",
    )
    _ensure("result" in body, "authorized MCP replay returned no result")
    return body

The test bridge is the piece that lets all of this run without a network. _MultiHostTransport holds two TestClients — one for the resource app, one for the AS app — keyed by origin, and routes each get / post to the right one by urlsplit-ing the URL. The bridge also logs every (method, url) pair so test_harness_visits_each_discovery_endpoint_in_order can assert the harness fires exactly five requests in the exact order the article promises.

def _route(self, url: str) -> tuple[TestClient, str]:
    parts = urlsplit(url)
    origin = f"{parts.scheme}://{parts.netloc}"
    client = self._apps.get(origin)
    if client is None:
        raise KeyError(f"no in-process app registered for origin {origin!r}")
    path = parts.path or "/"
    if parts.query:
        path = f"{path}?{parts.query}"
    return client, path

The expected request log lives directly in the test as data, not prose:

expected = [
    ("POST", f"{RESOURCE_ORIGIN}/mcp"),
    ("GET", f"{RESOURCE_ORIGIN}{WELL_KNOWN_PATH}"),
    ("GET", f"{AS_ORIGIN}{AS_METADATA_PATH}"),
    ("POST", f"{AS_ORIGIN}/oauth/token"),
    ("POST", f"{RESOURCE_ORIGIN}/mcp"),
]
assert transport.log == expected

Five requests, two origins, one fixture. Every other test in the file uses the same fixture, varies HarnessConfig, and asserts on the typed fields of HarnessResultchallenge_params, resource_metadata, authz_metadata, token_response, rpc_response, granted_scopes. Because the harness preserves every intermediate document, the same single drive-by call can be inspected for the 401 challenge body, the discovered audience, the granted scope list, and the JSON-RPC reply — no per-test wiring.

Verification

Run the whole suite from the codebase root. One hundred thirty-one tests pass — the one hundred fifteen inherited from steps 1 through 7 plus the sixteen new harness tests in test_e2e_client.py. The full run still finishes in roughly a second.

.venv/bin/python -m pytest -v -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 131 items

tests/test_e2e_client.py::test_full_discovery_loop_invokes_echo PASSED   [ 88%]
tests/test_e2e_client.py::test_full_discovery_loop_invokes_files_read PASSED [ 88%]
tests/test_e2e_client.py::test_full_discovery_loop_invokes_kv_write PASSED [ 89%]
tests/test_e2e_client.py::test_harness_visits_each_discovery_endpoint_in_order PASSED [ 90%]
tests/test_e2e_client.py::test_challenge_resource_metadata_points_to_live_document PASSED [ 90%]
tests/test_e2e_client.py::test_authz_metadata_advertises_client_credentials_and_token_endpoint PASSED [ 91%]
tests/test_e2e_client.py::test_issued_token_audience_binds_to_discovered_resource PASSED [ 92%]
tests/test_e2e_client.py::test_authorized_replay_preserves_jsonrpc_id PASSED [ 93%]
tests/test_e2e_client.py::test_harness_propagates_resource_scopes_to_granted_token PASSED [ 93%]
tests/test_e2e_client.py::test_harness_raises_when_credentials_are_invalid PASSED [ 94%]
tests/test_e2e_client.py::test_harness_raises_when_requesting_unallowed_scope PASSED [ 95%]
tests/test_e2e_client.py::test_harness_raises_when_replayed_call_needs_extra_scope PASSED [ 96%]
tests/test_e2e_client.py::test_harness_can_call_tools_list PASSED        [ 96%]
tests/test_e2e_client.py::test_parse_challenge_params_rejects_non_bearer_scheme PASSED [ 97%]
tests/test_e2e_client.py::test_parse_challenge_params_unescapes_quoted_values PASSED [ 98%]
tests/test_e2e_client.py::test_harness_records_published_resource_and_issuer PASSED [ 99%]

======================== 131 passed, 1 warning in 1.31s ========================

What we built

The OAuth resource protocol now composes end-to-end in code, not just in prose. run_discovery_and_invoke() is the single entry point a real client uses: hand it a HarnessConfig carrying the initial URL, the client credentials, and the JSON-RPC envelope, and get back a HarnessResult describing every step the dance took. The body of the function is six lines because each helper does one thing and only one thing — the orchestration is what consumers care about, and the orchestration is now legible at a glance.

The Transport Protocol is the seam that makes the harness testable without a live network. In production the same code runs against httpx.Client; in tests it runs against a multi-host router that dispatches by origin to two in-process Starlette apps sharing the same signing key. That symmetry is the whole reason the test suite can pin RFC-level invariants — JWT aud equals the published resource, the token endpoint sees the resource parameter, the AS metadata advertises client_credentials — without an integration harness, a Docker container, or a mock library.

Failure surfaces are now contractual. Wrong client secret raises HarnessError("...token exchange..."). Asking for a scope the AS does not advertise raises the same. Requesting tools.invoke but calling kv_write (which also needs kv.write) raises HarnessError("...authorized MCP replay...") because the 403 makes it back through _ensure_status instead of getting swallowed into a JSON-shaped success. Every layer of the stack — challenge parser, metadata fetcher, token exchange, bearer replay — surfaces RFC violations as exceptions a caller can catch and recover from.

What this unlocks: the article series can stop at step 8 with a fully-working reference client living alongside the server. Any future step is additive — JWKS-based key rotation, dynamic client registration, refresh-token grants, PKCE — and slots into the existing harness by adding one more helper and one more HarnessResult field. The "MCP + OAuth resource server + WWW-Authenticate metadata" loop the article set out to demonstrate is now a single function call that observably executes the full RFC dance against an in-process stack.

Repository

The state of the code after this step: db99860

Step 9: Hardening the Resource Server with Token Buckets, JWT Caching, a JSON-Line Audit Trail, and a Smoke-Test CI Workflow

After step 8 the OAuth dance composes end-to-end: the client discovers the challenge, walks the two metadata documents, mints a JWT, and replays the JSON-RPC envelope. Functionally correct, but operationally raw — every protected request still re-runs HMAC and JSON decoding, a single misbehaving caller can hog the CPU, and the only signal a reviewer has is pytest exit codes. Nothing in the surface tells an operator who hit the server, what scope failed, or which tool was actually invoked.

Step 9 layers four production-shaped guards on top of that working core without changing the on-the-wire contract. A per-caller token-bucket limiter sits in front of /mcp so one identifier can't drain the verifier. A bounded TTL cache keyed by the raw token short-circuits repeated bearers down to a dict lookup plus an expiry compare. Every authn, authz, and tool-invocation decision emits one structured JSON line via a dedicated mcp_server.audit logger. A GitHub Actions workflow pins the public contract: every push runs the 150-test suite on Python 3.10, 3.11, and 3.12.

Setup

Three new source files, one new test file, one new CI workflow file, and one edit to the bearer middleware so it consumes the cache. No new runtime dependencies — the limiter is a Lock-guarded dict, the cache is a Lock-guarded dict, and the audit logger is logging.getLogger plus json.dumps.

  • New: src/mcp_server/rate_limit.pyRateLimitConfig, TokenBucketLimiter, and RateLimitMiddleware. Defaults of 30 tokens with a 10 / second refill are overridable via MCP_RATE_LIMIT_CAPACITY and MCP_RATE_LIMIT_REFILL_PER_SECOND. Exhaustion produces a JSON-RPC-shaped 429 with code = -32002 and a Retry-After header.

  • New: src/mcp_server/token_cache.pyTokenCache, a TTL dict keyed by the raw bearer string. Eviction is lazy on lookup-after-expiry and opportunistic on insert; the cap is MCP_TOKEN_CACHE_MAX (default 1024). Entries skip the cache entirely when the JWT lacks an exp claim, so an unbounded token can never poison the table.

  • New: src/mcp_server/audit.pyaudit(event, **fields) emits one json.dumps-encoded record on the mcp_server.audit logger. Seven event constants — auth.no_credentials, auth.invalid_token, auth.token_ok, auth.token_cache_hit, auth.insufficient_scope, tool.invoke, rate.limited — cover every decision the request path makes.

  • Edit: src/mcp_server/auth.py — the bearer guard now consults the TokenCache before calling jwt_codec.decode, emits EVENT_AUTH_TOKEN_OK on first verification and EVENT_AUTH_TOKEN_CACHE_HIT on subsequent lookups, and logs every rejection through the same audit helper.

  • Edit: src/mcp_server/server.pycreate_app(...) now installs RateLimitMiddleware ahead of BearerGuardMiddleware, audits insufficient-scope rejections, and emits one tool.invoke record per tools/call carrying the status (ok, error, invalid_params).

  • New: tests/test_hardening.py — nineteen tests covering the limiter unit, the limiter middleware, the cache unit, cache reuse on the live request path, every audit event, and a presence check on the CI workflow file.

  • New: .github/workflows/ci.yml — a smoke workflow that runs pytest -ra -q plus a python -c "import mcp_server" smoke import across a Python 3.10 / 3.11 / 3.12 matrix on every push and pull request.

Implementation

The limiter is intentionally tiny. A _Bucket dataclass holds tokens and last_refill; try_acquire refills the bucket against a monotonic clock and either decrements by one or returns the seconds-until-next-token to the caller. A Lock wraps the dict so concurrent ASGI workers can't race the refill, and the clock is injectable so the tests can step time forward without sleep.

def try_acquire(self, key: str) -> tuple[bool, float]:
    with self._lock:
        now = self._clock()
        bucket = self._buckets.setdefault(
            key, _Bucket(tokens=self.config.capacity, last_refill=now)
        )
        self._refill(bucket, now)
        if bucket.tokens >= 1.0:
            bucket.tokens -= 1.0
            return True, 0.0
        return False, self._wait_for_one(bucket)

The middleware sits in front of the bearer guard so a flood from one identifier is rejected before any HMAC happens. Keying on the raw Authorization header (or the client IP when the header is absent) gives each authenticated caller its own bucket — test_rate_limit_keys_authenticated_callers_separately pins this by showing token B succeeds at the same instant token A is throttled. Crucially, protected_paths is a tuple, so /healthz and /.well-known/oauth-protected-resource are deliberately unguarded; a probe loop or a discovery client can never trip the limiter and lose the ability to fetch a token.

The token cache is the symmetric piece on the read side. The bearer guard now wraps its jwt_codec.decode call in a one-line dict lookup, and on a hit returns the cached claims plus a cache_hit=True flag the audit helper turns into EVENT_AUTH_TOKEN_CACHE_HIT. Entries are evicted lazily on lookup past their exp and opportunistically on insert; an entry whose JWT omits exp is silently dropped on put so an unbounded token can never sit in memory forever.

def _verify_token(self, token: str) -> tuple[dict[str, Any] | None, str, bool]:
    cached = self.token_cache.get(token)
    if cached is not None:
        return dict(cached), "", True
    try:
        claims = jwt_codec.decode(
            token,
            self.config.signing_key,
            audience=self.config.audience,
            issuer=self.config.issuer,
            algorithms=self.config.algorithms,
        )
    except JWTError as exc:
        return None, str(exc), False
    self.token_cache.put(token, claims)
    return claims, "", False

The audit module is the smallest surface of the three. One audit() helper builds a dict carrying the event name, an integer epoch timestamp, and the supplied keyword fields, then ships it through json.dumps(..., sort_keys=True) to a dedicated mcp_server.audit logger. No handler is attached by default — production wires a stdout JSON formatter, tests wire caplog — so the module never decides for the embedder where the bytes go.

def audit(event: str, **fields: Any) -> None:
    record: dict[str, Any] = {"event": event, "ts": int(time.time())}
    record.update({k: v for k, v in fields.items() if v is not None})
    get_logger().info(json.dumps(record, sort_keys=True, default=str))

The call sites are sprinkled exactly where a decision happens: the bearer guard emits auth.no_credentials / auth.invalid_token on the rejection paths and auth.token_ok / auth.token_cache_hit on success, the RPC dispatcher emits auth.insufficient_scope with the sorted missing-scope list before returning 403, and tools/call emits tool.invoke carrying the tool name, the sub claim from the JWT, and a status (ok, error, invalid_params). The rate limiter middleware emits rate.limited with the caller key and the computed Retry-After.

The CI workflow is the last piece. A single smoke job checks out, installs the package with the [dev] extras, runs pytest -ra -q, and then does a python -c "import mcp_server; print('mcp_server import ok')" smoke import. The matrix pins three Python versions so a stdlib regression in any one of them blocks the merge, and fail-fast: false keeps the full matrix visible instead of cancelling on the first red cell.

Verification

Run the whole suite from the codebase root. The original 131 tests from steps 1 through 8 still pass; the 19 new tests in test_hardening.py cover the limiter, the cache, the live cache-reuse path, every audit event, and the CI workflow file. Total: 150 tests, well under a second.

.venv/bin/python -m pytest -o addopts=""
============================= test session starts ==============================
platform darwin -- Python 3.11.11, pytest-9.0.3, pluggy-1.6.0
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.13.0
collected 150 items

tests/test_authz_server.py ......................                        [ 14%]
tests/test_challenge.py ..............                                   [ 24%]
tests/test_e2e_client.py ................                                [ 34%]
tests/test_hardening.py ...................                              [ 47%]
tests/test_jwt_codec.py ...........                                      [ 54%]
tests/test_jwt_guard.py ..........                                       [ 61%]
tests/test_metadata.py .....................                             [ 75%]
tests/test_server.py ..............                                      [ 84%]
tests/test_tool_scopes.py ........................                       [100%]

======================== 150 passed, 1 warning in 0.67s ========================

Drilling into the hardening file alone makes the new coverage explicit — three limiter unit tests, three middleware tests covering the 429 response, the unguarded /healthz, and per-caller key isolation, four cache unit tests, one cache reuse test that monkeypatches jwt_codec.decode and asserts it runs exactly once across four requests, seven audit-event tests, and one CI workflow assertion.

.venv/bin/python -m pytest tests/test_hardening.py -v -o addopts=""
tests/test_hardening.py::test_token_bucket_acquires_until_capacity_exhausted PASSED
tests/test_hardening.py::test_token_bucket_refills_at_configured_rate PASSED
tests/test_hardening.py::test_token_bucket_keys_are_independent PASSED
tests/test_hardening.py::test_rate_limit_middleware_returns_429_after_capacity PASSED
tests/test_hardening.py::test_rate_limit_does_not_protect_healthz PASSED
tests/test_hardening.py::test_rate_limit_keys_authenticated_callers_separately PASSED
tests/test_hardening.py::test_token_cache_returns_none_for_unknown_token PASSED
tests/test_hardening.py::test_token_cache_round_trips_claims_until_expiry PASSED
tests/test_hardening.py::test_token_cache_skips_entries_without_exp_claim PASSED
tests/test_hardening.py::test_token_cache_evicts_expired_entries_to_make_room PASSED
tests/test_hardening.py::test_bearer_middleware_reuses_cached_claims PASSED
tests/test_hardening.py::test_audit_logs_no_credentials PASSED
tests/test_hardening.py::test_audit_logs_invalid_token_with_reason PASSED
tests/test_hardening.py::test_audit_logs_successful_token_then_cache_hit PASSED
tests/test_hardening.py::test_audit_logs_insufficient_scope PASSED
tests/test_hardening.py::test_audit_logs_tool_invocation_status PASSED
tests/test_hardening.py::test_audit_logs_rate_limit_block PASSED
tests/test_hardening.py::test_audit_helper_emits_sorted_json_lines PASSED
tests/test_hardening.py::test_ci_workflow_file_exists_and_invokes_pytest PASSED

======================== 19 passed, 1 warning in 0.18s =========================

What we built

The resource server now has four production-shaped guards bolted to the working OAuth core, and none of them moved the on-the-wire contract. Step 8's drive-by harness still runs unchanged against the same /mcp endpoint; the difference is that step 8's harness now hits a server that refuses to spin its CPU under a flood and surfaces every authn / authz decision to an aggregator.

The rate limiter is the cheapest insurance in the stack. A Lock-guarded token bucket keyed on the bearer (or IP) means one misbehaving caller burns its own bucket before any HMAC happens, and the JSON-RPC-shaped 429 plus Retry-After header gives a well-written client enough to back off automatically. /healthz and the discovery metadata stay unguarded by design — a probe loop or a fresh client must never lose the ability to fetch a token because of throttling.

The token cache turns the steady state into a dict lookup. The bearer middleware now records a single auth.token_ok on first verification and an arbitrary number of auth.token_cache_hit records on every reuse, so the access pattern of every caller is visible without re-running HMAC + JSON decoding. Lazy expiry eviction plus a MCP_TOKEN_CACHE_MAX ceiling cap the memory footprint, and tokens missing exp are silently refused on insert so an unbounded credential can never sit in memory.

The structured audit log is the piece an operator actually reads. Seven mcp_server.audit events — auth.no_credentials, auth.invalid_token, auth.token_ok, auth.token_cache_hit, auth.insufficient_scope, tool.invoke, rate.limited — cover every decision the request path makes. Each line is one json.dumps-encoded dict with sorted keys and a monotonic ts field, so a log aggregator can parse the stream without a regex, and the embedding application picks the handler. The smoke CI workflow is the final guarantee: every push runs the full 150-test suite on three Python versions, so the public contract of the resource server can't silently regress.

Repository

The state of the code after this step: 926d4de

Repository

Full source at https://github.com/vytharion/mcp-tool-oauth-www-authenticate-meta.

Walk the lessons by stepping through the git commits in the repo — each major step has its own commit you can git checkout and rerun.