Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ddtrace/llmobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
LLMObs.enable()
"""

from ._experiment import Dataset
from ._experiment import DatasetRecord
from ._llmobs import LLMObs
from ._llmobs import LLMObsSpan


__all__ = ["LLMObs", "LLMObsSpan"]
__all__ = ["LLMObs", "LLMObsSpan", "Dataset", "DatasetRecord"]
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain"
SPAN_SUBDOMAIN_NAME = "llmobs-intake"
EVAL_SUBDOMAIN_NAME = "api"
EXP_SUBDOMAIN_NAME = "api"
AGENTLESS_SPAN_BASE_URL = "https://{}".format(SPAN_SUBDOMAIN_NAME)
AGENTLESS_EVAL_BASE_URL = "https://{}".format(EVAL_SUBDOMAIN_NAME)
AGENTLESS_EXP_BASE_URL = "https://{}".format(EXP_SUBDOMAIN_NAME)

EVP_PAYLOAD_SIZE_LIMIT = 5 << 20 # 5MB (actual limit is 5.1MB)
EVP_EVENT_SIZE_LIMIT = (1 << 20) - 1024 # 999KB (actual limit is 1MB)
Expand Down
30 changes: 30 additions & 0 deletions ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import TypedDict
from typing import Union

from typing_extensions import NotRequired


JSONType = Union[str, int, float, bool, None, List["JSONType"], Dict[str, "JSONType"]]
NonNoneJSONType = Union[str, int, float, bool, List[JSONType], Dict[str, JSONType]]


class DatasetRecord(TypedDict):
input: NonNoneJSONType
expected_output: JSONType
metadata: Dict[str, Any]
record_id: NotRequired[Optional[str]]


class Dataset:
name: str
_id: str
_data: List[DatasetRecord]

def __init__(self, name: str, dataset_id: str, data: List[DatasetRecord]) -> None:
self.name = name
self._id = dataset_id
self._data = data
23 changes: 23 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._context import LLMObsContextProvider
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import ToolCallTracker
Expand All @@ -85,6 +86,7 @@
from ddtrace.llmobs._utils import validate_prompt
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs._writer import LLMObsSpanEvent
from ddtrace.llmobs._writer import LLMObsSpanWriter
from ddtrace.llmobs._writer import should_use_agentless
Expand Down Expand Up @@ -155,6 +157,7 @@ def get_tag(self, key: str) -> Optional[str]:
class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
_app_key: str = os.environ.get("DD_APP_KEY", "")

def __init__(
self,
Expand All @@ -180,6 +183,11 @@ def __init__(
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
llmobs_service=self,
)
self._dne_client = LLMObsExperimentsClient(
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
is_agentless=agentless_enabled,
)

forksafe.register(self._child_after_fork)

Expand Down Expand Up @@ -447,6 +455,7 @@ def enable(
instrumented_proxy_urls: Optional[Set[str]] = None,
site: Optional[str] = None,
api_key: Optional[str] = None,
app_key: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
span_processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None,
Expand All @@ -462,6 +471,7 @@ def enable(
:param set[str] instrumented_proxy_urls: A set of instrumented proxy URLs to help detect when to emit LLM spans.
:param str site: Your datadog site.
:param str api_key: Your datadog api key.
:param str app_key: Your datadog application key.
:param str env: Your environment name.
:param str service: Your service name.
:param Callable[[LLMObsSpan], LLMObsSpan] span_processor: A function that takes an LLMObsSpan and returns an
Expand All @@ -477,6 +487,7 @@ def enable(
# grab required values for LLMObs
config._dd_site = site or config._dd_site
config._dd_api_key = api_key or config._dd_api_key
cls._app_key = app_key or cls._app_key
config.env = env or config.env
config.service = service or config.service
config._llmobs_ml_app = ml_app or config._llmobs_ml_app
Expand Down Expand Up @@ -549,6 +560,18 @@ def enable(
config._llmobs_ml_app,
)

@classmethod
def pull_dataset(cls, name: str) -> Dataset:
return cls._instance._dne_client.dataset_pull(name)

@classmethod
def create_dataset(cls, name: str, description: str) -> Dataset:
return cls._instance._dne_client.dataset_create(name, description)

@classmethod
def _delete_dataset(cls, dataset_id: str) -> None:
return cls._instance._dne_client.dataset_delete(dataset_id)

@classmethod
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None) -> None:
"""Register a processor to be called on each LLMObs span.
Expand Down
100 changes: 100 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import atexit
import json
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import quote


# TypedDict was added to typing in python 3.8
Expand All @@ -24,6 +27,7 @@
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
from ddtrace.llmobs import _telemetry as telemetry
from ddtrace.llmobs._constants import AGENTLESS_EVAL_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_EXP_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_SPAN_BASE_URL
from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR
from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT
Expand All @@ -33,8 +37,12 @@
from ddtrace.llmobs._constants import EVP_PAYLOAD_SIZE_LIMIT
from ddtrace.llmobs._constants import EVP_PROXY_AGENT_BASE_PATH
from ddtrace.llmobs._constants import EVP_SUBDOMAIN_HEADER_NAME
from ddtrace.llmobs._constants import EXP_SUBDOMAIN_NAME
from ddtrace.llmobs._constants import SPAN_ENDPOINT
from ddtrace.llmobs._constants import SPAN_SUBDOMAIN_NAME
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._utils import safe_json
from ddtrace.settings._agent import config as agent_config

