feat(logs): [SVLS-8582] Hold logs and add durable context to durable function logs#1053
feat(logs): [SVLS-8582] Hold logs and add durable context to durable function logs#1053
Conversation
When the Lambda runtime is identified as a Durable Function (runtime_version contains "DurableFunction"), the logs processor now: - Maintains an Option<bool> is_durable_function flag that starts as None (unknown) until PlatformInitStart sets it to Some(true/false), with a PlatformStart fallback to Some(false) to avoid holding logs forever. - Holds logs in pending_durable_logs while the flag is None, draining them once the flag is resolved (avoids a race where logs arrive before the PlatformInitStart event). - Maintains a durable_id_map (capacity 5, FIFO eviction) from request_id to (durable_execution_id, durable_execution_name), populated by parsing JSON log messages that carry those fields. - Skips flushing any log whose request_id is not yet in the map; once it is, appends durable_execution_id:<id>,durable_execution_name:<name> to the log's ddtags before queuing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eyed by request_id - pending_durable_logs: Vec<IntakeLog> → held_logs: HashMap<String, Vec<IntakeLog>> Logs are now grouped by request_id, enabling O(1) lookup and targeted draining. - try_update_durable_map now returns bool (true = new entry added). The caller is responsible for draining held_logs when a new entry arrives, so the method stays focused on map maintenance only. - New drain_held_for_request_id: when a request_id's durable context first becomes known, immediately moves all stashed logs for that request_id into ready_logs with the appropriate durable tags, without touching the rest of held_logs. - New resolve_held_logs_on_durable_function_set: called at the exact moment is_durable_function transitions from None to Some(...) — inside PlatformInitStart and the PlatformStart fallback — rather than on every process() call. For Some(false) all held logs are flushed; for Some(true) the held logs are scanned for durable context and eligible ones are moved to ready_logs, ineligible ones remain. - queue_log_after_rules updated: in the None state logs with a request_id go into held_logs[request_id]; in the Some(true) state logs with no durable context yet are also stashed in held_logs[request_id] instead of being dropped. - process() no longer needs to drain pending logs on each call; that is now driven entirely by the state-transition helpers above. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nction PlatformInitStart is essentially guaranteed to arrive before any invocation on a cold start, so the fallback that set is_durable_function = Some(false) on PlatformStart was unnecessary. Update the three unit tests that skipped PlatformInitStart by setting is_durable_function = Some(false) directly (private field is accessible from the in-file test module), keeping each test focused on what it actually exercises rather than the cold-start init lifecycle. Update the integration test to prepend a platform.initStart event and call sync_consume once per event (previously it sent all events first and then drained once, which now works correctly only with the init event). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
0a7b272 to
299e972
Compare
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When an aws.lambda span (resource = dd-tracer-serverless-span) carries durable_function_execution_id and durable_function_execution_name meta tags, forward the context to the logs agent via a dedicated mpsc channel so held logs can be tagged and released. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… of tags Add durable_execution_id and durable_execution_name as fields on the Lambda struct so they appear as lambda.durable_execution_id and lambda.durable_execution_name attributes in the log payload, consistent with how lambda.request_id is represented. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
We should include an integration test for this. |
How does the extension determine if it is a lambda durable function? |
Will add. |
In the payload of |
| durable_context_order: VecDeque<String>, | ||
| } | ||
|
|
||
| const DURABLE_CONTEXT_MAP_CAPACITY: usize = 5; |
There was a problem hiding this comment.
This feels unnecessarily low, especially for LMI.
There was a problem hiding this comment.
Will change to 100. How does that sound?
| // These tests require a Lambda runtime environment that reports 'DurableFunction' in | ||
| // PlatformInitStart.runtime_version so that the extension sets is_durable_function = true | ||
| // and activates the log-holding / durable-context-decorating pipeline. | ||
| describe('Durable Function', () => { |
There was a problem hiding this comment.
These tests are only checking that a customer log has a durable execution id and name.
We should also verify that Platform logs include a durable execution id and name.
There was a problem hiding this comment.
Good point. Will also test platform logs and extension logs.
| /// - `Some(false)` → serialize and push straight to `ready_logs`. | ||
| /// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map` | ||
| /// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`. | ||
| fn queue_log_after_rules(&mut self, mut log: IntakeLog) { |
There was a problem hiding this comment.
I'm hesitant to modify how we are handling logs coming from the customer's function. If they are using the durable function sdk, and I think that we should have this as a requirement, then their logs already include the durable function execution id and name. So we can immediately add them to the ready logs. Likewise, even without knowing if a function is a durable function, if we get a customer log with no durable execution id and name, then we know it isn't a durable function and we can still immediately push this to ready_logs. By pushing these logs to held_logs, we introduce the risk of potentially not flushing them.
I would lean towards only adding logic to handle Platform logs for Durable Functions. That way, if there is an issue with flushing these held_logs, it has a much smaller impact.
There was a problem hiding this comment.
Make sense. Actually there's a third category: extension logs. So I'll make changes to platform logs and extension logs, and not affect function logs.
There was a problem hiding this comment.
if we get a customer log with no durable execution id and name, then we know it isn't a durable function
Actually this is false. In a durable function handler, the user can still write logs directly without using durable SDK. I don't think it's a good idea to send such logs to ready_logs without tagging durable context.
But for logs from durable SDK, we can push them to ready_logs as you suggested.
| } | ||
| handle_reparenting(&mut reparenting_info, &mut span); | ||
|
|
||
| if span.name == "aws.lambda" { |
There was a problem hiding this comment.
Just a thought - We could update the tracer libraries to send a log with request id and lambda durable execution id and name when it emits the aws.lambda span. This would allow us to have all the logic for request id -> durable execution id and name mapping within the logs code and not require passing this info from the traces/trace_agent to lambda/agent. I think it could be a little bit cleaner.
There was a problem hiding this comment.
I'm not convinced that using logs could be cleaner. Adding one line of log can be intrusive to the customer, similar to our discussion of using logs to capture durable execution status changes: https://docs.google.com/document/d/1K4svKY69cUUXZCrT7mDu5Y2gOJs29oLRUQduOOi3Xm8/edit?disco=AAABzusDFWI
There was a problem hiding this comment.
However, if some day latency becomes a problem, for example, for a 15-minute invocation, logs can only be flushed at the end of an invocation when the aws.lambda span is emitted, then that's a good reason to use a log for this purpose.
- Enable DD_LOG_LEVEL=DEBUG on both durable stack Lambdas so extension logs appear in Datadog queries - Assert platform START/END logs lack durable context for non-durable Lambda and carry durable context for durable Lambda - Assert all DD_EXTENSION logs lack durable context for non-durable Lambda and carry durable context for durable Lambda Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two bugs found from the first CI run: 1. Wrong attribute path: Datadog Logs API v2 nests user-defined attributes under a second `attributes` key inside the top-level `attributes`. All lambda attribute checks updated from `log.attributes.lambda.*` to `log.attributes.attributes.lambda.*`. 2. Durable functions cannot be invoked with an unqualified ARN (InvalidParameterValueException). Publish a new Lambda version before each test run and invoke with the version qualifier, mirroring the snapstart test pattern. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Combine both sides: - Keep durable_context_tx (from this branch) in start_logs_agent return tuple and start_trace_agent parameter - Add shared_client / client parameter (from main) to start_logs_agent call, LogsFlusher::new, and start_trace_agent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…durable runtime
The Lambda durable execution runtime rejects HTTP-style response dicts
({'statusCode': 200}) with "Invalid Status in invocation output." Return
None instead, which the runtime accepts.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Logs emitted through the durable execution SDK already carry an
`executionArn` field. Parse `durable_execution_name` and
`durable_execution_id` directly from the ARN in `get_message()` and
skip the hold/release cycle in `queue_log_after_rules()`, pushing such
logs straight to `ready_logs`.
ARN format:
arn:aws:lambda:{region}:{account}:function:{name}:{version}
/durable-execution/{exec_name}/{exec_id}
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
If the function is a durable function, then add two attributes to every log:
lambda.durable_execution_idlambda.durable_execution_nameBackground
durable_function_execution_idanddurable_function_execution_nameto theaws.lambdaspanDetails
HashMap<String, Vec<IntakeLog>>durable_context_map: request_id -> (execution_id, execution_name). When trace agent receives anaws.lambdaspan that has these three attributes, the entry is inserted to the map. The map has a fixed capacity, with FIFO eviction policy.PlatformInitStartevent, from the payload it learns whether the function is a durable function.request_idis already in thedurable_context_map, i.e.:lambda.durable_execution_idandlambda.durable_execution_nameon these logs, anddurable_context_map, if there are already held logs for thisrequest_id, then drain those logs.Test plan
Steps
Build a layer, install it on a function, and invoke it.
Result
returns all the logs for two invocations of this durable execution. It returns 98 logs, equal to 49 logs for the first invocation + 49 logs for the second invocation. (query link)