Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,110 @@ s = UnstructuredClient(debug_logger=logging.getLogger("unstructured_client"))

<!-- Placeholder for Future Speakeasy SDK Sections -->

## Authentication with Client Secrets

> Available from SDK version **0.44.0** (first release carrying the
> `unstructured_client.auth` module).

If you are running against an Unstructured deployment that issues **client
secrets** (`uns_sk_...`) — e.g. Dedicated Instances or self-hosted
clusters with `DEPLOYMENT_MODE=dedicated` on account-service — the SDK
can transparently exchange that secret for a short-lived JWT, cache it,
refresh it before expiry, and send it on every request as
`Authorization: Bearer <jwt>`.

### Synchronous usage

```python
from unstructured_client import UnstructuredClient
from unstructured_client.auth import ClientCredentials

client = UnstructuredClient(
api_key_auth=ClientCredentials(
client_secret="uns_sk_...",
server_url="https://accounts.unstructuredapp.io", # account-service base URL
),
server_url="https://platform.unstructuredapp.io", # platform-api / core-product
)

# Every operation automatically carries Authorization: Bearer <jwt>.
client.general.partition(...)
```

### Asynchronous usage

```python
import asyncio
from unstructured_client import UnstructuredClient
from unstructured_client.auth import AsyncClientCredentials

async def main() -> None:
auth = AsyncClientCredentials(
client_secret="uns_sk_...",
server_url="https://accounts.unstructuredapp.io",
)
async with UnstructuredClient(api_key_auth=auth) as client:
await client.general.partition_async(...)

asyncio.run(main())
```

### Legacy API-key bridge

For deployments still using legacy api-tracking keys, the same machinery
is available through `LegacyKeyExchange` / `AsyncLegacyKeyExchange`. It
hits the same `/auth/token-exchange` endpoint with
`grant_type=api_key` and is intentionally transitional — migrate to
`ClientCredentials` once client secrets are provisioned.

```python
from unstructured_client.auth import LegacyKeyExchange

client = UnstructuredClient(
api_key_auth=LegacyKeyExchange(
api_key="your-legacy-uns_ak-key",
server_url="https://accounts.unstructuredapp.io",
),
)
```

### Behavior and tuning

- **Caching:** JWTs are held in-memory and reused until
`refresh_buffer_seconds` (default **60s**) before absolute expiry.
- **Concurrency:** sync callers share a `threading.Lock`, async callers
share an `asyncio.Lock`. Ten concurrent requests on a cold cache drive
exactly one exchange.
- **Retries:** 5xx and network errors retry with exponential backoff
(default `max_retries=3`). `400` / `401` fail fast with
`TokenExchangeError` / `InvalidCredentialError`.
- **Outage fallback:** if account-service is unreachable *and* a cached
token is still within its absolute TTL, the cached token is returned
and a warning is logged on `unstructured-client.auth`.
- **Disabled exchange:** when the server responds with
`token_exchange_enabled=False`, the call raises
`TokenExchangeDisabledError` — that deployment expects the plain
`api_key_auth="..."` string form instead.
- **Custom HTTP client:** pass `http_client=httpx.Client(...)` (or
`httpx.AsyncClient(...)` for the async variant) to share a connection
pool, route through a corporate proxy, pin a custom CA bundle for mTLS,
or otherwise control how the SDK reaches account-service. When the
argument is omitted, the auth callable lazily creates and owns a private
`httpx.Client`; that client is closed automatically when the auth
instance is garbage-collected, or you can call `close()` /
`aclose()` explicitly.

### Backward compatibility

Passing a plain string still works exactly as before:

```python
client = UnstructuredClient(api_key_auth="your-key")
```

In that case the SDK sends `unstructured-api-key: your-key` without any
token exchange, identical to pre-`auth` SDK versions.

### Maturity

This SDK is in beta, and there may be breaking changes between versions without a major version update. Therefore, we recommend pinning usage
Expand Down
115 changes: 115 additions & 0 deletions _test_unstructured_client/integration/test_client_credentials_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""End-to-end integration test for :class:`ClientCredentials` and
:class:`LegacyKeyExchange`.

This test is **opt-in**: it only runs when every required env var is set.
Point it at any deployment that has ``DEPLOYMENT_MODE=dedicated`` (or any
other configuration that accepts ``/auth/token-exchange``) and a valid
client secret provisioned via account-service.

Required env vars
-----------------

