Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
LOG_LEVEL: ERROR
SNAPSHOT_DIR: ${CI_PROJECT_DIR}/tests/snapshots
SNAPSHOT_CI: 1
VCR_CASSETTES_DIRECTORY: ${CI_PROJECT_DIR}/tests/llmobs/llmobs_cassettes
PORT: 9126
DD_POOL_TRACE_CHECK_FAILURES: true
DD_DISABLE_ERROR_RESPONSES: true
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/llmobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
LLMObs.enable()
"""

from ._experiment import Dataset
from ._experiment import DatasetRecord
from ._llmobs import LLMObs
from ._llmobs import LLMObsSpan


__all__ = ["LLMObs", "LLMObsSpan"]
__all__ = ["LLMObs", "LLMObsSpan", "Dataset", "DatasetRecord"]
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain"
SPAN_SUBDOMAIN_NAME = "llmobs-intake"
EVAL_SUBDOMAIN_NAME = "api"
EXP_SUBDOMAIN_NAME = "api"
AGENTLESS_SPAN_BASE_URL = "https://{}".format(SPAN_SUBDOMAIN_NAME)
AGENTLESS_EVAL_BASE_URL = "https://{}".format(EVAL_SUBDOMAIN_NAME)
AGENTLESS_EXP_BASE_URL = "https://{}".format(EXP_SUBDOMAIN_NAME)

EVP_PAYLOAD_SIZE_LIMIT = 5 << 20 # 5MB (actual limit is 5.1MB)
EVP_EVENT_SIZE_LIMIT = (1 << 20) - 1024 # 999KB (actual limit is 1MB)
Expand Down
30 changes: 30 additions & 0 deletions ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import TypedDict
from typing import Union

from typing_extensions import NotRequired


JSONType = Union[str, int, float, bool, None, List["JSONType"], Dict[str, "JSONType"]]
NonNoneJSONType = Union[str, int, float, bool, List[JSONType], Dict[str, JSONType]]


class DatasetRecord(TypedDict):
input: NonNoneJSONType
expected_output: JSONType
metadata: Dict[str, Any]
record_id: NotRequired[Optional[str]]


class Dataset:
name: str
_id: str
_data: List[DatasetRecord]

def __init__(self, name: str, dataset_id: str, data: List[DatasetRecord]) -> None:
self.name = name
self._id = dataset_id
self._data = data
24 changes: 24 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._context import LLMObsContextProvider
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import ToolCallTracker
Expand All @@ -85,6 +86,7 @@
from ddtrace.llmobs._utils import validate_prompt
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs._writer import LLMObsSpanEvent
from ddtrace.llmobs._writer import LLMObsSpanWriter
from ddtrace.llmobs._writer import should_use_agentless
Expand Down Expand Up @@ -155,6 +157,7 @@ def get_tag(self, key: str) -> Optional[str]:
class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
_app_key: str = os.environ.get("DD_APP_KEY", "")

def __init__(
self,
Expand All @@ -180,6 +183,12 @@ def __init__(
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
llmobs_service=self,
)
self._dne_client = LLMObsExperimentsClient(
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
_app_key=self._app_key,
is_agentless=True, # agent proxy doesn't seem to work for experiments
)

forksafe.register(self._child_after_fork)

Expand Down Expand Up @@ -447,6 +456,7 @@ def enable(
instrumented_proxy_urls: Optional[Set[str]] = None,
site: Optional[str] = None,
api_key: Optional[str] = None,
app_key: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
span_processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None,
Expand All @@ -462,6 +472,7 @@ def enable(
:param set[str] instrumented_proxy_urls: A set of instrumented proxy URLs to help detect when to emit LLM spans.
:param str site: Your datadog site.
:param str api_key: Your datadog api key.
:param str app_key: Your datadog application key.
:param str env: Your environment name.
:param str service: Your service name.
:param Callable[[LLMObsSpan], LLMObsSpan] span_processor: A function that takes an LLMObsSpan and returns an
Expand All @@ -477,6 +488,7 @@ def enable(
# grab required values for LLMObs
config._dd_site = site or config._dd_site
config._dd_api_key = api_key or config._dd_api_key
cls._app_key = app_key or cls._app_key
config.env = env or config.env
config.service = service or config.service
config._llmobs_ml_app = ml_app or config._llmobs_ml_app
Expand Down Expand Up @@ -549,6 +561,18 @@ def enable(
config._llmobs_ml_app,
)

@classmethod
def pull_dataset(cls, name: str) -> Dataset:
return cls._instance._dne_client.dataset_pull(name)

@classmethod
def create_dataset(cls, name: str, description: str) -> Dataset:
return cls._instance._dne_client.dataset_create(name, description)

@classmethod
def _delete_dataset(cls, dataset_id: str) -> None:
return cls._instance._dne_client.dataset_delete(dataset_id)

@classmethod
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None) -> None:
"""Register a processor to be called on each LLMObs span.
Expand Down
100 changes: 100 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import atexit
import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import quote


