feat(sqs): live-queue reaper enumerates partitioned keyspace (Phase 3.D PR 6b)#736
feat(sqs): live-queue reaper enumerates partitioned keyspace (Phase 3.D PR 6b)#736
Conversation
….D PR 6b) Follow-up to PR 6a (#735). The tombstone-driven reaper now sweeps partitioned data / vis / byage / dedup / group records on DeleteQueue / PurgeQueue, but the live-queue retention reaper still walked only the legacy keyspace, so: - retention-expired messages on partitioned queues leaked their data / vis / byage / group rows forever (reapQueue's byage walk used sqsMsgByAgePrefixAllGenerations only), - expired dedup records on partitioned FIFO queues leaked forever (reapExpiredDedup's prefix scan used SqsMsgDedupPrefix only — empty for partitioned queues, since sqsMsgDedupKeyDispatch routes their writes under SqsPartitionedMsgDedupPrefix). Closes the live-queue half of the Codex P2 from PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"); PR 6a covered the tombstoned-cohort half. Scope (the second slice of §11 PR 6 from the split-queue-FIFO design doc): - reapQueue: legacy byage walk extracted as reapQueueLegacy, behaviour byte-identical to pre-PR-6b for non-partitioned queues. Adds a new reapQueuePartition step that runs once per partition for PartitionCount > 1 queues. Each partition gets its own per-partition budget per the §6 design ("partitions × budget per cycle"); a 32-partition queue thus allows up to 32 × sqsReaperPerQueueBudget records per tick, comfortably within the 30s reaper interval. - reapPartitionedPage: partitioned twin of reapPage. Same live-vs-orphan classification (parsed.SendTimestampMs > cutoff under the live gen, unconditional reap under older gens, defensive skip on parsed.Generation > currentGen) but parses each entry with parseSqsPartitionedMsgByAgeKey and routes the dispatch through reapOneRecordPartitioned. - classifyPartitionedByAgeEntry: helper extracted from reapPartitionedPage so the loop body stays under the cyclop ceiling. Returns (parsedKey, reapable bool). - reapExpiredDedup: now takes *sqsQueueMeta and routes by PartitionCount. Legacy meta (PartitionCount <= 1) → reapExpiredDedupLegacy (byte-identical to pre-PR-6b walk). Partitioned meta (PartitionCount > 1) → reapExpiredDedupPartitioned (NEW), which iterates each partition's partitioned dedup prefix under its own per-partition budget and uses the existing reapDedupPage to apply the value-based ExpiresAtMillis filter. Caller audit: - reapQueue: one production caller (reapAllQueues line 85). No signature change. Behaviour for non-partitioned queues byte-identical; partitioned queues get the additional per-partition pass. - reapExpiredDedup: signature changed to take *sqsQueueMeta. One production caller (reapAllQueues line 88), updated. No test files invoked this helper directly. Legacy meta routes to the byte-identical legacy walk; partitioned routes to the new walk. - reapQueueLegacy / reapQueuePartition / reapExpiredDedupLegacy / reapExpiredDedupPartitioned / classifyPartitionedByAgeEntry: each has exactly one caller in the new live-queue reap path. - reapOneRecordPartitioned: existing helper from PR 6a. Previous caller was reapDeadByAgePartitionPage (tombstone path); now also called from reapPartitionedPage (live-queue path). Same dispatch semantics — synthetic meta carrying PartitionCount > 1 to flip the dispatch helper branch. Tests: - New TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions (wire-level): create a 4-partition FIFO queue, send across 6 distinct groups, backdate every partitioned dedup record's ExpiresAtMillis, run reapAllQueues, assert every partitioned dedup row across [0, 4) is gone. Pre-PR-6b reaper would have left every row in place — the test fails on the legacy code path. Self-review (CLAUDE.md): 1. Data loss — Closes the live-queue dedup leak; closes the partitioned retention-expired-message leak. Legacy queues unchanged: reapQueue, reapExpiredDedup, byage walks, and dispatch helpers all keep their byte-identical pre-PR-6b paths for PartitionCount <= 1. 2. Concurrency / distributed failures — Reaper still runs only on the leader. Each partition's pass is sequential; per- partition budget bounds the pass. Existing OCC dispatch semantics on each per-record reap unchanged. 3. Performance — Per-tick partitioned-queue cost grows from O(1 walk) to O(partition_count walks) on byage AND dedup. Each partition bounded by sqsReaperPerQueueBudget. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design. 4. Data consistency — Live-vs-orphan classification on partitioned byage mirrors the legacy branch exactly (reapPage / reapPartitionedPage share the rules through classifyPartitionedByAgeEntry). PartitionCount immutability means the meta-driven iteration bound matches the on-disk keys for any cohort. 5. Test coverage — One new wire-level integration test for the partitioned dedup walk; the partitioned byage walk reuses parsing / dispatch helpers already tested by PR 6a's tombstone-reap integration test.
|
@claude review |
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThe pull request extends the SQS reaper to handle partitioned queues by splitting both byage and dedup expiry reaping logic into legacy and partition-aware paths. Queue metadata is threaded through the reaper pipeline to enable partition iteration and per-partition budget management. A comprehensive integration test validates that dedup reaping correctly removes expired dedup records across all partitions. ChangesPartition-Aware Reaper Implementation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 22 minutes and 15 seconds.Comment |
|
Claude finished @bootjp's task in 3m 30s —— View job Review
The PR correctly closes the live-queue half of the partitioned-FIFO reaper gap. The architecture is coherent — Two items need addressing before merge; one stylistic observation below them. Issue 1 — Stale comment in
|
There was a problem hiding this comment.
Code Review
This pull request implements partitioned reaping for SQS queues, updating the reaper to handle both legacy and partitioned keyspaces for message expiration and deduplication record cleanup. A new test case verifies that expired deduplication records are correctly reaped from partitioned FIFO queues. The reviewer suggested that reapExpiredDedup should perform both legacy and partitioned scans for partitioned queues to maintain consistency with reapQueue and prevent potential record leaks.
| if meta != nil && meta.PartitionCount > 1 { | ||
| return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS) | ||
| } | ||
| return s.reapExpiredDedupLegacy(ctx, queueName, readTS) |
There was a problem hiding this comment.
The reapExpiredDedup function uses an if/else logic to choose between legacy and partitioned dedup reaping. In contrast, reapQueue (line 433-451) performs both legacy and partitioned scans for partitioned queues to be defensive against leaked records. To maintain consistency and ensure no records are leaked if a queue's state were ever inconsistent, consider performing both scans here as well when meta.PartitionCount > 1.
if meta != nil && meta.PartitionCount > 1 {
if err := s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS); err != nil {
return err
}
}
return s.reapExpiredDedupLegacy(ctx, queueName, readTS)There was a problem hiding this comment.
🧹 Nitpick comments (1)
adapter/sqs_partitioned_dispatch_test.go (1)
715-723: 💤 Low valueConsider adding
ReadKeysfor OCC consistency.The backdate write lacks
ReadKeys, which differs from the pattern used elsewhere in this file (e.g., lines 58-65). While this test runs in isolation and conflicts are unlikely, including the key inReadKeyswould match the established OCC transaction pattern.Suggested fix
req := &kv.OperationGroup[kv.OP]{ IsTxn: true, StartTS: readTS, + ReadKeys: [][]byte{bytes.Clone(row.Key)}, Elems: []*kv.Elem[kv.OP]{ {Op: kv.Put, Key: bytes.Clone(row.Key), Value: body}, }, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_partitioned_dispatch_test.go` around lines 715 - 723, The txn request `req` created as a *kv.OperationGroup[kv.OP]* for the backdated write is missing the OCC `ReadKeys` field; update the request construction (the `req` passed to `srv.coordinator.Dispatch`) to include the transaction's read key(s) (the same key used in the Put element) in `ReadKeys` so the operation follows the established OCC pattern used elsewhere in this file.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 715-723: The txn request `req` created as a
*kv.OperationGroup[kv.OP]* for the backdated write is missing the OCC `ReadKeys`
field; update the request construction (the `req` passed to
`srv.coordinator.Dispatch`) to include the transaction's read key(s) (the same
key used in the Put element) in `ReadKeys` so the operation follows the
established OCC pattern used elsewhere in this file.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c9072fae-a2cd-465c-8f46-dd99affc49eb
📒 Files selected for processing (2)
adapter/sqs_partitioned_dispatch_test.goadapter/sqs_reaper.go
Two findings from Claude on PR #736 plus a medium-severity Gemini suggestion in the same review pass: 1. (Claude must-fix) Stale forward-reference comment in reapPage was written in PR 6a pointing at PR 6b. PR 6b is now this branch, so the "is a follow-up to PR 6a" wording was misleading. Trimmed to a one-line note that reapPage covers the legacy keyspace and reapPartitionedPage is the partitioned twin. 2. (Claude should-fix) classifyPartitionedByAgeEntry had no direct coverage of its retention-cutoff branch (live gen, sendTs <= cutoff -> reapable) or its future-gen guard (parsed.gen > currentGen -> not reapable). The integration tests exercise the orphan-cohort and dedup-expiry paths only. Added TestClassifyPartitionedByAgeEntry: a table-driven unit test that pins every branch (within retention, past retention, exact-cutoff boundary, orphan generation, future generation, wrong partition, wrong queue prefix, legacy key) against deterministic key inputs built via sqsPartitionedMsgByAgeKey. No store / coordinator dependency, so the test is fast and immune to retention timing. 3. (Gemini medium) reapExpiredDedup used an if/else to pick between legacy and partitioned dedup reaping for partitioned queues, while reapQueue runs both for symmetry and defensive coverage of leaked legacy entries. Mirrored that policy: legacy scan now always runs (cheap on an empty prefix today), and the partitioned scan additionally runs for partitioned queues. Caller audit: reapExpiredDedup has a single caller (reapAllQueues) and the error contract is unchanged, so no semantic ripple. Refs: PR #736 review thread; Claude review at run 25327783108.
|
@claude review |
|
Claude finished @bootjp's task in 2m 58s —— View job Review
This is the round-2 pass. All items from the previous Claude review and the Gemini concern are resolved. Resolved since round 1Stale comment in
One remaining nit (no blocker)
Correctness notes (no action needed)
The architecture is sound and test coverage is now complete for all new decision paths. Ready to merge pending CI. |
## Summary
Phase 3.D PR 7a — adds the
`elastickv_sqs_partition_messages_total{queue, partition, action}`
Prometheus counter so dashboards and alerts can spot uneven
`MessageGroupId` distributions across partitioned-FIFO queues.
Metrics-only: the Jepsen HT-FIFO workload (PR 7b) ships separately so
the two review loops do not cross.
- `monitoring/sqs.go` (new): `SQSMetrics` with cardinality cap
(`sqsMaxTrackedQueues = 512`, overflow collapses to `_other`) mirroring
`DynamoDBMetrics`. Drops empty queue names and unknown action labels so
a future call-site bug cannot pollute the series space dashboards have
to learn about. Nil-receiver-safe so adapter call sites do not need to
nil-guard.
- `adapter/sqs.go`: `SQSPartitionObserver` interface +
`WithSQSPartitionObserver` option. Re-declared in `adapter` so it
doesn't import `monitoring` at the package boundary (matches the
DynamoDB/Redis observer pattern). Action constants
(`send`/`receive`/`delete`) re-declared on the adapter side and
validated at runtime by the monitoring side — drift between the two
surfaces as a dropped observation, not a wedge.
- `adapter/sqs_fifo.go`, `adapter/sqs_messages.go`: emit the counter on
the **partitioned** commit branch only (`PartitionCount > 1`) for send /
receive / delete. Legacy single-partition queues stay off the metric
since partition is always 0 and the cardinality cost would buy nothing.
- `monitoring/registry.go`, `main_sqs.go`, `main.go`: wire the
registry's `SQSPartitionObserver()` into `startSQSServer` so the SQS
server picks up the production observer on cluster boot. Test fixtures
and CLI tools that build `SQSServer` without a registry pass `nil` and
the metric stays at zero.
## Tests
`monitoring/sqs_test.go` (new, 6 cases):
- `TestSQSMetrics_ObservePartitionMessage_IncrementsByLabelTriple` — pin
the `(queue, partition, action)` counter contract.
- `TestSQSMetrics_ObservePartitionMessage_DropsInvalidAction` — pin the
typo guard against future drift between adapter and monitoring
constants.
- `TestSQSMetrics_ObservePartitionMessage_DropsEmptyQueue` — pin that an
empty queue name does not collapse with valid observations onto a shared
series.
- `TestSQSMetrics_NilReceiverIsSafe` — pin the nil-receiver
short-circuit the adapter relies on.
- `TestSQSMetrics_QueueLabelOverflow` — pin the cap-and-collapse so a
misbehaving caller cannot exhaust the Prometheus series budget.
- `TestSQSMetrics_RegistryWiring` — pin that the public `Registry`
exposes the metric under the documented name.
## Self-review (5 lenses)
1. **Data loss** — N/A; metrics-only, no storage / Raft / FSM touch.
2. **Concurrency** — counter increments are atomic via Prometheus; the
`trackedQueues` map is only consulted from the dispatch-success path
under the SQS server's existing concurrency model. No new locks.
3. **Performance** — one map lookup + one `CounterVec` lookup per
partitioned send/receive/delete on the success branch. Legacy queues
skip the call entirely. Cardinality bounded at 512 queue × 32 partition
(`htfifoMaxPartitions`) × 3 action ≈ 49k series worst case; in practice
a 32-partition queue yields 96 series, so the budget is plenty for the
SLO panels.
4. **Data consistency** — the metric is observed AFTER OCC dispatch
succeeds, so the counter reflects committed state. Receive/delete
branches that return on retryable errors deliberately do not increment
(the retry path will observe on the eventual success).
5. **Test coverage** — 6 unit tests in `monitoring/`, plus the
adapter-side nil-observer path is exercised by all existing
partitioned-FIFO tests in `adapter/sqs_partitioned_dispatch_test.go`
(they pass `nil` observer through the test fixture).
## Test plan
- [x] `go test -race -count=1 ./monitoring/...`
- [x] `go test -race -count=1 -run 'TestSQS' ./adapter/...`
- [x] `go test -race -count=1 ./...` (full suite)
- [x] `golangci-lint --config=.golangci.yaml run ./...` (full repo)
- [ ] Jepsen HT-FIFO workload — deferred to PR 7b
## Refs
- `docs/design/2026_05_01_partial_split_queue_fifo.md` §11 PR 7
- Builds on PR 5b-3 (#734) capability gate, PR 6a (#735) tombstone
reaper, PR 6b (#736 in flight) live-queue reaper.
Summary
Phase 3.D PR 6b: live-queue reaper enumerates partitioned keyspace. Follow-up to PR #735 (6a) — the tombstone-driven sweep already walks partitioned data on DeleteQueue / PurgeQueue, but the live-queue retention reaper still only saw the legacy keyspace, so:
reapQueuewalkedsqsMsgByAgePrefixAllGenerationsonly),reapExpiredDedupscannedSqsMsgDedupPrefixonly — empty for partitioned queues sincesqsMsgDedupKeyDispatchroutes their writes underSqsPartitionedMsgDedupPrefix).Closes the live-queue half of the Codex P2 from PR #732 round 0; PR 6a covered the tombstoned-cohort half.
What changes
reapQueue: legacy byage walk extracted asreapQueueLegacy(byte-identical to pre-PR-6b for non-partitioned queues). AddsreapQueuePartitionstep that runs once per partition forPartitionCount > 1queues. Per-partition budget per the §6 design ("partitions × budget per cycle"); 30s tick interval comfortably absorbs.reapPartitionedPage: partitioned twin ofreapPage. Same live-vs-orphan classification, but parses each entry withparseSqsPartitionedMsgByAgeKeyand routes the dispatch throughreapOneRecordPartitioned.classifyPartitionedByAgeEntry: helper extracted fromreapPartitionedPageso the loop body stays under the cyclop ceiling. Returns(parsedKey, reapable bool).reapExpiredDedup(signature changed): now takes*sqsQueueMetaand routes byPartitionCount. Legacy meta →reapExpiredDedupLegacy(byte-identical). Partitioned meta →reapExpiredDedupPartitioned(NEW), iterates each partition's dedup prefix under its own per-partition budget.Caller audit
reapQueue— one production caller (reapAllQueues); signature unchanged. Non-partitioned queues byte-identical; partitioned get the extra per-partition pass.reapExpiredDedup— signature changed to take*sqsQueueMeta; one production caller (reapAllQueues), updated. No tests called it directly.reapQueueLegacy/reapQueuePartition/reapPartitionedPage/reapExpiredDedupLegacy/reapExpiredDedupPartitioned/classifyPartitionedByAgeEntry) each have exactly one production caller in the new live-queue reap path.reapOneRecordPartitioned(existing PR 6a helper): previously called fromreapDeadByAgePartitionPage(tombstone path); now also fromreapPartitionedPage(live-queue path). Same dispatch semantics.Tests
TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions: 4-partition queue, send across 6 distinct groups, backdate every partitioned dedup record'sExpiresAtMillis, runreapAllQueues, assert every partitioned dedup row across[0, 4)is gone. Pre-PR-6b reaper would leave every row in place.Self-review (CLAUDE.md)
sqsReaperPerQueueBudget. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design.reapPage/reapPartitionedPageshare the rules throughclassifyPartitionedByAgeEntry).PartitionCountimmutability means the meta-driven iteration bound matches the on-disk keys.Test plan
make lint— 0 issuesSummary by CodeRabbit
Bug Fixes
Tests