Skip to content

Commit 3632c6c

Browse files
chore(ci_visibility): split writer payloads (#15576)
## Description Split payloads to the event intakes when they would exceed 4.5MB. ## Testing Unit tests. ## Risks None. ## Additional Notes None.
1 parent affc3c7 commit 3632c6c

File tree

4 files changed

+126
-46
lines changed

4 files changed

+126
-46
lines changed

ddtrace/testing/internal/telemetry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ def record_event_payload(
177177
payload_size: int,
178178
request_seconds: float,
179179
events_count: int,
180-
serialization_seconds: float,
181180
error: t.Optional[ErrorType],
182181
) -> None:
183182
tags = {"endpoint": endpoint}
@@ -186,11 +185,14 @@ def record_event_payload(
186185
self.add_count_metric("endpoint_payload.requests", 1, tags)
187186
self.add_distribution_metric("endpoint_payload.requests_ms", request_seconds * 1000, tags)
188187
self.add_distribution_metric("endpoint_payload.events_count", events_count, tags)
189-
self.add_distribution_metric("endpoint_payload.events_serialization_ms", serialization_seconds * 1000, tags)
190188

191189
if error:
192190
self.record_event_payload_error(endpoint, error)
193191

192+
def record_event_payload_serialization_seconds(self, endpoint: str, serialization_seconds: float) -> None:
193+
tags = {"endpoint": endpoint}
194+
self.add_distribution_metric("endpoint_payload.events_serialization_ms", serialization_seconds * 1000, tags)
195+
194196
def record_event_payload_error(self, endpoint: str, error: ErrorType) -> None:
195197
# `endpoint_payload.requests_errors` accepts a different set of error types, so we need to convert them here.
196198
if error == ErrorType.TIMEOUT:

ddtrace/testing/internal/writer.py

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ def __init__(self) -> None:
3535
self.should_finish = threading.Event()
3636
self.flush_interval_seconds = 60
3737
self.events: t.List[Event] = []
38+
# 4.5MB max uncompressed payload size, following <https://github.com/DataDog/datadog-ci-rb/pull/272>.
39+
self.max_payload_size = int(4.5 * 1024 * 1024)
3840

3941
def put_event(self, event: Event) -> None:
40-
# TODO: compute/estimate payload size as events are inserted, and force a push once we reach a certain size.
4142
with self.lock:
4243
self.events.append(event)
4344

@@ -80,6 +81,22 @@ def flush(self) -> None:
8081
def _send_events(self, events: t.List[Event]) -> None:
8182
pass
8283

84+
@abstractmethod
85+
def _encode_events(self, events: t.List[Event]) -> bytes:
86+
pass
87+
88+
def _split_pack_events(self, events: t.List[Event]) -> t.List[bytes]:
89+
pack = self._encode_events(events)
90+
91+
if len(pack) > self.max_payload_size and len(events) > 1:
92+
del pack
93+
midpoint = len(events) // 2
94+
packs = self._split_pack_events(events[0:midpoint])
95+
packs += self._split_pack_events(events[midpoint:])
96+
return packs
97+
else:
98+
return [pack]
99+
83100

84101
class TestOptWriter(BaseWriter):
85102
__test__ = False
@@ -122,27 +139,36 @@ def put_item(self, item: TestItem[t.Any, t.Any]) -> None:
122139
event = self.serializers[type(item)](item)
123140
self.put_event(event)
124141

125-
def _send_events(self, events: t.List[Event]) -> None:
142+
def _encode_events(self, events: t.List[Event]) -> bytes:
126143
payload = {
127144
"version": 1,
128145
"metadata": self.metadata,
129146
"events": events,
130147
}
148+
return msgpack_packb(payload)
149+
150+
def _send_events(self, events: t.List[Event]) -> None:
131151
with StopWatch() as serialization_time:
132-
pack = msgpack_packb(payload)
152+
packs = self._split_pack_events(events)
133153

134-
result = self.connector.request(
135-
"POST", "/api/v2/citestcycle", data=pack, headers={"Content-Type": "application/msgpack"}, send_gzip=True
136-
)
154+
TelemetryAPI.get().record_event_payload_serialization_seconds("test_cycle", serialization_time.elapsed())
137155

138-
TelemetryAPI.get().record_event_payload(
139-
endpoint="test_cycle",
140-
payload_size=len(pack),
141-
request_seconds=result.elapsed_seconds,
142-
events_count=len(events),
143-
serialization_seconds=serialization_time.elapsed(),
144-
error=result.error_type,
145-
)
156+
for pack in packs:
157+
result = self.connector.request(
158+
"POST",
159+
"/api/v2/citestcycle",
160+
data=pack,
161+
headers={"Content-Type": "application/msgpack"},
162+
send_gzip=True,
163+
)
164+
165+
TelemetryAPI.get().record_event_payload(
166+
endpoint="test_cycle",
167+
payload_size=len(pack),
168+
request_seconds=result.elapsed_seconds,
169+
events_count=len(events),
170+
error=result.error_type,
171+
)
146172

147173

148174
class TestCoverageWriter(BaseWriter):
@@ -169,35 +195,40 @@ def put_coverage(self, test_run: TestRun, coverage_bitmaps: t.Iterable[t.Tuple[s
169195
)
170196
self.put_event(event)
171197

198+
def _encode_events(self, events: t.List[Event]) -> bytes:
199+
return msgpack_packb({"version": 2, "coverages": events})
200+
172201
def _send_events(self, events: t.List[Event]) -> None:
173202
with StopWatch() as serialization_time:
174-
pack = msgpack_packb({"version": 2, "coverages": events})
175-
176-
files = [
177-
FileAttachment(
178-
name="coverage1",
179-
filename="coverage1.msgpack",
180-
content_type="application/msgpack",
181-
data=pack,
182-
),
183-
FileAttachment(
184-
name="event",
185-
filename="event.json",
186-
content_type="application/json",
187-
data=b'{"dummy":true}',
188-
),
189-
]
190-
191-
result = self.connector.post_files("/api/v2/citestcov", files=files, send_gzip=True)
192-
193-
TelemetryAPI.get().record_event_payload(
194-
endpoint="code_coverage",
195-
payload_size=len(pack),
196-
request_seconds=result.elapsed_seconds,
197-
events_count=len(events),
198-
serialization_seconds=serialization_time.elapsed(),
199-
error=result.error_type,
200-
)
203+
packs = self._split_pack_events(events)
204+
205+
TelemetryAPI.get().record_event_payload_serialization_seconds("code_coverage", serialization_time.elapsed())
206+
207+
for pack in packs:
208+
files = [
209+
FileAttachment(
210+
name="coverage1",
211+
filename="coverage1.msgpack",
212+
content_type="application/msgpack",
213+
data=pack,
214+
),
215+
FileAttachment(
216+
name="event",
217+
filename="event.json",
218+
content_type="application/json",
219+
data=b'{"dummy":true}',
220+
),
221+
]
222+
223+
result = self.connector.post_files("/api/v2/citestcov", files=files, send_gzip=True)
224+
225+
TelemetryAPI.get().record_event_payload(
226+
endpoint="code_coverage",
227+
payload_size=len(pack),
228+
request_seconds=result.elapsed_seconds,
229+
events_count=len(events),
230+
error=result.error_type,
231+
)
201232

202233

203234
def serialize_test_run(test_run: TestRun) -> Event:

tests/testing/internal/test_telemetry.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,6 @@ def test_record_event_payload_ok(self, telemetry_api: TelemetryAPI) -> None:
274274
payload_size=613,
275275
request_seconds=3.14,
276276
events_count=42,
277-
serialization_seconds=0.5,
278277
error=None,
279278
)
280279

@@ -285,7 +284,6 @@ def test_record_event_payload_ok(self, telemetry_api: TelemetryAPI) -> None:
285284
call(CIVISIBILITY, "endpoint_payload.bytes", 613, (("endpoint", "test_cycle"),)),
286285
call(CIVISIBILITY, "endpoint_payload.requests_ms", 3140, (("endpoint", "test_cycle"),)),
287286
call(CIVISIBILITY, "endpoint_payload.events_count", 42, (("endpoint", "test_cycle"),)),
288-
call(CIVISIBILITY, "endpoint_payload.events_serialization_ms", 500, (("endpoint", "test_cycle"),)),
289287
]
290288

291289
@pytest.mark.parametrize(
@@ -307,7 +305,6 @@ def test_record_event_payload_error(
307305
payload_size=613,
308306
request_seconds=3.14,
309307
events_count=42,
310-
serialization_seconds=0.5,
311308
error=http_error_type,
312309
)
313310

@@ -327,6 +324,12 @@ def test_record_event_payload_error(
327324
call(CIVISIBILITY, "endpoint_payload.bytes", 613, (("endpoint", "test_cycle"),)),
328325
call(CIVISIBILITY, "endpoint_payload.requests_ms", 3140, (("endpoint", "test_cycle"),)),
329326
call(CIVISIBILITY, "endpoint_payload.events_count", 42, (("endpoint", "test_cycle"),)),
327+
]
328+
329+
def test_record_event_payload_serialization_seconds(self, telemetry_api: TelemetryAPI) -> None:
330+
telemetry_api.record_event_payload_serialization_seconds("test_cycle", 0.5)
331+
332+
assert telemetry_api.writer.add_distribution_metric.call_args_list == [
330333
call(CIVISIBILITY, "endpoint_payload.events_serialization_ms", 500, (("endpoint", "test_cycle"),)),
331334
]
332335

tests/testing/internal/test_writer.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Tests for ddtrace.testing.internal.writer module."""
22

33
from unittest.mock import Mock
4+
from unittest.mock import call
45
from unittest.mock import patch
56

67
from ddtrace.testing.internal.http import BackendConnectorAgentlessSetup
@@ -10,6 +11,7 @@
1011
from ddtrace.testing.internal.test_data import TestSession
1112
from ddtrace.testing.internal.test_data import TestStatus
1213
from ddtrace.testing.internal.test_data import TestSuite
14+
from ddtrace.testing.internal.tracer_api import msgpack_packb
1315
from ddtrace.testing.internal.writer import Event
1416
from ddtrace.testing.internal.writer import TestCoverageWriter
1517
from ddtrace.testing.internal.writer import TestOptWriter
@@ -101,6 +103,48 @@ def test_send_events(self, mock_packb: Mock, mock_backend_connector: Mock) -> No
101103
send_gzip=True,
102104
)
103105

106+
@patch("ddtrace.testing.internal.http.BackendConnector")
107+
def test_split_events(self, mock_backend_connector: Mock) -> None:
108+
"""Test sending events to backend."""
109+
mock_connector = Mock()
110+
mock_backend_connector.return_value = mock_connector
111+
mock_connector.request.return_value = BackendResult(
112+
response=Mock(status=200), response_length=42, elapsed_seconds=1.2
113+
)
114+
115+
writer = TestOptWriter(BackendConnectorAgentlessSetup(site="test", api_key="key"))
116+
writer.metadata = {"*": {"foo": "bar"}}
117+
writer.max_payload_size = 70
118+
119+
events = [Event(type="test1"), Event(type="test2"), Event(type="test3")]
120+
121+
writer._send_events(events)
122+
123+
expected_pack_1 = msgpack_packb(
124+
{"version": 1, "metadata": {"*": {"foo": "bar"}}, "events": [{"type": "test1"}]}
125+
)
126+
expected_pack_2 = msgpack_packb(
127+
{"version": 1, "metadata": {"*": {"foo": "bar"}}, "events": [{"type": "test2"}, {"type": "test3"}]}
128+
)
129+
130+
# Check HTTP request
131+
assert mock_connector.request.call_args_list == [
132+
call(
133+
"POST",
134+
"/api/v2/citestcycle",
135+
data=expected_pack_1,
136+
headers={"Content-Type": "application/msgpack"},
137+
send_gzip=True,
138+
),
139+
call(
140+
"POST",
141+
"/api/v2/citestcycle",
142+
data=expected_pack_2,
143+
headers={"Content-Type": "application/msgpack"},
144+
send_gzip=True,
145+
),
146+
]
147+
104148

105149
class TestTestCoverageWriter:
106150
"""Tests for TestCoverageWriter class."""

0 commit comments

Comments
 (0)