- ``UNS_ACCOUNTS_URL`` base URL of account-service (e.g.
``https://accounts.<your-deployment>.example``)
- ``UNS_CLIENT_SECRET`` ``uns_sk_...`` client secret
- ``UNS_PLATFORM_API_URL`` platform-api base URL to hit after exchange

Optional:

- ``UNS_LEGACY_API_KEY`` if set, the LegacyKeyExchange path is also
exercised against the same platform-api.

What it verifies
----------------

1. The SDK can bootstrap a :class:`ClientCredentials` and successfully
exchange the secret for a JWT against real account-service.
2. A real downstream call (``jobs.list_jobs``) goes through with
``Authorization: Bearer`` and returns 2xx.
3. Re-using the same client does not trigger a second exchange (cache
hit) because the first JWT is still within its TTL.
"""

from __future__ import annotations

import os

import pytest

from unstructured_client import UnstructuredClient
from unstructured_client.auth import ClientCredentials, LegacyKeyExchange
from unstructured_client.models import operations

ACCOUNTS_URL = os.getenv("UNS_ACCOUNTS_URL")
CLIENT_SECRET = os.getenv("UNS_CLIENT_SECRET")
PLATFORM_API_URL = os.getenv("UNS_PLATFORM_API_URL")
LEGACY_API_KEY = os.getenv("UNS_LEGACY_API_KEY")


_REASON = (
"Opt-in E2E: set UNS_ACCOUNTS_URL, UNS_CLIENT_SECRET, and "
"UNS_PLATFORM_API_URL to run against a real deployment that supports "
"/auth/token-exchange."
)


pytestmark = pytest.mark.skipif(
not (ACCOUNTS_URL and CLIENT_SECRET and PLATFORM_API_URL),
reason=_REASON,
)


def _list_jobs(session: UnstructuredClient) -> None:
"""Lightweight read request that only needs an authenticated identity."""
session.jobs.list_jobs(request=operations.ListJobsRequest())


def test_client_credentials_exchange_and_list_jobs():
cc = ClientCredentials(
client_secret=CLIENT_SECRET, # type: ignore[arg-type]
server_url=ACCOUNTS_URL, # type: ignore[arg-type]
)
try:
session = UnstructuredClient(
api_key_auth=cc,
server_url=PLATFORM_API_URL,
timeout_ms=60_000,
)

_list_jobs(session)

# Cached exchange: internal cache now holds a JWT; a second call
# should not trigger a new exchange unless we crossed the refresh
# buffer, which is unlikely across two sequential requests.
before_cache = cc._cache # type: ignore[attr-defined]
assert before_cache is not None, "expected cache to be populated after first call"

_list_jobs(session)

after_cache = cc._cache # type: ignore[attr-defined]
assert after_cache is before_cache, (
"ClientCredentials re-exchanged within TTL; cache should be reused"
)
finally:
cc.close()


@pytest.mark.skipif(
LEGACY_API_KEY is None,
reason="Set UNS_LEGACY_API_KEY to also exercise the LegacyKeyExchange path.",
)
def test_legacy_key_exchange_and_list_jobs():
lke = LegacyKeyExchange(
api_key=LEGACY_API_KEY, # type: ignore[arg-type]
server_url=ACCOUNTS_URL, # type: ignore[arg-type]
)
try:
session = UnstructuredClient(
api_key_auth=lke,
server_url=PLATFORM_API_URL,
timeout_ms=60_000,
)
_list_jobs(session)
assert lke._cache is not None # type: ignore[attr-defined]
finally:
lke.close()
1 change: 1 addition & 0 deletions _test_unstructured_client/unit/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

95 changes: 95 additions & 0 deletions _test_unstructured_client/unit/auth/_mock_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Shared helpers for exercising token-exchange auth callables.

The mock transports here let tests script a sequence of responses / exceptions
for the ``POST /auth/token-exchange`` endpoint without standing up a real
account-service.
"""

from __future__ import annotations

import json
from typing import Any, Callable, Iterable, List, Optional, Union

import httpx


ResponseStep = Union[httpx.Response, Exception, Callable[[httpx.Request], httpx.Response]]


class ScriptedTransport(httpx.MockTransport):
"""A MockTransport that walks through a scripted sequence of responses.

Each element can be an :class:`httpx.Response`, an ``Exception`` instance
(raised instead of returned), or a callable that accepts the request and
returns a response. Tests can inspect :attr:`requests` to assert how many
exchanges took place and what bodies were sent.
"""

def __init__(self, steps: Iterable[ResponseStep]) -> None:
self._steps: List[ResponseStep] = list(steps)
self.requests: List[httpx.Request] = []
super().__init__(self._handler)

def _handler(self, request: httpx.Request) -> httpx.Response:
self.requests.append(request)
if not self._steps:
raise AssertionError(
"ScriptedTransport exhausted; unexpected extra request to "
f"{request.url}",
)
step = self._steps.pop(0)
if isinstance(step, Exception):
raise step
if callable(step):
return step(request)
return step


class AsyncScriptedTransport(httpx.MockTransport):
"""Async counterpart to :class:`ScriptedTransport`."""

def __init__(self, steps: Iterable[ResponseStep]) -> None:
self._steps: List[ResponseStep] = list(steps)
self.requests: List[httpx.Request] = []

async def _handler(request: httpx.Request) -> httpx.Response:
self.requests.append(request)
if not self._steps:
raise AssertionError(
"AsyncScriptedTransport exhausted; unexpected extra "
f"request to {request.url}",
)
step = self._steps.pop(0)
if isinstance(step, Exception):
raise step
if callable(step):
return step(request)
return step

super().__init__(_handler)


def exchange_response(
access_token: Optional[str] = "jwt-1",
*,
expires_in: int = 900,
token_exchange_enabled: bool = True,
token_type: str = "bearer",
status_code: int = 200,
extra: Optional[dict] = None,
) -> httpx.Response:
"""Build a canned ``/auth/token-exchange`` response body."""
body: dict[str, Any] = {
"access_token": access_token,
"token_type": token_type,
"expires_in": expires_in,
"token_exchange_enabled": token_exchange_enabled,
}
if extra:
body.update(extra)
return httpx.Response(status_code, json=body)


def body_of(request: httpx.Request) -> dict:
"""Decode the JSON body from an outgoing exchange request."""
return json.loads(request.content.decode("utf-8"))
Loading
Loading