Skip to content

Commit dbbfd8e

Browse files
authored
Merge branch 'main' into zachg/llmobs_annotate_context_fix
2 parents 73af63c + 3632c6c commit dbbfd8e

File tree

11 files changed

+220
-114
lines changed

11 files changed

+220
-114
lines changed

.github/workflows/system-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
persist-credentials: false
4646
repository: 'DataDog/system-tests'
4747
# Automatically managed, use scripts/update-system-tests-version to update
48-
ref: '94529f681dcaf74382ed47c3b0c85acdb775b6c9'
48+
ref: '30e17ba7009b84998c0ada3b3a17f39c0037faba'
4949

5050
- name: Download wheels to binaries directory
5151
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
@@ -90,7 +90,7 @@ jobs:
9090
persist-credentials: false
9191
repository: 'DataDog/system-tests'
9292
# Automatically managed, use scripts/update-system-tests-version to update
93-
ref: '94529f681dcaf74382ed47c3b0c85acdb775b6c9'
93+
ref: '30e17ba7009b84998c0ada3b3a17f39c0037faba'
9494

9595
- name: Build runner
9696
uses: ./.github/actions/install_runner
@@ -287,7 +287,7 @@ jobs:
287287
persist-credentials: false
288288
repository: 'DataDog/system-tests'
289289
# Automatically managed, use scripts/update-system-tests-version to update
290-
ref: '94529f681dcaf74382ed47c3b0c85acdb775b6c9'
290+
ref: '30e17ba7009b84998c0ada3b3a17f39c0037faba'
291291
- name: Download wheels to binaries directory
292292
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
293293
with:

.gitlab-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ variables:
1414
DD_VPA_TEMPLATE: "vpa-template-cpu-p70-10percent-2x-oom-min-cap"
1515
# CI_DEBUG_SERVICES: "true"
1616
# Automatically managed, use scripts/update-system-tests-version to update
17-
SYSTEM_TESTS_REF: "94529f681dcaf74382ed47c3b0c85acdb775b6c9"
17+
SYSTEM_TESTS_REF: "30e17ba7009b84998c0ada3b3a17f39c0037faba"
1818

1919
default:
2020
interruptible: true

ddtrace/internal/_encoding.pyx

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,8 +1257,7 @@ cdef class Packer(object):
12571257
default_used = True
12581258
continue
12591259
else:
1260-
o = "Integer value out of range"
1261-
continue
1260+
raise OverflowError("Integer value out of range")
12621261
elif PyFloat_CheckExact(o):
12631262
dval = o
12641263
ret = msgpack_pack_double(&self.pk, dval)
@@ -1274,14 +1273,12 @@ cdef class Packer(object):
12741273
if self.encoding == NULL:
12751274
ret = msgpack_pack_unicode(&self.pk, o, ITEM_LIMIT)
12761275
if ret == -2:
1277-
o = f"Unicode string is too large {L}"
1278-
continue
1276+
raise ValueError("unicode string is too large")
12791277
else:
12801278
o = PyUnicode_AsEncodedString(o, self.encoding, self.unicode_errors)
12811279
L = len(o)
12821280
if L > ITEM_LIMIT:
1283-
o = f"Unicode string is too large {L}"
1284-
continue
1281+
raise ValueError("unicode string is too large")
12851282
ret = msgpack_pack_raw(&self.pk, L)
12861283
if ret == 0:
12871284
rawval = o
@@ -1290,8 +1287,7 @@ cdef class Packer(object):
12901287
d = <dict>o
12911288
L = len(d)
12921289
if L > ITEM_LIMIT:
1293-
o = f"Dictionnary is too large {L}"
1294-
continue
1290+
raise ValueError("dict is too large")
12951291
ret = msgpack_pack_map(&self.pk, L)
12961292
if ret == 0:
12971293
for k, v in d.items():
@@ -1304,8 +1300,7 @@ cdef class Packer(object):
13041300
elif PyList_CheckExact(o):
13051301
L = Py_SIZE(o)
13061302
if L > ITEM_LIMIT:
1307-
o = f"List is too large {L}"
1308-
continue
1303+
raise ValueError("list is too large")
13091304
ret = msgpack_pack_array(&self.pk, L)
13101305
if ret == 0:
13111306
for v in o:
@@ -1318,8 +1313,7 @@ cdef class Packer(object):
13181313
else:
13191314
ret = msgpack_pack_false(&self.pk)
13201315
else:
1321-
o = f"Can not serialize [{type(o).__name__}] object"
1322-
continue
1316+
PyErr_Format(TypeError, b"can not serialize '%.200s' object", Py_TYPE(o).tp_name)
13231317
return ret
13241318

