Skip to content

[MongoDB Storage] Storage v3 for incremental reprocessing#585

Merged
rkistner merged 98 commits intomainfrom
incremental-processing-storage
Apr 13, 2026
Merged

[MongoDB Storage] Storage v3 for incremental reprocessing#585
rkistner merged 98 commits intomainfrom
incremental-processing-storage

Conversation

@rkistner
Copy link
Copy Markdown
Contributor

@rkistner rkistner commented Mar 24, 2026

This rewrites the MongoDB storage for version 3, in preparation for incremental reprocessing.

Postgres storage will follow in a future PR - this one is already big enough.

Logical storage changes

On a high level:

  1. We partition bucket_data and parameter_indexes (previously bucket_parameters) by source definition, instead of replication stream (previously group_id).
  2. We partition source_records (previously current_data) by source table.
  3. The specific storage format is slightly adjusted to account for the changes required for incremental reprocessing (detailed write-up to follow).
  4. Partition source_tables and bucket_state by replication stream.

All of this is implemented in storage version 3 only - storage formats for versions 1 and 2 are unchanged. However, the implementation is restructured to account for the different logic in the different versions now.

This goal here is primarily to track and split records by bucket definition id or parameter index id, to allow adding and removing specific definitions, without affected others.

Physical storage changes

Wherever we partition by source table, bucket definition or parameter index, we now use separate collections instead of one large collection.

The collection split has some advantages:

  1. Removing data becomes very cheap - just drop the collection.
  2. Reads and writes become faster by moving the common data to the collection name, instead of duplicating in each document.

The split is primarily for performance reasons, not functionally required for incremental reprocessing. It just makes sense to make the changes at the same time, while we're making significant storage changes. It does have some caveats - there are some code paths that query across multiple collections now, which could be slower. We do still need to optimize those cases.

This PR departs from the storage structure used in the incremental reprocessing POC in #468:

  1. The POC made changes on the storage structure directly, without regard for storage versions or backwards-compatibility.
  2. The POC did not use the collection splits.

Other changes

parameter_index optimization

In storage V1, bucket_parameters has this index:

{ 'key.g': 1, lookup: 1, _id: 1 }

And lookups were performed using this aggregation pipeline:

{
  $match: {
    'key.g': group_id,
    lookup: { $in: [...] },
    _id: { $lte: checkpoint }
  }
},
{
  $sort: {
    _id: -1
  }
},
{
  $group: {
    _id: { key: '$key', lookup: '$lookup' },
    bucket_parameters: {
      $first: '$bucket_parameters'
    }
  }
}

The issue is this is quite inefficient when there are a large number of keys per lookup. If the parameter compact job does not run often enough, the number here can build up quite a lot - I've seen 50k+ documents for a single lookup/key in some cases. These queries are rerun very often in some cases (once per checkpoint), so the overhead adds up quickly.

