diff --git a/.gitlab/datasources/test-suites.yaml b/.gitlab/datasources/test-suites.yaml index be065c284..6ea591354 100644 --- a/.gitlab/datasources/test-suites.yaml +++ b/.gitlab/datasources/test-suites.yaml @@ -3,3 +3,4 @@ test_suites: - name: otlp - name: snapstart - name: lmi + - name: durable diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3c2ef31cb..4519183c2 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -46,7 +46,7 @@ use bottlecap::{ }, logger, logs::{ - agent::LogsAgent, + agent::{DurableContextUpdate, LogsAgent}, aggregator_service::{ AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, }, @@ -298,15 +298,20 @@ async fn extension_loop_active( // and shares the connection pool. let shared_client = bottlecap::http::get_client(config); - let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) = - start_logs_agent( - config, - Arc::clone(&api_key_factory), - &tags_provider, - event_bus_tx.clone(), - aws_config.is_managed_instance_mode(), - &shared_client, - ); + let ( + logs_agent_channel, + logs_flusher, + logs_agent_cancel_token, + logs_aggregator_handle, + durable_context_tx, + ) = start_logs_agent( + config, + Arc::clone(&api_key_factory), + &tags_provider, + event_bus_tx.clone(), + aws_config.is_managed_instance_mode(), + &shared_client, + ); let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd( tags_provider.clone(), @@ -357,6 +362,7 @@ async fn extension_loop_active( &tags_provider, invocation_processor_handle.clone(), appsec_processor.clone(), + durable_context_tx, &shared_client, ); @@ -1039,6 +1045,7 @@ fn start_logs_agent( LogsFlusher, CancellationToken, LogsAggregatorHandle, + Sender, ) { let (aggregator_service, aggregator_handle) = LogsAggregatorService::default(); // Start service in background @@ -1046,7 +1053,7 @@ fn start_logs_agent( aggregator_service.run().await; }); - let (mut agent, tx) = LogsAgent::new( + let (mut agent, tx, durable_context_tx) = LogsAgent::new( Arc::clone(tags_provider), Arc::clone(config), event_bus, @@ -1068,7 +1075,13 @@ fn start_logs_agent( config.clone(), client.clone(), ); - (tx, flusher, cancel_token, aggregator_handle) + ( + tx, + flusher, + cancel_token, + aggregator_handle, + durable_context_tx, + ) } #[allow(clippy::type_complexity)] @@ -1078,6 +1091,7 @@ fn start_trace_agent( tags_provider: &Arc, invocation_processor_handle: InvocationProcessorHandle, appsec_processor: Option>>, + durable_context_tx: Sender, client: &Client, ) -> ( Sender, @@ -1163,6 +1177,7 @@ fn start_trace_agent( Arc::clone(tags_provider), stats_concentrator_handle.clone(), span_dedup_handle, + durable_context_tx, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 71e5a721e..ac7d65599 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc::{self, Sender}; use tokio_util::sync::CancellationToken; -use tracing::debug; +use tracing::{debug, error}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; @@ -12,9 +12,13 @@ use crate::{LAMBDA_RUNTIME_SLUG, config}; const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100); +/// `(request_id, execution_id, execution_name)` extracted from an `aws.lambda` span. +pub type DurableContextUpdate = (String, String, String); + #[allow(clippy::module_name_repetitions)] pub struct LogsAgent { rx: mpsc::Receiver, + durable_context_rx: mpsc::Receiver, processor: LogsProcessor, aggregator_handle: AggregatorHandle, cancel_token: CancellationToken, @@ -28,7 +32,7 @@ impl LogsAgent { event_bus: Sender, aggregator_handle: AggregatorHandle, is_managed_instance_mode: bool, - ) -> (Self, Sender) { + ) -> (Self, Sender, Sender) { let processor = LogsProcessor::new( Arc::clone(&datadog_config), tags_provider, @@ -38,16 +42,18 @@ impl LogsAgent { ); let (tx, rx) = mpsc::channel::(1000); + let (durable_context_tx, durable_context_rx) = mpsc::channel::(100); let cancel_token = CancellationToken::new(); let agent = Self { rx, + durable_context_rx, processor, aggregator_handle, cancel_token, }; - (agent, tx) + (agent, tx, durable_context_tx) } pub async fn spin(&mut self) { @@ -56,6 +62,13 @@ impl LogsAgent { Some(event) = self.rx.recv() => { self.processor.process(event, &self.aggregator_handle).await; } + Some((request_id, execution_id, execution_name)) = self.durable_context_rx.recv() => { + self.processor.insert_to_durable_map(&request_id, &execution_id, &execution_name); + let ready_logs = self.processor.take_ready_logs(); + if !ready_logs.is_empty() && let Err(e) = self.aggregator_handle.insert_batch(ready_logs) { + error!("LOGS_AGENT | Failed to insert batch: {}", e); + } + } () = self.cancel_token.cancelled() => { debug!("LOGS_AGENT | Received shutdown signal, draining remaining events"); diff --git a/bottlecap/src/logs/aggregator.rs b/bottlecap/src/logs/aggregator.rs index 6d4f8287a..e523e46b8 100644 --- a/bottlecap/src/logs/aggregator.rs +++ b/bottlecap/src/logs/aggregator.rs @@ -106,6 +106,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -130,6 +131,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -156,6 +158,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -196,6 +199,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index d281bfb19..563bea32f 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -122,6 +122,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), diff --git a/bottlecap/src/logs/lambda/mod.rs b/bottlecap/src/logs/lambda/mod.rs index e196c1ece..c3d9b24e9 100644 --- a/bottlecap/src/logs/lambda/mod.rs +++ b/bottlecap/src/logs/lambda/mod.rs @@ -30,10 +30,14 @@ pub struct Message { pub status: String, } -#[derive(Serialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Debug, Clone, Default, PartialEq)] pub struct Lambda { pub arn: String, pub request_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub durable_execution_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub durable_execution_name: Option, } impl Message { @@ -50,6 +54,7 @@ impl Message { lambda: Lambda { arn: function_arn, request_id, + ..Lambda::default() }, timestamp, status: status.unwrap_or("info".to_string()), diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 8325dde32..9aa67f71f 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -1,3 +1,4 @@ +use std::collections::{HashMap, VecDeque}; use std::error::Error; use std::fmt::Write; use std::sync::Arc; @@ -37,8 +38,28 @@ pub struct LambdaProcessor { logs_enabled: bool, // Managed Instance mode is_managed_instance_mode: bool, + // Whether this is a Durable Function runtime. + // None = not yet determined (hold all logs until known). + // Some(true) = durable function. Hold logs for a request_id until + // the durable execution id and execution name for this invocationare known. + // These two fields are extracted from the aws.lambda span sent by the tracer. + // Some(false) = not a durable function; mark logs as ready to be aggregated as normal. + is_durable_function: Option, + // Logs held pending resolution, keyed by request_id. + // While is_durable_function is None, every incoming log is stashed here so + // we can decide whether to filter/tag it once the flag is known. + // While is_durable_function is Some(true), logs whose request_id has no + // durable execution context yet are also stashed here; they are drained + // the moment that context arrives. + held_logs: HashMap>, + // Maps request_id -> (durable_execution_id, durable_execution_name) + durable_context_map: HashMap, + // Insertion order for FIFO eviction when map reaches capacity + durable_context_order: VecDeque, } +const DURABLE_CONTEXT_MAP_CAPACITY: usize = 100; + const OOM_ERRORS: [&str; 7] = [ "fatal error: runtime: out of memory", // Go "java.lang.OutOfMemoryError", // Java @@ -55,6 +76,19 @@ fn is_oom_error(error_msg: &str) -> bool { .any(|&oom_str| error_msg.contains(oom_str)) } +/// Parses a Lambda durable execution ARN and returns `(execution_id, execution_name)`. +/// +/// Expected format: +/// `arn:aws:lambda:{region}:{account}:function:{name}:{version}/durable-execution/{exec_name}/{exec_id}` +fn parse_durable_execution_arn(arn: &str) -> Option<(String, String)> { + const SEPARATOR: &str = "/durable-execution/"; + let durable_part = arn.split(SEPARATOR).nth(1)?; + let mut parts = durable_part.splitn(2, '/'); + let exec_name = parts.next().filter(|s| !s.is_empty())?.to_string(); + let exec_id = parts.next().filter(|s| !s.is_empty())?.to_string(); + Some((exec_id, exec_name)) +} + /// Maps AWS/common log level strings to Datadog log status values. /// Case-insensitive and accepts both short and long forms /// (e.g. "WARN"/"WARNING", "INFO"/"INFORMATION", "ERR"/"ERROR"). @@ -103,6 +137,10 @@ impl LambdaProcessor { ready_logs: Vec::new(), event_bus, is_managed_instance_mode, + is_durable_function: None, + held_logs: HashMap::new(), + durable_context_map: HashMap::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY), + durable_context_order: VecDeque::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY), } } @@ -111,7 +149,7 @@ impl LambdaProcessor { let copy = event.clone(); match event.record { TelemetryRecord::Function(v) => { - let (request_id, message) = match v { + let (request_id, message, durable_ctx) = match v { serde_json::Value::Object(obj) => { let request_id = if self.is_managed_instance_mode { obj.get("requestId") @@ -121,11 +159,14 @@ impl LambdaProcessor { } else { None }; + let durable_ctx = obj.get("executionArn") + .and_then(|v| v.as_str()) + .and_then(parse_durable_execution_arn); let msg = Some(serde_json::to_string(&obj).unwrap_or_default()); - (request_id, msg) + (request_id, msg, durable_ctx) }, - serde_json::Value::String(s) => (None, Some(s)), - _ => (None, None), + serde_json::Value::String(s) => (None, Some(s), None), + _ => (None, None, None), }; if let Some(message) = message { @@ -136,13 +177,18 @@ impl LambdaProcessor { } } - return Ok(Message::new( + let mut msg = Message::new( message, request_id, self.function_arn.clone(), event.time.timestamp_millis(), None, - )); + ); + if let Some((exec_id, exec_name)) = durable_ctx { + msg.lambda.durable_execution_id = Some(exec_id); + msg.lambda.durable_execution_name = Some(exec_name); + } + return Ok(msg); } Err("Unable to parse log".into()) @@ -185,6 +231,10 @@ impl LambdaProcessor { let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display + let is_durable = rv.contains("DurableFunction"); + self.is_durable_function = Some(is_durable); + self.resolve_held_logs_on_durable_function_set(is_durable); + Ok(Message::new( format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"), None, @@ -467,15 +517,178 @@ impl LambdaProcessor { } } + /// Inserts the durable execution context for a `request_id`, received from the `aws.lambda` + /// span. Evicts the oldest entry when the map is at capacity. If `is_durable_function` is + /// already `Some(true)`, drains any held logs for that `request_id`. + pub fn insert_to_durable_context_map( + &mut self, + request_id: &str, // key + execution_id: &str, // value + execution_name: &str, // value + ) { + if self.durable_context_map.contains_key(request_id) { + error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map"); + return; + } + if self.durable_context_order.len() >= DURABLE_CONTEXT_MAP_CAPACITY + && let Some(oldest) = self.durable_context_order.pop_front() + { + self.durable_context_map.remove(&oldest); + } + self.durable_context_order.push_back(request_id.to_string()); + self.durable_context_map.insert( + request_id.to_string(), + (execution_id.to_string(), execution_name.to_string()), + ); + self.drain_held_for_request_id(request_id); + } + + pub fn take_ready_logs(&mut self) -> Vec { + std::mem::take(&mut self.ready_logs) + } + + /// Moves all logs held for `request_id` into `ready_logs`, tagging each with the + /// durable execution context that is now known for that `request_id`. + fn drain_held_for_request_id(&mut self, request_id: &str) { + let Some(held) = self.held_logs.remove(request_id) else { + return; + }; + let durable_ctx = self + .durable_context_map + .get(request_id) + .map(|(id, name)| (id.clone(), name.clone())); + if let Some((exec_id, exec_name)) = durable_ctx { + for mut log in held { + log.message.lambda.durable_execution_id = Some(exec_id.clone()); + log.message.lambda.durable_execution_name = Some(exec_name.clone()); + if let Ok(s) = serde_json::to_string(&log) { + drop(log); + self.ready_logs.push(s); + } + } + } + } + + /// Called once when `is_durable_function` is set, draining every entry in `held_logs`: + /// - `false` → drain all held logs. + /// - `true` → drain logs whose `request_id` is already in `durable_context_map`, + /// the rest stay in `held_logs` until their context arrives via an `aws.lambda` span. + fn resolve_held_logs_on_durable_function_set(&mut self, is_durable: bool) { + let held = std::mem::take(&mut self.held_logs); + if is_durable { + // Drain logs whose request_id is already in the map. + for (request_id, logs) in held { + let durable_ctx = self + .durable_context_map + .get(&request_id) + .map(|(id, name)| (id.clone(), name.clone())); + if let Some((exec_id, exec_name)) = durable_ctx { + // If the request_id is in the durable context map, set durable execution id + // and execution name, and add logs to ready_logs. + for mut log in logs { + log.message.lambda.durable_execution_id = Some(exec_id.clone()); + log.message.lambda.durable_execution_name = Some(exec_name.clone()); + if let Ok(s) = serde_json::to_string(&log) { + self.ready_logs.push(s); + } + } + } else { + // No context yet — keep logs in held_logs until the aws.lambda span arrives. + self.held_logs.insert(request_id, logs); + } + } + } else { + // Drain all held logs. + for (_, logs) in held { + for log in logs { + if let Ok(s) = serde_json::to_string(&log) { + self.ready_logs.push(s); + } + } + } + } + } + /// Processes a log, applies filtering rules, serializes it, and queues it for aggregation fn process_and_queue_log(&mut self, mut log: IntakeLog) { let should_send_log = self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); - if should_send_log && let Ok(serialized_log) = serde_json::to_string(&log) { - // explicitly drop log so we don't accidentally re-use it and push - // duplicate logs to the aggregator - drop(log); - self.ready_logs.push(serialized_log); + if should_send_log { + self.queue_log_after_rules(log); + } + } + + /// Queues a log that has already had processing rules applied. + /// + /// Logs from the durable execution SDK include an `executionArn` field from which + /// `durable_execution_id` and `durable_execution_name` are extracted in `get_message()`. + /// Such logs are pushed directly to `ready_logs` without holding. + /// + /// For all other logs, routing depends on `is_durable_function`: + /// - `None` → stash in `held_logs[request_id]`; logs without a `request_id` are + /// marked as ready to be aggregated since they cannot carry durable context. + /// - `Some(false)` → serialize and push straight to `ready_logs`. + /// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map` + /// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`. + fn queue_log_after_rules(&mut self, mut log: IntakeLog) { + // Durable execution SDK logs already carry execution context extracted from executionArn. + if log.message.lambda.durable_execution_id.is_some() { + if let Ok(serialized_log) = serde_json::to_string(&log) { + drop(log); + self.ready_logs.push(serialized_log); + } + return; + } + + match self.is_durable_function { + None => { + if let Some(rid) = log.message.lambda.request_id.clone() { + self.held_logs.entry(rid).or_default().push(log); + } else { + error!("LOGS | queue_log_after_rules: log without request_id: {log:?}"); + drop(log); + } + } + Some(false) => { + if let Ok(serialized_log) = serde_json::to_string(&log) { + // explicitly drop log so we don't accidentally re-use it and push + // duplicate logs to the aggregator + drop(log); + self.ready_logs.push(serialized_log); + } + } + Some(true) => { + // Durable context is populated by span processing (insert_to_durable_context_map). + // Mark this log as ready to be aggregated if its request_id already has context; otherwise hold. + let durable_ctx = log + .message + .lambda + .request_id + .as_ref() + .and_then(|rid| self.durable_context_map.get(rid)) + .map(|(id, name)| (id.clone(), name.clone())); + + match durable_ctx { + Some((exec_id, exec_name)) => { + log.message.lambda.durable_execution_id = Some(exec_id); + log.message.lambda.durable_execution_name = Some(exec_name); + if let Ok(serialized_log) = serde_json::to_string(&log) { + // explicitly drop log so we don't accidentally re-use it and push + // duplicate logs to the aggregator + drop(log); + self.ready_logs.push(serialized_log); + } + } + None => { + if let Some(rid) = log.message.lambda.request_id.clone() { + self.held_logs.entry(rid).or_default().push(log); + } else { + error!("LOGS | queue_log_after_rules: log without request_id: {log:?}"); + drop(log); + } + } + } + } } } @@ -569,6 +782,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -586,6 +800,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -608,6 +823,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -628,6 +844,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -653,6 +870,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -678,6 +896,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "error".to_string(), @@ -708,6 +927,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -733,6 +953,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "info".to_string(), @@ -758,6 +979,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -783,6 +1005,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -808,6 +1031,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -888,6 +1112,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1039,6 +1264,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1059,6 +1285,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1199,6 +1426,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1233,6 +1461,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1248,6 +1477,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1322,6 +1552,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1383,6 +1614,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1433,6 +1665,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); // First, send an extension log (orphan) that doesn't have a request_id let extension_event = TelemetryEvent { @@ -2123,4 +2356,149 @@ mod tests { let intake_log = processor.get_intake_log(lambda_message).unwrap(); assert_eq!(intake_log.message.status, "warn"); } + + // --- parse_durable_execution_arn --- + + #[test] + fn test_parse_durable_execution_arn_valid() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1/durable-execution/my-exec-name/exec-id-abc"; + let result = parse_durable_execution_arn(arn); + assert_eq!( + result, + Some(("exec-id-abc".to_string(), "my-exec-name".to_string())) + ); + } + + #[test] + fn test_parse_durable_execution_arn_latest() { + let arn = "arn:aws:lambda:us-west-2:999999999999:function:fn:$LATEST/durable-execution/exec-name/exec-uuid-123"; + let result = parse_durable_execution_arn(arn); + assert_eq!( + result, + Some(("exec-uuid-123".to_string(), "exec-name".to_string())) + ); + } + + #[test] + fn test_parse_durable_execution_arn_missing_separator() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1"; + assert!(parse_durable_execution_arn(arn).is_none()); + } + + #[test] + fn test_parse_durable_execution_arn_missing_exec_id() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1/durable-execution/only-name"; + assert!(parse_durable_execution_arn(arn).is_none()); + } + + // --- executionArn extraction in get_message / queue_log_after_rules --- + + fn make_processor_for_durable_arn_tests() -> LambdaProcessor { + let tags = HashMap::new(); + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: tags.clone(), + serverless_logs_enabled: true, + ..config::Config::default() + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (tx, _) = tokio::sync::mpsc::channel(2); + LambdaProcessor::new(tags_provider, config, tx, false) + } + + #[tokio::test] + async fn test_durable_sdk_log_sets_execution_context_from_execution_arn() { + let mut processor = make_processor_for_durable_arn_tests(); + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let msg = processor.get_message(event).await.unwrap(); + assert_eq!(msg.lambda.durable_execution_id, Some("my-id".to_string())); + assert_eq!( + msg.lambda.durable_execution_name, + Some("my-name".to_string()) + ); + } + + #[tokio::test] + async fn test_durable_sdk_log_pushed_to_ready_logs_without_holding() { + let mut processor = make_processor_for_durable_arn_tests(); + // is_durable_function is still None (PlatformInitStart not yet received) + assert!(processor.is_durable_function.is_none()); + // Durable SDK logs arrive during an invocation, so request_id is set + processor.invocation_context.request_id = "req-abc".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + // Log should have been sent to aggregator (not held) + assert!(processor.held_logs.is_empty()); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + let logs: Vec = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!( + logs[0]["message"]["lambda"]["durable_execution_id"], + "my-id" + ); + assert_eq!( + logs[0]["message"]["lambda"]["durable_execution_name"], + "my-name" + ); + } + + #[tokio::test] + async fn test_durable_sdk_log_pushed_to_ready_logs_even_in_durable_function_mode() { + let mut processor = make_processor_for_durable_arn_tests(); + processor.is_durable_function = Some(true); + processor.invocation_context.request_id = "req-abc".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + assert!(processor.held_logs.is_empty()); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + } + + #[tokio::test] + async fn test_function_log_without_execution_arn_is_still_held_in_durable_mode() { + let mut processor = make_processor_for_durable_arn_tests(); + processor.is_durable_function = Some(true); + // Simulate a known request_id with no durable context yet + processor.invocation_context.request_id = "req-123".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String("plain log without ARN".to_string())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + // Should be held, not sent to aggregator + assert!(processor.held_logs.contains_key("req-123")); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert!(batches.is_empty()); + } } diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index cff6df5f0..35bfd0012 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -41,6 +41,25 @@ impl LogsProcessor { } } } + + pub fn insert_to_durable_map( + &mut self, + request_id: &str, + execution_id: &str, + execution_name: &str, + ) { + match self { + LogsProcessor::Lambda(p) => { + p.insert_to_durable_context_map(request_id, execution_id, execution_name); + } + } + } + + pub fn take_ready_logs(&mut self) -> Vec { + match self { + LogsProcessor::Lambda(p) => p.take_ready_logs(), + } + } } #[allow(clippy::module_name_repetitions)] diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 6ad5c00cc..818778634 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -21,6 +21,7 @@ use tokio_util::sync::CancellationToken; use tower_http::limit::RequestBodyLimitLayer; use tracing::{debug, error, warn}; +use crate::logs::agent::DurableContextUpdate; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ appsec::processor::Processor as AppSecProcessor, @@ -91,6 +92,7 @@ pub struct TraceState { pub invocation_processor_handle: InvocationProcessorHandle, pub tags_provider: Arc, pub span_deduper: DedupHandle, + pub durable_context_tx: Sender, } #[derive(Clone)] @@ -118,6 +120,7 @@ pub struct TraceAgent { tx: Sender, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, + durable_context_tx: Sender, } #[derive(Clone, Copy)] @@ -141,6 +144,7 @@ impl TraceAgent { tags_provider: Arc, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, + durable_context_tx: Sender, ) -> TraceAgent { // Set up a channel to send processed traces to our trace aggregator. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -170,6 +174,7 @@ impl TraceAgent { shutdown_token: CancellationToken::new(), stats_concentrator, span_deduper, + durable_context_tx, } } @@ -225,6 +230,7 @@ impl TraceAgent { invocation_processor_handle: self.invocation_processor_handle.clone(), tags_provider: Arc::clone(&self.tags_provider), span_deduper: self.span_deduper.clone(), + durable_context_tx: self.durable_context_tx.clone(), }; let stats_state = StatsState { @@ -304,6 +310,7 @@ impl TraceAgent { state.invocation_processor_handle, state.tags_provider, state.span_deduper, + state.durable_context_tx, ApiVersion::V04, ) .await @@ -317,6 +324,7 @@ impl TraceAgent { state.invocation_processor_handle, state.tags_provider, state.span_deduper, + state.durable_context_tx, ApiVersion::V05, ) .await @@ -473,6 +481,7 @@ impl TraceAgent { invocation_processor_handle: InvocationProcessorHandle, tags_provider: Arc, deduper: DedupHandle, + durable_context_tx: Sender, version: ApiVersion, ) -> Response { let start = Instant::now(); @@ -593,6 +602,24 @@ impl TraceAgent { } handle_reparenting(&mut reparenting_info, &mut span); + if span.name == "aws.lambda" { + // If this aws.lambda span carries durable function context, forward it to + // the logs agent so it can release any held logs and set durable execution context on them. + if let (Some(request_id), Some(execution_id), Some(execution_name)) = ( + span.meta.get("request_id"), + span.meta.get("durable_function_execution_id"), + span.meta.get("durable_function_execution_name"), + ) { + let _ = durable_context_tx + .send(( + request_id.clone(), + execution_id.clone(), + execution_name.clone(), + )) + .await; + } + } + // Keep the span chunk.push(span); } diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 4b6367c9a..23f7178f9 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -75,7 +75,7 @@ async fn test_logs() { LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone(), client); let telemetry_events: Vec = serde_json::from_str( - r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) + r#"[{"time":"2022-10-21T14:05:03.000Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}},{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) .map_err(|e| e.to_string()).expect("Failed parsing telemetry events"); let sender = logs_agent_tx.clone(); @@ -85,10 +85,9 @@ async fn test_logs() { .send(an_event.clone()) .await .expect("Failed sending telemetry events"); + logs_agent.sync_consume().await; } - logs_agent.sync_consume().await; - let _ = logs_flusher.flush(None).await; hello_mock.assert(); diff --git a/integration-tests/bin/app.ts b/integration-tests/bin/app.ts index a7cb568ee..900cd4027 100644 --- a/integration-tests/bin/app.ts +++ b/integration-tests/bin/app.ts @@ -2,6 +2,7 @@ import 'source-map-support/register'; import * as cdk from 'aws-cdk-lib'; import {Base} from '../lib/stacks/base'; +import {Durable} from '../lib/stacks/durable'; import {Otlp} from '../lib/stacks/otlp'; import {Snapstart} from '../lib/stacks/snapstart'; import {LambdaManagedInstancesStack} from '../lib/stacks/lmi'; @@ -25,6 +26,9 @@ const stacks = [ new Base(app, `integ-${identifier}-base`, { env, }), + new Durable(app, `integ-${identifier}-durable`, { + env, + }), new Otlp(app, `integ-${identifier}-otlp`, { env, }), diff --git a/integration-tests/lambda/durable-python/lambda_durable_function.py b/integration-tests/lambda/durable-python/lambda_durable_function.py new file mode 100644 index 000000000..0e7f5ad10 --- /dev/null +++ b/integration-tests/lambda/durable-python/lambda_durable_function.py @@ -0,0 +1,9 @@ +import logging + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + logger.info('Hello from durable function!') + # Return None rather than an HTTP-style dict; the Lambda durable execution + # runtime rejects {'statusCode': 200} with "Invalid Status in invocation output." diff --git a/integration-tests/lambda/durable-python/lambda_function.py b/integration-tests/lambda/durable-python/lambda_function.py new file mode 100644 index 000000000..ce47095cd --- /dev/null +++ b/integration-tests/lambda/durable-python/lambda_function.py @@ -0,0 +1,10 @@ +import logging + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + logger.info('Hello from non-durable function!') + return { + 'statusCode': 200 + } diff --git a/integration-tests/lambda/durable-python/requirements.txt b/integration-tests/lambda/durable-python/requirements.txt new file mode 100644 index 000000000..e8e026e23 --- /dev/null +++ b/integration-tests/lambda/durable-python/requirements.txt @@ -0,0 +1 @@ +# No additional requirements needed - datadog-lambda layer provides dependencies diff --git a/integration-tests/lib/stacks/durable.ts b/integration-tests/lib/stacks/durable.ts new file mode 100644 index 000000000..e1a37307b --- /dev/null +++ b/integration-tests/lib/stacks/durable.ts @@ -0,0 +1,72 @@ +import * as cdk from 'aws-cdk-lib'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import { Construct } from 'constructs'; +import { + createLogGroup, + defaultDatadogEnvVariables, + defaultDatadogSecretPolicy, + getExtensionLayer, + getDefaultPythonLayer, + defaultPythonRuntime, +} from '../util'; + +export class Durable extends cdk.Stack { + constructor(scope: Construct, id: string, props: cdk.StackProps) { + super(scope, id, props); + + const extensionLayer = getExtensionLayer(this); + const pythonLayer = getDefaultPythonLayer(this); + + // Non-durable Python Lambda - used to verify that logs do NOT have durable execution context. + const pythonFunctionName = `${id}-python-lambda`; + const pythonFunction = new lambda.Function(this, pythonFunctionName, { + runtime: defaultPythonRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'datadog_lambda.handler.handler', + code: lambda.Code.fromAsset('./lambda/durable-python'), + functionName: pythonFunctionName, + timeout: cdk.Duration.seconds(30), + memorySize: 256, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: pythonFunctionName, + DD_TRACE_ENABLED: 'true', + DD_LAMBDA_HANDLER: 'lambda_function.handler', + DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126', + DD_LOG_LEVEL: 'DEBUG', + }, + logGroup: createLogGroup(this, pythonFunctionName), + }); + pythonFunction.addToRolePolicy(defaultDatadogSecretPolicy); + pythonFunction.addLayers(extensionLayer); + pythonFunction.addLayers(pythonLayer); + + // Durable Python Lambda - used to verify that logs DO have durable execution context. + const durablePythonFunctionName = `${id}-python-durable-lambda`; + const durablePythonFunction = new lambda.Function(this, durablePythonFunctionName, { + runtime: defaultPythonRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'datadog_lambda.handler.handler', + code: lambda.Code.fromAsset('./lambda/durable-python'), + functionName: durablePythonFunctionName, + timeout: cdk.Duration.seconds(30), + memorySize: 256, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: durablePythonFunctionName, + DD_TRACE_ENABLED: 'true', + DD_LAMBDA_HANDLER: 'lambda_durable_function.handler', + DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126', + DD_LOG_LEVEL: 'DEBUG', + }, + logGroup: createLogGroup(this, durablePythonFunctionName), + durableConfig: { + executionTimeout: cdk.Duration.minutes(15), + retentionPeriod: cdk.Duration.days(14), + }, + }); + durablePythonFunction.addToRolePolicy(defaultDatadogSecretPolicy); + durablePythonFunction.addLayers(extensionLayer); + durablePythonFunction.addLayers(pythonLayer); + } +} diff --git a/integration-tests/lib/util.ts b/integration-tests/lib/util.ts index d35351395..8410db0f3 100644 --- a/integration-tests/lib/util.ts +++ b/integration-tests/lib/util.ts @@ -15,7 +15,7 @@ export const defaultJavaRuntime = lambda.Runtime.JAVA_21; export const defaultDotnetRuntime = lambda.Runtime.DOTNET_8; export const defaultNodeLayerArn = 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Node24-x:132'; -export const defaultPythonLayerArn = 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Python313-ARM:117'; +export const defaultPythonLayerArn = 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Python313-ARM:123'; export const defaultJavaLayerArn = 'arn:aws:lambda:us-east-1:464622532012:layer:dd-trace-java:25'; export const defaultDotnetLayerArn = 'arn:aws:lambda:us-east-1:464622532012:layer:dd-trace-dotnet-ARM:23'; diff --git a/integration-tests/tests/durable.test.ts b/integration-tests/tests/durable.test.ts new file mode 100644 index 000000000..f6dbf2ea1 --- /dev/null +++ b/integration-tests/tests/durable.test.ts @@ -0,0 +1,226 @@ +import { invokeLambdaAndGetDatadogData, LambdaInvocationDatadogData } from './utils/util'; +import { publishVersion } from './utils/lambda'; +import { getIdentifier } from '../config'; + +describe('Durable Function Log Tests', () => { + describe('Non-Durable Function', () => { + let result: LambdaInvocationDatadogData; + + beforeAll(async () => { + const identifier = getIdentifier(); + const functionName = `integ-${identifier}-durable-python-lambda`; + + console.log('Invoking non-durable Python Lambda...'); + result = await invokeLambdaAndGetDatadogData(functionName, {}, true); + console.log('Non-durable Lambda invocation and data fetching completed'); + }, 300000); // 5 minute timeout + + it('should invoke Lambda successfully', () => { + expect(result.statusCode).toBe(200); + }); + + it('should have "Hello from non-durable function!" log message', () => { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from non-durable function!') + ); + expect(log).toBeDefined(); + }); + + it('logs should have lambda.request_id attribute', () => { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from non-durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.request_id).toBeDefined(); + }); + + it('logs should NOT have lambda.durable_execution_id attribute', () => { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from non-durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + }); + + it('logs should NOT have lambda.durable_execution_name attribute', () => { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from non-durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + }); + + it('platform START log should NOT have lambda.durable_execution_id or lambda.durable_execution_name attributes', () => { + const log = result.logs?.find((log: any) => + log.message.includes('START RequestId:') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + }); + + it('platform END log should NOT have lambda.durable_execution_id or lambda.durable_execution_name attributes', () => { + const log = result.logs?.find((log: any) => + log.message.includes('END RequestId:') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + }); + + it('extension logs should NOT have lambda.durable_execution_id or lambda.durable_execution_name attributes', () => { + // Extension logs (from debug!/info!/warn! in the Rust extension code) are forwarded + // in non-durable mode. They arrive without a request_id initially (orphan logs), then + // inherit the invocation request_id when PlatformStart fires. + // DD_LOG_LEVEL=DEBUG ensures these logs appear in Datadog. + const logs = result.logs?.filter((log: any) => + log.message.includes('DD_EXTENSION') + ); + expect(logs).toBeDefined(); + expect(logs?.length).toBeGreaterThan(0); + for (const log of logs ?? []) { + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + } + }); + }); + + // These tests require a Lambda runtime environment that reports 'DurableFunction' in + // PlatformInitStart.runtime_version so that the extension sets is_durable_function = true + // and activates the log-holding / durable-context-decorating pipeline. + describe('Durable Function', () => { + let coldStartResult: LambdaInvocationDatadogData; + let warmStartResult: LambdaInvocationDatadogData; + let concurrentResults: LambdaInvocationDatadogData[]; + + beforeAll(async () => { + const identifier = getIdentifier(); + const baseFunctionName = `integ-${identifier}-durable-python-durable-lambda`; + + // Durable functions require a qualified ARN (version or alias); publish a new version + // so the first invocation is a cold start and so we have a valid qualifier. + console.log('Publishing version for durable Python Lambda...'); + const version = await publishVersion(baseFunctionName); + const functionName = `${baseFunctionName}:${version}`; + + console.log('Invoking durable Python Lambda (cold start - first invocation of new version)...'); + coldStartResult = await invokeLambdaAndGetDatadogData(functionName, {}, false); + + console.log('Invoking durable Python Lambda (warm start)...'); + warmStartResult = await invokeLambdaAndGetDatadogData(functionName, {}, false); + + console.log('Invoking durable Python Lambda 3x concurrently...'); + concurrentResults = await Promise.all([ + invokeLambdaAndGetDatadogData(functionName, {}, false), + invokeLambdaAndGetDatadogData(functionName, {}, false), + invokeLambdaAndGetDatadogData(functionName, {}, false), + ]); + + console.log('All durable Lambda invocations and data fetching completed'); + }, 600000); // 10 minute timeout + + it('logs should have lambda.durable_execution_id attribute', () => { + const log = coldStartResult.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + }); + + it('logs should have lambda.durable_execution_name attribute', () => { + const log = coldStartResult.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('logs arriving before the aws.lambda span should be held and released with durable execution context', () => { + // On cold start, function logs likely arrive before the tracer flushes the aws.lambda span. + // The extension stashes these logs in held_logs[request_id] (because is_durable_function + // is Some(true) and the request_id is not yet in durable_context_map). Once the span + // arrives and context is inserted, drain_held_for_request_id decorates and releases them. + const log = coldStartResult.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('logs arriving after the aws.lambda span should be decorated immediately with durable execution context', () => { + // In a durable orchestration a function may be invoked multiple times within the same + // execution. On the second (warm-start) invocation the extension already has the + // durable_execution_id / durable_execution_name in its durable_context_map from the + // first invocation's span. Any log whose request_id is already in the map is therefore + // decorated inline in queue_log_after_rules without ever being held. + const log = warmStartResult.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('each invocation should have its own durable execution context when multiple concurrent invocations occur', () => { + // Concurrent invocations run in separate Lambda execution environments, each with its + // own extension state and durable_context_map. Every invocation independently receives + // the is_durable_function flag and the aws.lambda span, so each set of logs is decorated + // with its own (potentially distinct) durable execution context. + for (const result of concurrentResults) { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + } + + // Each concurrent invocation should report a valid request_id so we can distinguish + // their log sets from one another. + const requestIds = concurrentResults.map(r => r.requestId); + const uniqueRequestIds = new Set(requestIds); + expect(uniqueRequestIds.size).toBe(concurrentResults.length); + }); + + it('platform START log should have lambda.durable_execution_id and lambda.durable_execution_name attributes', () => { + // PlatformStart generates "START RequestId: ... Version: ..." with a request_id. + // In durable mode the log is held in held_logs until the aws.lambda span arrives, + // then decorated with durable execution context and released. + const log = coldStartResult.logs?.find((log: any) => + log.message.includes('START RequestId:') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('platform END log should have lambda.durable_execution_id and lambda.durable_execution_name attributes', () => { + // PlatformRuntimeDone generates "END RequestId: ..." with a request_id and is handled + // identically to START — held until durable context arrives, then decorated. + const log = coldStartResult.logs?.find((log: any) => + log.message.includes('END RequestId:') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('extension logs should have lambda.durable_execution_id and lambda.durable_execution_name attributes', () => { + // Extension logs (debug!/info!/warn! from the Rust extension code) arrive without a + // request_id and are stashed as orphan logs. When PlatformStart fires they inherit the + // invocation request_id, then follow the same held-log path as other durable logs: + // held until the aws.lambda span provides durable execution context, then decorated. + // DD_LOG_LEVEL=DEBUG ensures these logs are emitted and appear in Datadog. + const logs = coldStartResult.logs?.filter((log: any) => + log.message.includes('DD_EXTENSION') + ); + expect(logs).toBeDefined(); + expect(logs?.length).toBeGreaterThan(0); + for (const log of logs ?? []) { + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + } + }); + }); +});