13251319
cpdef pack(self, object obj):

ddtrace/internal/datadog/profiling/stack_v2/echion/echion/tasks.h

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,27 +182,26 @@ class TaskInfo
182182
PyObject* origin = NULL;
183183
PyObject* loop = NULL;
184184

185-
GenInfo::Ptr coro = nullptr;
186-
187185
StringTable::Key name;
186+
bool is_on_cpu = false;
187+
GenInfo::Ptr coro = nullptr;
188188

189189
// Information to reconstruct the async stack as best as we can
190190
TaskInfo::Ptr waiter = nullptr;
191-
bool is_on_cpu = false;
192191

193192
[[nodiscard]] static Result<TaskInfo::Ptr> create(TaskObj*);
194193
TaskInfo(PyObject* origin, PyObject* loop, GenInfo::Ptr coro, StringTable::Key name, TaskInfo::Ptr waiter)
195194
: origin(origin)
196195
, loop(loop)
197-
, coro(std::move(coro))
198196
, name(name)
197+
, is_on_cpu(coro && coro->is_running)
198+
, coro(std::move(coro))
199199
, waiter(std::move(waiter))
200-
, is_on_cpu(this->coro && this->coro->is_running)
201200
{
202201
}
203202

204203
[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
205-
inline size_t unwind(FrameStack&);
204+
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
206205
};
207206

208207
inline std::unordered_map<PyObject*, PyObject*> task_link_map;
@@ -344,7 +343,7 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;
344343
// ----------------------------------------------------------------------------
345344

346345
inline size_t
347-
TaskInfo::unwind(FrameStack& stack)
346+
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
348347
{
349348
// TODO: Check for running task.
350349
std::stack<PyObject*> coro_frames;
@@ -355,14 +354,37 @@ TaskInfo::unwind(FrameStack& stack)
355354
coro_frames.push(py_coro->frame);
356355
}
357356

358-
int count = 0;
357+
// Total number of frames added to the Stack
358+
size_t count = 0;
359359

360360
// Unwind the coro frames
361361
while (!coro_frames.empty()) {
362362
PyObject* frame = coro_frames.top();
363363
coro_frames.pop();
364364

365-
count += unwind_frame(frame, stack);
365+
auto new_frames = unwind_frame(frame, stack);
366+
367+
// If we failed to unwind the Frame, stop unwinding the coroutine chain; otherwise we could
368+
// end up with Stacks with missing Frames between two coroutines Frames.
369+
if (new_frames == 0) {
370+
break;
371+
}
372+
373+
// If this is the first Frame being unwound (we have not added any Frames to the Stack yet),
374+
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
375+
if (count == 0) {
376+
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
377+
upper_python_stack_size = new_frames - 1;
378+
379+
// Remove the Python Frames from the Stack (they will be added back later)
380+
// We cannot push those Frames now because otherwise they would be added once per Task,
381+
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.
382+
for (size_t i = 0; i < upper_python_stack_size; i++) {
383+
stack.pop_back();
384+
}
385+
}
386+
387+
count += new_frames;
366388
}
367389

368390
return count;

ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -264,31 +264,45 @@ ThreadInfo::unwind_tasks()
264264
}
265265
}
266266

