Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions scripts/regenerate_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ def _add_comments(text: str, comments: dict[str, str]) -> str:
for key, comment in sorted(comments.items(), key=lambda x: -len(x[0])):
text = re.sub(
rf"^(\s*{re.escape(key)}.*)$",
lambda m, c=comment: m.group(0)
if "#" in m.group(0)
else f"{m.group(0)} {c}",
lambda m, c=comment: (
m.group(0) if "#" in m.group(0) else f"{m.group(0)} {c}"
),
text,
count=0,
flags=re.MULTILINE,
Expand Down Expand Up @@ -326,6 +326,15 @@ def _build_full(model_cls: type[BenchmarkConfig], overrides: dict) -> dict:
if overrides:
data = _deep_merge(data, overrides)

# Mirror LoadPattern's model_serializer: use_legacy_loadgen_qps_metrics
# applies only to poisson, so drop it from other patterns' templates.
# TODO(vir): remove this prune when use_legacy_loadgen_qps_metrics is removed.
settings = data.get("settings")
if isinstance(settings, dict):
load_pattern = settings.get("load_pattern")
if isinstance(load_pattern, dict) and load_pattern.get("type") != "poisson":
load_pattern.pop("use_legacy_loadgen_qps_metrics", None)

# Resolve streaming AUTO → off/on (mirrors schema validator)
test_type = data.get("type")
mp = data.get("model_params", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class MetricCounterKey(str, Enum):
# tracked row still exists when the aggregator sees the ERROR.
TRACKED_SAMPLES_FAILED = "tracked_samples_failed"
TRACKED_DURATION_NS = "tracked_duration_ns"
# Legacy MLPerf LoadGen Server "completed" window (final_query_all_samples_done_time).
LEGACY_LOADGEN_WINDOW_DURATION_NS = "legacy_loadgen_window_duration_ns"
# Total wall-clock duration since session start. Updated on every event as
# max(current, event_timestamp - session_start). Stored as a counter
# rather than computed from (now - start) at read time because
Expand Down Expand Up @@ -320,6 +322,10 @@ async def process(self, records: list[EventRecord]) -> None:
MetricCounterKey.TRACKED_DURATION_NS.value,
table.total_tracked_duration_ns,
)
registry.set_counter(
MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value,
table.total_loadgen_window_ns,
)
logger.debug("Session event: %s", ev)
continue

Expand Down Expand Up @@ -392,6 +398,10 @@ async def process(self, records: list[EventRecord]) -> None:
MetricCounterKey.TRACKED_DURATION_NS.value,
table.total_tracked_duration_ns,
)
registry.set_counter(
MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value,
table.total_loadgen_window_ns,
)
try:
await self._publisher.publish_final(registry, n_pending_tasks=n_pending)
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,13 @@ def __init__(self, registry: MetricsRegistry) -> None:
self.session_started_ns: int | None = None
self.tracked_blocks: list[TrackedBlock] = []

# LoadGen window anchors: start at the FIRST issued tracked request
# (LoadGen t=0), end at the completion of the last-issued request that
# completed. See total_loadgen_window_ns.
self._loadgen_max_issued_ns: int = -1
self._loadgen_window_end_ns: int | None = None
self._loadgen_window_start_ns: int | None = None

# --- Trigger registration ---

def add_trigger(self, field_name: str, trigger: EmitTrigger) -> None:
Expand Down Expand Up @@ -501,6 +508,16 @@ def total_completed_tracked_samples(self) -> int:
"""Total samples completed across all tracking blocks."""
return sum(b.completed_samples for b in self.tracked_blocks)

@property
def total_loadgen_window_ns(self) -> int:
"""Window from the first issued tracked request (LoadGen t=0) to the
completion of the last-issued request that completed — the legacy MLPerf
LoadGen ``final_query_all_samples_done_time`` analog. Returns 0 (=>
the legacy metric falls back to native) when no tracked request completed."""
if self._loadgen_window_end_ns is None or self._loadgen_window_start_ns is None:
return 0
return max(0, self._loadgen_window_end_ns - self._loadgen_window_start_ns)

# --- Field updates ---

