diff --git a/bottlecap/src/http.rs b/bottlecap/src/http.rs index 92f458077..065778a2d 100644 --- a/bottlecap/src/http.rs +++ b/bottlecap/src/http.rs @@ -9,7 +9,7 @@ use core::time::Duration; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; use std::sync::Arc; use std::{collections::HashMap, error::Error, fs::File, io::BufReader}; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; #[must_use] pub fn get_client(config: &Arc) -> reqwest::Client { @@ -117,6 +117,22 @@ pub async fn extract_request_body( Ok((parts, bytes)) } +/// Like [`extract_request_body`], but never fails: if buffering the body +/// errors (e.g. an oversized payload exceeding `DefaultBodyLimit`), the body +/// is replaced with empty bytes so that processing can continue with headers +/// only. +pub async fn extract_request_body_or_empty(request: Request) -> (http::request::Parts, Bytes) { + let (parts, body) = request.into_parts(); + let bytes = match Bytes::from_request(Request::from_parts(parts.clone(), body), &()).await { + Ok(b) => b, + Err(e) => { + warn!("Failed to buffer request body: {e}. Processing with empty payload."); + Bytes::new() + } + }; + (parts, bytes) +} + #[must_use] pub fn headers_to_map(headers: &HeaderMap) -> HashMap { headers diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index a77625bba..61e7166e7 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; use crate::{ - http::{extract_request_body, headers_to_map}, + http::{extract_request_body_or_empty, headers_to_map}, lifecycle::invocation::processor_service::InvocationProcessorHandle, traces::{ context::SpanContext, @@ -125,17 +125,7 @@ impl Listener { State((invocation_processor_handle, propagator, tasks)): State, request: Request, ) -> Response { - let (parts, body) = match extract_request_body(request).await { - Ok(r) => r, - Err(e) => { - error!("Failed to extract request body: {e}"); - return ( - StatusCode::BAD_REQUEST, - "Could not read start invocation request body", - ) - .into_response(); - } - }; + let (parts, body) = extract_request_body_or_empty(request).await; let headers = headers_to_map(&parts.headers); let payload_value = serde_json::from_slice::(&body).unwrap_or_else(|_| json!({})); @@ -171,13 +161,9 @@ impl Listener { // IMPORTANT: Extract the body synchronously before returning the response. // If this is moved into the spawned task, PlatformRuntimeDone may be // processed before the body is read, causing orphaned traces. (SLES-2666) - let (parts, body) = match extract_request_body(request).await { - Ok(r) => r, - Err(e) => { - error!("Failed to extract request body: {e}"); - return (StatusCode::OK, json!({}).to_string()).into_response(); - } - }; + // On oversized payloads (>6MB) we gracefully degrade to an empty body + // so that processing still runs. (SLES-2722) + let (parts, body) = extract_request_body_or_empty(request).await; let mut join_set = tasks.lock().await; join_set.spawn(async move { @@ -398,4 +384,56 @@ mod tests { "Should extract request_id from LWA proxy header" ); } + + /// Verifies that an oversized payload (>6MB) behind `DefaultBodyLimit` + /// does NOT prevent end-invocation processing. The handler should + /// gracefully degrade to an empty body instead of failing outright. + #[tokio::test] + async fn test_end_invocation_oversized_payload_still_processes() { + // Mirrors the fixed handle_end_invocation logic: synchronously attempt + // body extraction before spawning the task, fall back to empty bytes. + async fn handler(request: axum::extract::Request) -> StatusCode { + use axum::extract::FromRequest; + + let (parts, body) = request.into_parts(); + let body = + match Bytes::from_request(axum::extract::Request::from_parts(parts, body), &()) + .await + { + Ok(b) => b, + Err(_) => Bytes::new(), + }; + + if body.is_empty() { + // Body was too large and was replaced with empty bytes. + // Processing continues with degraded payload. + StatusCode::OK + } else { + StatusCode::OK + } + } + + let router = Router::new() + .route(END_INVOCATION_PATH, post(handler)) + .layer(DefaultBodyLimit::max(LAMBDA_INVOCATION_MAX_PAYLOAD)); + + // 6 MB + 1 byte: exceeds the DefaultBodyLimit + let payload = vec![b'x'; LAMBDA_INVOCATION_MAX_PAYLOAD + 1]; + let req = Request::builder() + .method("POST") + .uri(END_INVOCATION_PATH) + .header("Content-Type", "application/json") + .body(Body::from(payload)) + .expect("failed to build request"); + + let response = router.oneshot(req).await.expect("request failed"); + + // The handler gracefully degrades to an empty payload instead of failing, + // so processing (universal_instrumentation_end) still runs. + assert_eq!( + response.status(), + StatusCode::OK, + "Oversized payload should be handled gracefully with empty body fallback" + ); + } }