feat: Write transaction fragments directly to storage to reduce consumer memory footprint#3783
feat: Write transaction fragments directly to storage to reduce consumer memory footprint#3783
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
Comment |
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3783 +/- ##
==========================================
+ Coverage 87.06% 87.20% +0.13%
==========================================
Files 25 25
Lines 2343 2391 +48
Branches 592 595 +3
==========================================
+ Hits 2040 2085 +45
- Misses 301 304 +3
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This comment has been minimized.
This comment has been minimized.
d3dc0f4 to
1c882c2
Compare
1c882c2 to
71b61f5
Compare
This comment has been minimized.
This comment has been minimized.
dba7fc3 to
c7d9d17
Compare
This comment has been minimized.
This comment has been minimized.
c7d9d17 to
ccaed2d
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ccaed2d3ca
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@claude please review this PR.
|
|
Every microsecond counts! |
robacourt
left a comment
There was a problem hiding this comment.
LGTM. Great work! The interaction with the Materializer will need addressing, maybe consumers with materializers should be write_unit=txn
Claude Code ReviewSummaryTenth review iteration of PR #3783 (2026-02-24). Since Review #9, a significant batch of commits landed that address all four remaining "What Remains" items plus two chatgpt-codex P1 concerns from Feb 10. The PR is now in excellent shape. One low-probability race condition from Review #9 remains open. What's Working WellStorage capability check (
Flush boundary offset fix (
Split notification functions (
Expanded test coverage: The new tests in
Changeset ( Now accurately documents the three new callbacks and the automatic fallback behavior for InMemoryStorage. Issues FoundImportant (Should Fix)1. Pre-existing standalone consumer + new materializer subscription race File: Carried over from Review #9 — still unaddressed. def handle_call({:subscribe_materializer, pid}, _from, state) do
Process.monitor(pid, tag: :materializer_down)
{:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, ...}
endFor a Impact: Incorrect move-in/out tracking in the outer shape for that one transaction. Probability: Low — requires (a) inner shape created independently before outer shape, (b) outer shape created while a multi-fragment transaction is active on the inner shape. Suggested mitigation: In def handle_call({:subscribe_materializer, pid}, _from, state) do
Process.monitor(pid, tag: :materializer_down)
{:ok, safe_offset} = ShapeCache.Storage.fetch_latest_offset(state.storage)
{:reply, {:ok, safe_offset}, %{state | materializer_subscribed?: true}, state.hibernate_after}
endThis ensures the materializer always starts from a committed boundary. Suggestions (Nice to Have)2. Changeset claim doesn't match behavior for backends missing File: The changeset says: "Custom storage backends that do not implement these callbacks will automatically fall back to the default The defp validate_storage_capabilities(%__MODULE__{write_unit: @write_unit_txn_fragment} = state, storage) do
{mod, _opts} = storage
supports? = function_exported?(mod, :supports_txn_fragment_streaming?, 0) and mod.supports_txn_fragment_streaming?()
if supports? do
state
else
# ... existing downgrade logic
end
end3. Pre-existing:
Issue ConformanceLinked issue #3415: "Avoid holding whole transaction in consumer memory" ✅ Core objective delivered: memory improvement via direct fragment-to-storage streaming Previous Review Status (Review #9 → Review #10)What was addressed:
What remains:
Overall AssessmentThe PR is merge-ready from a correctness standpoint. The critical architecture decisions (write_unit set at init time, materializer commit deferral, flush boundary alignment) are all correct. The one remaining Important issue is a low-probability edge case that requires a specific timing window. If the team is comfortable with the probability, this can merge as-is and the fix can be a follow-up. If not, the mitigation is a one-line change in Review iteration: 10 | 2026-02-24 |
5f18340 to
4714a7a
Compare
Update (Feb 16)Rebased on current main and pushed new commits addressing review feedback:
Still TODO: address @robacourt's feedback about shapes that are dependencies (innermost shapes with no dependencies themselves but that DO have a materializer) — these should probably use Based on current implementation, Materializer sees changes from all txn fragments but when it then notifies the consumer with a |
This comment has been minimized.
This comment has been minimized.
Latest changes (3 new commits since last push)Defer Materializer subscriber notifications until commit fragment Previously, when Changes:
|
|
Regarding an erlier agentic comment about the potential impact of direct txn fragment writing to storage on TL;DR: Fragment Streaming Storage SafetyAll 7 crash/consistency scenarios are safe. The three-offset architecture (
No code changes are needed — the existing invariants hold correctly with the fragment write path. |
There was a problem hiding this comment.
LGTM. To test the affect of this on subqueries, you could cherry-pick 0cc3b49 and run the tests with the number of mutations in the test transactions over the max txn size and see if there any issues:
ELECTRIC_EXPERIMENTAL_MAX_TXN_SIZE=5 \
MUTATIONS_PER_TXN=15 \
mix test --only oracle test/integration/oracle_property_test.exs|
@robacourt That's dope! It has found one failure already after running for ~10sec. I'll look into fixing it and see if it can discover anything else. |
…rate functions This is needed to be able to selectively notify the appropriate subscriber and processing individual txn fragments.
…tions PendingTxn no longer needs to track last_log_offset since the commit fragment's offset (txn_fragment.last_log_offset) is now used directly in maybe_complete_pending_txn. This fixes flush alignment lagging behind when the final commit fragment is filtered out. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
InMemoryStorage and TestStorage return false, PureFileStorage returns true. This allows runtime capability checking before enabling fragment streaming mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pport When initializing a consumer, validate that the storage backend supports txn fragment streaming before using :txn_fragment mode. If unsupported (e.g. InMemoryStorage), fall back to full-transaction buffering with a warning log. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously, Materializer sent :materializer_changes to subscribers on every new_changes call. When write_unit=:txn_fragment, this meant the outer Consumer could receive multiple intermediate move-in/move-out notifications within a single transaction, causing redundant work. Add a commit: option to Materializer.new_changes/3 (default: true). Changes are accumulated in pending_events and only flushed to subscribers when commit: true. The fragment-streaming path passes commit: not is_nil(fragment.commit) to defer until the final fragment. maybe_complete_pending_txn sends an empty new_changes with commit: true to handle the edge case where the commit fragment has no matching changes but earlier fragments accumulated events. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ding writes for this txn
Verifies that splitting inserts across two new_changes calls (one uncommitted, one committed) produces the same event order as sending them in a single call. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…_to_storage
The :includes_truncate and {[], 0} branches returned {state, []} instead
of bare state, causing a FunctionClauseError crash when piped into
maybe_complete_pending_txn which pattern-matches on %State{}.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…fications maybe_complete_pending_txn was notifying clients with txn_fragment.last_log_offset — the offset of the last change across ALL tables in the commit fragment. But fetch_latest_offset returns the offset of the last change written to THIS shape's storage. When a split transaction has changes to tables not matching the shape, the client receives a higher offset than what the shape's storage knows about, causing "offset out of bounds" on the next poll. Fix: use state.latest_offset (the last offset actually written to this shape's log) instead of the fragment's cross-table offset. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
751cf24 to
cdf0cb7
Compare
|
I have pushed a fix for the issue described in #3783 (comment) and have set Still it's impossible to be sure this branch doesn't add new failures on top of those already observed in |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cdf0cb70ac
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
…hape-with-subqueries hierarchy The write_txn_fragment_to_storage path in Consumer was designed with simplifications that assumed the consumer has no inner shape dependencies: it doesn't account for move-ins/move-outs, doesn't call State.track_change, and evaluates WHERE clauses without extra_refs from inner materializers. These simplifications are correct for true leaf consumers, but a consumer can have a materializer subscribed to it by an outer shape even if it was initially assigned write_unit=txn_fragment (based on having no dependencies of its own). When such a consumer processes fragments directly, the outer shape's do_handle_txn call to Materializer.get_all_as_refs sees intermediate materializer state that hasn't been properly committed, leading to incorrect WHERE clause evaluation in the outer shape. Instead of reacting to materializer subscription events at runtime, we now initialize the consumer with write_unit=txn upfront when its shape is known to be part of a shape-with-subqueries hierarchy. This ensures that any consumer whose state may be observed by an outer shape via extra_refs buffers all fragments and processes the complete transaction atomically from the start, guaranteeing the materializer's value_counts are updated in a single consistent operation before the outer shape queries them. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
No need to pass the whole state, only stack_id is used from it. Makes the data dependencies between functions clearer.
cdf0cb7 to
846aa9f
Compare
This comment has been minimized.
This comment has been minimized.
…angeset Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tack This is a temporary measure to get the code changes made in this branch so far out of the door before polishing interop between subqueries and individual txn fragment writing to storage.
|
In order to get this PR merged, I have disabled individual txn fragment writing to storage when the I will undo that and will continue fixing edge cases for shapes with subqueries in this follow-up PR. |
robacourt
left a comment
There was a problem hiding this comment.
LGTM. I just left a couple of comments
| |> build_shape_dependencies(known) | ||
| |> build_shape_dependencies(false, known) | ||
|
|
||
| # Any inner shape of a root shape with subqueries must pass the is_subquery_shape? option |
There was a problem hiding this comment.
I'm not sure why being a root is a special case. Any dependency should have the is_subquery_shape?: true
There was a problem hiding this comment.
Oh, ignore me. You're just excluding the root from being is_subquery_shape?. Fair enough.
There was a problem hiding this comment.
The logic in my head seems more straightforward that you tell a shape whether it's a subquery shape or not, when building the root thats false and when it builds it's dependencies it's true. No need to even mention the root. Also is_subquery_shape? could always be set rather than some additional conditional logic. But this is a very minor nit!
| Enum.zip(shape.shape_dependencies_handles, shape.shape_dependencies) | ||
| |> Enum.reject(fn {handle, _shape} -> MapSet.member?(known, handle) end) | ||
| |> build_shape_dependencies(known) | ||
| |> build_shape_dependencies(false, known) |
There was a problem hiding this comment.
Can a shape become a subquery shape after it's created? So if SELECT id FROM products was created as a shape first and then id IN (SELECT id FROM products) later, would the 2nd shape use the first as the subquery shape? If so setting is_subquery_shape? on creation breaks. If this is the case personally I wouldn't be against ensuring that shapes and subquery shapes were distinct to prevent this from happening, it's only an optimisation to reuse shapes.
There was a problem hiding this comment.
I can address both your comments here.
You are right, a shape may start independent but then have a materializer subscribe to it at a later point in time. I attempted to identify inner shapes using the is_subquery_shape? option passed at consumer startup but then determined that it wouldn't be sufficient. If we want to enable txn fragment processing for stacks that have allow_subqueries, we need to make it work even for shape with subqueries properly, at least in the case where shape.shape_dependencies == []. At that point, the is_subquery_shape? flag becomes superfluous and will be removed from the codebase.
There was a problem hiding this comment.
Ok, so your plan for this PR is to not enable txn fragment processing for stacks that have allow_subqueries?
There was a problem hiding this comment.
Exactly.
Once this is merged, direct txn fragment writing to storage will only be enabled for stack that don't have subqueries.





Closes #3415
Summary
For shapes without subquery dependencies (the vast majority), the consumer now streams each transaction fragment directly to storage as it arrives, instead of buffering the entire transaction in memory. This significantly reduces per-shape-consumer memory usage for large transactions.
Shapes with subquery dependencies continue to buffer the full transaction (
write_unit=txn) because they need the complete change set for move-in/move-out reasoning.How it works
The consumer operates in one of two modes, determined at shape creation time:
write_unit=txn_fragment(default): Each fragment is filtered, converted to log items, and written to storage immediately. A lightweightPendingTxnstruct tracks only metadata (xid, offset, byte count, change count) across fragments. When the final fragment with a commit arrives, storage is notified to advance its transaction boundary.write_unit=txn(shapes with subquery dependencies): Fragments are accumulated into a complete transaction before writing, as before.Key additions
Consumer.PendingTxn— tracks in-progress transaction metadata during fragment streaming. It is the lightweight counterpart toTransactionBuilder: whereTransactionBuilderaccumulates all changes in memory,PendingTxnonly tracks offsets and counters while actual data is written to storage and discarded immediately.New storage callbacks —
append_fragment_to_log!/2writes log items without advancing the persisted transaction offset (can be called multiple times per transaction), andsignal_txn_commit!/2marks the transaction as complete so crash-recovery sees the correct committed boundary. The existing full-transaction write path is refactored to share the same underlying logic.Support.StorageTracer— test helper using Erlang trace sessions to observe which storage functions are called from a process, enabling non-intrusive assertions on the write path taken without wrapper modules or function patching.Other notable changes
FlushTrackerinShapeLogCollectoris now only invoked for fragments that include a commit, since mid-transaction fragments don't represent a flush boundary.CrashingFileStorage(dead code).Expected memory footprint reduction
Previously, the consumer held an entire transaction's worth of change structs in memory until commit. Now, each fragment's changes are written to storage and released immediately. The consumer only retains the
PendingTxnmetadata struct (~100 bytes) per in-progress transaction, reducing peak memory by roughly the size of the buffered change data. The improvement scales linearly with transaction size.Testing
Consumer and storage tests have been expanded to cover both
write_unitmodes, including tracing-based assertions on which storage functions are called.