Expand Down Expand Up @@ -109,6 +117,7 @@ def __init__(
is_agentless: bool,
_site: str = "",
_api_key: str = "",
_app_key: str = "",
_override_url: str = "",
) -> None:
super(BaseLLMObsWriter, self).__init__(interval=interval)
Expand All @@ -118,6 +127,7 @@ def __init__(
self._timeout: float = timeout
self._api_key: str = _api_key or config._dd_api_key
self._site: str = _site or config._dd_site
self._app_key: str = _app_key or os.environ.get("DD_APP_KEY")
self._override_url: str = _override_url

self._agentless: bool = is_agentless
Expand Down Expand Up @@ -263,6 +273,96 @@ def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]:
return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}}


class LLMObsExperimentsClient(BaseLLMObsWriter):
EVP_SUBDOMAIN_HEADER_VALUE = EXP_SUBDOMAIN_NAME
AGENTLESS_BASE_URL = AGENTLESS_EXP_BASE_URL
ENDPOINT = ""

def request(self, method: str, path: str, body: JSONType = None) -> Response:
headers = {
"Content-Type": "application/json",
"DD-API-KEY": self._api_key,
"DD-APPLICATION-KEY": self._app_key,
}
if not self._agentless:
headers[EVP_SUBDOMAIN_HEADER_NAME] = self.EVP_SUBDOMAIN_HEADER_VALUE

encoded_body = json.dumps(body).encode("utf-8") if body else b""
conn = get_connection(self._intake)
try:
url = self._intake + self._endpoint + path
logger.debug("requesting %s", url)
conn.request(method, url, encoded_body, headers)
resp = conn.getresponse()
return Response.from_http_response(resp)
finally:
conn.close()

def dataset_delete(self, dataset_id: str) -> None:
path = "/api/unstable/llm-obs/v1/datasets/delete"
resp = self.request(
"POST",
path,
body={
"data": {
"type": "datasets",
"attributes": {
"type": "soft",
"dataset_ids": [dataset_id],
},
},
},
)
assert resp.status == 200, f"Failed to delete dataset {id}: {resp.get_json()}"
return None

def dataset_create(self, name: str, description: str) -> Dataset:
path = "/api/unstable/llm-obs/v1/datasets"
body: JSONType = {
"data": {
"type": "datasets",
"attributes": {"name": name, "description": description},
}
}
resp = self.request("POST", path, body)
assert resp.status == 200, f"Failed to create dataset {name}: {resp.status} {resp.get_json()}"
response_data = resp.get_json()
dataset_id = response_data["data"]["id"]
return Dataset(name, dataset_id, [])

def dataset_pull(self, name: str) -> Dataset:
path = f"/api/unstable/llm-obs/v1/datasets?filter[name]={quote(name)}"
resp = self.request("GET", path)
assert resp.status == 200, f"Failed to pull dataset {name}: {resp.status} {resp.get_json()}"

response_data = resp.get_json()
data = response_data["data"]
if not data:
raise ValueError(f"Dataset '{name}' not found")

dataset_id = data[0]["id"]
url = f"/api/unstable/llm-obs/v1/datasets/{dataset_id}/records"
resp = self.request("GET", url)
if resp.status == 404:
raise ValueError(f"Dataset '{name}' not found")
records_data = resp.get_json()

class_records: List[DatasetRecord] = []
for record in records_data.get("data", []):
attrs = record.get("attributes", {})
input_data = attrs.get("input")
expected_output = attrs.get("expected_output")
class_records.append(
{
"record_id": record.get("id"),
"input": input_data,
"expected_output": expected_output,
"metadata": attrs.get("metadata", {}),
}
)
return Dataset(name, dataset_id, class_records)


class LLMObsSpanWriter(BaseLLMObsWriter):
"""Writes span events to the LLMObs Span Endpoint."""

Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,15 @@ services:
volumes:
- ddagent:/tmp/ddagent:rw
testagent:
image: ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent:v1.21.0
image: ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent:v1.27.3
ports:
- "127.0.0.1:9126:8126"
volumes:
- ./tests/snapshots:/snapshots
environment:
- LOG_LEVEL=WARNING
- SNAPSHOT_DIR=/snapshots
- VCR_CASSETTES_DIRECTORY=tests/cassettes
- SNAPSHOT_CI=0
- DD_POOL_TRACE_CHECK_FAILURES=true
- DD_DISABLE_ERROR_RESPONSES=true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
? !!python/object/new:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
? !!python/object/new:multidict._multidict.istr
- Content-Length
: - '0'
? !!python/object/new:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: GET
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/034c285b-bc5b-4681-bc57-06a04289a7a0/records
response:
body:
string: '{"data":[],"meta":{"after":""}}'
headers:
content-length:
- '31'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Mon, 07 Jul 2025 17:43:06 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
interactions:
- request:
body: '{"data": {"type": "datasets", "attributes": {"type": "soft", "dataset_ids":
["8a37c116-c381-422d-ba72-cee2055ef304"]}}}'
headers:
Accept:
- '*/*'
? !!python/object/new:multidict._multidict.istr
- Accept-Encoding
: - identity
Connection:
- keep-alive
Content-Length:
- '119'
? !!python/object/new:multidict._multidict.istr
- Content-Type
: - application/json
User-Agent:
- python-requests/2.32.3
method: POST
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/delete
response:
body:
string: '{"data":[{"id":"8a37c116-c381-422d-ba72-cee2055ef304","type":"datasets","attributes":{"author":{"id":"0dd9d379-c1a3-11ed-b4e0-566658a732f8"},"created_at":"2025-07-07T17:43:01.802098Z","current_version":0,"deleted_at":"2025-07-07T17:43:02.178053Z","description":"A
second test dataset","name":"test-dataset-2","updated_at":"2025-07-07T17:43:01.802098Z"}}]}'
headers:
content-length:
- '355'
content-security-policy:
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
content-type:
- application/vnd.api+json
date:
- Mon, 07 Jul 2025 17:43:02 GMT
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
status:
code: 200
message: OK
version: 1
Loading
Loading