perf(daemon): per-root DashMap sharding, normalizers, and bounded connection pool#1060
perf(daemon): per-root DashMap sharding, normalizers, and bounded connection pool#1060svarlamov wants to merge 7 commits into
Conversation
| let root_key = payload_root_sid.clone().unwrap_or_default(); | ||
| let normalizer_arc = self | ||
| .normalizers | ||
| .entry(root_key) | ||
| .or_insert_with(|| { | ||
| Arc::new(AsyncMutex::new( | ||
| crate::daemon::trace_normalizer::TraceNormalizer::new(self.backend.clone()), | ||
| )) | ||
| }) | ||
| .clone(); |
There was a problem hiding this comment.
🔴 Per-root normalizer leak: late/duplicate terminal events create orphaned normalizers that are never cleaned up
When a root's terminal event is processed, clear_trace_root_tracking at src/daemon.rs:4423 removes the normalizer from the normalizers DashMap. If a subsequent event for that same root arrives (e.g., a connection-close fallback atexit generated by enqueue_stale_connection_close_fallbacks), apply_trace_payload_to_state at line 7008 calls self.normalizers.entry(root_key).or_insert_with(...), creating a brand-new normalizer with empty state. This fresh normalizer has no completed_roots entry for the root, so it doesn't early-return. For an atexit event, the normalizer creates a deferred_exits entry and returns Ok(None). Back in apply_trace_payload_to_state, the replace_pending_root_entry call finds no pending slot (already cleaned up), so clear_trace_root_tracking is never called again — the orphaned normalizer persists in the DashMap forever.
In the old single-normalizer design, completed_roots (retained up to 16,384 entries per COMPLETED_ROOT_RETENTION_LIMIT at src/daemon/trace_normalizer.rs:86) would catch these late events and discard them immediately. The per-root split loses this cross-lifecycle memory, causing an unbounded slow leak in long-running daemons.
Prompt for agents
The problem is that after clear_trace_root_tracking removes a normalizer for a completed root from the normalizers DashMap, any subsequent payload for that root (e.g. a connection-close fallback atexit) will lazily create a brand-new normalizer that has no knowledge the root already completed. This new normalizer is never cleaned up.
In the old single-normalizer design, the completed_roots set (capped at 16,384 entries) served as a guard against processing late events for already-finished roots.
Possible approaches:
1. Add a separate DashSet<String> (e.g. completed_root_sids) on ActorDaemonCoordinator that tracks recently completed roots (with a bounded size). In apply_trace_payload_to_state, check this set BEFORE creating/looking up a per-root normalizer. If the root is in the completed set, skip normalizer creation and return TracePayloadApplyOutcome::None.
2. Alternatively, in the terminal-event branch of apply_trace_payload_to_state (around line 7020-7036), after clear_trace_root_tracking is called, also check whether the normalizer for this root is still in the DashMap and remove it if no pending root slot exists.
3. A simpler approach: in the code at line 7005-7014, before or_insert_with, check whether this root has already been processed by consulting the ingress state (e.g. if no ingress maps contain this root AND no pending root slot exists, skip creating the normalizer).
Was this helpful? React with 👍 or 👎 to provide feedback.
273ed39 to
80bd7f1
Compare
| drop(count); | ||
| self.ingr_root_open_connections.remove(root_sid); |
There was a problem hiding this comment.
🔴 TOCTOU race in record_trace_connection_close between drop(count) and remove()
The conversion from a single Mutex<TraceIngressState> to per-field DashMaps introduces a race condition in record_trace_connection_close. When count == 1, the code drops the RefMut (releasing the DashMap shard lock at line 4304) and then calls remove() (re-acquiring the lock at line 4305). In the window between these two operations, a concurrent call to trace_root_connection_opened (src/daemon.rs:4288-4293) on another connection-handler thread can find the existing entry (still at value 1), increment it to 2, and then remove() deletes the entry entirely — losing the new connection's count.
The old code was safe because both the decrement-or-remove and the open were serialized under one Mutex<TraceIngressState> lock. The consequence is that the newly opened connection becomes untracked: when it later closes, record_trace_connection_close won't find it in ingr_root_open_connections and will incorrectly report the root as stale, potentially triggering a premature close-fallback atexit event.
Prompt for agents
The bug is in record_trace_connection_close (src/daemon.rs around lines 4296-4310). When count <= 1, the code does drop(count) then self.ingr_root_open_connections.remove(root_sid), but between these two operations another thread can call trace_root_connection_opened which increments the counter, and then remove() deletes the entry losing the new connection count.
To fix this atomically, replace the drop+remove pattern with DashMap's remove_if method (available in dashmap 6.x), which checks and removes in a single shard-locked operation:
let was_removed = self.ingr_root_open_connections.remove_if(root_sid, |_, v| *v <= 1).is_some();
if !was_removed { continue; }
Alternatively, you could keep the get_mut path for count > 1 (decrement) and use remove_if for the count == 1 case. The key requirement is that the check-then-remove must be atomic with respect to concurrent entry/or_insert calls from trace_root_connection_opened.
Was this helpful? React with 👍 or 👎 to provide feedback.
52a3bc3 to
905bc5c
Compare
98e95e4 to
3b51be1
Compare
bbb0dff to
43d37b6
Compare
3b51be1 to
c6a7743
Compare
c6a7743 to
144859b
Compare
| let removed = self | ||
| .ingr_root_open_connections | ||
| .remove_if_mut(root_sid, |_, v| { | ||
| if *v > 0 { | ||
| *v -= 1; | ||
| } | ||
| *v == 0 | ||
| }); | ||
| if removed.is_some() || !self.ingr_root_open_connections.contains_key(root_sid) { | ||
| stale_roots.push(root_sid.clone()); | ||
| } |
There was a problem hiding this comment.
🟡 TOCTOU race in record_trace_connection_close can produce duplicate stale roots
The remove_if_mut + contains_key two-step in record_trace_connection_close has a TOCTOU race when two connections for the same root close concurrently. If root has count=2 and two threads call simultaneously: Thread A's remove_if_mut decrements to 1 (returns None); Thread B's remove_if_mut decrements to 0, removes entry (returns Some). Thread A then calls contains_key which returns false (entry removed by B), so Thread A also marks the root as stale. Both threads return the same root in stale_roots, leading to duplicate fallback atexit events being enqueued downstream via enqueue_stale_connection_close_fallbacks. While the completed_root_sids guard and ingr_root_close_fallback_enqueued DashSet may mitigate the worst effects, the old code under a single Mutex had no such race — only one thread could decrement and determine staleness at a time.
Was this helpful? React with 👍 or 👎 to provide feedback.
ee6d2c6 to
8c8636e
Compare
…ers, bounded connection pool
Three architectural improvements to the trace2 ingestion pipeline:
1. Replace Mutex<TraceIngressState> with 15 independent DashMap/DashSet fields
- Each field (root_worktrees, root_families, root_argv, root_mutating, etc.)
is now a dashmap::DashMap<String, V> or dashmap::DashSet<String>
- Per-root events no longer serialize on a single global mutex; concurrent
connections to different roots run without blocking each other
- Readonly fast path is now fully lock-free: DashMap shard lookups replace
global Mutex acquisition for the 40+/sec readonly events from IDEs like Zed
- Fold `mark_trace_root_activity` into the existing lock acquisition in the
mutating path, eliminating one full Mutex cycle per mutating event
- Add `dashmap = "6"` dependency
2. Per-root normalizer instances (dashmap::DashMap<String, Arc<AsyncMutex<...>>>)
- Replace single global AsyncMutex<TraceNormalizer> with a per-root map
- Normalizer state is now scoped to each root SID; entries are cleaned up
when clear_trace_root_tracking removes the root
- Enables future concurrent ingest workers per root without further refactor
3. Bounded trace connection thread pool
- Add a hard ceiling (MAX_CONCURRENT_TRACE_CONNECTIONS=512) on simultaneous
connection handler threads
- Excess connections are dropped with a log message rather than spawning
unbounded OS threads (prevents memory exhaustion and scheduler overload)
- Active count tracked with a shared AtomicUsize; thread spawn/exit
are O(1) atomic operations
All 1420 tests pass. Benchmarks: readonly_flood/zed_mixed_1000_events unchanged
(p=0.99; concurrency benefit not captured by single-threaded microbenchmark).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1. TOCTOU race in record_trace_connection_close: the old drop(count)+remove() pattern released the DashMap shard lock between check and delete, allowing trace_root_connection_opened on a concurrent thread to increment the counter and then have it silently deleted. Fix: use remove_if_mut to atomically decrement and remove within a single shard-lock acquisition. 2. Orphaned per-root normalizer leak: after clear_trace_root_tracking removes normalizers[root], a late connection-close fallback atexit would lazily create a fresh empty normalizer that never gets cleaned up, causing an unbounded slow memory leak in long-running daemons. Fix: maintain a bounded completed_root_sids DashSet (FIFO-evicted at 16 384 entries, mirroring the normalizer's own completed_roots limit) and early-exit in apply_trace_payload_to_state for already-completed roots. 3. Panic safety of active-connection counter: the plain fetch_sub after the handler call would be skipped if handle_trace_connection_actor panics, leaking the counter and eventually blocking new connections. Fix: use a small RAII ActiveGuard whose Drop impl performs the decrement, guaranteeing cleanup even during panic unwinding. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1. remove_if_mut: guard against count=0 edge case — use explicit
`if *v > 0 { *v -= 1 }` instead of saturating_sub to avoid
silently removing entries that are already at zero.
2. ingr_root_mutating TOCTOU: replace get-then-insert with atomic
entry API (or_insert + conditional upgrade) so concurrent
connection-handler threads cannot overwrite true with false.
3. ingr_root_target_repo_only TOCTOU: same atomic entry fix, with
the reflog cleanup only firing on an actual false→true upgrade.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the MAX_CONCURRENT_TRACE_CONNECTIONS=512 ceiling and ActiveGuard RAII pattern. The simple unbounded thread::spawn from main is sufficient — the connection pool added complexity without meaningful benefit for current workloads, and the trace listener may move away from OS threads soon. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Early-exit when root_key is empty before normalizer creation, preventing an orphaned normalizer keyed on "" that would never be cleaned up. - Document the benign duplicate-stale-roots race in record_trace_connection_close (concurrent close can push the same root twice; downstream deduplicates via ingr_root_close_fallback_enqueued DashSet). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8c8636e to
6958b24
Compare
|
Just fyi maybe some useful ideas, but this is old so I'm just gonna close it out for now. CC @heapwolf |
Summary
Two architectural improvements that make the trace2 ingest pipeline scale with concurrent workloads.
1. Shard
TraceIngressStatewith DashMapReplace the single
Mutex<TraceIngressState>(15HashMap/HashSetfields all keyed by root SID) with 15 independentdashmap::DashMap/DashSetfields onActorDaemonCoordinator:DashSet::containsreplacesMutex::lock + HashSet::containsmark_trace_root_activity(previously an extra Mutex cycle) into the existing lock acquisition for mutating events — saves one lock cycle pergit commitDashSet::insertreturningboolprovides atomic check-and-set for the close-fallback deduplication logic2. Per-root normalizer instances
Replace
AsyncMutex<TraceNormalizer>(one global instance) withDashMap<String, Arc<AsyncMutex<TraceNormalizer>>>:clear_trace_root_trackingDashSet(16,384 FIFO) to prevent orphaned normalizer creation from late eventsCorrectness fixes
record_trace_connection_close: Useremove_if_mutto atomically decrement-and-remove within a single shard lock, with explicitv > 0guard against double-close edge caseingr_root_mutating/ingr_root_target_repo_only: Use DashMap entry API for atomic read-modify-write so concurrent threads cannot overwritetruewithfalsecompleted_root_sidsDashSet prevents late events from creating orphaned normalizers after root tracking is clearedrecord_trace_connection_closecan produce duplicate stale roots under concurrent close; downstreamingr_root_close_fallback_enqueuedDashSet deduplicatesTest plan
cargo test --lib— all tests passcargo clippy --all-targets— cleancargo fmt— applied🤖 Generated with Claude Code