267+
// Make sure the on CPU task is first
268+
for (size_t i = 0; i < leaf_tasks.size(); i++) {
269+
if (leaf_tasks[i].get().is_on_cpu) {
270+
if (i > 0) {
271+
std::swap(leaf_tasks[i], leaf_tasks[0]);
272+
}
273+
break;
274+
}
275+
}
276+
277+
// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
278+
size_t upper_python_stack_size = 0;
279+
// Unused variable, will be used later by TaskInfo::unwind
280+
size_t unused;
281+
282+
bool on_cpu_task_seen = false;
267283
for (auto& leaf_task : leaf_tasks) {
284+
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;
285+
268286
auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
269287
auto& stack = stack_info->stack;
288+
270289
for (auto current_task = leaf_task;;) {
271290
auto& task = current_task.get();
272291

273-
size_t stack_size = task.unwind(stack);
274-
292+
// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
293+
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
275294
if (task.is_on_cpu) {
276-
// Undo the stack unwinding
277-
// TODO[perf]: not super-efficient :(
278-
for (size_t i = 0; i < stack_size; i++)
279-
stack.pop_back();
280-
281-
// Instead we get part of the thread stack
282-
FrameStack temp_stack;
283-
size_t nframes = (python_stack.size() > stack_size) ? python_stack.size() - stack_size : 0;
284-
for (size_t i = 0; i < nframes; i++) {
285-
auto python_frame = python_stack.front();
286-
temp_stack.push_front(python_frame);
287-
python_stack.pop_front();
288-
}
289-
while (!temp_stack.empty()) {
290-
stack.push_front(temp_stack.front());
291-
temp_stack.pop_front();
295+
// Get the "bottom" part of the Python synchronous Stack, that is to say the
296+
// synchronous functions and coroutines called by the Task's outermost coroutine
297+
// The number of Frames to push is the total number of Frames in the Python stack, from which we
298+
// subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint)
299+
// This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... ,
300+
// innermost sync function]
301+
size_t frames_to_push =
302+
(python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0;
303+
for (size_t i = 0; i < frames_to_push; i++) {
304+
const auto& python_frame = python_stack[frames_to_push - i - 1];
305+
stack.push_front(python_frame);
292306
}
293307
}
294308

@@ -317,8 +331,21 @@ ThreadInfo::unwind_tasks()
317331
}
318332

319333
// Finish off with the remaining thread stack
320-
for (auto p = python_stack.begin(); p != python_stack.end(); p++)
321-
stack.push_back(*p);
334+
// If we have seen an on-CPU Task, then upper_python_stack_size will be set and will include the sync entry
335+
// point and the asyncio machinery Frames. Otherwise, we are in `select` (idle) and we should push all the
336+
// Frames.
337+
338+
// There could be a race condition where relevant partial Python Thread Stack ends up being different from the
339+
// one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would
340+
// cause an underflow, so let's be conservative.
341+
size_t start_index = 0;
342+
if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) {
343+
start_index = python_stack.size() - upper_python_stack_size;
344+
}
345+
for (size_t i = start_index; i < python_stack.size(); i++) {
346+
const auto& python_frame = python_stack[i];
347+
stack.push_back(python_frame);
348+
}
322349

323350
current_tasks.push_back(std::move(stack_info));
324351
}

ddtrace/testing/internal/telemetry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ def record_event_payload(
177177
payload_size: int,
178178
request_seconds: float,
179179
events_count: int,
180-
serialization_seconds: float,
181180
error: t.Optional[ErrorType],
182181
) -> None:
183182
tags = {"endpoint": endpoint}
@@ -186,11 +185,14 @@ def record_event_payload(
186185
self.add_count_metric("endpoint_payload.requests", 1, tags)
187186
self.add_distribution_metric("endpoint_payload.requests_ms", request_seconds * 1000, tags)
188187
self.add_distribution_metric("endpoint_payload.events_count", events_count, tags)
189-
self.add_distribution_metric("endpoint_payload.events_serialization_ms", serialization_seconds * 1000, tags)
190188

191189
if error:
192190
self.record_event_payload_error(endpoint, error)
193191

192+
def record_event_payload_serialization_seconds(self, endpoint: str, serialization_seconds: float) -> None:
193+
tags = {"endpoint": endpoint}
194+
self.add_distribution_metric("endpoint_payload.events_serialization_ms", serialization_seconds * 1000, tags)
195+
194196
def record_event_payload_error(self, endpoint: str, error: ErrorType) -> None:
195197
# `endpoint_payload.requests_errors` accepts a different set of error types, so we need to convert them here.
196198
if error == ErrorType.TIMEOUT:

0 commit comments

Comments
 (0)