Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 283 additions & 0 deletions README.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Need to explain how to use the access token once you get it.
  2. How does this work when the client ID is expired?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended the snippet and section to cover following points:

  1. Need to explain how to use the access token once you get it.
  2. How does this work when the client ID is expired?

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
- [Writing MCP Clients](#writing-mcp-clients)
- [Client Display Utilities](#client-display-utilities)
- [OAuth Authentication for Clients](#oauth-authentication-for-clients)
- [Enterprise Managed Authorization](#enterprise-managed-authorization)
- [Parsing Tool Results](#parsing-tool-results)
- [MCP Primitives](#mcp-primitives)
- [Server Capabilities](#server-capabilities)
Expand Down Expand Up @@ -2356,6 +2357,288 @@ _Full example: [examples/snippets/clients/oauth_client.py](https://github.com/mo

For a complete working example, see [`examples/clients/simple-auth-client/`](examples/clients/simple-auth-client/).

#### Enterprise Managed Authorization

The SDK includes support for Enterprise Managed Authorization (SEP-990), which enables MCP clients to connect to protected servers using enterprise Single Sign-On (SSO) systems. This implementation supports:

- **RFC 8693**: OAuth 2.0 Token Exchange (ID Token → ID-JAG)
- **RFC 7523**: JSON Web Token (JWT) Profile for OAuth 2.0 Authorization Grants (ID-JAG → Access Token)
- Integration with enterprise identity providers (Okta, Azure AD, etc.)

**Key Components:**

The `EnterpriseAuthOAuthClientProvider` class extends the standard OAuth provider to implement the enterprise authorization flow:

**Token Exchange Flow:**

1. **Obtain ID Token** from your enterprise IdP (e.g., Okta, Azure AD)
2. **Exchange ID Token for ID-JAG** using RFC 8693 Token Exchange
3. **Exchange ID-JAG for Access Token** using RFC 7523 JWT Bearer Grant
4. **Use Access Token** to call protected MCP server tools

**Using the Access Token with MCP Server:**

1. Once you have obtained the access token, you can use it to authenticate requests to the MCP server
2. The access token is automatically included in all subsequent requests to the MCP server, allowing you to access protected tools and resources based on your enterprise identity and permissions.

**Handling Token Expiration and Refresh:**

Access tokens have a limited lifetime and will expire. When tokens expire:

- **Check Token Expiration**: Use the `expires_in` field to determine when the token expires
- **Refresh Flow**: When expired, repeat the token exchange flow with a fresh ID token from your IdP
- **Automatic Refresh**: Implement automatic token refresh before expiration (recommended for production)
- **Error Handling**: Catch authentication errors and retry with refreshed tokens

**Important Notes:**

- **ID Token Expiration**: If the ID token from your IdP expires, you must re-authenticate with the IdP to obtain a new ID token before performing token exchange
- **Token Storage**: Store tokens securely and implement the `TokenStorage` interface to persist tokens between application restarts
- **Scope Changes**: If you need different scopes, you must obtain a new ID token from the IdP with the required scopes
- **Security**: Never log or expose access tokens or ID tokens in production environments

**Example Usage:**

<!-- snippet-source examples/snippets/clients/enterprise_managed_auth_client.py -->
```python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this instead be put in the snippets folder and then use the scripts/update_readme_snippets.py script to update the README accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Moved the snippet to a new file under examples/snippets/clients/enterprise_managed_auth_client.py. And generated a snippet in README.md using the script

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Any

import httpx
from pydantic import AnyUrl

from mcp import ClientSession
from mcp.client.auth import OAuthTokenError, TokenStorage
from mcp.client.auth.extensions import (
EnterpriseAuthOAuthClientProvider,
TokenExchangeParameters,
)
from mcp.client.sse import sse_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from mcp.types import CallToolResult


# Placeholder function for IdP authentication
async def get_id_token_from_idp() -> str:
"""
Placeholder function to get ID token from your IdP.
In production, implement actual IdP authentication flow.
"""
raise NotImplementedError("Implement your IdP authentication flow here")


# Define token storage implementation
class SimpleTokenStorage(TokenStorage):
def __init__(self) -> None:
self._tokens: OAuthToken | None = None
self._client_info: OAuthClientInformationFull | None = None

async def get_tokens(self) -> OAuthToken | None:
return self._tokens

async def set_tokens(self, tokens: OAuthToken) -> None:
self._tokens = tokens

async def get_client_info(self) -> OAuthClientInformationFull | None:
return self._client_info

async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
self._client_info = client_info


def is_token_expired(access_token: OAuthToken) -> bool:
"""Check if the access token has expired."""
if access_token.expires_in:
# Calculate expiration time
issued_at = datetime.now(timezone.utc)
expiration_time = issued_at + timedelta(seconds=access_token.expires_in)
return datetime.now(timezone.utc) >= expiration_time
return False


async def refresh_access_token(
enterprise_auth: EnterpriseAuthOAuthClientProvider,
client: httpx.AsyncClient,
id_token: str,
) -> OAuthToken:
"""Refresh the access token when it expires."""
try:
# Update token exchange parameters with fresh ID token
enterprise_auth.token_exchange_params.subject_token = id_token

# Re-exchange for new ID-JAG
id_jag = await enterprise_auth.exchange_token_for_id_jag(client)

# Get new access token
access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag)
return access_token
except Exception as e:
print(f"Token refresh failed: {e}")
# Re-authenticate with IdP if ID token is also expired
id_token = await get_id_token_from_idp()
return await refresh_access_token(enterprise_auth, client, id_token)


async def call_tool_with_retry(
session: ClientSession,
tool_name: str,
arguments: dict[str, Any],
enterprise_auth: EnterpriseAuthOAuthClientProvider,
client: httpx.AsyncClient,
id_token: str,
) -> CallToolResult | None:
"""Call a tool with automatic retry on token expiration."""
max_retries = 1

for attempt in range(max_retries + 1):
try:
result = await session.call_tool(tool_name, arguments)
return result
except OAuthTokenError:
if attempt < max_retries:
print("Token expired, refreshing...")
# Refresh token and reconnect
_access_token = await refresh_access_token(enterprise_auth, client, id_token)
# Note: In production, you'd need to reconnect the session here
else:
raise
return None


async def main() -> None:
# Step 1: Get ID token from your IdP (example with Okta)
id_token = await get_id_token_from_idp() # Your IdP authentication

# Step 2: Configure token exchange parameters
token_exchange_params = TokenExchangeParameters.from_id_token(
id_token=id_token,
mcp_server_auth_issuer="https://your-idp.com", # IdP issuer URL
mcp_server_resource_id="https://mcp-server.example.com", # MCP server resource ID
scope="mcp:tools mcp:resources", # Optional scopes
)

# Step 3: Create enterprise auth provider
enterprise_auth = EnterpriseAuthOAuthClientProvider(
server_url="https://mcp-server.example.com",
client_metadata=OAuthClientMetadata(
client_name="Enterprise MCP Client",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["urn:ietf:params:oauth:grant-type:jwt-bearer"],
response_types=["token"],
),
storage=SimpleTokenStorage(),
idp_token_endpoint="https://your-idp.com/oauth2/v1/token",
token_exchange_params=token_exchange_params,
)

async with httpx.AsyncClient() as client:
# Step 4: Exchange ID token for ID-JAG
id_jag = await enterprise_auth.exchange_token_for_id_jag(client)
print(f"Obtained ID-JAG: {id_jag[:50]}...")

# Step 5: Exchange ID-JAG for access token
access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag)
print(f"Access token obtained, expires in: {access_token.expires_in}s")

# Step 6: Check if token is expired (for demonstration)
if is_token_expired(access_token):
print("Token is expired, refreshing...")
access_token = await refresh_access_token(enterprise_auth, client, id_token)

# Step 7: Use the access token to connect to MCP server
headers = {"Authorization": f"Bearer {access_token.access_token}"}

async with sse_client(url="https://mcp-server.example.com", headers=headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()

# Call tools with automatic retry on token expiration
result = await call_tool_with_retry(
session, "enterprise_tool", {"param": "value"}, enterprise_auth, client, id_token
)
if result:
print(f"Tool result: {result.content}")

# List available resources
resources = await session.list_resources()
for resource in resources.resources:
print(f"Resource: {resource.uri}")


async def maintain_active_session(
enterprise_auth: EnterpriseAuthOAuthClientProvider,
mcp_server_url: str,
) -> None:
"""Maintain an active session with automatic token refresh."""
id_token_var = await get_id_token_from_idp()

async with httpx.AsyncClient() as client:
while True:
try:
# Update token exchange params with current ID token
enterprise_auth.token_exchange_params.subject_token = id_token_var

# Get access token
id_jag = await enterprise_auth.exchange_token_for_id_jag(client)
access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag)

# Calculate refresh time (refresh before expiration)
refresh_in = access_token.expires_in - 60 if access_token.expires_in else 300

# Use the token for MCP operations
headers = {"Authorization": f"Bearer {access_token.access_token}"}
async with sse_client(mcp_server_url, headers=headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()

# Perform operations...
# Schedule refresh before token expires
await asyncio.sleep(refresh_in)

except Exception as e:
print(f"Session error: {e}")
# Re-authenticate with IdP
id_token_var = await get_id_token_from_idp()
await asyncio.sleep(5) # Wait before retry


if __name__ == "__main__":
asyncio.run(main())
```

_Full example: [examples/snippets/clients/enterprise_managed_auth_client.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/clients/enterprise_managed_auth_client.py)_
<!-- /snippet-source -->

**Working with SAML Assertions:**

If your enterprise uses SAML instead of OIDC, you can exchange SAML assertions:

```python
token_exchange_params = TokenExchangeParameters.from_saml_assertion(
saml_assertion=saml_assertion_string,
mcp_server_auth_issuer="https://your-idp.com",
mcp_server_resource_id="https://mcp-server.example.com",
scope="mcp:tools",
)
```

**Decoding and Inspecting ID-JAG Tokens:**

You can decode ID-JAG tokens to inspect their claims:

```python
from mcp.client.auth.extensions import decode_id_jag

# Decode without signature verification (for inspection only)
claims = decode_id_jag(id_jag)
print(f"Subject: {claims.sub}")
print(f"Issuer: {claims.iss}")
print(f"Audience: {claims.aud}")
print(f"Client ID: {claims.client_id}")
print(f"Resource: {claims.resource}")
```

### Parsing Tool Results

When calling tools through MCP, the `CallToolResult` object contains the tool's response in a structured format. Understanding how to parse this result is essential for properly handling tool outputs.
Expand Down
Loading