feat(sqs): tombstone reaper enumerates partitioned keyspace (Phase 3.D PR 6a)#735
feat(sqs): tombstone reaper enumerates partitioned keyspace (Phase 3.D PR 6a)#735
Conversation
…D PR 6a) Addresses the Codex P2 review on PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"), deferred at the time as a "PR 6 concern". With PR 5b-3 lifting the dormancy gate, partitioned FIFO queues can now land in production — but until this PR the tombstone reaper only swept the legacy keyspace, so partitioned data / vis / byage / dedup / group rows leaked permanently after DeleteQueue or PurgeQueue. Scope (the focused first piece of §11 PR 6 from docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md): - Tombstone schema: tombstone value now encodes PartitionCount so the reaper can drive partition iteration off the tombstone alone (the meta record is gone by the time the reaper observes the tombstone). Encoding is 8 bytes big-endian uint64. Backward compat: PartitionCount<=1 keeps the byte-identical legacy []byte{1} sentinel; any non-canonical value (empty, 1-byte, 4-byte, out-of-range 8-byte) decodes to PartitionCount=1 so a partial rollback or future encoding revision degrades safely to legacy single-partition reap behaviour. - DeleteQueue (sqs_catalog.go) and PurgeQueue (sqs_purge.go) write the encoded value, reading PartitionCount from the pre-delete / pre-purge meta. PartitionCount is immutable across SetQueueAttributes / PurgeQueue (§3.2) so the post-purge meta and the pre-purge tombstone always agree on the count. - Tombstone reaper: reapTombstonedGeneration takes the decoded PartitionCount, sweeps the legacy keyspace (always — covers pre-HT-FIFO queues plus a defensive "did we accidentally write legacy?" pass) and, on PartitionCount > 1, ALSO sweeps every partition's byage / dedup / group prefix family via the new reapPartitionedGeneration helper. allDone gating delays tombstone deletion until every partition is fully drained, so a wide-fanout queue may need multiple reaper ticks but cannot leave the tombstone-deleted-but-data-lingering inconsistency. - Per-record dispatch: reapOneRecord and buildReapOps now take *sqsQueueMeta + partition uint32 instead of hardcoding nil + 0. Legacy callers pass nil + 0 (byte-identical to the pre-PR-5b layout); the partitioned twin reapOneRecordPartitioned synthesises a meta carrying PartitionCount > 1 so the dispatch helpers route to sqsPartitionedMsg{Data,Vis,Group}Key. - New parsed-key helpers in sqs_keys.go: sqsPartitionedMsgByAgePrefixForPartition(queue, partition, gen) sqsPartitionedMsgDedupKeyPrefix(queue, partition, gen) sqsPartitionedMsgGroupKeyPrefix(queue, partition, gen) Each returns the (queue, partition, gen)-bound scan prefix the reaper feeds deleteAllPrefix / reapDeadByAgePartition. Out of scope (follow-up PR 6b): - Live-queue retention reap (reapQueue) still iterates only the legacy byage prefix; partitioned-byage live retention is the next slice. The tombstone-cohort path this PR addresses is the immediate Codex P2 concern (DeleteQueue + reaper leaving partitioned dedup records behind). - PurgeQueue / DeleteQueue do not iterate partition keyspace synchronously; the tombstone-driven async sweep is sufficient for the AWS-shaped semantics today, and the design doc does not require synchronous per-partition deletion. Caller audit: - encodeQueueTombstoneValue: two production callers (DeleteQueue, PurgeQueue), both pass the queue's PartitionCount. - decodeQueueTombstoneValue: one production caller (reaper); legacy / out-of-range fallback to 1 documented and tested. - reapOneRecord (signature changed): three production callers — reapDeadByAgePage (legacy), reapDeadByAgePartitionPage (via reapOneRecordPartitioned, partitioned), reapQueue's over-retention sweep (legacy). All updated; partitioned path uses the synthetic meta to flip the dispatch helper branch. - reapPartitionedGeneration / reapDeadByAgePartition / reapDeadByAgePartitionPage: each has exactly one caller in the new tombstone-reap path. Self-review (CLAUDE.md): 1. Data loss — Closes the Codex P2 leak; no path now leaves partitioned records orphaned after DeleteQueue/PurgeQueue. Legacy queues unchanged: tombstone value, reaper sweep prefixes, and dispatch helpers all keep the byte-identical pre-PR-6a behaviour for PartitionCount<=1. 2. Concurrency / distributed failures — Reaper still runs only on the leader (existing reaperCtx). Tombstone read is at the reaper's snapshot readTS; every individual reap is its own OCC dispatch with ErrWriteConflict treated as success (message touched by another path). Per-partition iteration is sequential; no new shared state. 3. Performance — Per-tick cost grows from O(legacy prefix scans) to O((partition_count + 1) * legacy prefix scans). Capped by sqsReaperPerQueueBudget per partition. PartitionCount=1 queues see no extra cost (the partitioned branch short-circuits on partitionCount<=1). 4. Data consistency — Tombstone deletion gated on full-drain across all partition + legacy prefix families. Partial drains leave the tombstone in place so the next reaper tick can resume. PartitionCount immutability invariant means the tombstone-encoded count and the reaper's iteration bound can never disagree with the live queue. 5. Test coverage — 4 unit tests on the tombstone-value codec (legacy roundtrip, partitioned encode, fallback for non- canonical values, out-of-range clamp). 1 wire-level integration test that creates a partitioned queue, sends to 6 distinct groups, calls DeleteQueue, hand-runs the reaper, and asserts every partitioned data / vis / byage / dedup / group prefix is empty + the tombstone itself is deleted.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
@claude review |
|
Claude finished @bootjp's task in 5m 23s —— View job Code Review — PR #735
|
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThis PR implements partition-count-encoded tombstones and partition-aware reaping for HT-FIFO partitioned queues. Queue deletions and purges now encode the queue's partition count into the tombstone value; the reaper decodes this count and performs targeted partition-scoped cleanup of byage, dedup, and group-lock key families before removing the tombstone. ChangesPartition-Count Tombstones and Partition-Aware Reaping
Sequence DiagramsequenceDiagram
participant Client as Client
participant Adapter as SQS Adapter
participant Store as KV Store
participant Reaper as Tombstone Reaper
Client->>Adapter: DeleteQueue(queue)
Note over Adapter: Load queue meta (with PartitionCount)
Adapter->>Adapter: Encode tombstone value<br/>encodeQueueTombstoneValue(PartitionCount)
Adapter->>Store: Put tombstone key with encoded value
Note over Reaper: Background reap tick
Reaper->>Store: Scan tombstone keys
Reaper->>Store: Fetch tombstone value
Reaper->>Reaper: Decode PartitionCount<br/>from tombstone value
alt PartitionCount > 1
loop For each partition [0, PartitionCount)
Reaper->>Store: Delete byage keys (partitioned prefix)
Reaper->>Store: Delete dedup keys (partitioned prefix)
Reaper->>Store: Delete group-lock keys (partitioned prefix)
end
end
Note over Reaper: Legacy sweep (if needed)
Reaper->>Store: Delete legacy byage/dedup/group keys
Reaper->>Store: Delete tombstone key
Reaper-->>Client: Queue fully reaped
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 44 minutes and 12 seconds.Comment |
There was a problem hiding this comment.
Code Review
This pull request enables the SQS tombstone reaper to drain partitioned queues by encoding the partition count into tombstone values and introducing partition-aware cleanup logic. Feedback identifies a potential resource exhaustion issue where the work budget is not correctly shared across partitions and suggests refactoring tombstone deletion to use more descriptive helpers.
| // Partitioned sweep: one (byage, dedup, group) triple per | ||
| // partition. Each triple shares the per-queue budget with the | ||
| // legacy sweep, so a wide-fanout queue may need multiple reaper | ||
| // ticks to fully drain — same contract as the live-queue reap. |
There was a problem hiding this comment.
The comment states that each triple 'shares the per-queue budget with the legacy sweep', but the implementation of reapPartitionedGeneration and its helpers each initialize their own processed counter. This results in a total work per tick of O(partitionCount * budget). To prevent potential resource exhaustion and ensure bounds are strictly enforced, please update the implementation to share the budget or correct the documentation to reflect the actual behavior.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests. Reject new requests when the bound is reached.
| allDone = allDone && partDone | ||
| } | ||
| if allDone { | ||
| _ = s.dispatchDedupDelete(ctx, tombstoneKey, readTS) |
There was a problem hiding this comment.
Using dispatchDedupDelete to delete the tombstone key is slightly confusing as the tombstone is not a deduplication record. To improve maintainability and follow the practice of using appropriate centralized helpers, consider using a more generic helper or calling coordinator.Dispatch directly with a kv.Del operation.
References
- Centralize common complex logic, such as retry/backoff mechanisms, into shared helper functions to avoid code duplication and improve maintainability.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
adapter/sqs_keys.go (1)
169-179: ⚡ Quick winDrop the new
//nolint:gosecby decoding intouint32directly.This branch is already bounded to
htfifoMaxPartitions, so the suppression is avoidable. Splitting the 8-byte payload into high/lowuint32s keeps the same wire format and removes the new lint escape hatch.♻️ Proposed refactor
func decodeQueueTombstoneValue(value []byte) uint32 { if len(value) != sqsGenerationSuffixLen { return 1 } - pc := binary.BigEndian.Uint64(value) - if pc == 0 || pc > uint64(htfifoMaxPartitions) { + hi := binary.BigEndian.Uint32(value[:4]) + pc := binary.BigEndian.Uint32(value[4:]) + if hi != 0 || pc == 0 || pc > htfifoMaxPartitions { return 1 } - // Bounded by htfifoMaxPartitions (=32) immediately above, so - // the uint32 narrow cannot overflow. - return uint32(pc) //nolint:gosec // bounded by htfifoMaxPartitions + return pc }As per coding guidelines, "Use
gofmtformatting and pass linters configured in.golangci.yaml(gocritic,gocyclo,gosec,wrapcheck,mnd, etc.). Avoid//nolintannotations — refactor instead."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_keys.go` around lines 169 - 179, The function decodeQueueTombstoneValue currently reads an 8-byte uint64 and uses //nolint:gosec; instead, decode directly into a uint32 to avoid the lint suppression: in decodeQueueTombstoneValue keep the length check against sqsGenerationSuffixLen, then extract the lower 4 bytes (using binary.BigEndian.Uint32 on value[4:]) (or the appropriate 4-byte slice preserving the wire format) into a uint32, validate that it is non-zero and <= htfifoMaxPartitions, and return that uint32; remove the //nolint:gosec comment and the uint64 path entirely so no narrowing lint is needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 621-649: Modify countPartitionedRows to accept a partitions int
parameter (replace the hard-coded 8) and include scans for the data and vis
keyspaces in addition to byage/dedup/group. Keep the existing readTS logic
(node.sqsServer.nextTxnReadTS) and prefixScanEnd usage, add the family-level
all-partitions prefixes for data/vis (e.g.
sqsPartitionedMsgDataPrefixForQueueAllPartitions(queueName) and
sqsPartitionedMsgVisPrefixForQueueAllPartitions(queueName)) to prefixes, and
iterate partition from 0 to partitions-1 to append
sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen) and
sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen). Update the two call
sites in this test to pass the actual partitions value. Ensure require.NoError
remains around each ScanAt call and total sums len(rows) as before.
---
Nitpick comments:
In `@adapter/sqs_keys.go`:
- Around line 169-179: The function decodeQueueTombstoneValue currently reads an
8-byte uint64 and uses //nolint:gosec; instead, decode directly into a uint32 to
avoid the lint suppression: in decodeQueueTombstoneValue keep the length check
against sqsGenerationSuffixLen, then extract the lower 4 bytes (using
binary.BigEndian.Uint32 on value[4:]) (or the appropriate 4-byte slice
preserving the wire format) into a uint32, validate that it is non-zero and <=
htfifoMaxPartitions, and return that uint32; remove the //nolint:gosec comment
and the uint64 path entirely so no narrowing lint is needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 76940a69-4c8d-4c68-b80d-0101a17c1008
📒 Files selected for processing (6)
adapter/sqs_catalog.goadapter/sqs_keys.goadapter/sqs_partitioned_dispatch_test.goadapter/sqs_purge.goadapter/sqs_reaper.goadapter/sqs_tombstone_value_test.go
| // countPartitionedRows sums the rows under every partitioned | ||
| // data / vis / byage / dedup / group prefix for a (queue, gen) | ||
| // cohort. Used by the tombstone-reaper integration test to assert | ||
| // the cohort is fully drained without enumerating individual keys. | ||
| func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int { | ||
| t.Helper() | ||
| ctx := context.Background() | ||
| readTS := node.sqsServer.nextTxnReadTS(ctx) | ||
| prefixes := [][]byte{ | ||
| // Match every partition by using the family-level "all | ||
| // partitions" prefix where one exists; for dedup / group | ||
| // the family-level prefix isn't pre-built, so iterate | ||
| // partitions [0, 4) explicitly. | ||
| sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName), | ||
| } | ||
| for partition := uint32(0); partition < 8; partition++ { | ||
| prefixes = append(prefixes, | ||
| sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen), | ||
| sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen), | ||
| ) | ||
| } | ||
| total := 0 | ||
| for _, prefix := range prefixes { | ||
| rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) | ||
| require.NoError(t, err) | ||
| total += len(rows) | ||
| } | ||
| return total | ||
| } |
There was a problem hiding this comment.
Expand this helper to scan the actual data/vis keyspace.
countPartitionedRows currently ignores partitioned data and vis rows, so this test can still pass if the reaper drops byage/dedup/group but leaks message records. The hard-coded 8 also makes the helper undercount future callers with a different partition count.
💡 Tighten the helper to cover the full cohort
-func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int {
+func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64, partitionCount uint32) int {
t.Helper()
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
- prefixes := [][]byte{
- // Match every partition by using the family-level "all
- // partitions" prefix where one exists; for dedup / group
- // the family-level prefix isn't pre-built, so iterate
- // partitions [0, 4) explicitly.
- sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName),
- }
- for partition := uint32(0); partition < 8; partition++ {
+ prefixes := make([][]byte, 0, int(partitionCount)*5)
+ for partition := uint32(0); partition < partitionCount; partition++ {
prefixes = append(prefixes,
+ sqsPartitionedMsgDataKey(queueName, partition, gen, ""),
+ sqsPartitionedMsgVisPrefixForQueue(queueName, partition, gen),
+ sqsPartitionedMsgByAgePrefixForPartition(queueName, partition, gen),
sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen),
sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen),
)
}
total := 0Please also pass partitions at the two call sites in this test.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@adapter/sqs_partitioned_dispatch_test.go` around lines 621 - 649, Modify
countPartitionedRows to accept a partitions int parameter (replace the
hard-coded 8) and include scans for the data and vis keyspaces in addition to
byage/dedup/group. Keep the existing readTS logic (node.sqsServer.nextTxnReadTS)
and prefixScanEnd usage, add the family-level all-partitions prefixes for
data/vis (e.g. sqsPartitionedMsgDataPrefixForQueueAllPartitions(queueName) and
sqsPartitionedMsgVisPrefixForQueueAllPartitions(queueName)) to prefixes, and
iterate partition from 0 to partitions-1 to append
sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen) and
sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen). Update the two call
sites in this test to pass the actual partitions value. Ensure require.NoError
remains around each ScanAt call and total sums len(rows) as before.
Addresses Claude's 4 review items on the round-0 commit. None
behaviour-changing; all narrow code-quality / clarity fixes.
1. Remove the //nolint:gosec on the uint64→uint32 narrow in
decodeQueueTombstoneValue (Claude Issue 1). CLAUDE.md is
explicit: "Avoid //nolint — refactor instead." Refactored to
read the 8-byte tombstone value as two big-endian uint32s
instead of a single uint64, then check the high half is zero
(must be — encoder only writes PartitionCount<=htfifoMaxPartitions
into the low 32 bits) and the low half is in
[1, htfifoMaxPartitions]. No narrowing conversion → no gosec
flag → no nolint annotation.
2. reapOneRecordPartitioned's synthetic meta uses
PartitionCount=2 instead of htfifoMaxPartitions (=32) as the
dispatch sentinel (Claude Issue 2). The dispatch helpers only
branch on PartitionCount>1; using the maximum implies the
synthetic meta carries the queue's real count, which it does
not. A future caller that read meta.PartitionCount for any
other purpose would silently get the wrong answer.
3. Stale comment fix in countPartitionedRows test helper
(Claude Issue 3): "[0, 4)" → "[0, 8)" to match the actual
loop bound and the test queue's PartitionCount=8.
4. New sqsTombstoneValueLen=8 constant for the tombstone value
size (Claude Issue 4). Was reusing sqsGenerationSuffixLen
which is a key-layout constant — incidentally also 8 today
but conceptually unrelated. Dedicated constant makes the
intent self-documenting and decouples the two: a future
tombstone-value encoding revision can change the value size
without touching key parsers.
Below threshold, intentionally not addressed:
- Gemini medium "shared budget" comment vs. per-partition
budget implementation (sqs_reaper.go:182). The PR description
explicitly states "O((partition_count + 1) × legacy prefix
scans)" — the per-partition budget IS the design. Comment was
already correct in describing the intent. This is a wording
nit at most; not P0/P1/high/critical/major.
- Gemini medium "use generic Dispatch instead of dispatchDedupDelete
for tombstone deletion" (sqs_reaper.go:193). Style suggestion;
the current call works correctly. Below threshold.
Verified: build clean, golangci-lint 0 issues, targeted test
sweep (-race) all green.
|
@claude review Round 1 — addressed Claude's 4 review items from round 0. None behaviour-changing.
Below threshold (intentionally not addressed):
Build clean, |
|
Claude finished @bootjp's task in 4m 19s —— View job Code Review — PR #735 Round 1
All four Round 0 issues are correctly addressed. No new issues introduced. Fix 1 —
|
## Summary
Phase 3.D PR 7a — adds the
`elastickv_sqs_partition_messages_total{queue, partition, action}`
Prometheus counter so dashboards and alerts can spot uneven
`MessageGroupId` distributions across partitioned-FIFO queues.
Metrics-only: the Jepsen HT-FIFO workload (PR 7b) ships separately so
the two review loops do not cross.
- `monitoring/sqs.go` (new): `SQSMetrics` with cardinality cap
(`sqsMaxTrackedQueues = 512`, overflow collapses to `_other`) mirroring
`DynamoDBMetrics`. Drops empty queue names and unknown action labels so
a future call-site bug cannot pollute the series space dashboards have
to learn about. Nil-receiver-safe so adapter call sites do not need to
nil-guard.
- `adapter/sqs.go`: `SQSPartitionObserver` interface +
`WithSQSPartitionObserver` option. Re-declared in `adapter` so it
doesn't import `monitoring` at the package boundary (matches the
DynamoDB/Redis observer pattern). Action constants
(`send`/`receive`/`delete`) re-declared on the adapter side and
validated at runtime by the monitoring side — drift between the two
surfaces as a dropped observation, not a wedge.
- `adapter/sqs_fifo.go`, `adapter/sqs_messages.go`: emit the counter on
the **partitioned** commit branch only (`PartitionCount > 1`) for send /
receive / delete. Legacy single-partition queues stay off the metric
since partition is always 0 and the cardinality cost would buy nothing.
- `monitoring/registry.go`, `main_sqs.go`, `main.go`: wire the
registry's `SQSPartitionObserver()` into `startSQSServer` so the SQS
server picks up the production observer on cluster boot. Test fixtures
and CLI tools that build `SQSServer` without a registry pass `nil` and
the metric stays at zero.
## Tests
`monitoring/sqs_test.go` (new, 6 cases):
- `TestSQSMetrics_ObservePartitionMessage_IncrementsByLabelTriple` — pin
the `(queue, partition, action)` counter contract.
- `TestSQSMetrics_ObservePartitionMessage_DropsInvalidAction` — pin the
typo guard against future drift between adapter and monitoring
constants.
- `TestSQSMetrics_ObservePartitionMessage_DropsEmptyQueue` — pin that an
empty queue name does not collapse with valid observations onto a shared
series.
- `TestSQSMetrics_NilReceiverIsSafe` — pin the nil-receiver
short-circuit the adapter relies on.
- `TestSQSMetrics_QueueLabelOverflow` — pin the cap-and-collapse so a
misbehaving caller cannot exhaust the Prometheus series budget.
- `TestSQSMetrics_RegistryWiring` — pin that the public `Registry`
exposes the metric under the documented name.
## Self-review (5 lenses)
1. **Data loss** — N/A; metrics-only, no storage / Raft / FSM touch.
2. **Concurrency** — counter increments are atomic via Prometheus; the
`trackedQueues` map is only consulted from the dispatch-success path
under the SQS server's existing concurrency model. No new locks.
3. **Performance** — one map lookup + one `CounterVec` lookup per
partitioned send/receive/delete on the success branch. Legacy queues
skip the call entirely. Cardinality bounded at 512 queue × 32 partition
(`htfifoMaxPartitions`) × 3 action ≈ 49k series worst case; in practice
a 32-partition queue yields 96 series, so the budget is plenty for the
SLO panels.
4. **Data consistency** — the metric is observed AFTER OCC dispatch
succeeds, so the counter reflects committed state. Receive/delete
branches that return on retryable errors deliberately do not increment
(the retry path will observe on the eventual success).
5. **Test coverage** — 6 unit tests in `monitoring/`, plus the
adapter-side nil-observer path is exercised by all existing
partitioned-FIFO tests in `adapter/sqs_partitioned_dispatch_test.go`
(they pass `nil` observer through the test fixture).
## Test plan
- [x] `go test -race -count=1 ./monitoring/...`
- [x] `go test -race -count=1 -run 'TestSQS' ./adapter/...`
- [x] `go test -race -count=1 ./...` (full suite)
- [x] `golangci-lint --config=.golangci.yaml run ./...` (full repo)
- [ ] Jepsen HT-FIFO workload — deferred to PR 7b
## Refs
- `docs/design/2026_05_01_partial_split_queue_fifo.md` §11 PR 7
- Builds on PR 5b-3 (#734) capability gate, PR 6a (#735) tombstone
reaper, PR 6b (#736 in flight) live-queue reaper.
….D PR 6b) (#736) ## Summary Phase 3.D PR 6b: live-queue reaper enumerates partitioned keyspace. Follow-up to PR #735 (6a) — the tombstone-driven sweep already walks partitioned data on DeleteQueue / PurgeQueue, but the live-queue retention reaper still only saw the legacy keyspace, so: - retention-expired messages on partitioned queues leaked their data / vis / byage / group rows forever (`reapQueue` walked `sqsMsgByAgePrefixAllGenerations` only), - expired dedup records on partitioned FIFO queues leaked forever (`reapExpiredDedup` scanned `SqsMsgDedupPrefix` only — empty for partitioned queues since `sqsMsgDedupKeyDispatch` routes their writes under `SqsPartitionedMsgDedupPrefix`). Closes the live-queue half of the Codex P2 from PR #732 round 0; PR 6a covered the tombstoned-cohort half. ## What changes - **`reapQueue`**: legacy byage walk extracted as `reapQueueLegacy` (byte-identical to pre-PR-6b for non-partitioned queues). Adds `reapQueuePartition` step that runs once per partition for `PartitionCount > 1` queues. Per-partition budget per the §6 design ("partitions × budget per cycle"); 30s tick interval comfortably absorbs. - **`reapPartitionedPage`**: partitioned twin of `reapPage`. Same live-vs-orphan classification, but parses each entry with `parseSqsPartitionedMsgByAgeKey` and routes the dispatch through `reapOneRecordPartitioned`. - **`classifyPartitionedByAgeEntry`**: helper extracted from `reapPartitionedPage` so the loop body stays under the cyclop ceiling. Returns `(parsedKey, reapable bool)`. - **`reapExpiredDedup`** (signature changed): now takes `*sqsQueueMeta` and routes by `PartitionCount`. Legacy meta → `reapExpiredDedupLegacy` (byte-identical). Partitioned meta → `reapExpiredDedupPartitioned` (NEW), iterates each partition's dedup prefix under its own per-partition budget. ## Caller audit - `reapQueue` — one production caller (`reapAllQueues`); signature unchanged. Non-partitioned queues byte-identical; partitioned get the extra per-partition pass. - `reapExpiredDedup` — signature changed to take `*sqsQueueMeta`; one production caller (`reapAllQueues`), updated. No tests called it directly. - New helpers (`reapQueueLegacy` / `reapQueuePartition` / `reapPartitionedPage` / `reapExpiredDedupLegacy` / `reapExpiredDedupPartitioned` / `classifyPartitionedByAgeEntry`) each have exactly one production caller in the new live-queue reap path. - `reapOneRecordPartitioned` (existing PR 6a helper): previously called from `reapDeadByAgePartitionPage` (tombstone path); now also from `reapPartitionedPage` (live-queue path). Same dispatch semantics. ## Tests - New `TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions`: 4-partition queue, send across 6 distinct groups, backdate every partitioned dedup record's `ExpiresAtMillis`, run `reapAllQueues`, assert every partitioned dedup row across `[0, 4)` is gone. Pre-PR-6b reaper would leave every row in place. ## Self-review (CLAUDE.md) 1. **Data loss** — Closes the live-queue dedup leak + partitioned retention-expired-message leak. Legacy queues unchanged. 2. **Concurrency / distributed failures** — Reaper still runs only on the leader. Per-partition pass is sequential; per-partition budget bounds the pass. OCC semantics on each record reap unchanged. 3. **Performance** — Per-tick partitioned-queue cost grows from O(1 walk) to O(partition_count walks) on byage AND dedup. Each partition bounded by `sqsReaperPerQueueBudget`. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design. 4. **Data consistency** — Live-vs-orphan classification on partitioned byage mirrors the legacy branch exactly (`reapPage` / `reapPartitionedPage` share the rules through `classifyPartitionedByAgeEntry`). `PartitionCount` immutability means the meta-driven iteration bound matches the on-disk keys. 5. **Test coverage** — One new wire-level integration test for the partitioned dedup walk; the partitioned byage walk reuses parsing / dispatch helpers already covered by PR 6a's tombstone-reap integration test. ## Test plan - [x] `make lint` — 0 issues - [x] Targeted reaper / retention / dedup / HTFIFO / PartitionedFIFO suites (-race, clean) - [x] Wider regression on Send/Receive/Delete + CreateQueue/DeleteQueue/PurgeQueue (-race, clean) - [ ] CI: full Jepsen + race <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Fixed deduplication record cleanup in partitioned FIFO queues to properly remove expired records across all partitions. * Enhanced cleanup mechanism to correctly handle partition-aware behavior and prevent memory leaks in partitioned queue scenarios. * **Tests** * Added comprehensive test for deduplication record cleanup across partitioned FIFO queue partitions. <!-- end of auto-generated comment: release notes by coderabbit.ai -->

Summary
Phase 3.D PR 6a: tombstone reaper enumerates partitioned keyspace. Closes the Codex P2 from PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"), deferred at the time as a "PR 6 concern". With PR #734 (5b-3) lifting the dormancy gate, partitioned FIFO queues can now land in production — but until this PR the tombstone reaper only swept the legacy keyspace, so partitioned data / vis / byage / dedup / group rows leaked permanently after DeleteQueue / PurgeQueue.
What changes
Tombstone schema: tombstone value now encodes
PartitionCountso the reaper can drive partition iteration off the tombstone alone (the meta record is gone by the time the reaper observes the tombstone). 8 bytes big-endian uint64. Backward compat:PartitionCount<=1keeps the byte-identical legacy[]byte{1}sentinel; non-canonical values (empty, 1-byte, 4-byte, out-of-range 8-byte) decode toPartitionCount=1so a partial rollback or future encoding revision degrades safely to the legacy single-partition reap.DeleteQueue + PurgeQueue write the encoded value, reading
PartitionCountfrom the pre-delete / pre-purge meta (immutable across SetQueueAttributes / PurgeQueue per §3.2).Tombstone reaper:
reapTombstonedGenerationtakes the decoded count, sweeps the legacy keyspace (always — covers pre-HT-FIFO queues plus a defensive pass) and onPartitionCount > 1ALSO sweeps every partition's byage / dedup / group prefix family via the newreapPartitionedGenerationhelper.allDonegating delays tombstone deletion until every partition is fully drained.Per-record dispatch:
reapOneRecordandbuildReapOpsnow take*sqsQueueMeta+partition uint32instead of hardcodingnil + 0. Legacy callers passnil + 0(byte-identical to pre-PR-5b layout); the partitioned twinreapOneRecordPartitionedsynthesises a meta carryingPartitionCount > 1so the dispatch helpers route tosqsPartitionedMsg{Data,Vis,Group}Key.New parsed-key helpers in
sqs_keys.go:sqsPartitionedMsgByAgePrefixForPartition(queue, partition, gen)sqsPartitionedMsgDedupKeyPrefix(queue, partition, gen)sqsPartitionedMsgGroupKeyPrefix(queue, partition, gen)Out of scope (follow-up PR 6b)
reapQueue) still iterates only the legacy byage prefix; partitioned-byage live retention is the next slice.PurgeQueue/DeleteQueuedo not iterate partition keyspace synchronously; the tombstone-driven async sweep is sufficient for AWS-shaped semantics.Caller audit
encodeQueueTombstoneValue— 2 production callers (DeleteQueue, PurgeQueue), both pass the queue'sPartitionCount.decodeQueueTombstoneValue— 1 production caller (reaper); legacy / out-of-range fallback to 1 documented and tested.reapOneRecord(signature changed) — 3 production callers:reapDeadByAgePage(legacy),reapDeadByAgePartitionPage(viareapOneRecordPartitioned, partitioned),reapQueue's over-retention sweep (legacy). All updated; partitioned path uses the synthetic meta to flip the dispatch helper branch.reapPartitionedGeneration,reapDeadByAgePartition,reapDeadByAgePartitionPage) each have exactly one caller in the new tombstone-reap path.Self-review (CLAUDE.md)
PartitionCount<=1.reaperCtx). Tombstone read is at the reaper's snapshotreadTS; every individual reap is its own OCC dispatch withErrWriteConflicttreated as success. Per-partition iteration is sequential; no new shared state.sqsReaperPerQueueBudgetper partition.PartitionCount=1queues see no extra cost (partitioned branch short-circuits).PartitionCountimmutability invariant means the tombstone-encoded count and the reaper's iteration bound can never disagree with the live queue.reapTombstonedQueues, and asserts every partitioned data / vis / byage / dedup / group prefix is empty + the tombstone itself is deleted.Test plan
make lint— golangci-lint 0 issuesSummary by CodeRabbit
Release Notes
Bug Fixes
Tests