Skip to content

Commit d75a587

Browse files
authored
chore(llmobs): dac strip io from vertex (#13693)
Remove potentially sensitive i/o data from apm spans. This way, prompt and completion data will only appear on the llm obs spans, which are/will be subject to data access controls. Mostly, this just removes io tag sets. A few things (mostly metrics) have llmobs tags dependent on span tags, so there is a bit more refactoring there. Let me know if I removed anything that should really stay, or if I missed something that should be restricted. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 5175caa commit d75a587

13 files changed

+106
-302
lines changed
Lines changed: 0 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
11
import sys
22

3-
from vertexai.generative_models import GenerativeModel
4-
from vertexai.generative_models import Part
5-
6-
from ddtrace.internal.utils import get_argument_value
7-
from ddtrace.llmobs._integrations.utils import get_generation_config_google
8-
from ddtrace.llmobs._integrations.utils import get_system_instructions_from_google_model
9-
from ddtrace.llmobs._integrations.utils import tag_request_content_part_google
10-
from ddtrace.llmobs._integrations.utils import tag_response_part_google
113
from ddtrace.llmobs._utils import _get_attr
124

135

@@ -44,7 +36,6 @@ def __iter__(self):
4436
self._dd_span.set_exc_info(*sys.exc_info())
4537
raise
4638
finally:
47-
tag_stream_response(self._dd_span, self._chunks, self._dd_integration)
4839
if self._dd_integration.is_pc_sampled_llmobs(self._dd_span):
4940
self._kwargs["instance"] = self._model_instance
5041
self._kwargs["history"] = self._history
@@ -74,7 +65,6 @@ async def __aiter__(self):
7465
self._dd_span.set_exc_info(*sys.exc_info())
7566
raise
7667
finally:
77-
tag_stream_response(self._dd_span, self._chunks, self._dd_integration)
7868
if self._dd_integration.is_pc_sampled_llmobs(self._dd_span):
7969
self._kwargs["instance"] = self._model_instance
8070
self._kwargs["history"] = self._history
@@ -95,158 +85,3 @@ def extract_info_from_parts(parts):
9585
if function_call is not None:
9686
function_calls.append(function_call)
9787
return concatenated_text, function_calls
98-
99-
100-
def _tag_response_parts(span, integration, parts):
101-
text, function_calls = extract_info_from_parts(parts)
102-
span.set_tag_str(
103-
"vertexai.response.candidates.%d.content.parts.%d.text" % (0, 0),
104-
integration.trunc(str(text)),
105-
)
106-
for idx, function_call in enumerate(function_calls):
107-
span.set_tag_str(
108-
"vertexai.response.candidates.%d.content.parts.%d.function_calls.%d.function_call.name" % (0, 0, idx),
109-
_get_attr(function_call, "name", ""),
110-
)
111-
span.set_tag_str(
112-
"vertexai.response.candidates.%d.content.parts.%d.function_calls.%d.function_call.args" % (0, 0, idx),
113-
integration.trunc(str(_get_attr(function_call, "args", ""))),
114-
)
115-
116-
117-
def tag_stream_response(span, chunks, integration):
118-
all_parts = []
119-
role = ""
120-
for chunk in chunks:
121-
candidates = _get_attr(chunk, "candidates", [])
122-
for candidate_idx, candidate in enumerate(candidates):
123-
finish_reason = _get_attr(candidate, "finish_reason", None)
124-
if finish_reason:
125-
span.set_tag_str(
126-
"vertexai.response.candidates.%d.finish_reason" % (candidate_idx),
127-
_get_attr(finish_reason, "name", ""),
128-
)
129-
candidate_content = _get_attr(candidate, "content", {})
130-
role = role or _get_attr(candidate_content, "role", "")
131-
if not integration.is_pc_sampled_span(span):
132-
continue
133-
parts = _get_attr(candidate_content, "parts", [])
134-
all_parts.extend(parts)
135-
token_counts = _get_attr(chunk, "usage_metadata", None)
136-
if not token_counts:
137-
continue
138-
span.set_metric("vertexai.response.usage.prompt_tokens", _get_attr(token_counts, "prompt_token_count", 0))
139-
span.set_metric(
140-
"vertexai.response.usage.completion_tokens", _get_attr(token_counts, "candidates_token_count", 0)
141-
)
142-
span.set_metric("vertexai.response.usage.total_tokens", _get_attr(token_counts, "total_token_count", 0))
143-
# streamed responses have only a single candidate, so there is only one role to be tagged
144-
span.set_tag_str("vertexai.response.candidates.0.content.role", str(role))
145-
_tag_response_parts(span, integration, all_parts)
146-
147-
148-
def _tag_request_content(span, integration, content, content_idx):
149-
"""Tag the generation span with request contents."""
150-
if isinstance(content, str):
151-
span.set_tag_str("vertexai.request.contents.%d.text" % content_idx, integration.trunc(content))
152-
return
153-
if isinstance(content, dict):
154-
role = content.get("role", "")
155-
if role:
156-
span.set_tag_str("vertexai.request.contents.%d.role" % content_idx, role)
157-
parts = content.get("parts", [])
158-
for part_idx, part in enumerate(parts):
159-
tag_request_content_part_google("vertexai", span, integration, part, part_idx, content_idx)
160-
return
161-
if isinstance(content, Part):
162-
tag_request_content_part_google("vertexai", span, integration, content, 0, content_idx)
163-
return
164-
role = _get_attr(content, "role", "")
165-
if role:
166-
span.set_tag_str("vertexai.request.contents.%d.role" % content_idx, str(role))
167-
parts = _get_attr(content, "parts", [])
168-
if not parts:
169-
span.set_tag_str(
170-
"vertexai.request.contents.%d.text" % content_idx,
171-
integration.trunc("[Non-text content object: {}]".format(repr(content))),
172-
)
173-
return
174-
for part_idx, part in enumerate(parts):
175-
tag_request_content_part_google("vertexai", span, integration, part, part_idx, content_idx)
176-
177-
178-
def tag_request(span, integration, instance, args, kwargs, is_chat):
179-
"""Tag the generation span with request details.
180-
Includes capturing generation configuration, system prompt, and messages.
181-
"""
182-
# instance is either a chat session or a model itself
183-
model_instance = instance if isinstance(instance, GenerativeModel) else instance._model
184-
contents = get_argument_value(args, kwargs, 0, "content" if is_chat else "contents")
185-
history = _get_attr(instance, "_history", [])
186-
if history:
187-
if isinstance(contents, list):
188-
contents = history + contents
189-
if isinstance(contents, Part) or isinstance(contents, str) or isinstance(contents, dict):
190-
contents = history + [contents]
191-
generation_config = get_generation_config_google(model_instance, kwargs)
192-
generation_config_dict = None
193-
if generation_config is not None:
194-
generation_config_dict = (
195-
generation_config if isinstance(generation_config, dict) else generation_config.to_dict()
196-
)
197-
system_instructions = get_system_instructions_from_google_model(model_instance)
198-
stream = kwargs.get("stream", None)
199-
200-
if generation_config_dict is not None:
201-
for k, v in generation_config_dict.items():
202-
span.set_tag_str("vertexai.request.generation_config.%s" % k, str(v))
203-
204-
if stream:
205-
span.set_tag("vertexai.request.stream", True)
206-
207-
if not integration.is_pc_sampled_span(span):
208-
return
209-
210-
for idx, text in enumerate(system_instructions):
211-
span.set_tag_str(
212-
"vertexai.request.system_instruction.%d.text" % idx,
213-
integration.trunc(str(text)),
214-
)
215-
216-
if isinstance(contents, str):
217-
span.set_tag_str("vertexai.request.contents.0.text", integration.trunc(str(contents)))
218-
return
219-
elif isinstance(contents, Part):
220-
tag_request_content_part_google("vertexai", span, integration, contents, 0, 0)
221-
return
222-
elif not isinstance(contents, list):
223-
return
224-
for content_idx, content in enumerate(contents):
225-
_tag_request_content(span, integration, content, content_idx)
226-
227-
228-
def tag_response(span, generations, integration):
229-
"""Tag the generation span with response details.
230-
Includes capturing generation text, roles, finish reasons, and token counts.
231-
"""
232-
generations_dict = generations.to_dict()
233-
candidates = generations_dict.get("candidates", [])
234-
for candidate_idx, candidate in enumerate(candidates):
235-
finish_reason = _get_attr(candidate, "finish_reason", None)
236-
if finish_reason:
237-
span.set_tag_str("vertexai.response.candidates.%d.finish_reason" % candidate_idx, finish_reason)
238-
candidate_content = _get_attr(candidate, "content", None)
239-
role = _get_attr(candidate_content, "role", "")
240-
span.set_tag_str("vertexai.response.candidates.%d.content.role" % candidate_idx, str(role))
241-
if not integration.is_pc_sampled_span(span):
242-
continue
243-
parts = _get_attr(candidate_content, "parts", [])
244-
for part_idx, part in enumerate(parts):
245-
tag_response_part_google("vertexai", span, integration, part, part_idx, candidate_idx)
246-
247-
token_counts = generations_dict.get("usage_metadata", None)
248-
if not token_counts:
249-
return
250-
span.set_metric("vertexai.response.usage.prompt_tokens", _get_attr(token_counts, "prompt_token_count", 0))
251-
span.set_metric("vertexai.response.usage.completion_tokens", _get_attr(token_counts, "candidates_token_count", 0))
252-
span.set_metric("vertexai.response.usage.total_tokens", _get_attr(token_counts, "total_token_count", 0))

ddtrace/contrib/internal/vertexai/patch.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
import vertexai
66

7+
# Force the generative_models module to load
8+
from vertexai.generative_models import GenerativeModel # noqa:F401
9+
710
from ddtrace import config
811
from ddtrace.contrib.internal.trace_utils import unwrap
912
from ddtrace.contrib.internal.trace_utils import with_traced_module
1013
from ddtrace.contrib.internal.trace_utils import wrap
1114
from ddtrace.contrib.internal.vertexai._utils import TracedAsyncVertexAIStreamResponse
1215
from ddtrace.contrib.internal.vertexai._utils import TracedVertexAIStreamResponse
13-
from ddtrace.contrib.internal.vertexai._utils import tag_request
14-
from ddtrace.contrib.internal.vertexai._utils import tag_response
1516
from ddtrace.llmobs._integrations import VertexAIIntegration
1617
from ddtrace.llmobs._integrations.utils import extract_model_name_google
1718
from ddtrace.trace import Pin
@@ -69,13 +70,11 @@ def _traced_generate(vertexai, pin, func, instance, args, kwargs, model_instance
6970
# history must be copied since it is modified during the LLM interaction
7071
history = getattr(instance, "history", [])[:]
7172
try:
72-
tag_request(span, integration, instance, args, kwargs, is_chat)
7373
generations = func(*args, **kwargs)
7474
if stream:
7575
return TracedVertexAIStreamResponse(
7676
generations, model_instance, integration, span, args, kwargs, is_chat, history
7777
)
78-
tag_response(span, generations, integration)
7978
except Exception:
8079
span.set_exc_info(*sys.exc_info())
8180
raise
@@ -104,13 +103,11 @@ async def _traced_agenerate(vertexai, pin, func, instance, args, kwargs, model_i
104103
# history must be copied since it is modified during the LLM interaction
105104
history = getattr(instance, "history", [])[:]
106105
try:
107-
tag_request(span, integration, instance, args, kwargs, is_chat)
108106
generations = await func(*args, **kwargs)
109107
if stream:
110108
return TracedAsyncVertexAIStreamResponse(
111109
generations, model_instance, integration, span, args, kwargs, is_chat, history
112110
)
113-
tag_response(span, generations, integration)
114111
except Exception:
115112
span.set_exc_info(*sys.exc_info())
116113
raise

ddtrace/llmobs/_integrations/utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,20 @@ def get_llmobs_metrics_tags(integration_name, span):
186186
return usage
187187

188188

189+
def parse_llmobs_metric_args(metrics):
190+
usage = {}
191+
input_tokens = _get_attr(metrics, "prompt_tokens", None)
192+
output_tokens = _get_attr(metrics, "completion_tokens", None)
193+
total_tokens = _get_attr(metrics, "total_tokens", None)
194+
if input_tokens is not None:
195+
usage[INPUT_TOKENS_METRIC_KEY] = input_tokens
196+
if output_tokens is not None:
197+
usage[OUTPUT_TOKENS_METRIC_KEY] = output_tokens
198+
if total_tokens is not None:
199+
usage[TOTAL_TOKENS_METRIC_KEY] = total_tokens
200+
return usage
201+
202+
189203
def get_system_instructions_from_google_model(model_instance):
190204
"""
191205
Extract system instructions from model and convert to []str for tagging.

ddtrace/llmobs/_integrations/vertexai.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@
77
from ddtrace.internal.utils import ArgumentError
88
from ddtrace.internal.utils import get_argument_value
99
from ddtrace.llmobs._constants import INPUT_MESSAGES
10+
from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY
1011
from ddtrace.llmobs._constants import METADATA
1112
from ddtrace.llmobs._constants import METRICS
1213
from ddtrace.llmobs._constants import MODEL_NAME
1314
from ddtrace.llmobs._constants import MODEL_PROVIDER
1415
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
16+
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
1517
from ddtrace.llmobs._constants import SPAN_KIND
18+
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
1619
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
1720
from ddtrace.llmobs._integrations.utils import extract_message_from_part_google
18-
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
1921
from ddtrace.llmobs._integrations.utils import get_system_instructions_from_google_model
2022
from ddtrace.llmobs._integrations.utils import llmobs_get_metadata_google
2123
from ddtrace.llmobs._utils import _get_attr
@@ -43,6 +45,7 @@ def _llmobs_set_tags(
4345
) -> None:
4446
instance = kwargs.get("instance", None)
4547
history = kwargs.get("history", [])
48+
metrics = kwargs.get("metrics", {})
4649
metadata = llmobs_get_metadata_google(kwargs, instance)
4750

4851
system_instruction = get_system_instructions_from_google_model(instance)
@@ -56,6 +59,7 @@ def _llmobs_set_tags(
5659
output_messages = [{"content": ""}]
5760
if response is not None:
5861
output_messages = self._extract_output_message(response)
62+
metrics = self._extract_metrics_from_response(response)
5963

6064
span._set_ctx_items(
6165
{
@@ -65,10 +69,41 @@ def _llmobs_set_tags(
6569
METADATA: metadata,
6670
INPUT_MESSAGES: input_messages,
6771
OUTPUT_MESSAGES: output_messages,
68-
METRICS: get_llmobs_metrics_tags("vertexai", span),
72+
METRICS: metrics,
6973
}
7074
)
7175

76+
def _extract_metrics_from_response(self, response):
77+
"""Extract metrics from the response."""
78+
if isinstance(response, list):
79+
for chunk in response:
80+
token_counts = _get_attr(chunk, "usage_metadata", None)
81+
if not token_counts:
82+
continue
83+
input_tokens = _get_attr(token_counts, "prompt_token_count", 0)
84+
output_tokens = _get_attr(token_counts, "candidates_token_count", 0)
85+
total_tokens = _get_attr(token_counts, "total_token_count", 0)
86+
else:
87+
generations_dict = response.to_dict()
88+
89+
token_counts = generations_dict.get("usage_metadata", None)
90+
if not token_counts:
91+
return
92+
93+
input_tokens = _get_attr(token_counts, "prompt_token_count", 0)
94+
output_tokens = _get_attr(token_counts, "candidates_token_count", 0)
95+
total_tokens = _get_attr(token_counts, "total_token_count", 0)
96+
97+
metrics = {}
98+
if input_tokens is not None:
99+
metrics[INPUT_TOKENS_METRIC_KEY] = input_tokens
100+
if output_tokens is not None:
101+
metrics[OUTPUT_TOKENS_METRIC_KEY] = output_tokens
102+
if total_tokens is not None:
103+
metrics[TOTAL_TOKENS_METRIC_KEY] = total_tokens
104+
105+
return metrics
106+
72107
def _extract_input_message(self, contents, history, system_instruction=None):
73108
from vertexai.generative_models._generative_models import Part
74109

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
other:
2+
- |
3+
vertexai: Removes the IO data from the APM spans for VertexAI LLM requests and responses, which is duplicated in the LLM Observability span.

tests/snapshots/tests.contrib.vertexai.test_vertexai.test_vertexai_completion.json

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,19 @@
1010
"error": 0,
1111
"meta": {
1212
"_dd.p.dm": "-0",
13-
"_dd.p.tid": "67378d4d00000000",
13+
"_dd.p.tid": "685ae96d00000000",
1414
"language": "python",
15-
"runtime-id": "4bb0a91fcf15428fa3998e8daa98b1dc",
16-
"vertexai.request.contents.0.text": "Why do bears hibernate?",
17-
"vertexai.request.generation_config.max_output_tokens": "30",
18-
"vertexai.request.generation_config.stop_sequences": "['x']",
19-
"vertexai.request.generation_config.temperature": "1.0",
15+
"runtime-id": "51dd90093585456fa5287bb1606924df",
2016
"vertexai.request.model": "gemini-1.5-flash",
21-
"vertexai.request.provider": "google",
22-
"vertexai.response.candidates.0.content.parts.0.text": "Bears hibernate to conserve energy and survive during winter months when food is scarce.\\n",
23-
"vertexai.response.candidates.0.content.role": "model",
24-
"vertexai.response.candidates.0.finish_reason": "STOP"
17+
"vertexai.request.provider": "google"
2518
},
2619
"metrics": {
2720
"_dd.measured": 1,
2821
"_dd.top_level": 1,
2922
"_dd.tracer_kr": 1.0,
3023
"_sampling_priority_v1": 1,
31-
"process_id": 87069,
32-
"vertexai.response.usage.completion_tokens": 16,
33-
"vertexai.response.usage.prompt_tokens": 14,
34-
"vertexai.response.usage.total_tokens": 30
24+
"process_id": 93922
3525
},
36-
"duration": 338000,
37-
"start": 1731693901811611000
26+
"duration": 172000,
27+
"start": 1750788461546503000
3828
}]]

0 commit comments

Comments
 (0)