Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
05ea425
feat(logs): track durable execution context and gate log flushing on it
lym953 Mar 2, 2026
c34304c
refactor(logs): replace pending_durable_logs vec with held_logs map k…
lym953 Mar 2, 2026
6d7c68a
refactor(logs): remove PlatformStart fallback for unset is_durable_fu…
lym953 Mar 2, 2026
299e972
refactor(logs): extract durable ID map capacity into a named constant
lym953 Mar 2, 2026
98d76af
fix(logs): resolve clippy errors in durable function log processor
lym953 Mar 3, 2026
0ca047e
feat(logs): detect durable function context from aws.lambda span tags
lym953 Mar 3, 2026
a7d0f00
fix(logs): prefix durable execution tags with lambda.
lym953 Mar 3, 2026
6cae5ce
fix(logs): add durable execution context as lambda attributes instead…
lym953 Mar 3, 2026
6a32778
fmt
lym953 Mar 3, 2026
9cfd5ce
chore: remove debug logging added during durable function development
lym953 Mar 3, 2026
59a12bc
Modify comments and rename variables
lym953 Mar 4, 2026
41442f2
Rename DURABLE_CONTEXT_MAP_CAPACITY
lym953 Mar 4, 2026
ae1d91c
Update comments
lym953 Mar 4, 2026
beb39d2
Make resolve_held_logs_on_durable_function_set() take is_durable
lym953 Mar 4, 2026
5211191
Fix comment: flush -> mark as ready to be aggregated
lym953 Mar 4, 2026
09fd968
Rename: update_durable_map() -> self.processor.insert_to_durable_map()
lym953 Mar 4, 2026
3edba2a
Rename a function
lym953 Mar 4, 2026
1204f93
Modify comments. Add error handling.
lym953 Mar 4, 2026
2582c99
fmt
lym953 Mar 4, 2026
6889344
Check error log
lym953 Mar 4, 2026
a72f713
Collapse if condition
lym953 Mar 4, 2026
1e118a7
Add integration test
lym953 Mar 5, 2026
c3626f3
Bump python layer version from 117 to 123
lym953 Mar 5, 2026
3038c3d
Add durable to test-suites.yaml
lym953 Mar 6, 2026
b686408
Add platform and extension log assertions to durable integration tests
lym953 Mar 6, 2026
52094ca
fix(durable-integ-test): correct attribute path and durable invocation
lym953 Mar 6, 2026
ce4d1f8
fix(merge): resolve conflict in main.rs between durable-id-map and main
lym953 Mar 6, 2026
c889d30
fix(durable-integ-test): return None from durable handler to satisfy …
lym953 Mar 6, 2026
fe1ecbe
Change durable context map capacity from 5 to 100
lym953 Mar 6, 2026
15f7d76
feat(logs): fast-path durable SDK logs via executionArn field
lym953 Mar 6, 2026
4f15c7e
fmt
lym953 Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/datasources/test-suites.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ test_suites:
- name: otlp
- name: snapstart
- name: lmi
- name: durable
39 changes: 27 additions & 12 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use bottlecap::{
},
logger,
logs::{
agent::LogsAgent,
agent::{DurableContextUpdate, LogsAgent},
aggregator_service::{
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
},
Expand Down Expand Up @@ -298,15 +298,20 @@ async fn extension_loop_active(
// and shares the connection pool.
let shared_client = bottlecap::http::get_client(config);

let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
&shared_client,
);
let (
logs_agent_channel,
logs_flusher,
logs_agent_cancel_token,
logs_aggregator_handle,
durable_context_tx,
) = start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
&shared_client,
);

let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
tags_provider.clone(),
Expand Down Expand Up @@ -357,6 +362,7 @@ async fn extension_loop_active(
&tags_provider,
invocation_processor_handle.clone(),
appsec_processor.clone(),
durable_context_tx,
&shared_client,
);