To a large extent this was resolved by having a separate job for parameter compacting (see #569), but we can still improve the query. So in storage V3, we have this index:

{ lookup: 1, key: 1, _id: -1 } // no key.g anymore, since that's part of the collection name now

And for lookups, we do each lookup individually, using this pipeline:

{
  $match: {
    lookup: lookupFilter,
    _id: { $lte: checkpoint }
  }
},
{
  $sort: {
    key: 1,
    _id: -1
  }
},
{
  $group: {
    _id: {
      key: '$key'
    },
    bucket_parameters: {
      $first: '$bucket_parameters'
    }
  }
},
{
  $project: {
    _id: 0,
    bucket_parameters: 1
  }
}

We then combine multiple of those into a single db round-trip by using $unionWith. The same applies whether the different lookups are in the same collection (same parameter index), or in different collections (different parameter indexes).

clustered bucket_data collections

Each individual bucket_data collection is now a MongoDB clustered collection.

In normal collections, each document is assigned an internal id, with an ordered _id index pointing to the internal id.

With clustered collections, that is removed - documents are directly stored in-order, without a separate index. In a way, it's similar to having the entire collection in a covered index.

This has two main advantages:

  1. Each read and each write touches a single location only, instead of the separate collection storage + _id index.
  2. Range read operations typically cover data close to each other in storage, instead of needing random access reads for each document after finding the _id index entry.

Since we're practically only doing range operations in this collection and have no secondary indexes, this appears to work quite well for us, resulting in 2-10x throughput improvements in bulk read performance in some cases. We do still need to benchmark a little better to confirm.

Two issues to look out for:

  1. https://jira.mongodb.org/browse/SERVER-121822 - reverse collection scans have a bounds issue. We have a workaround for this when compacting.
  2. https://jira.mongodb.org/browse/SERVER-75063 and similar issues - the query planner appears to not be as optimized for clustered collections yet. We need to check every query we do against the collection, and make sure that it runs efficiently.

Clearing data

For storage V3, we mostly just drop collections when clearing data, which is fast.

For storage V1, the clearing process is tweaked slightly:

  1. Instead of retrying the entire clear loop, we retry individual operations when hitting the batch timeout.
  2. Increase the clear batch timeout from 5s -> 40s. I have not confirmed, but it appears like mongodb is scanning many documents ahead before removing them. So if we have a short batch timeout, there is a lot of scanning work that we discard. By increasing the timeout, we reduce the amount of scanned documents vs deleted documents.

Storage metrics

Previously, on the first replication when there is no active replication stream / sync rules yet, storage metrics would always report 0. On subsequent re-replication, the metrics would include the total storage from both the active and in-progress replication streams.

This changes the implementation to include current storage during the first replication.

In the future, we can split this metric into separate ones for active storage vs reprocessing.

Id mappings

For bucket definitions and parameter indexes, we now need an unique id for each.

It is not important that these are globally unique at this point - these can be unique per replication stream.

Right now, we compute this once and store these on the sync_rules document. This may change in the future.

Implementation

Even though the changes per collection are relatively minor conceptually, the implementations depart significantly.

I initially tried using if/else logic where the specific queries are performed, but this got messy quite quickly. The current iteration instead split the storage implementation into separate v1 and v3 implementation classes, with common logic in an abstract class.

There are still a couple of places where this does TS casts that effectively bypass type checks, which I'm not completely happy with. It is unfortunately difficult to resolve all of those without significantly more code.

Disclaimer: I used Codex for large parts of the refactoring. This is limited to the implementation/refactoring - the data modeling and implementation structure was designed manually.

Future changes

We may make more changes in the v3 storage format before starting to use it:

  1. Cleanup field names, for example replica_id_columns2.
  2. Shorten field names in collections where the overhead may be significant, such as bucket_data_*.
  3. Properly test performance of clustered collections for bucket_data, reverting to standard collections if there are issues.

@rkistner rkistner changed the title [WIP] Incremental processing storage v3 [MongoDB Storage] Storage v3 for incremental reprocessing Apr 1, 2026
@rkistner rkistner marked this pull request as ready for review April 1, 2026 13:16
@rkistner rkistner requested a review from simolus3 April 1, 2026 13:16
Copy link
Copy Markdown
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes make sense to me and I'm happy with the restructuring into v1/v3 implementation classes 👍

I don't have any concerns apart from a few minor comments and questions.

Comment thread docs/storage-v3.md
Comment thread modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts Outdated
Comment thread modules/module-mongodb-storage/test/src/storage_compacting.test.ts Outdated
Comment thread modules/module-mongodb-storage/src/storage/implementation/v1/models.ts Outdated
Comment thread modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts Outdated
Comment thread modules/module-mongodb-storage/src/storage/implementation/v3/PersistedBatchV3.ts Outdated
simolus3
simolus3 previously approved these changes Apr 2, 2026
@simolus3 simolus3 self-requested a review April 2, 2026 10:38
@rkistner rkistner force-pushed the incremental-processing-storage branch from 822be7c to 8d8164f Compare April 2, 2026 15:22
@rkistner rkistner merged commit 203233d into main Apr 13, 2026
44 checks passed
@rkistner rkistner deleted the incremental-processing-storage branch April 13, 2026 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants