Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 80 additions & 80 deletions tensorrt_llm/serve/scripts/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)


async def _iter_sse_data(response_content):
"""Yield SSE data payloads from a byte stream.

Buffers incoming bytes on newline boundaries, decodes UTF-8, strips
non-data lines and the ``data:`` prefix, and skips ``[DONE]`` sentinels.
Callers receive only JSON-ready strings.
"""
buf = b""
async for chunk in response_content:
buf += chunk
while b"\n" in buf:
line_bytes, buf = buf.split(b"\n", 1)
line = line_bytes.decode("utf-8").strip()
if not line or not line.startswith("data:"):
continue
payload = line.removeprefix("data:").lstrip()
if payload == "[DONE]":
continue
yield payload
Comment thread
longlee0622 marked this conversation as resolved.


@dataclass
class RequestFuncInput:
prompt: str
Expand Down Expand Up @@ -88,14 +109,7 @@ async def async_request_trt_llm(
if response.status == 200:
output.success = True
if streaming:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue

chunk = chunk_bytes.decode("utf-8").removeprefix(
"data:")

async for chunk in _iter_sse_data(response.content):
data = json.loads(chunk)
output.generated_text += data["text_output"]
timestamp = time.perf_counter()
Expand Down Expand Up @@ -197,45 +211,38 @@ async def async_request_openai_completions(
if response.status == 200:
if streaming:
first_chunk_received = False
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue

chunk = chunk_bytes.decode("utf-8").removeprefix(
"data: ")
if chunk != "[DONE]":
data = json.loads(chunk)

# NOTE: Some completion API might have a last
# usage summary response without a token so we
# want to check a token was generated
if choices := data.get("choices"):
# Note that text could be empty here
# e.g. for special tokens
text = choices[0].get("text")
timestamp = time.perf_counter()
# First token
if not first_chunk_received:
first_chunk_received = True
ttft = time.perf_counter() - st
output.ttft = ttft

# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)

most_recent_timestamp = timestamp
generated_text += text or ""

# Extract avg_decoded_tokens_per_iter from streaming response
if "avg_decoded_tokens_per_iter" in choices[0]:
output.avg_decoded_tokens_per_iter = choices[
0]["avg_decoded_tokens_per_iter"]
elif usage := data.get("usage"):
output.output_tokens = usage.get(
"completion_tokens")
async for chunk in _iter_sse_data(response.content):
data = json.loads(chunk)

# NOTE: Some completion API might have a last
# usage summary response without a token so we
# want to check a token was generated
if choices := data.get("choices"):
# Note that text could be empty here
# e.g. for special tokens
text = choices[0].get("text")
timestamp = time.perf_counter()
# First token
if not first_chunk_received:
first_chunk_received = True
ttft = time.perf_counter() - st
output.ttft = ttft

# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)

most_recent_timestamp = timestamp
generated_text += text or ""

# Extract avg_decoded_tokens_per_iter from streaming response
if "avg_decoded_tokens_per_iter" in choices[0]:
output.avg_decoded_tokens_per_iter = choices[0][
"avg_decoded_tokens_per_iter"]
elif usage := data.get("usage"):
output.output_tokens = usage.get(
"completion_tokens")
if first_chunk_received:
output.success = True
else:
Expand Down Expand Up @@ -342,40 +349,33 @@ async def async_request_openai_chat_completions(
if response.status == 200:
output.success = True
if streaming:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue

chunk = chunk_bytes.decode("utf-8").removeprefix(
"data: ")
if chunk != "[DONE]":
timestamp = time.perf_counter()
data = json.loads(chunk)

if choices := data.get("choices"):
content = choices[0]["delta"].get("content")
# First token
if ttft == 0.0:
ttft = timestamp - st
output.ttft = ttft

# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)

generated_text += content or ""

# Extract avg_decoded_tokens_per_iter from streaming chat response
if "avg_decoded_tokens_per_iter" in choices[0]:
output.avg_decoded_tokens_per_iter = choices[
0]["avg_decoded_tokens_per_iter"]
elif usage := data.get("usage"):
output.output_tokens = usage.get(
"completion_tokens")
async for chunk in _iter_sse_data(response.content):
timestamp = time.perf_counter()
data = json.loads(chunk)

most_recent_timestamp = timestamp
if choices := data.get("choices"):
content = choices[0]["delta"].get("content")
# First token
if ttft == 0.0:
ttft = timestamp - st
output.ttft = ttft

# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)

generated_text += content or ""

# Extract avg_decoded_tokens_per_iter from streaming chat response
if "avg_decoded_tokens_per_iter" in choices[0]:
output.avg_decoded_tokens_per_iter = choices[0][
"avg_decoded_tokens_per_iter"]
elif usage := data.get("usage"):
output.output_tokens = usage.get(
"completion_tokens")

most_recent_timestamp = timestamp

output.generated_text = generated_text
output.latency = most_recent_timestamp - st
Expand Down
40 changes: 31 additions & 9 deletions tests/integration/defs/perf/test_perf_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,9 @@ def get_gpu_type() -> str:
except (subprocess.CalledProcessError, FileNotFoundError, IndexError):
raise RuntimeError("Failed to get GPU type")

self.upload_to_db = "upload" in test_case_name.split("-")[0]
self.upload_to_db = "upload" in test_case_name.split("-")[0] and bool(
os.environ.get("OPEN_SEARCH_DB_BASE_URL", "")
)
self.gpu_type = get_gpu_type()

# Parse test case name to get config_base_name, select_pattern, runtime, benchmark_mode
Expand Down Expand Up @@ -1526,15 +1528,32 @@ def _check_benchmark_errors(self, output: str) -> None:
if not output:
return

# Tolerance: allow up to 1% failed requests for high-concurrency benchmarks
# where transient network issues can cause rare individual request failures.
max_failure_rate = 0.01

# Check for non-zero failed requests (default benchmark)
failed_requests_match = re.search(r"Failed requests:\s+(\d+)", output)
if failed_requests_match:
failed_count = int(failed_requests_match.group(1))
if failed_count > 0:
error_msg = f"Benchmark output contains {failed_count} failed requests."
raise RuntimeError(error_msg)
total_match = re.search(r"Successful requests:\s+(\d+)", output)
total_requests = (
int(total_match.group(1)) + failed_count if total_match else failed_count
)
failure_rate = failed_count / total_requests if total_requests > 0 else 1.0
if failure_rate > max_failure_rate:
error_msg = (
Comment thread
longlee0622 marked this conversation as resolved.
f"Benchmark output contains {failed_count} failed requests "
f"({failure_rate:.2%} failure rate exceeds {max_failure_rate:.0%} threshold)."
)
raise RuntimeError(error_msg)
return

# Check for explicit failure markers (default benchmark)
# Check for explicit failure markers (default benchmark) only when
# the numeric "Failed requests:" line was not found (the markers are
# always printed together with the numeric count, so this avoids
# double-counting when the failure rate is within tolerance).
if "!FAILED REQUESTS!" in output or "!CHECK LOG FOR ERRORS!" in output:
error_msg = "Benchmark output contains failure markers."
raise RuntimeError(error_msg)
Expand All @@ -1550,11 +1569,14 @@ def _check_benchmark_errors(self, output: str) -> None:
num_prompts = int(num_prompts_match.group(1))
failed_count = num_prompts - successful_count
if failed_count > 0:
error_msg = (
f"SA benchmark: {failed_count} of {num_prompts} requests failed "
f"({successful_count} successful)."
)
raise RuntimeError(error_msg)
failure_rate = failed_count / num_prompts if num_prompts > 0 else 1.0
if failure_rate > max_failure_rate:
error_msg = (
f"SA benchmark: {failed_count} of {num_prompts} requests failed "
f"({successful_count} successful, "
f"{failure_rate:.2%} exceeds {max_failure_rate:.0%} threshold)."
)
raise RuntimeError(error_msg)

def run_ex(self, commands) -> Dict[int, List[str]]:
"""Run commands and collect outputs."""
Expand Down
Loading