def set_field(
Expand All @@ -526,6 +543,8 @@ def set_field(
return
row = self._create_row(sample_uuid)
row.tracked_block_idx = len(self.tracked_blocks) - 1
if self._loadgen_window_start_ns is None:
self._loadgen_window_start_ns = value
else:
row = self._in_flight.get(sample_uuid)
if row is None:
Expand Down Expand Up @@ -608,3 +627,8 @@ def _update_tracked_block(self, row: SampleRow, complete_ns: int) -> None:
if complete_ns > block.last_complete_ns:
block.last_complete_ns = complete_ns
block.completed_samples += 1
# End the legacy LoadGen window at the completion of the last-issued
# (largest issued_ns) request that completed.
if row.issued_ns is not None and row.issued_ns > self._loadgen_max_issued_ns:
self._loadgen_max_issued_ns = row.issued_ns
self._loadgen_window_end_ns = complete_ns
13 changes: 13 additions & 0 deletions src/inference_endpoint/commands/benchmark/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,20 +804,33 @@ def _on_phase_start(phase: PhaseConfig) -> None:
try:
runtime = ctx.config.settings.runtime
warmup = ctx.config.settings.warmup
load_pattern = ctx.config.settings.load_pattern
report = Report.from_snapshot(
snap_dict,
seeds={
"scheduler_random_seed": runtime.scheduler_random_seed,
"dataloader_random_seed": runtime.dataloader_random_seed,
"warmup_random_seed": warmup.warmup_random_seed,
},
use_legacy_loadgen_qps_metrics=(
load_pattern.type == LoadPatternType.POISSON
and load_pattern.use_legacy_loadgen_qps_metrics
),
)
if not report.complete:
logger.warning(
"Report is incomplete (state=%s, n_pending_tasks=%d)",
report.state,
snap_dict.get("n_pending_tasks", 0),
)
if report.legacy_loadgen_window_duration_ns is not None:
logger.warning(
"Reporting QPS/TPS with the legacy MLPerf LoadGen Server "
"'completed' definition (deprecated; to be removed once a "
"formal tail-cutting mechanism lands). Pass "
"--no-use-legacy-loadgen-qps-metrics for endpoints-native "
"metrics."
)
except Exception as e: # noqa: BLE001 — best-effort report build.
logger.warning(f"Failed to build report from snapshot: {e}")

Expand Down
27 changes: 27 additions & 0 deletions src/inference_endpoint/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
ConfigDict,
Discriminator,
Field,
SerializerFunctionWrapHandler,
Tag,
TypeAdapter,
field_validator,
model_serializer,
model_validator,
)

Expand Down Expand Up @@ -466,6 +468,31 @@ class LoadPattern(BaseModel):
cyclopts.Parameter(alias="--concurrency", help="Concurrent requests"),
] = Field(None, gt=0)

# TODO(vir): remove once the formal tail-cutting mechanism lands.
use_legacy_loadgen_qps_metrics: Annotated[
bool,
cyclopts.Parameter(
negative="--no-use-legacy-loadgen-qps-metrics",
help=(
"Only applies to the poisson load pattern. Report QPS/TPS using "
"the legacy MLPerf LoadGen Server 'completed' definition — (completed-1)/T "
"and tokens/T, T = first issued request to completion of the "
"last-issued request (see mlcommons/inference loadgen/results.cc). "
"--no-... uses endpoints-native completed/duration. Ignored for "
"non-poisson patterns."
),
),
] = True

@model_serializer(mode="wrap")
def _serialize(self, handler: SerializerFunctionWrapHandler) -> dict[str, Any]:
# use_legacy_loadgen_qps_metrics only applies to poisson; drop it from
# the serialized form (and thus YAML templates) for other patterns.
data = handler(self)
if self.type != LoadPatternType.POISSON:
data.pop("use_legacy_loadgen_qps_metrics", None)
return data

