forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrevoke.py
More file actions
94 lines (79 loc) · 3.11 KB
/
revoke.py
File metadata and controls
94 lines (79 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from dataclasses import dataclass
from functools import partial
from typing import Any, Literal
from pydantic import BaseModel, ValidationError
from starlette.requests import Request
from starlette.responses import Response
from mcp.server.auth.errors import (
stringify_pydantic_error,
)
from mcp.server.auth.json_response import PydanticJSONResponse
from mcp.server.auth.middleware.client_auth import AuthenticationError, ClientAuthenticator
from mcp.server.auth.provider import AccessToken, OAuthAuthorizationServerProvider, RefreshToken
class RevocationRequest(BaseModel):
"""
# See https://datatracker.ietf.org/doc/html/rfc7009#section-2.1
"""
token: str
token_type_hint: Literal["access_token", "refresh_token"] | None = None
client_id: str
client_secret: str | None
class RevocationErrorResponse(BaseModel):
error: Literal["invalid_request", "unauthorized_client"]
error_description: str | None = None
@dataclass
class RevocationHandler:
provider: OAuthAuthorizationServerProvider[Any, Any, Any]
client_authenticator: ClientAuthenticator
async def handle(self, request: Request) -> Response:
"""
Handler for the OAuth 2.0 Token Revocation endpoint.
"""
try:
form_data = await request.form()
revocation_request = RevocationRequest.model_validate(dict(form_data))
except ValidationError as e:
return PydanticJSONResponse(
status_code=400,
content=RevocationErrorResponse(
error="invalid_request",
error_description=stringify_pydantic_error(e),
),
)
# Authenticate client
try:
client = await self.client_authenticator.authenticate(
revocation_request.client_id, revocation_request.client_secret
)
except AuthenticationError as e:
return PydanticJSONResponse(
status_code=401,
content=RevocationErrorResponse(
error="unauthorized_client",
error_description=e.message,
),
)
loaders = [
self.provider.load_access_token,
partial(self.provider.load_refresh_token, client),
]
if revocation_request.token_type_hint == "refresh_token":
loaders = reversed(loaders)
token: None | AccessToken | RefreshToken = None
for loader in loaders:
token = await loader(revocation_request.token)
if token is not None:
break
# if token is not found, just return HTTP 200 per the RFC
if token and token.client_id == client.client_id:
# Revoke token; provider is not meant to be able to do validation
# at this point that would result in an error
await self.provider.revoke_token(token)
# Return successful empty response
return Response(
status_code=200,
headers={
"Cache-Control": "no-store",
"Pragma": "no-cache",
},
)