Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bottlecap/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::time::Duration;
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
use std::sync::Arc;
use std::{collections::HashMap, error::Error, fs::File, io::BufReader};
use tracing::{debug, error};
use tracing::{debug, error, warn};

#[must_use]
pub fn get_client(config: &Arc<config::Config>) -> reqwest::Client {
Expand Down Expand Up @@ -117,6 +117,22 @@ pub async fn extract_request_body(
Ok((parts, bytes))
}

/// Like [`extract_request_body`], but never fails: if buffering the body
/// errors (e.g. an oversized payload exceeding `DefaultBodyLimit`), the body
/// is replaced with empty bytes so that processing can continue with headers
/// only.
pub async fn extract_request_body_or_empty(request: Request) -> (http::request::Parts, Bytes) {
let (parts, body) = request.into_parts();
let bytes = match Bytes::from_request(Request::from_parts(parts.clone(), body), &()).await {
Ok(b) => b,
Err(e) => {
warn!("Failed to buffer request body: {e}. Processing with empty payload.");
Bytes::new()
}
};
(parts, bytes)
}

#[must_use]
pub fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
headers
Expand Down
76 changes: 57 additions & 19 deletions bottlecap/src/lifecycle/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};

use crate::{
http::{extract_request_body, headers_to_map},
http::{extract_request_body_or_empty, headers_to_map},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you're using extract_request_body_or_empty, what's going on with extract_request_body? Is that ever used again? Do we need to update this new behavior in previous usages or?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few other usages for extract_request_body

  1. let (_, body) = match extract_request_body(request).await {
  2. let (parts, body) = match extract_request_body(request).await {
  3. let (parts, body) = match extract_request_body(req).await {
  4. let (parts, body) = match extract_request_body(request).await {
  5. let (parts, body) = match extract_request_body(request).await {

I actually don't have enough background knowledge to say if those cases can follow the same pattern...Do you think we should make all of them just log the error and accept empty body?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, fair enough, let's keep it as is for this one and let the other ones handle it as they expect it

lifecycle::invocation::processor_service::InvocationProcessorHandle,
traces::{
context::SpanContext,
Expand Down Expand Up @@ -125,17 +125,7 @@ impl Listener {
State((invocation_processor_handle, propagator, tasks)): State<ListenerState>,
request: Request,
) -> Response {
let (parts, body) = match extract_request_body(request).await {
Ok(r) => r,
Err(e) => {
error!("Failed to extract request body: {e}");
return (
StatusCode::BAD_REQUEST,
"Could not read start invocation request body",
)
.into_response();
}
};
let (parts, body) = extract_request_body_or_empty(request).await;

let headers = headers_to_map(&parts.headers);
let payload_value = serde_json::from_slice::<Value>(&body).unwrap_or_else(|_| json!({}));
Expand Down Expand Up @@ -171,13 +161,9 @@ impl Listener {
// IMPORTANT: Extract the body synchronously before returning the response.
// If this is moved into the spawned task, PlatformRuntimeDone may be
// processed before the body is read, causing orphaned traces. (SLES-2666)
let (parts, body) = match extract_request_body(request).await {
Ok(r) => r,
Err(e) => {
error!("Failed to extract request body: {e}");
return (StatusCode::OK, json!({}).to_string()).into_response();
}
};
// On oversized payloads (>6MB) we gracefully degrade to an empty body
// so that processing still runs. (SLES-2722)
let (parts, body) = extract_request_body_or_empty(request).await;

let mut join_set = tasks.lock().await;
join_set.spawn(async move {
Expand Down Expand Up @@ -398,4 +384,56 @@ mod tests {
"Should extract request_id from LWA proxy header"
);
}

/// Verifies that an oversized payload (>6MB) behind `DefaultBodyLimit`
/// does NOT prevent end-invocation processing. The handler should
/// gracefully degrade to an empty body instead of failing outright.
#[tokio::test]
async fn test_end_invocation_oversized_payload_still_processes() {
// Mirrors the fixed handle_end_invocation logic: synchronously attempt
// body extraction before spawning the task, fall back to empty bytes.
async fn handler(request: axum::extract::Request) -> StatusCode {
use axum::extract::FromRequest;

let (parts, body) = request.into_parts();
let body =
match Bytes::from_request(axum::extract::Request::from_parts(parts, body), &())
.await
{
Ok(b) => b,
Err(_) => Bytes::new(),
};

if body.is_empty() {
// Body was too large and was replaced with empty bytes.
// Processing continues with degraded payload.
StatusCode::OK
} else {
StatusCode::OK
}
}

let router = Router::new()
.route(END_INVOCATION_PATH, post(handler))
.layer(DefaultBodyLimit::max(LAMBDA_INVOCATION_MAX_PAYLOAD));

// 6 MB + 1 byte: exceeds the DefaultBodyLimit
let payload = vec![b'x'; LAMBDA_INVOCATION_MAX_PAYLOAD + 1];
let req = Request::builder()
.method("POST")
.uri(END_INVOCATION_PATH)
.header("Content-Type", "application/json")
.body(Body::from(payload))
.expect("failed to build request");

let response = router.oneshot(req).await.expect("request failed");

// The handler gracefully degrades to an empty payload instead of failing,
// so processing (universal_instrumentation_end) still runs.
assert_eq!(
response.status(),
StatusCode::OK,
"Oversized payload should be handled gracefully with empty body fallback"
);
}
}
Loading