@model_validator(mode="after")
def _validate_completeness(self) -> Self:
if self.type == LoadPatternType.POISSON and (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ settings:
type: poisson # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step
target_qps: 10.0 # Target QPS
target_concurrency: null # Concurrent requests
use_legacy_loadgen_qps_metrics: true # Only applies to the poisson load pattern. Report QPS/TPS using the legacy MLPerf LoadGen Server 'completed' definition — (completed-1)/T and tokens/T, T = first issued request to completion of the last-issued request (see mlcommons/inference loadgen/results.cc). --no-... uses endpoints-native completed/duration. Ignored for non-poisson patterns.
client:
num_workers: -1 # Worker processes (-1=auto)
log_level: INFO # Worker log level
Expand Down
62 changes: 55 additions & 7 deletions src/inference_endpoint/metrics/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ class Report(msgspec.Struct, frozen=True): # type: ignore[call-arg]
tpot: dict[str, Any]
latency: dict[str, Any]
output_sequence_lengths: dict[str, Any]
# Legacy MLPerf LoadGen Server "completed" window (poisson only): first
# issued request -> completion of the last-issued request
# (final_query_all_samples_done_time analog; see mlcommons/inference
# loadgen/results.cc). Not None iff QPS/TPS were computed over this window;
# None means the endpoints-native full-run window was used. Recorded so
# result_summary.json is self-describing about which view it holds.
# TODO(vir): deprecate once endpoints has a formal tail-cutting mechanism.
legacy_loadgen_window_duration_ns: int | None = None

# Derived throughput, computed once in from_snapshot so the serialized
# report (result_summary.json) is self-complete. qps is None without a
Expand All @@ -146,7 +154,11 @@ class Report(msgspec.Struct, frozen=True): # type: ignore[call-arg]

@classmethod
def from_snapshot(
cls, snap: dict[str, Any], *, seeds: dict[str, int] | None = None
cls,
snap: dict[str, Any],
*,
seeds: dict[str, int] | None = None,
use_legacy_loadgen_qps_metrics: bool = True,
) -> Report:
"""Build a Report from a snapshot dict.

Expand All @@ -169,6 +181,13 @@ def from_snapshot(
honest "incomplete" report on missing fields instead of crashing:
missing ``state`` defaults to ``"interrupted"`` (worst-case),
missing counters / series to zero / empty.

The snapshot always carries BOTH ``tracked_duration_ns`` and
``legacy_loadgen_window_duration_ns``, so it stays config-agnostic and
fully reinterpretable either way. Which window the reported QPS/TPS use
is decided by the run config (``use_legacy_loadgen_qps_metrics``,
recorded in ``config.yaml`` and in this Report's serialized JSON), not
by the snapshot.
"""
counters: dict[str, int | float] = {}
series: dict[str, dict[str, Any]] = {}
Expand Down Expand Up @@ -197,13 +216,41 @@ def _series_dict(key: str) -> dict[str, Any]:
n_completed = _counter("tracked_samples_completed")
osl = _series_dict("osl")

# Derived throughput. qps needs a duration; tps additionally needs OSL.
if duration_ns is None:
qps = tps = None
# Legacy MLPerf LoadGen Server "completed" window (poisson only): first
# issued request -> completion of the last-issued request
# (final_query_all_samples_done_time analog; see mlcommons/inference
# loadgen/results.cc).
# TODO(vir): deprecate once endpoints has a formal tail-cutting mechanism.
raw_loadgen_window_ns = _counter("legacy_loadgen_window_duration_ns")

# Derived throughput, computed once so result_summary.json is
# self-complete. Legacy LoadGen QPS = (completed-1)/window; otherwise
# native completed/duration. The walrus assigns the window
# unconditionally (first operand of the `and`), so it stays in scope
# for the tps computation and the stored field below — no second
# counter read.
if (
legacy_loadgen_window_duration_ns := (
raw_loadgen_window_ns
if (use_legacy_loadgen_qps_metrics and raw_loadgen_window_ns > 0)
else None
)
) is not None and n_completed >= 2:
qps = (n_completed - 1) / (legacy_loadgen_window_duration_ns / 1e9)
elif duration_ns is not None:
qps = n_completed / (duration_ns / 1e9)
else:
duration_s = duration_ns / 1e9
qps = n_completed / duration_s
tps = (osl.get("total", 0) / duration_s) if osl else None
qps = None
tps_window_ns = (
legacy_loadgen_window_duration_ns
if legacy_loadgen_window_duration_ns is not None
else duration_ns
)
tps = (
(osl.get("total", 0) / (tps_window_ns / 1e9))
if (tps_window_ns and osl)
else None
)

# Default missing state to "interrupted" — a malformed / partial
# snapshot dict is treated as worst-case (run did not reach a
Expand All @@ -226,6 +273,7 @@ def _series_dict(key: str) -> dict[str, Any]:
tpot=_series_dict("tpot_ns"),
latency=_series_dict("sample_latency_ns"),
output_sequence_lengths=osl,
legacy_loadgen_window_duration_ns=legacy_loadgen_window_duration_ns,
qps=qps,
tps=tps,
seeds=seeds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,50 @@ async def test_ended_in_second_batch(self, tmp_path):
agg.close()


# ---------------------------------------------------------------------------
# LoadGen window aggregation (end-to-end through the event router)
# ---------------------------------------------------------------------------


@pytest.mark.unit
class TestLoadgenWindowAggregation:
@pytest.mark.asyncio
async def test_loadgen_window_duration_emitted(self, tmp_path):
"""The aggregator emits ``legacy_loadgen_window_duration_ns`` = first
issue to the completion of the last-issued request.

Sequence: STARTED, START_PERFORMANCE_TRACKING, ISSUED(s1, t=100),
COMPLETE(s1, t=500), ISSUED(s2, t=200, last-issued), COMPLETE(s2,
t=600), STOP_PERFORMANCE_TRACKING. Window = 600 - 100 = 500.
"""
loop = asyncio.get_event_loop()
with ManagedZMQContext.scoped(socket_dir=str(tmp_path)) as ctx:
agg, registry, _ = make_aggregator(ctx, loop, "agg_loadgen_window")
try:
await agg.process(
[
session_event(SessionEventType.STARTED, ts=0),
session_event(
SessionEventType.START_PERFORMANCE_TRACKING, ts=10
),
sample_event(SampleEventType.ISSUED, "s1", ts=100),
sample_event(SampleEventType.COMPLETE, "s1", ts=500),
sample_event(SampleEventType.ISSUED, "s2", ts=200),
sample_event(SampleEventType.COMPLETE, "s2", ts=600),
session_event(
SessionEventType.STOP_PERFORMANCE_TRACKING, ts=700
),
]
)
counters = snapshot_counters(registry)
assert (
counters[MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value]
== 600 - 100
)
finally:
agg.close()


# ---------------------------------------------------------------------------
# Counter accounting (issued / completed)
# ---------------------------------------------------------------------------
Expand Down
Loading
Loading