Expand Down Expand Up @@ -1039,14 +1045,15 @@ fn start_logs_agent(
LogsFlusher,
CancellationToken,
LogsAggregatorHandle,
Sender<DurableContextUpdate>,
) {
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
// Start service in background
tokio::spawn(async move {
aggregator_service.run().await;
});

let (mut agent, tx) = LogsAgent::new(
let (mut agent, tx, durable_context_tx) = LogsAgent::new(
Arc::clone(tags_provider),
Arc::clone(config),
event_bus,
Expand All @@ -1068,7 +1075,13 @@ fn start_logs_agent(
config.clone(),
client.clone(),
);
(tx, flusher, cancel_token, aggregator_handle)
(
tx,
flusher,
cancel_token,
aggregator_handle,
durable_context_tx,
)
}

#[allow(clippy::type_complexity)]
Expand All @@ -1078,6 +1091,7 @@ fn start_trace_agent(
tags_provider: &Arc<TagProvider>,
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
durable_context_tx: Sender<DurableContextUpdate>,
client: &Client,
) -> (
Sender<SendDataBuilderInfo>,
Expand Down Expand Up @@ -1163,6 +1177,7 @@ fn start_trace_agent(
Arc::clone(tags_provider),
stats_concentrator_handle.clone(),
span_dedup_handle,
durable_context_tx,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand Down
19 changes: 16 additions & 3 deletions bottlecap/src/logs/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{self, Sender};
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::{debug, error};

use crate::event_bus::Event;
use crate::extension::telemetry::events::TelemetryEvent;
Expand All @@ -12,9 +12,13 @@ use crate::{LAMBDA_RUNTIME_SLUG, config};

const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);

/// `(request_id, execution_id, execution_name)` extracted from an `aws.lambda` span.
pub type DurableContextUpdate = (String, String, String);

#[allow(clippy::module_name_repetitions)]
pub struct LogsAgent {
rx: mpsc::Receiver<TelemetryEvent>,
durable_context_rx: mpsc::Receiver<DurableContextUpdate>,
processor: LogsProcessor,
aggregator_handle: AggregatorHandle,
cancel_token: CancellationToken,
Expand All @@ -28,7 +32,7 @@ impl LogsAgent {
event_bus: Sender<Event>,
aggregator_handle: AggregatorHandle,
is_managed_instance_mode: bool,
) -> (Self, Sender<TelemetryEvent>) {
) -> (Self, Sender<TelemetryEvent>, Sender<DurableContextUpdate>) {
let processor = LogsProcessor::new(
Arc::clone(&datadog_config),
tags_provider,
Expand All @@ -38,16 +42,18 @@ impl LogsAgent {
);

let (tx, rx) = mpsc::channel::<TelemetryEvent>(1000);
let (durable_context_tx, durable_context_rx) = mpsc::channel::<DurableContextUpdate>(100);
let cancel_token = CancellationToken::new();

let agent = Self {
rx,
durable_context_rx,
processor,
aggregator_handle,
cancel_token,
};

(agent, tx)
(agent, tx, durable_context_tx)
}

pub async fn spin(&mut self) {
Expand All @@ -56,6 +62,13 @@ impl LogsAgent {
Some(event) = self.rx.recv() => {
self.processor.process(event, &self.aggregator_handle).await;
}
Some((request_id, execution_id, execution_name)) = self.durable_context_rx.recv() => {
self.processor.insert_to_durable_map(&request_id, &execution_id, &execution_name);
let ready_logs = self.processor.take_ready_logs();
if !ready_logs.is_empty() && let Err(e) = self.aggregator_handle.insert_batch(ready_logs) {
error!("LOGS_AGENT | Failed to insert batch: {}", e);
}
}
() = self.cancel_token.cancelled() => {
debug!("LOGS_AGENT | Received shutdown signal, draining remaining events");

Expand Down
4 changes: 4 additions & 0 deletions bottlecap/src/logs/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand All @@ -130,6 +131,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand All @@ -156,6 +158,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down Expand Up @@ -196,6 +199,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/logs/aggregator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down
7 changes: 6 additions & 1 deletion bottlecap/src/logs/lambda/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ pub struct Message {
pub status: String,
}

#[derive(Serialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Debug, Clone, Default, PartialEq)]
pub struct Lambda {
pub arn: String,
pub request_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub durable_execution_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub durable_execution_name: Option<String>,
}

impl Message {
Expand All @@ -50,6 +54,7 @@ impl Message {
lambda: Lambda {
arn: function_arn,
request_id,
..Lambda::default()
},
timestamp,
status: status.unwrap_or("info".to_string()),
Expand Down
Loading
Loading