diff --git a/EXAMPLES.md b/EXAMPLES.md index a014820..f1aea7d 100644 --- a/EXAMPLES.md +++ b/EXAMPLES.md @@ -1,7 +1,24 @@ -# Auth0 FastAPI-API Examples +# Examples This document provides examples for using the `auth0-fastapi-api` package to secure your FastAPI applications with Auth0. + +- [Bearer Authentication](#bearer-authentication) +- [Scope Validation](#scope-validation) +- [DPoP Authentication](#dpop-authentication) + - [Accept both Bearer and DPoP tokens (default)](#accept-both-bearer-and-dpop-tokens-default) + - [Require only DPoP tokens](#require-only-dpop-tokens) + - [Require only Bearer tokens](#require-only-bearer-tokens) +- [Multiple Custom Domains (MCD)](#multiple-custom-domains-mcd) + - [Static domain list](#static-domain-list) + - [Dynamic resolver function](#dynamic-resolver-function) + - [Hybrid mode](#hybrid-mode) + - [With DPoP](#with-dpop) + - [With custom cache configuration](#with-custom-cache-configuration) +- [Reverse Proxy Support](#reverse-proxy-support) + +--- + ## Bearer Authentication ```python @@ -24,12 +41,31 @@ curl -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ http://localhost:8000/api/protected ``` +--- + +## Scope Validation + +```python +@app.get("/api/admin") +async def admin_route(claims=Depends(auth0.require_auth(scopes=["admin:access"]))): + return {"message": "Admin access granted"} + +@app.delete("/api/resource") +async def delete_route( + claims=Depends(auth0.require_auth(scopes=["delete:data", "admin:access"])) +): + """Requires BOTH scopes.""" + return {"message": "Resource deleted"} +``` + +--- + ## DPoP Authentication > [!NOTE] > DPoP is in Early Access. Contact Auth0 support to enable it. -**Mixed Mode (default)** - Accept both Bearer and DPoP: +### Accept both Bearer and DPoP tokens (default) ```python auth0 = Auth0FastAPI( @@ -47,7 +83,7 @@ curl -H "Authorization: DPoP YOUR_ACCESS_TOKEN" \ http://localhost:8000/api/protected ``` -**DPoP Required Mode** - Reject Bearer tokens: +### Require only DPoP tokens ```python auth0 = Auth0FastAPI( @@ -57,7 +93,7 @@ auth0 = Auth0FastAPI( ) ``` -**Bearer-Only Mode** - Disable DPoP: +### Require only Bearer tokens ```python auth0 = Auth0FastAPI( @@ -66,32 +102,140 @@ auth0 = Auth0FastAPI( dpop_enabled=False ) ``` +### Reverse Proxy Support -## Scope Validation +Enable X-Forwarded-* header trust for DPoP behind proxies: ```python -@app.get("/api/admin") -async def admin_route(claims=Depends(auth0.require_auth(scopes=["admin:access"]))): - return {"message": "Admin access granted"} +app = FastAPI() +app.state.trust_proxy = True # Required for load balancers/CDN -@app.delete("/api/resource") -async def delete_route( - claims=Depends(auth0.require_auth(scopes=["delete:data", "admin:access"])) -): - """Requires BOTH scopes.""" - return {"message": "Resource deleted"} +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier" +) ``` -## Reverse Proxy Support +--- -Enable X-Forwarded-* header trust for DPoP behind proxies: +## Multiple Custom Domains (MCD) + +### Static domain list ```python +from fastapi import FastAPI, Depends +from fastapi_plugin.fast_api_client import Auth0FastAPI + app = FastAPI() -app.state.trust_proxy = True # Required for load balancers/CDN +auth0 = Auth0FastAPI( + domains=["tenant1.us.auth0.com", "tenant2.eu.auth0.com"], + audience="your-api-identifier" +) + +@app.get("/api/protected") +async def protected_route(claims=Depends(auth0.require_auth())): + return {"user_id": claims["sub"]} +``` + +```bash +# Token from either domain is accepted +curl -H "Authorization: Bearer TOKEN_FROM_TENANT1" \ + http://localhost:8000/api/protected +``` + +### Dynamic resolver function + +```python +from fastapi_plugin import Auth0FastAPI, DomainsResolverContext + +def resolve_domains(context: DomainsResolverContext) -> list: + """Resolve allowed domains based on request context.""" + # context['unverified_iss'] - issuer from the token (before verification) + # context.get('request_url') - the API request URL + # context.get('request_headers') - the API request headers + return ["tenant1.us.auth0.com", "auth.example.com"] auth0 = Auth0FastAPI( - domain="your-domain.auth0.com", + domains=resolve_domains, audience="your-api-identifier" ) ``` + +> [!WARNING] +> `DomainsResolver` functions often rely on request headers such as `Host` or `X-Forwarded-Host`. These headers can be spoofed by clients unless your FastAPI instance is behind a trusted proxy and you have a clear trust boundary. Always validate/allowlist hosts and only honor forwarded headers from trusted infrastructure. + +### Hybrid mode + +Use `domain` and `domains` together for zero-downtime domain migration scenarios: + +```python +auth0 = Auth0FastAPI( + domain="primary.us.auth0.com", + domains=["primary.us.auth0.com", "new-domain.example.com"], + audience="your-api-identifier", + client_id="YOUR_CLIENT_ID", + client_secret="YOUR_CLIENT_SECRET" +) +``` + +### With DPoP + +```python +auth0 = Auth0FastAPI( + domains=["tenant1.us.auth0.com", "tenant2.eu.auth0.com"], + audience="your-api-identifier", + dpop_enabled=True, + dpop_required=False +) +``` + +### With custom cache configuration + +```python +from fastapi_plugin import Auth0FastAPI + +# Option 1: Use default InMemoryCache with custom config (recommended) +auth0 = Auth0FastAPI( + domains=["tenant1.us.auth0.com", "tenant2.eu.auth0.com"], + audience="your-api-identifier", + cache_ttl_seconds=1200, # Cache TTL + cache_max_entries=200 # Max cached entries +) + +# Option 2: Provide pre-configured cache adapter +from fastapi_plugin import InMemoryCache + +auth0 = Auth0FastAPI( + domains=["tenant1.us.auth0.com", "tenant2.eu.auth0.com"], + audience="your-api-identifier", + cache_adapter=InMemoryCache(max_entries=200), # Configure here + cache_ttl_seconds=1200 + # Note: cache_max_entries is ignored when cache_adapter is provided +) + +# Option 3: Custom cache implementation (Redis, etc.) +from fastapi_plugin import CacheAdapter + +class RedisCache(CacheAdapter): + def __init__(self, redis_client): + self.redis = redis_client + + def get(self, key: str): + return self.redis.get(key) + + def set(self, key: str, value, ttl_seconds=None): + self.redis.set(key, value, ex=ttl_seconds) + + def delete(self, key: str): + self.redis.delete(key) + + def clear(self): + self.redis.flushdb() + +auth0 = Auth0FastAPI( + domains=["tenant1.us.auth0.com", "tenant2.eu.auth0.com"], + audience="your-api-identifier", + cache_adapter=RedisCache(redis_client), + cache_ttl_seconds=1200 +) +``` diff --git a/README.md b/README.md index 5fb4226..351cc1b 100644 --- a/README.md +++ b/README.md @@ -285,6 +285,55 @@ asyncio.run(main()) More info https://auth0.com/docs/secure/tokens/token-vault +### 7. Multiple Custom Domains (MCD) + +The SDK supports accepting tokens from multiple Auth0 custom domains, enabling multi-tenant applications, zero-downtime domain migrations, and regional deployments. + +#### Static Domain List +```python +auth0 = Auth0FastAPI( + domains=["your-tenant.auth0.com", "custom-domain.example.com"], + audience="your-api-identifier" +) +``` + +#### Dynamic Domain Resolver + +**Dynamic Resolver:** +```python +from fastapi_plugin import DomainsResolverContext + +def resolve_domains(context: DomainsResolverContext) -> list: + """Resolve allowed domains based on request context.""" + # Access unverified issuer, request URL, and headers + return ["your-tenant.auth0.com", "custom-domain.example.com"] + +auth0 = Auth0FastAPI( + domains=resolve_domains, + audience="your-api-identifier" +) +``` + +**Hybrid Mode:** +```python +auth0 = Auth0FastAPI( + domain="primary.us.auth0.com", # Used for token exchange flows + domains=["primary.us.auth0.com", "new-domain.example.com"], # Both accepted for verification + audience="your-api-identifier", + client_id="YOUR_CLIENT_ID", + client_secret="YOUR_CLIENT_SECRET" +) +``` + +#### Key Features + +- **Double Issuer Validation:** Pre-signature and post-signature issuer checks prevent issuer confusion attacks +- **Per-Issuer Caching:** OIDC metadata and JWKS are cached separately for each domain with configurable TTL and LRU eviction +- **DPoP Compatible:** Full DPoP support works seamlessly across multiple domains +- **Custom Cache Backends:** Plug in Redis, Memcached, or other cache implementations via the `CacheAdapter` interface + +📖 **For detailed examples including dynamic resolvers, cache configuration, and DPoP integration, see the [Multiple Custom Domains section in EXAMPLES.md](EXAMPLES.md#multiple-custom-domains-mcd).** + ## Feedback ### Contributing diff --git a/fastapi_plugin/__init__.py b/fastapi_plugin/__init__.py index 593c2d1..f3d08b0 100644 --- a/fastapi_plugin/__init__.py +++ b/fastapi_plugin/__init__.py @@ -1,3 +1,19 @@ from .fast_api_client import Auth0FastAPI +from auth0_api_python import ( + CacheAdapter, + ConfigurationError, + DomainsResolver, + DomainsResolverContext, + DomainsResolverError, + InMemoryCache, +) -__all__ = ["Auth0FastAPI"] \ No newline at end of file +__all__ = [ + "Auth0FastAPI", + "CacheAdapter", + "ConfigurationError", + "DomainsResolver", + "DomainsResolverContext", + "DomainsResolverError", + "InMemoryCache", +] diff --git a/fastapi_plugin/fast_api_client.py b/fastapi_plugin/fast_api_client.py index fa23ffb..2d73d9f 100644 --- a/fastapi_plugin/fast_api_client.py +++ b/fastapi_plugin/fast_api_client.py @@ -1,10 +1,11 @@ -from typing import Optional, List, Union, Dict +from typing import Optional, List, Union, Dict, Callable from fastapi import Request, HTTPException from starlette.responses import Response from .utils import validate_scopes, http_exception, get_canonical_url from auth0_api_python.api_client import ApiClient, ApiClientOptions, BaseAuthError +from auth0_api_python.cache import CacheAdapter class Auth0FastAPI: @@ -14,41 +15,59 @@ class Auth0FastAPI: """ def __init__( - self, - domain: str, - audience: str, - client_id=None, - client_secret=None, - custom_fetch=None, - dpop_enabled=True, - dpop_required=False, - dpop_iat_leeway=30, - dpop_iat_offset=300): + self, + domain: Optional[str] = None, + audience: str = "", + domains: Optional[Union[List[str], Callable]] = None, + client_id=None, + client_secret=None, + custom_fetch=None, + dpop_enabled=True, + dpop_required=False, + dpop_iat_leeway=30, + dpop_iat_offset=300, + cache_adapter: Optional[CacheAdapter] = None, + cache_ttl_seconds: int = 600, + cache_max_entries: int = 100): """ - domain: Your Auth0 domain (like 'my-tenant.us.auth0.com') + domain: Your Auth0 domain (like 'my-tenant.us.auth0.com'). + Use for single-domain mode. Optional if 'domains' is provided. audience: API identifier from the Auth0 Dashboard + domains: List of allowed domain strings or a callable resolver function for + multi-custom domain (MCD) mode. Optional if 'domain' is provided. + Callable signature: (DomainsResolverContext) -> list[str] + client_id: Client ID for token exchange flows + client_secret: Client secret for token exchange flows custom_fetch: optional HTTP fetch override for the underlying SDK dpop_enabled: Enable DPoP support (default: True) dpop_required: Require DPoP authentication, reject Bearer tokens (default: False) dpop_iat_leeway: Clock skew tolerance for DPoP proof iat claim in seconds (default: 30) dpop_iat_offset: Maximum DPoP proof age in seconds (default: 300) + cache_adapter: Custom cache backend implementing CacheAdapter (default: InMemoryCache) + Note: When providing cache_adapter, configure it directly. + The cache_max_entries parameter only applies when cache_adapter is None. + cache_ttl_seconds: Cache time-to-live in seconds (default: 600) + cache_max_entries: Maximum cache entries before LRU eviction (default: 100) + Ignored when cache_adapter is provided. """ - if not domain: - raise ValueError("domain is required.") if not audience: raise ValueError("audience is required.") self.api_client = ApiClient( ApiClientOptions( - domain=domain, - audience=audience, - client_id=client_id, - client_secret=client_secret, + domain=domain, + audience=audience, + domains=domains, + client_id=client_id, + client_secret=client_secret, custom_fetch=custom_fetch, dpop_enabled=dpop_enabled, dpop_required=dpop_required, dpop_iat_leeway=dpop_iat_leeway, - dpop_iat_offset=dpop_iat_offset + dpop_iat_offset=dpop_iat_offset, + cache_adapter=cache_adapter, + cache_ttl_seconds=cache_ttl_seconds, + cache_max_entries=cache_max_entries ) ) diff --git a/tests/conftest.py b/tests/conftest.py index 553793a..7a589df 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,18 @@ +from typing import Dict, List + from pytest_httpx import HTTPXMock -from .test_utils import PUBLIC_DPOP_JWK +from .test_utils import PUBLIC_DPOP_JWK, PRIVATE_JWK + + +# RSA public key used across all test domains (shared key for simplicity) +RSA_PUBLIC_KEY = { + "kty": "RSA", + "kid": "TEST_KEY", + "n": PRIVATE_JWK["n"], + "e": PRIVATE_JWK["e"], + "alg": "RS256", + "use": "sig" +} def setup_mocks(httpx_mock: HTTPXMock): @@ -17,15 +30,43 @@ def setup_mocks(httpx_mock: HTTPXMock): url="https://auth0.local/.well-known/jwks.json", json={ "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, + RSA_PUBLIC_KEY, PUBLIC_DPOP_JWK ] } - ) \ No newline at end of file + ) + + +def setup_mcd_mocks(httpx_mock: HTTPXMock, domains: List[str]): + """Setup OIDC and JWKS mocks for multiple domains in MCD tests. + + Each domain gets its own .well-known/openid-configuration and + .well-known/jwks.json endpoints, all using the same RSA test key. + + Args: + httpx_mock: pytest-httpx HTTPXMock fixture + domains: List of domain strings (e.g., ["tenant1.auth0.com", "tenant2.auth0.com"]) + """ + for domain in domains: + # Strip protocol and trailing slash if present + clean_domain = domain.replace("https://", "").replace("http://", "").rstrip("/") + base_url = f"https://{clean_domain}" + + httpx_mock.add_response( + method="GET", + url=f"{base_url}/.well-known/openid-configuration", + json={ + "issuer": f"{base_url}/", + "jwks_uri": f"{base_url}/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url=f"{base_url}/.well-known/jwks.json", + json={ + "keys": [ + RSA_PUBLIC_KEY, + PUBLIC_DPOP_JWK + ] + } + ) \ No newline at end of file diff --git a/tests/test_bearer_token_validation.py b/tests/test_bearer_token_validation.py index eaec640..eed7112 100644 --- a/tests/test_bearer_token_validation.py +++ b/tests/test_bearer_token_validation.py @@ -9,7 +9,7 @@ from fastapi_plugin.fast_api_client import Auth0FastAPI from .test_utils import generate_token -from .conftest import setup_mocks +from .conftest import setup_mocks, setup_mcd_mocks # ============================================================================= @@ -73,6 +73,7 @@ async def test_route(claims=Depends(auth0.require_auth())): # ============================================================================= @pytest.mark.asyncio +@pytest.mark.httpx_mock(assert_all_responses_were_requested=False) async def test_missing_issuer_claim(httpx_mock: HTTPXMock): """Test that tokens without 'iss' claim are rejected with 401.""" setup_mocks(httpx_mock) @@ -103,6 +104,7 @@ async def test_route(claims=Depends(auth0.require_auth())): @pytest.mark.asyncio +@pytest.mark.httpx_mock(assert_all_responses_were_requested=False) async def test_invalid_issuer_claim(httpx_mock: HTTPXMock): """Test that tokens with mismatched issuer are rejected with 401.""" setup_mocks(httpx_mock) @@ -201,3 +203,254 @@ async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): assert response.status_code == 403 json_body = response.json() assert json_body["detail"]["error"] == "insufficient_scope" + + +# ============================================================================= +# MCD Token Verification Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_mcd_valid_token_from_allowed_domain(httpx_mock: HTTPXMock): + """Test that a valid token from a domain in the static list is accepted.""" + setup_mcd_mocks(httpx_mock, ["tenant1.auth0.com"]) + + access_token = await generate_token( + domain="tenant1.auth0.com", + user_id="user_123", + audience="", + issuer="https://tenant1.auth0.com/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com", "tenant2.auth0.com"], + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_mcd_token_from_disallowed_domain_rejected(): + """Test that a token from a domain NOT in the allowed list is rejected with 401.""" + access_token = await generate_token( + domain="attacker.auth0.com", + user_id="user_123", + audience="", + issuer="https://attacker.auth0.com/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com", "tenant2.auth0.com"], + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_token" + + +@pytest.mark.asyncio +async def test_mcd_with_callable_resolver_success(httpx_mock: HTTPXMock): + """Test MCD with a callable resolver that accepts the issuer.""" + setup_mcd_mocks(httpx_mock, ["dynamic.auth0.com"]) + + def domain_resolver(context): + return ["dynamic.auth0.com"] + + access_token = await generate_token( + domain="dynamic.auth0.com", + user_id="user_123", + audience="", + issuer="https://dynamic.auth0.com/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=domain_resolver, + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_mcd_resolver_rejects_issuer(): + """Test that a resolver returning domains not matching the issuer results in 401.""" + def restrictive_resolver(context): + return ["allowed.auth0.com"] + + access_token = await generate_token( + domain="unknown.auth0.com", + user_id="user_123", + audience="", + issuer="https://unknown.auth0.com/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=restrictive_resolver, + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_token" + + +@pytest.mark.asyncio +async def test_mcd_resolver_throws_exception(): + """Test that a resolver that throws an exception results in 500.""" + def broken_resolver(context): + raise ValueError("Database unavailable") + + access_token = await generate_token( + domain="some.auth0.com", + user_id="user_123", + audience="", + issuer="https://some.auth0.com/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=broken_resolver, + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 500 + json_body = response.json() + assert json_body["detail"]["error"] == "domains_resolver_error" + + +@pytest.mark.asyncio +async def test_mcd_token_with_valid_scopes(httpx_mock: HTTPXMock): + """Test MCD token verification passes with valid scopes.""" + setup_mcd_mocks(httpx_mock, ["tenant1.auth0.com"]) + + access_token = await generate_token( + domain="tenant1.auth0.com", + user_id="user_123", + audience="", + issuer="https://tenant1.auth0.com/", + iat=True, + exp=True, + claims={"scope": "read:data write:data"} + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com"], + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_mcd_token_with_insufficient_scopes(httpx_mock: HTTPXMock): + """Test MCD token verification fails with insufficient scopes.""" + setup_mcd_mocks(httpx_mock, ["tenant1.auth0.com"]) + + access_token = await generate_token( + domain="tenant1.auth0.com", + user_id="user_123", + audience="", + issuer="https://tenant1.auth0.com/", + iat=True, + exp=True, + claims={"scope": "read:data"} + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com"], + audience="" + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 403 + json_body = response.json() + assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_client_initialization.py b/tests/test_client_initialization.py index 4f43671..2748dc1 100644 --- a/tests/test_client_initialization.py +++ b/tests/test_client_initialization.py @@ -2,7 +2,9 @@ Tests for Auth0FastAPI client initialization and configuration. Tests constructor parameters, default values, and configuration options. """ +import pytest from fastapi_plugin.fast_api_client import Auth0FastAPI +from fastapi_plugin import ConfigurationError, InMemoryCache # ============================================================================= @@ -62,3 +64,83 @@ def test_dpop_custom_timing_configuration(): assert auth0.api_client.options.dpop_iat_leeway == 60 assert auth0.api_client.options.dpop_iat_offset == 600 + + +# ============================================================================= +# MCD Configuration Tests +# ============================================================================= + +def test_mcd_initialization_with_static_domains_list(): + """Test MCD initialization with a static list of domains.""" + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com", "tenant2.auth0.com"], + audience="test_audience" + ) + + assert auth0.api_client.options.domains is not None + assert auth0.api_client.options.domain is None + + +def test_mcd_initialization_with_callable_resolver(): + """Test MCD initialization with a domain resolver function.""" + def resolver(context): + return ["tenant1.auth0.com"] + + auth0 = Auth0FastAPI( + domains=resolver, + audience="test_audience" + ) + + assert callable(auth0.api_client.options.domains) + + +def test_mcd_initialization_hybrid_mode(): + """Test that both domain and domains can be provided (hybrid mode).""" + auth0 = Auth0FastAPI( + domain="primary.auth0.com", + domains=["primary.auth0.com", "secondary.auth0.com"], + audience="test_audience" + ) + + assert auth0.api_client.options.domain == "primary.auth0.com" + assert auth0.api_client.options.domains is not None + + +def test_mcd_missing_both_domain_and_domains_raises_error(): + """Test that ConfigurationError is raised when neither domain nor domains is provided.""" + with pytest.raises(ConfigurationError): + Auth0FastAPI(audience="test_audience") + + +def test_mcd_empty_domains_list_raises_error(): + """Test that ConfigurationError is raised for an empty domains list.""" + with pytest.raises(ConfigurationError): + Auth0FastAPI(domains=[], audience="test_audience") + + +def test_mcd_cache_configuration_passthrough(): + """Test that cache parameters are passed through to ApiClientOptions.""" + custom_cache = InMemoryCache() + + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test_audience", + cache_adapter=custom_cache, + cache_ttl_seconds=1200, + cache_max_entries=200 + ) + + assert auth0.api_client.options.cache_adapter is custom_cache + assert auth0.api_client.options.cache_ttl_seconds == 1200 + assert auth0.api_client.options.cache_max_entries == 200 + + +def test_mcd_backward_compatibility_single_domain(): + """Test that existing single-domain usage still works unchanged.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test_audience" + ) + + assert auth0.api_client.options.domain == "auth0.local" + assert auth0.api_client.options.domains is None diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py index 2f69d6e..cc55e76 100644 --- a/tests/test_dpop_authentication.py +++ b/tests/test_dpop_authentication.py @@ -12,7 +12,7 @@ generate_dpop_proof, generate_dpop_bound_token ) -from .conftest import setup_mocks +from .conftest import setup_mocks, setup_mcd_mocks @pytest.mark.asyncio @@ -273,3 +273,49 @@ async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): assert response.status_code == 403 json_body = response.json() assert json_body["detail"]["error"] == "insufficient_scope" + + +# ============================================================================= +# MCD + DPoP Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_mcd_dpop_with_static_domains(httpx_mock: HTTPXMock): + """Test DPoP authentication works with MCD static domain list.""" + setup_mcd_mocks(httpx_mock, ["tenant1.auth0.com"]) + + access_token = await generate_dpop_bound_token( + domain="tenant1.auth0.com", + user_id="user_123", + audience="", + issuer="https://tenant1.auth0.com/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domains=["tenant1.auth0.com", "tenant2.auth0.com"], + audience="", + dpop_enabled=True + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123"