# TypedDict was added to typing in python 3.8
Expand All @@ -24,6 +26,7 @@
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
from ddtrace.llmobs import _telemetry as telemetry
from ddtrace.llmobs._constants import AGENTLESS_EVAL_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_EXP_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_SPAN_BASE_URL
from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR
from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT
Expand All @@ -33,8 +36,12 @@
from ddtrace.llmobs._constants import EVP_PAYLOAD_SIZE_LIMIT
from ddtrace.llmobs._constants import EVP_PROXY_AGENT_BASE_PATH
from ddtrace.llmobs._constants import EVP_SUBDOMAIN_HEADER_NAME
from ddtrace.llmobs._constants import EXP_SUBDOMAIN_NAME
from ddtrace.llmobs._constants import SPAN_ENDPOINT
from ddtrace.llmobs._constants import SPAN_SUBDOMAIN_NAME
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._utils import safe_json
from ddtrace.settings._agent import config as agent_config

Expand Down Expand Up @@ -110,6 +117,7 @@ def __init__(
is_agentless: bool,
_site: str = "",
_api_key: str = "",
_app_key: str = "",
_override_url: str = "",
) -> None:
super(BaseLLMObsWriter, self).__init__(interval=interval)
Expand All @@ -119,6 +127,7 @@ def __init__(
self._timeout: float = timeout
self._api_key: str = _api_key or config._dd_api_key
self._site: str = _site or config._dd_site
self._app_key: str = _app_key
self._override_url: str = _override_url

self._agentless: bool = is_agentless
Expand Down Expand Up @@ -264,6 +273,97 @@ def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]:
return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}}


class LLMObsExperimentsClient(BaseLLMObsWriter):
EVP_SUBDOMAIN_HEADER_VALUE = EXP_SUBDOMAIN_NAME
AGENTLESS_BASE_URL = AGENTLESS_EXP_BASE_URL
ENDPOINT = ""

def request(self, method: str, path: str, body: JSONType = None) -> Response:
headers = {
"Content-Type": "application/json",
"DD-API-KEY": self._api_key,
"DD-APPLICATION-KEY": self._app_key,
}
if not self._agentless:
headers[EVP_SUBDOMAIN_HEADER_NAME] = self.EVP_SUBDOMAIN_HEADER_VALUE

encoded_body = json.dumps(body).encode("utf-8") if body else b""
conn = get_connection(self._intake)
try:
url = self._intake + self._endpoint + path
logger.debug("requesting %s", url)
conn.request(method, url, encoded_body, headers)
resp = conn.getresponse()
return Response.from_http_response(resp)
finally:
conn.close()

def dataset_delete(self, dataset_id: str) -> None:
path = "/api/unstable/llm-obs/v1/datasets/delete"
resp = self.request(
"POST",
path,
body={
"data": {
"type": "datasets",
"attributes": {
"type": "soft",
"dataset_ids": [dataset_id],
},
},
},
)
if resp.status != 200:
raise ValueError(f"Failed to delete dataset {id}: {resp.get_json()}")
return None

def dataset_create(self, name: str, description: str) -> Dataset:
path = "/api/unstable/llm-obs/v1/datasets"
body: JSONType = {
"data": {
"type": "datasets",
"attributes": {"name": name, "description": description},
}
}
resp = self.request("POST", path, body)
if resp.status != 200:
raise ValueError(f"Failed to create dataset {name}: {resp.status} {resp.get_json()}")
response_data = resp.get_json()
dataset_id = response_data["data"]["id"]
return Dataset(name, dataset_id, [])

def dataset_pull(self, name: str) -> Dataset:
path = f"/api/unstable/llm-obs/v1/datasets?filter[name]={quote(name)}"
resp = self.request("GET", path)
if resp.status != 200:
raise ValueError(f"Failed to pull dataset {name}: {resp.status} {resp.get_json()}")

response_data = resp.get_json()
data = response_data["data"]
if not data:
raise ValueError(f"Dataset '{name}' not found")

dataset_id = data[0]["id"]
url = f"/api/unstable/llm-obs/v1/datasets/{dataset_id}/records"
resp = self.request("GET", url)
if resp.status == 404:
raise ValueError(f"Dataset '{name}' not found")
records_data = resp.get_json()

class_records: List[DatasetRecord] = []
for record in records_data.get("data", []):
attrs = record.get("attributes", {})
class_records.append(
{
"record_id": record["id"],
"input": attrs["input"],
"expected_output": attrs["expected_output"],
"metadata": attrs.get("metadata", {}),
}
)
return Dataset(name, dataset_id, class_records)


class LLMObsSpanWriter(BaseLLMObsWriter):
"""Writes span events to the LLMObs Span Endpoint."""

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ services:
- "127.0.0.1:9126:8126"
volumes:
- ./tests/snapshots:/snapshots
- ./tests/llmobs/llmobs_cassettes:/cassettes
environment:
- LOG_LEVEL=WARNING
- SNAPSHOT_DIR=/snapshots
- VCR_CASSETTES_DIRECTORY=/cassettes
- SNAPSHOT_CI=0
- DD_POOL_TRACE_CHECK_FAILURES=true
- DD_DISABLE_ERROR_RESPONSES=true
Expand Down
9 changes: 8 additions & 1 deletion tests/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
cassette_library_dir=os.path.join(os.path.dirname(__file__), "llmobs_cassettes/"),
record_mode="once",
match_on=["path"],
filter_headers=["authorization", "OpenAI-Organization", "api-key", "x-api-key", ("DD-API-KEY", "XXXXXX")],
filter_headers=[
"authorization",
"OpenAI-Organization",
"api-key",
"x-api-key",
("DD-API-KEY", "XXXXXX"),
("DD-APPLICATION-KEY", "XXXXXX"),
],
# Ignore requests to the agent
ignore_localhost=True,
)
Expand Down
9 changes: 7 additions & 2 deletions tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ def ddtrace_global_config():


def default_global_config():
return {"_dd_api_key": "<not-a-real-api_key>", "_llmobs_ml_app": "unnamed-ml-app", "service": "tests.llmobs"}
return {
"_dd_api_key": os.environ.get("DD_API_KEY", "<not-a-real-api_key>"),
"_llmobs_ml_app": "unnamed-ml-app",
"service": "tests.llmobs",
}


@pytest.fixture
Expand Down Expand Up @@ -172,7 +176,7 @@ def tracer():
@pytest.fixture
def llmobs_env():
return {
"DD_API_KEY": "<default-not-a-real-key>",
"DD_API_KEY": os.environ.get("DD_API_KEY", "<default-not-a-real-key>"),
"DD_LLMOBS_ML_APP": "unnamed-ml-app",
}

Expand Down Expand Up @@ -269,6 +273,7 @@ def llmobs(
llmobs_service.enable(_tracer=tracer, **llmobs_enable_opts)
llmobs_service._instance._llmobs_span_writer = llmobs_span_writer
llmobs_service._instance._llmobs_span_writer.start()
llmobs_service._instance._dne_client._intake = "http://localhost:9126/vcr/datadog"
yield llmobs_service
tracer.shutdown()
llmobs_service.disable()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
interactions:
- request:
body: '{"data": {"type": "datasets", "attributes": {"type": "soft", "dataset_ids":
["e3608c37-7422-4a91-8897-9dc6097e86b4"]}}}'
headers:
Accept:
- '*/*'
? !!python/object/new:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
Content-Length:
- '119'
? !!python/object/new:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/delete
response:
body:
string: '{"data":[{"id":"e3608c37-7422-4a91-8897-9dc6097e86b4","type":"datasets","attributes":{"author":{"id":"0dd9d379-c1a3-11ed-b4e0-566658a732f8"},"created_at":"2025-07-07T19:14:51.167054Z","current_version":0,"deleted_at":"2025-07-07T19:14:53.628842Z","description":"A
test dataset","name":"test-dataset","updated_at":"2025-07-07T19:14:51.167054Z"}}]}'
headers:
content-length:
- '346'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Mon, 07 Jul 2025 19:14:53 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Loading
Loading