Skip to content

feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732

Merged
bootjp merged 7 commits intomainfrom
feat/sqs-htfifo-dispatch-wiring
May 4, 2026
Merged

feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732
bootjp merged 7 commits intomainfrom
feat/sqs-htfifo-dispatch-wiring

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented May 2, 2026

Summary

Stage 2 of Phase 3.D PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage / ChangeMessageVisibility through the per-key dispatch helpers landed in PR 5b-1 (#731). The §11 PR 2 dormancy gate still rejects PartitionCount > 1 at CreateQueue, so production queues stay on the legacy keyspace and observable behaviour is byte-identical. PR 5b-3 lifts the gate atomically with the capability check.

What's wired

Send paths

  • sendMessage, sendBatchStandardOnce: partition is a compile-time 0 (Standard queues reject PartitionCount > 1 via the cross-attribute validator). Dispatch helpers route to the legacy keyspace on the steady-state path.
  • sendFifoMessage: hashes MessageGroupId once via partitionFor and threads the result through every key construction (data, vis, byage, dedup, group-lock).

Receive fanout

  • scanAndDeliverOnce wraps the scan loop with a per-partition fanout over effectivePartitionCount(meta) iterations. The shared wall-clock + per-call Max budget caps apply across the whole call, not per-partition.
  • sqsMsgCandidate carries a new partition field stamped at scan time, so loadCandidateRecord / expireMessage / commitReceiveRotation / classifyFifoGroupLock all route to the partition the message was originally stored under.

Delete + ChangeMessageVisibility

  • handle.Partition from the v2 receipt handle drives every key construction.
  • loadMessageForDelete / loadAndVerifyMessage invoke the new validateReceiptHandleVersion once meta is in scope.

Receipt-handle dispatch (the contract shift)

  • encodeReceiptHandleDispatch(meta, partition, ...) is the single point that picks v1 vs v2.
  • decodeClientReceiptHandle is now version-agnostic; the queue-aware version check moved to validateReceiptHandleVersion. v1-on-partitioned and v2-on-non-partitioned both surface as ReceiptHandleIsInvalid, preserving the dormancy promise (no v2 wire-format probability from the public API) under the new contract. Since the §11 PR 2 dormancy gate keeps every production queue non-partitioned in this PR, every v2 handle still surfaces as ReceiptHandleIsInvalid downstream — exactly the PR 5a observable behaviour.

Reaper

The reaper iterates legacy byAge keys only — partition-iterating enumeration ships in the later partition-reaper PR. buildReapOps / reapOneRecord pass nil meta + partition 0 through dispatch helpers so the keys are byte-identical to the pre-PR-5b layout.

What does NOT change yet

  • CreateQueue still rejects PartitionCount > 1 with InvalidAttributeValue (the §11 PR 2 dormancy gate, lifted atomically in PR 5b-3).
  • Production queues never enter the partitioned dispatch branches in this PR — observable behaviour is byte-identical.
  • The reaper does not yet enumerate partitioned byAge keys (a later PR).

Test plan

10 new tests across the contract surface.

Unit tests (adapter/sqs_keys_dispatch_v2_test.go):

  • TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount — pins the version dispatch decision across nil / 0 / 1 / 4 metas.
  • TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1 — protects the byte-identical guarantee on legacy queues.
  • TestEncodeReceiptHandleDispatch_PerQueueUsesV2perQueue + PartitionCount=4 still produces v2 handles (keyspace is partitioned even when partitionFor collapses every group to partition 0).
  • TestValidateReceiptHandleVersion_QueueAwareRules — 10 sub-cases covering the (meta.PartitionCount × handle.Version) matrix, including perQueue + PartitionCount=4.
  • TestValidateReceiptHandleVersion_NilHandle — defensive nil branch.
  • TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned — named regression for the dormancy guarantee under the new contract.
  • TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy — byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
  • TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix — different partitions yield disjoint scan ranges.
  • TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace — pins the PR feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) #731 round 2 forward-note invariant: perQueue + PartitionCount=4 keeps the partitioned vis prefix at partition 0; collapsing to legacy would silently strand send writes.

Integration tests (adapter/sqs_partitioned_dispatch_test.go) install a partitioned meta directly on a queue created via the public API, short-circuiting the dormancy gate without disabling it for production CreateQueue:

  • TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip — end-to-end smoke test: send 6 groups, receive surfaces all via fanout, every handle is v2, delete via v2 handle, queue is empty afterwards, legacy keyspace stays empty.
  • TestSQSServer_PartitionedFIFO_RejectsV1Handle — forged v1 handle on a partitioned queue surfaces as ReceiptHandleIsInvalid via DeleteMessage and ChangeMessageVisibility.
  • TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZeroperQueue + PartitionCount=4 receive surfaces every message in one fanout pass; every v2 handle records Partition=0.

Updated TestDecodeClientReceiptHandle_RejectsV2TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract shift (rejection moved from API boundary to meta-aware validateReceiptHandleVersion).

  • go test -race ./adapter/... (targeted SQS scope) clean.
  • golangci-lint run ./adapter/... clean.

Self-review (per CLAUDE.md)

  1. Data loss — Dispatch helpers byte-identical on legacy queues (PartitionCount<=1 routes to legacy constructors). DLQ FIFO computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId) so cross-queue redrive lands in the right partition. Receive fanout scans every partition. No issue.
  2. Concurrency — Receive fanout iterates partitions sequentially under one shared wall-clock + max budget, so the original per-call SLA is preserved. meta is loaded once and passed by pointer through helpers; no concurrent meta refresh in a single call. validateReceiptHandleVersion runs after the gen check so a SetQueueAttributes race cannot flip the answer (PartitionCount is immutable). go test -race clean.
  3. Performance — Hot path adds 1 nil-check + 1 PartitionCount compare per dispatch. Receive fanout iterates effectivePartitionCount(meta) times: 1 on legacy / perQueue, N on perMessageGroupId with N partitions splitting roughly 1/N of the messages each.
  4. Data consistency — Receipt handle's recorded partition matches the storage partition (commitReceiveRotation uses cand.partition for both newVisKey and the handle). Cross-version handles rejected as ReceiptHandleIsInvalid — no leak between keyspaces. perQueue + PartitionCount=4 invariant pinned by both unit and integration tests.
  5. Test coverage — 10 new tests across the contract surface. Existing legacy-path SQS tests pass unchanged.

Caller audit (semantic-change discipline)

decodeClientReceiptHandle semantics changed from "reject all v2" to "decode any version, defer version validation to meta-aware caller". All 3 production callers audited via grep:

  • parseQueueAndReceipt (sqs_messages.go:1576) → deleteMessageWithRetryloadMessageForDeletevalidateReceiptHandleVersion after meta load. ✅
  • DeleteMessageBatch entry (sqs_messages_batch.go:475) → deleteMessageWithRetry → same as above. ✅
  • ChangeMessageVisibilityBatch entry (sqs_messages_batch.go:575) → changeVisibilityWithRetryloadAndVerifyMessagevalidateReceiptHandleVersion. ✅

The dormancy guarantee (v2 → ReceiptHandleIsInvalid on non-partitioned) is preserved across every caller path.

Follow-ups

  • PR 5b-3 (next): atomic gate-lift + CreateQueue capability check via PollSQSHTFIFOCapability (feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) #721). Removes the validatePartitionDormancyGate call and verifies cluster-wide HT-FIFO support before accepting PartitionCount > 1.
  • PR 6: partition-iterating reaper (drops the "legacy byAge only" caveat in buildReapOps).

Summary by CodeRabbit

  • New Features

    • Partition-aware message routing for FIFO and standard queues.
    • Receipt-handle v2 introduced for partitioned queues; v1 retained for non-partitioned queues.
    • Per-queue receive fanout rotation to round-robin partitions.
  • Bug Fixes

    • Stronger queue-aware receipt-handle validation and bounds checking.
    • Partition-aware fixes across send/receive/delete/change-visibility/redrive/reaper flows.
    • Per-queue fanout state reset on queue create/delete.
  • Tests

    • Extensive partitioned dispatch, receipt-handle, visibility-scan, dedup, rotation, and end-to-end tests.

…(Phase 3.D PR 5b-2)

Stage 2 of PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage
/ ChangeMessageVisibility through the per-key dispatch helpers
landed in PR 5b-1. The section 11 PR 2 dormancy gate still rejects
PartitionCount > 1 at CreateQueue, so production queues remain on
the legacy keyspace and observable behaviour is byte-identical.
PR 5b-3 lifts the gate atomically with the capability check.

Wiring

  - SendMessage (sendMessage, sendBatchStandardOnce): partition is
    a compile-time 0 (Standard queues reject PartitionCount > 1
    via the cross-attribute validator). Dispatch helpers route to
    legacy on the steady-state path.
  - sendFifoMessage: hashes MessageGroupId once via partitionFor
    and threads the result through every key construction (data,
    vis, byage, dedup, group-lock).
  - ReceiveMessage: scanAndDeliverOnce wraps the scan loop with a
    per-partition fanout over effectivePartitionCount(meta)
    iterations. The shared wall-clock + per-call max budget caps
    apply across the whole call, not per-partition. sqsMsgCandidate
    carries the partition field stamped at scan time so downstream
    rotate / delete / expire helpers route to the right partition.
  - DeleteMessage / ChangeMessageVisibility: handle.Partition from
    the v2 receipt handle drives every key construction.

Receipt-handle dispatch

  encodeReceiptHandleDispatch(meta, partition, ...) is the single
  point that picks v1 vs v2. ReceiveMessage on a partitioned queue
  records cand.partition into the handle so DeleteMessage routes
  back to the same partition.

  decodeClientReceiptHandle is now version-agnostic; the queue-
  aware version check moved to validateReceiptHandleVersion which
  runs in loadMessageForDelete / loadAndVerifyMessage once meta is
  in scope. v1 on a partitioned queue and v2 on a non-partitioned
  queue both surface as ReceiptHandleIsInvalid -- preserving the
  dormancy promise (no v2 wire-format probability from the public
  API) under the new contract. The section 11 PR 2 dormancy gate
  still in force in this PR keeps every queue non-partitioned, so
  v2 handles still surface as ReceiptHandleIsInvalid downstream,
  the exact PR 5a observable behaviour.

Reaper

  Reaper iterates legacy byAge keys only -- partition-iterating
  enumeration ships in the later partition reaper PR. buildReapOps
  / reapOneRecord pass nil meta + partition 0 through dispatch
  helpers so the keys are byte-identical to the pre-PR-5b layout.

Tests (10 added)

  Unit tests (sqs_keys_dispatch_v2_test.go):
    - TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount:
      pins the version dispatch decision across nil/0/1/4 metas.
    - TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1:
      protects the byte-identical guarantee on legacy queues.
    - TestEncodeReceiptHandleDispatch_PerQueueUsesV2: pins that
      perQueue + PartitionCount=4 still produces v2 handles
      (keyspace is partitioned even when partitionFor collapses
      every group to partition 0).
    - TestValidateReceiptHandleVersion_QueueAwareRules: 10
      sub-cases covering the (meta.PartitionCount x handle.Version)
      matrix, including perQueue + PartitionCount=4.
    - TestValidateReceiptHandleVersion_NilHandle: defensive nil.
    - TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned:
      named regression for the dormancy guarantee under the new
      contract.
    - TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy:
      byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
    - TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix:
      different partitions yield disjoint scan ranges.
    - TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace:
      pins the PR 731 round 2 forward-note invariant -- perQueue
      + PartitionCount=4 keeps the partitioned vis prefix at
      partition 0; collapsing to legacy would silently strand
      send writes.

  Integration tests (sqs_partitioned_dispatch_test.go) install a
  partitioned meta directly on a queue created via the public API,
  short-circuiting the dormancy gate without disabling it for
  production CreateQueue:
    - TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip:
      end-to-end smoke test (send 6 groups, receive surfaces all
      via fanout, every handle is v2, delete via v2 handle, queue
      is empty afterwards, legacy keyspace stays empty).
    - TestSQSServer_PartitionedFIFO_RejectsV1Handle: forged v1
      handle on a partitioned queue surfaces as ReceiptHandleIsInvalid
      via DeleteMessage and ChangeMessageVisibility.
    - TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero:
      perQueue + PartitionCount=4 receive surfaces every message
      in one fanout pass; every v2 handle records Partition=0.

  Updated TestDecodeClientReceiptHandle_RejectsV2 to
  TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract
  shift (rejection moved from API boundary to meta-aware
  validateReceiptHandleVersion).

Self-review (per CLAUDE.md)

1. Data loss -- Dispatch helpers byte-identical on legacy queues
   (PartitionCount<=1 routes to legacy constructors). DLQ FIFO
   computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId)
   so cross-queue redrive lands in the right partition. Receive
   fanout scans every partition. No issue.
2. Concurrency -- Receive fanout iterates partitions sequentially
   under one shared wall-clock + max budget, so the original
   per-call SLA is preserved. meta is loaded once and passed by
   pointer through helpers; no concurrent meta refresh in a
   single call. validateReceiptHandleVersion runs after the gen
   check so a SetQueueAttributes race cannot flip the answer
   (PartitionCount is immutable). go test -race clean.
3. Performance -- Hot path adds 1 nil-check + 1 PartitionCount
   compare per dispatch. Receive fanout iterates effectivePartitionCount(meta)
   times: 1 on legacy / perQueue, N on perMessageGroupId, with
   N partitions splitting roughly 1/N of the messages each.
4. Data consistency -- Receipt handle's recorded partition matches
   the storage partition (commitReceiveRotation uses cand.partition
   for both newVisKey and the handle). Cross-version handles
   rejected as ReceiptHandleIsInvalid -- no leak between keyspaces.
   perQueue + PartitionCount=4 invariant pinned by both unit and
   integration tests.
5. Test coverage -- 10 new tests across the contract surface.
   Existing legacy-path SQS tests pass unchanged.

go test -race ./adapter/... and golangci-lint clean.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 2, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 290d4913-d395-4cf8-9f5f-2e94f01c72b1

📥 Commits

Reviewing files that changed from the base of the PR and between 3961d4a and 5047223.

📒 Files selected for processing (2)
  • adapter/sqs_keys.go
  • adapter/sqs_keys_dispatch_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • adapter/sqs_keys_dispatch_test.go

📝 Walkthrough

Walkthrough

Wires partition-aware key and receipt-handle dispatch across SQS flows: send (FIFO and standard), receive (partition fanout with rotated start), delete, change-visibility, redrive, reaper, and batching. Adds v2 receipt-handle encoding/validation for partitioned queues and stamps message candidates with partition routing.

Changes

Partition-Aware SQS Dispatch

Layer / File(s) Summary
Dispatch Helpers / Validation
adapter/sqs_keys_dispatch.go, adapter/sqs_keys_dispatch_v2_test.go
Adds sqsMsgVisScanBoundsDispatch, encodeReceiptHandleDispatch, and validateReceiptHandleVersion; sqsMsgDedupKeyDispatch now accepts groupID; tests for encoding, validation, and vis-scan bounds added.
Key Shape
adapter/sqs_keys.go, adapter/sqs_keys_test.go
sqsPartitionedMsgDedupKey now takes groupID and inserts a `
FIFO Send / Dedup / Group Lock
adapter/sqs_fifo.go
sendFifoMessage, loadFifoDedupRecord, loadFifoGroupLock, and classifyFifoGroupLock compute/accept partition (via partitionFor(meta, MessageGroupId)) and use sqsMsg*KeyDispatch keys.
Receive / Scanning / Candidates
adapter/sqs_messages.go
Receive now fans out across partitions with rotating start (nextReceiveFanoutStart / dropReceiveFanoutCounter); sqsMsgCandidate carries partition; scans, candidate loading, delivery, rotate/commit, expiry, and receipt-handle encoding use dispatch helpers; decodeClientReceiptHandle only decodes and validation moved to validateReceiptHandleVersion.
Standard Batch & Delete/Visibility Paths
adapter/sqs_messages_batch.go, adapter/sqs_messages.go
Standard send-batch and delete/change-visibility use dispatch key builders (non-partitioned queues use partition=0); delete/visibility load paths validate handle version and build keys from decoded handle.Partition/handle.QueueGeneration.
Redrive to DLQ
adapter/sqs_redrive.go
redriveCandidateToDLQ and buildRedriveOps accept srcMeta (not srcGen) and compute DLQ/source keys with sqsMsg*KeyDispatch, deriving dlqPartition := partitionFor(dlqMeta, MessageGroupId).
Reaper
adapter/sqs_reaper.go
Reaper deletion uses dispatch key constructors (hardcoded partition=0 for legacy iteration) and loads partition-aware group-lock keys.
Server State / Catalog
adapter/sqs.go, adapter/sqs_catalog.go
Adds receiveFanoutCounters sync.Map to SQSServer; queue create/delete now call dropReceiveFanoutCounter to clear per-queue fanout state.
Tests / Integration
adapter/sqs_partitioned_dispatch_test.go, adapter/sqs_keys_dispatch_test.go, adapter/sqs_receipt_handle_v2_test.go
Adds end-to-end partitioned FIFO round-trip tests, receipt-handle version/validation tests, dedup-key routing tests, receive fanout rotation/cleanup tests, and updates to receipt-handle decode expectations to accept v2.

Sequence Diagram

sequenceDiagram
    participant Client
    participant SQS_Server
    participant Partitioner
    participant Visibility_Store
    participant Data_Store

    rect rgba(100, 150, 200, 0.5)
    Note over Client,SQS_Server: Partitioned FIFO Send
    Client->>SQS_Server: SendMessage(MessageGroupId="group-A")
    SQS_Server->>Partitioner: partitionFor(meta, "group-A")
    Partitioner-->>SQS_Server: partition = 2
    SQS_Server->>Data_Store: Write(sqsMsgDataKeyDispatch(..., partition=2, ...))
    SQS_Server->>Visibility_Store: Write(sqsMsgVisKeyDispatch(..., partition=2, ...))
    SQS_Server->>SQS_Server: encodeReceiptHandleDispatch(..., partition=2) → v2 handle
    SQS_Server-->>Client: SendMessageResponse(ReceiptHandle=v2)
    end

    rect rgba(200, 100, 150, 0.5)
    Note over Client,Visibility_Store: Partitioned Fanout Receive
    Client->>SQS_Server: ReceiveMessage(MaxMessages=10)
    SQS_Server->>SQS_Server: readTS → startPartitionOffset → partition[0..N)
    loop For Each Partition
        SQS_Server->>Visibility_Store: Scan(sqsMsgVisScanBoundsDispatch(..., partition, ...))
        Visibility_Store-->>SQS_Server: Candidates{partition, visKey, messageID}
    end
    SQS_Server->>Data_Store: Load(sqsMsgDataKeyDispatch(..., cand.partition, ...))
    SQS_Server->>SQS_Server: encodeReceiptHandleDispatch(..., cand.partition) → v2 handle
    SQS_Server-->>Client: ReceiveMessageResponse(Messages=[...], ReceiptHandles=[v2])
    end

    rect rgba(150, 200, 100, 0.5)
    Note over Client,Data_Store: Partition-Aware Delete
    Client->>SQS_Server: DeleteMessage(ReceiptHandle=v2)
    SQS_Server->>SQS_Server: validateReceiptHandleVersion(meta, v2_handle)
    SQS_Server->>Data_Store: Delete(sqsMsgDataKeyDispatch(..., v2_handle.partition, ...))
    SQS_Server->>Visibility_Store: Delete(sqsMsgVisKeyDispatch(..., v2_handle.partition, ...))
    SQS_Server-->>Client: DeleteMessageResponse()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hopped through keyspace, soft and quick,
Group-ids mapped to shards with a clever trick.
V2 handles hum which partition to seek,
Fanouts spin round, each receive unique.
Hop—send—delete—now the queue is slick!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: wiring partitioned-FIFO data plane operations through dispatch helpers, matching the core objective of Phase 3.D PR 5b-2.
Docstring Coverage ✅ Passed Docstring coverage is 94.59% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-htfifo-dispatch-wiring

Tip

💬 Introducing [Slack Agent](https://www.coderabbit.ai/agent): Turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value).


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 2, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 2, 2026

Claude finished @bootjp's task in 8m 28s —— View job


Code Review: feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)

  • Gather PR context and understand the request
  • Read changed files (9 adapter files, ~900 additions / ~160 deletions)
  • Analyze dispatch helpers and wiring logic
  • Review test coverage
  • Check for data loss, concurrency, and consistency issues
  • Post complete review

Overall assessment

This is a well-structured, carefully staged PR. The core abstraction — a single dispatch layer (sqs_keys_dispatch.go) that routes to legacy or partitioned key constructors based on meta.PartitionCount — is the right shape, and the dormancy guarantee (no observable change on production queues) holds cleanly. The self-review in the PR description is thorough and accurate on all five CLAUDE.md lenses. There is one medium-severity finding and a few minor notes.


Medium: validateReceiptHandleVersion does not bound-check handle.Partition against meta.PartitionCount

File: adapter/sqs_keys_dispatch.go:165–196

validateReceiptHandleVersion verifies that the handle's version matches the queue's partition mode, but it does not verify that the partition index in a v2 handle is within [0, meta.PartitionCount).

A client that receives a legitimate v2 handle (partition = N) could forge a new v2 handle for the same queue with an out-of-range partition (e.g., partition = 63 on a 4-partition queue). After passing the version check, loadMessageForDelete constructs a key for partition 63, gets ErrKeyNotFound, and returns sqsDeleteNoOp — silently succeeding. This is AWS-idempotent but slightly incorrect semantics for a forged handle. More importantly, it leaks structural information: the client can confirm partition-key-space boundaries exist at that queue by observing the silent success vs. an explicit invalid-handle error.

// Suggested addition in validateReceiptHandleVersion, after the version checks:
if meta != nil && meta.PartitionCount > 1 && handle.Partition >= meta.PartitionCount {
    return errors.New("receipt handle partition out of range")
}

Since PartitionCount is immutable after queue creation (enforced by validatePartitionImmutability), this check is race-safe. The dormancy gate in PR 5b-2 means this can't be triggered today, but it should be in place before the gate is lifted in PR 5b-3.

Fix this →


Minor observations

1. now is computed per-partition in scanAndDeliverPartition (sqs_messages.go:901)

Before this PR, now was computed once in scanAndDeliverOnce. After, it is computed fresh at the start of each scanAndDeliverPartition call, meaning a message that crosses its visibility threshold between partitions 0 and 1 during a fanout receive might be delivered in partition 1's scan but would not have been found in a single-partition equivalent. This is strictly a behavior improvement (AWS does not guarantee anything stronger than eventual delivery anyway), but it is a subtle semantic delta worth noting in the PR description for future readers.

2. Integration test's legacy-keyspace probe end key uses 4 bytes of 0xFF (sqs_partitioned_dispatch_test.go:184)

end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)

The actual message-ID suffix in the key is a base64url-encoded 16-byte UUID (~22 chars). The 4-byte 0xFF suffix works correctly because all base64url characters (A-Z, a-z, 0-9, -, _) have byte values < 0xFF, so any base64url string of any length compares less than [0xFF 0xFF 0xFF 0xFF]. However, using prefixScanEnd(legacyDataPrefix) (already available in the codebase at adapter/prefix_scan.go) would make the intent explicit and match the pattern used in the rest of the reaper code. Not a bug, just a consistency nit.

3. sqsMsgVisScanBoundsDispatch duplicates the scan-bounds logic from sqsMsgVisScanBounds (sqs_keys_dispatch.go:128–141)

The two functions share identical logic (construct prefix, zero-pad start, increment upper bound for end). A minor refactor opportunity: sqsMsgVisScanBoundsDispatch could delegate to sqsMsgVisScanBounds on the legacy branch rather than re-implementing the bounds math inline. The test TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy catches any divergence, so this is low risk either way.


Correctness verification (key paths)

Path Partition source Consistent with send?
sendFifoMessage key construction partitionFor(meta, in.MessageGroupId)
commitReceiveRotation vis-key + handle cand.partition (stamped at scan)
buildDeleteOps vis/byage/group-lock keys handle.Partition (v2 handle)
changeVisibilityWithRetry old/new vis key handle.Partition (v2 handle)
buildRedriveOps DLQ keys partitionFor(dlqMeta, dlqRec.MessageGroupId)
Reaper (buildReapOps) nil meta + partition 0 → legacy only ✅ (intentional, PR 6 will extend)
validateReceiptHandleVersion version-only check ⚠️ partition bounds not checked

Snapshot consistency: readTS is computed once in scanAndDeliverOnce and threaded identically through all partition fanout calls — consistent multi-partition snapshot. ✅

Group-lock partition coherence: partitionFor is deterministic (FNV-1a, power-of-two mask); a given MessageGroupId maps to exactly one partition for the lifetime of a queue generation. The group lock is always written and read at that same partition. ✅

PartitionCount immutability: validatePartitionImmutability is enforced on SetQueueAttributes, making the meta-dispatch pattern race-safe without re-loading meta inside OCC transactions. ✅

Dormancy preservation: Every code path that could emit a v2 handle is gated on meta.PartitionCount > 1, which CreateQueue continues to reject via validatePartitionDormancyGate until PR 5b-3. ✅


Test coverage

10 new tests are well-targeted. Particular highlights:

  • TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace directly pins the forward-note invariant from PR 731 round 2 (perQueue + partitioned keyspace must scan partitioned prefix, not legacy).
  • TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip sanity-checks the legacy keyspace is empty after all operations, which is the cleanest possible regression gate for future dispatch regressions.
  • TestDecodeClientReceiptHandle_AcceptsV2 correctly documents the contract shift.

Missing: a test that a v2 handle with Partition >= meta.PartitionCount is rejected (tracking the medium finding above).


Verdict: Ready to merge after the partition bounds check is added to validateReceiptHandleVersion and covered by a test. The rest of the implementation is sound.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the data plane wiring for partitioned FIFO queues. It introduces dispatch helpers to route key construction, receipt handle encoding, and version validation based on the queue's partition count. Updates were made to message sending, receiving, deletion, and redriving logic to support these partitions. A review comment pointed out an improvement opportunity in the integration tests to use a more robust helper for calculating prefix scan bounds instead of manually appending bytes.

Comment on lines +181 to +184
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for creating the end key for the prefix scan is incorrect. Appending 0xFF bytes does not correctly create an exclusive upper bound for all cases, especially when the prefix itself contains 0xFF bytes, which could cause the scan to read into adjacent key ranges (e.g., a different queue generation). This could lead to test flakiness or incorrect test behavior.

A more robust way to create an end key for a prefix is to find the next lexicographical key after the prefix. The codebase already has a helper for this, prefixScanEnd. You should use it here for correctness and consistency.

Suggested change
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
// The end key for a prefix scan is the smallest key that does not
// have the prefix. This correctly bounds the scan to only keys
// within the given queue and generation.
end := prefixScanEnd(legacyDataPrefix)

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 443ceb5f9c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go Outdated
Comment on lines +873 to +875
for partition := uint32(0); partition < partitions; partition++ {
if len(delivered) >= opts.Max {
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid fixed partition-0 priority in receive fanout

When PartitionCount > 1, this loop always starts at partition 0 and exits as soon as opts.Max is filled, so repeated ReceiveMessage calls can permanently starve higher-index partitions whenever partition 0 has a sustained backlog. This only appears on partitioned queues, but once enabled it can make some message groups effectively undeliverable under load because later partitions are never scanned.

Useful? React with 👍 / 👎.

return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject out-of-range v2 receipt-handle partitions

The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.

Useful? React with 👍 / 👎.

…und 1)

Codex P1: with the fanout always starting at partition 0, a sustained
backlog on partition 0 would fill opts.Max before the loop reached
partition 1 — messages in higher-index partitions were never observed
under load. Rotate the starting partition by readTS so consecutive
receives spread across every partition.

Helper startPartitionOffset folds the upper and lower 32 bits of the
HLC-derived readTS and mask-ANDs by partitions-1 (PartitionCount is
power-of-two by validator invariant). The byte-slice fold preserves
entropy from both HLC halves while keeping the conversion in uint32
space — no //nolint needed for gosec G115.

FIFO ordering is unaffected: partitionFor is deterministic, so a
MessageGroupId always lands in one partition and cross-partition
iteration order does not reorder messages within any group.

Caller audit: scanAndDeliverOnce / scanAndDeliverPartition signatures
and error semantics are unchanged; only the internal iteration order
rotates. New helper has only the one in-package caller.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 2, 2026

@claude review

Round 1 addressing Codex P1: rotate the receive-fanout starting partition by readTS so a sustained backlog on a single partition cannot starve higher-index partitions.

Changes:

  • adapter/sqs_messages.go — new startPartitionOffset(partitions, readTS) helper. scanAndDeliverOnce now iterates (startOffset + i) % partitions. Implementation folds the upper and lower 32 bits of the HLC-derived readTS and mask-ANDs by partitions - 1 (power-of-two by validator invariant), keeping the computation in uint32 space (gosec G115 clean, no //nolint).
  • adapter/sqs_partitioned_dispatch_test.goTestStartPartitionOffset_RotatesByReadTS pins: legacy/perQueue → 0, every partition appears as a starting point across consecutive readTS, and the output is always in [0, partitions).

FIFO ordering: unaffected. partitionFor is deterministic, so a MessageGroupId always lands in one partition; cross-partition iteration order never reorders messages within any group.

Caller audit: scanAndDeliverOnce and scanAndDeliverPartition signatures and error semantics are unchanged — only the internal iteration order rotates. The new helper has one in-package caller.

P2/medium findings (out-of-range v2 partition validation, prefix-scan helper in test) are intentionally deferred — below the loop's P0/P1/high/critical/major threshold.

@claude
Copy link
Copy Markdown

claude Bot commented May 2, 2026

Claude finished @bootjp's task in 2m 22s —— View job


Code Review: Round 1 — Partition-starvation fix (startPartitionOffset)

  • Gather PR context and understand the request
  • Read adapter/sqs_messages.gostartPartitionOffset helper + fanout loop change
  • Read adapter/sqs_partitioned_dispatch_test.goTestStartPartitionOffset_RotatesByReadTS
  • Review rotation logic correctness (gosec G115, power-of-two invariant)
  • Review FIFO ordering preservation
  • Review test coverage and edge cases
  • Post complete review

Overall assessment

The fix is correct and well-scoped. The P1 finding (partition 0 starvation under sustained backlog) is addressed cleanly without touching any signatures or error semantics. Implementation and test match the stated intent. A few minor observations follow.


Correctness: startPartitionOffset

func startPartitionOffset(partitions uint32, readTS uint64) uint32 {
    if partitions <= 1 {
        return 0
    }
    var buf [8]byte
    binary.BigEndian.PutUint64(buf[:], readTS)
    folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8])
    return folded & (partitions - 1)
}

Power-of-two invariantPartitionCount is power-of-two enforced by htfifoValidatePartitionCount; effectivePartitionCount also returns 1 (= 2⁰) for the perQueue collapse, so mask-AND is equivalent to modulo throughout. ✅

gosec G115 avoidance — the byte-slice fold sidesteps the explicit uint64 → uint32 narrowing that gosec would flag. The approach correctly mirrors partitionFor. ✅

Distribution for real HLC values — the HLC layout is (physical_ms << 16) | logical_counter. Splitting at the 32-bit boundary gives:

  • upper32 = physical_ms >> 16 (changes slowly, ~once per ms)
  • lower32 = (physical_ms & 0xFFFF) << 16 | logical_counter (low 2 bits driven by logical_counter)

For consecutive receives in the same millisecond, logical_counter increments by 1 per call, so the low bits of folded increment by 1 → sequential cycling through all N partitions. ✅ For cross-millisecond boundaries, both halves change, maintaining spread.

Overflow safety in the loopstartOffset + i < 2 * partitions; with htfifoMaxPartitions = 32, max sum is 62, well within uint32. % partitions is safe since effectivePartitionCount guarantees partitions >= 1. ✅


Minor observations

1. Test exercises only low ts values where upper 32 bits are zero (sqs_partitioned_dispatch_test.go:335–353)

The coverage loop runs ts = 1..64 and ts = 0..1023. With these values, upper32 = 0, so folded = lower32 = ts, and the offset cycles trivially: ts & 3 = 0,1,2,3,0,.... This is correct but it doesn't exercise the XOR fold with non-zero upper bits. Real HLC timestamps have physical_ms >> 16 ≈ 0x190xxx (non-zero). Adding a sub-case with an HLC-range seed (e.g., ts = uint64(1_735_000_000_000) << 16) would give more confidence that the fold doesn't accidentally collapse to a constant offset for real values. The fix itself is still sound — this is a test quality observation only.

2. Comment verbosity (sqs_messages.go:907–925, sqs_messages.go:872–882)

CLAUDE.md's convention is: "one short line max" for comments. The startPartitionOffset doc block is ~10 lines and the inline loop comment is ~11 lines. The gosec-G115 reasoning is genuinely non-obvious and worth keeping, but both blocks could be compressed significantly. Not blocking, but the inline loop comment in particular restates what's already in the function's doc.

3. Loop uses % where mask-AND would be consistent (sqs_messages.go:892)

partition := (startOffset + i) % partitions

startPartitionOffset uses & (partitions - 1) and explicitly documents the power-of-two invariant. Using & (partitions - 1) in the loop too would be consistent and self-documenting, avoiding the % operator whose correctness requires knowing that partitions is never 0 (which it isn't, but the reader has to check). Either form is correct.


FIFO ordering

Preserved. partitionFor is deterministic (FNV-1a over MessageGroupId), so a group always hashes to exactly one partition regardless of which partition the fanout starts from. Re-ordering scanAndDeliverPartition calls cannot reorder messages within any group because cross-partition messages are by definition different groups. ✅


Summary

The P1 fix is correct, gosec-clean, and backward-compatible on every queue mode. The three observations above are all minor or nit-level. The implementation is ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 52c449f5b3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go Outdated
Comment on lines +884 to +885
startOffset := startPartitionOffset(partitions, readTS)
for i := uint32(0); i < partitions; i++ {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Rotate receive start independently from commit-ts stride

Using readTS as the only rotation source can still starve partitions under sustained load, because readTS comes from the last committed timestamp and often advances by a fixed stride (roughly the number of successful per-message rotations in each receive call). With power-of-two partitions, masking this structured value can lock start offsets to a subset (e.g. 4 partitions + MaxNumberOfMessages=10 can alternate between two starts), so if those early partitions stay backlogged, later partitions are never scanned.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
adapter/sqs_partitioned_dispatch_test.go (1)

288-317: ⚡ Quick win

Pin the single-pass behavior instead of retrying receives.

This loop lets the test pass even if effectivePartitionCount never collapses to 1, because repeated receives can still drain partition 0 eventually. If the contract here is “one receive call must surface every message,” make that the assertion directly.

Suggested tightening
 	collected := make(map[string]bool, len(groups))
-	for range 4 {
-		if len(collected) == len(groups) {
-			break
-		}
-		status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
-			"QueueUrl":            queueURL,
-			"MaxNumberOfMessages": 10,
-			"VisibilityTimeout":   60,
-		})
-		require.Equal(t, http.StatusOK, status, "receive: %v", out)
-		msgs, _ := out["Messages"].([]any)
-		for _, m := range msgs {
-			mm, _ := m.(map[string]any)
-			body, _ := mm["Body"].(string)
-			collected[body] = true
-			handle, _ := mm["ReceiptHandle"].(string)
-			parsed, err := decodeReceiptHandle(handle)
-			require.NoError(t, err)
-			require.Equal(t, sqsReceiptHandleVersion2, parsed.Version,
-				"perQueue + PartitionCount=4 must still produce v2 handles")
-			require.Equal(t, uint32(0), parsed.Partition,
-				"perQueue mode pins every group to partition 0, so every handle must record Partition=0")
-		}
-	}
+	status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
+		"QueueUrl":            queueURL,
+		"MaxNumberOfMessages": 10,
+		"VisibilityTimeout":   60,
+	})
+	require.Equal(t, http.StatusOK, status, "receive: %v", out)
+	msgs, _ := out["Messages"].([]any)
+	for _, m := range msgs {
+		mm, _ := m.(map[string]any)
+		body, _ := mm["Body"].(string)
+		collected[body] = true
+		handle, _ := mm["ReceiptHandle"].(string)
+		parsed, err := decodeReceiptHandle(handle)
+		require.NoError(t, err)
+		require.Equal(t, sqsReceiptHandleVersion2, parsed.Version,
+			"perQueue + PartitionCount=4 must still produce v2 handles")
+		require.Equal(t, uint32(0), parsed.Partition,
+			"perQueue mode pins every group to partition 0, so every handle must record Partition=0")
+	}
 	require.Len(t, collected, len(groups),
 		"perQueue receive must surface every message in one fanout pass over partition 0")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 288 - 317, The test
currently loops multiple receive calls which masks failures by eventually
draining partition 0; instead make a single receive call using
callSQS(sqsReceiveMessageTarget, ...) and assert that the response contains
exactly len(groups) Messages in that one call, then iterate those messages to
validate decodeReceiptHandle parsed.Version equals sqsReceiptHandleVersion2 and
parsed.Partition == 0; remove the retry loop and fail the test immediately if
the single receive doesn't return all messages so the contract “one receive call
must surface every message” is enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/sqs_keys_dispatch.go`:
- Around line 182-195: In validateReceiptHandleVersion, add a bounds check for
v2 handles so that when meta != nil && meta.PartitionCount > 1 and
handle.Version == sqsReceiptHandleVersion2 you also verify handle.Partition <
meta.PartitionCount; if the partition is out of range return the
ReceiptHandleIsInvalid error (or an appropriate error indicating invalid receipt
handle) instead of allowing it to fall through to downstream routing; update the
function logic around validateReceiptHandleVersion to perform this check before
returning nil.

In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 91-106: The test currently only asserts parsed.Partition is within
a range which can hide misrouting; instead, after populating sent (the map from
group -> MessageId) reverse that map to look up group by MessageId and assert
that for each received handle the decoded parsed.Partition equals
partitionFor(meta, group). Concretely: after sending messages with callSQS
(using MessageGroupId) and collecting sent[g]=msgID, when you decode handles
check the MessageId from the handle, find the original group via the reversed
sent map, then replace the loose parsed.Partition < 4 assertion with
require.Equal(t, partitionFor(meta, group), parsed.Partition) so each message is
asserted to land on its expected partition.
- Around line 160-191: The current post-delete check re-calls
callSQS(sqsReceiveMessageTarget) immediately and can miss undeleted but
invisible messages (earlier receives used VisibilityTimeout: 60), so a
regression that leaves records invisible would still pass; instead of relying on
immediate receive, either wait > VisibilityTimeout before re-receiving or
(preferred) directly assert the partitioned keyspace is empty by scanning the
partitioned data and visibility prefixes with node.sqsServer.store.ScanAt using
the queue generation from node.sqsServer.loadQueueMetaAt (you already use
sqsMsgDataKey for the legacy prefix—compute the analogous partitioned
data/visibility prefixes for this queue generation and assert ScanAt returns
zero entries), keeping the existing legacy-prefix check for completeness.

---

Nitpick comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 288-317: The test currently loops multiple receive calls which
masks failures by eventually draining partition 0; instead make a single receive
call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response
contains exactly len(groups) Messages in that one call, then iterate those
messages to validate decodeReceiptHandle parsed.Version equals
sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and
fail the test immediately if the single receive doesn't return all messages so
the contract “one receive call must surface every message” is enforced.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 467f55af-49a9-40c4-a7bc-c9b223133d6e

📥 Commits

Reviewing files that changed from the base of the PR and between c7ed1a1 and 52c449f.

📒 Files selected for processing (9)
  • adapter/sqs_fifo.go
  • adapter/sqs_keys_dispatch.go
  • adapter/sqs_keys_dispatch_v2_test.go
  • adapter/sqs_messages.go
  • adapter/sqs_messages_batch.go
  • adapter/sqs_partitioned_dispatch_test.go
  • adapter/sqs_reaper.go
  • adapter/sqs_receipt_handle_v2_test.go
  • adapter/sqs_redrive.go

Comment thread adapter/sqs_keys_dispatch.go
Comment thread adapter/sqs_partitioned_dispatch_test.go
Comment thread adapter/sqs_partitioned_dispatch_test.go Outdated
…sertions (PR #732, round 2)

CodeRabbit P1/Major: validateReceiptHandleVersion accepted any v2 handle on a
partitioned queue without checking handle.Partition < meta.PartitionCount. With
decodeClientReceiptHandle now accepting v2 on the wire, an out-of-range
partition would fall through to sqsMsg*KeyDispatch and depend on downstream
routing failure semantics rather than returning ReceiptHandleIsInvalid at the
documented choke point. Add the bounds check and surface the same
"receipt handle is invalid" public error.

Caller audit: both production callers (loadMessageForDelete,
loadAndVerifyMessage in sqs_messages.go) already wrap any non-nil return
from validateReceiptHandleVersion as sqsErrReceiptHandleInvalid HTTP 400,
so the new error case slots into the existing semantic bucket — no
caller-side change required.

Two test tightenings on PR #732 round 2:

- TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip previously
  asserted only that parsed.Partition < 4. That would still pass if a
  dispatch regression sent every group to partition 0. Reverse the
  group->messageID map to messageID->group and require parsed.Partition
  == partitionFor(meta, group) per message — this is the assertion that
  actually catches misrouting.

- The post-delete emptiness check ran a fresh receive immediately while
  every prior receive used VisibilityTimeout=60. A regression that turned
  DeleteMessage into "leave the record invisible but not removed" would
  still pass under the active visibility window. Probe the partitioned
  data keyspace directly per partition (which DeleteMessage targets), and
  also issue the public-API receive after sleeping past a 1s visibility
  window so any in-flight invisible record would re-expose.

Add a regression test (TestValidateReceiptHandleVersion_RejectsOutOfRangePartition)
covering several out-of-range partitions and the count-1 boundary.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 3, 2026

@claude review

Round 2 — addressed three CodeRabbit 🟠 Major findings:

  1. validateReceiptHandleVersion now bounds-checks handle.Partition < meta.PartitionCount (the v2 partition is client-controlled once decoding accepts v2). Audit: both production callers (loadMessageForDelete, loadAndVerifyMessage) already surface any non-nil return as ReceiptHandleIsInvalid HTTP 400, so the new failure mode joins the existing public-error bucket — no caller-side semantic change.
  2. Round-trip test now asserts parsed.Partition == partitionFor(meta, group) per message (was: only < 4), so a dispatch regression that collapsed every group to partition 0 would actually fail.
  3. Post-delete emptiness check now probes the partitioned data keyspace directly per partition, and the public-API receive uses a 1s visibility timeout after a 1.1s sleep — a regression that left messages invisible but not deleted can no longer pass under the active 60s window.

Plus a new regression TestValidateReceiptHandleVersion_RejectsOutOfRangePartition covering several out-of-range partitions and the count-1 boundary.

@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Claude finished @bootjp's task in 3m 10s —— View job


Claude finished @bootjp's task ——


Code Review: Round 2 — Three Major Findings Addressed

  • Gather PR context and understand Round 2 changes
  • Read adapter/sqs_keys_dispatch.go — bounds check in validateReceiptHandleVersion
  • Read adapter/sqs_keys_dispatch_v2_test.go — new regression test
  • Read adapter/sqs_partitioned_dispatch_test.go — updated round-trip assertions + post-delete probe
  • Verify correctness of partition bounds check
  • Verify partitionFor(meta, group) assertion in round-trip test
  • Verify post-delete emptiness check robustness
  • Check for any remaining issues
  • Post complete review

Overall assessment

All three 🟠 Major findings are correctly addressed. The implementations are sound, the new regression test is well-targeted, and no new issues were introduced. Two nits remain (carried from prior rounds but explicitly deferred). Ready to merge.


Finding 1: Bounds check in validateReceiptHandleVersion (sqs_keys_dispatch.go:194-196) ✅

The check is placed correctly — after the version check on the partitioned branch, before returning nil:

if handle.Partition >= meta.PartitionCount {
    return errors.New("receipt handle partition out of range for queue")
}

Placement is right: the bounds check only runs when version is already confirmed v2, so a v1 handle on a partitioned queue gets the version-mismatch error (not the partition error). Error string contains "out of range" as the test's strings.Contains expects. Both production callers (loadMessageForDeletevalidateReceiptHandleVersion, loadAndVerifyMessagevalidateReceiptHandleVersion) surface any non-nil error as ReceiptHandleIsInvalid, so the new failure mode joins the existing public error bucket without any caller-side change. ✅

TestValidateReceiptHandleVersion_RejectsOutOfRangePartition covers partitions 4, 5, 17, and 2³⁰ on a PartitionCount=4 queue (all must error) and the count-1 boundary partition=3 (must succeed). The boundary case is the right regression anchor. ✅


Finding 2: partitionFor(meta, group) dispatch assertion (sqs_partitioned_dispatch_test.go:154-159) ✅

The byMessageID reverse map and partitionFor assertion correctly close the gap the CodeRabbit review identified. The old parsed.Partition < 4 check would have passed even if sendFifoMessage always routed everything to partition 0 — this version fails that regression.

The meta loaded at line 96-101 (after installPartitionedMetaForTest completes) is the same meta the send path loads, since PartitionCount is immutable after creation. The assertion partitionFor(meta, group) == parsed.Partition is therefore a valid cross-check between send and receive dispatch. ✅


Finding 3: Post-delete emptiness check (sqs_partitioned_dispatch_test.go:180-210) ✅

The dual-probe strategy is solid:

  • Direct keyspace scan per partition (lines 184-196): catches the "leave the record invisible but not deleted" regression that the previous immediate-receive check couldn't catch under a 60s visibility window. Uses sqsMsgDataKeyDispatch(meta, queueName, p, meta.Generation, "") — correctly routes to the partitioned key prefix for each partition, not the legacy prefix.
  • Public-API receive with VisibilityTimeout: 1 after time.Sleep(1100 * time.Millisecond) (lines 198-210): catches any record that survived delete as invisible. The sleep margin is 100ms past the 1s timeout, which is adequate in a single-process test.

The scan iterates meta.PartitionCount (not effectivePartitionCount) — correct here since this is htfifoThroughputPerMessageGroupID where effectivePartitionCount == PartitionCount. ✅


Nits (not blocking, both carried from prior rounds)

1. New partitioned scan also uses 0xFF, 0xFF, 0xFF, 0xFF suffix (sqs_partitioned_dispatch_test.go:186-187)

The existing legacy scan (line 220-221) and the new per-partition scan both use:

end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)

prefixScanEnd is available in the package (adapter/prefix_scan.go:7) and is already used throughout redis.go and dynamodb.go. The current approach is technically safe for this key format (message IDs are base64url, all bytes ≤ 0x7E; partition indices up to 32 never produce 0xFF bytes), but prefixScanEnd(dataPrefix) would make intent explicit and match the codebase pattern. Both occurrences would benefit from the same cleanup — a one-liner each.

2. Partition scan loop uses require.False(HasPrefix(...)) where require.Empty reads cleaner

The assertion at lines 190-195:

for _, kvp := range page {
    require.False(t,
        strings.HasPrefix(string(kvp.Key), string(dataPrefix)), ...)
}

Any record returned by ScanAt(dataPrefix, end, ...) necessarily has dataPrefix as a prefix, so HasPrefix is always true for any returned record — making require.False equivalent to t.Fatalf(...). The logic is correct (the test fails on the first surviving record with the key printed), but require.Empty(t, page, "partition %d still has records: %v", p, page) would be clearer. Minor style nit only.


Correctness table (Round 2 additions)

Contract Before Round 2 After Round 2
Out-of-range handle.Partition on partitioned queue Falls through to dispatch → downstream routing error ReceiptHandleIsInvalid at validateReceiptHandleVersion
Partition routing assertion in round-trip test parsed.Partition < 4 (silent zero-collapse regression) parsed.Partition == partitionFor(meta, group)
Post-delete "invisible but not deleted" regression Undetectable (60s VT still active on re-receive) Caught by direct data-keyspace scan per partition ✅

Verdict: Ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 11fbd45b18

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go Outdated
Comment on lines +932 to +933
folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8])
return folded & (partitions - 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Derive fanout start from per-call counter, not readTS bits

This rotation still depends only on masked readTS bits, so it can lock onto a subset of partitions when readTS advances by a structured stride (which is common here because HLC stores a 16-bit logical counter in the low bits and ReceiveMessage often commits a fixed number of per-message transactions). For example, with 4 partitions and a stride of 10 commits per call, folded & (partitions-1) alternates between two starts, so hot early partitions can keep filling MaxNumberOfMessages and starve later partitions. Fresh evidence: the new fold/XOR still ends in & (partitions - 1), which preserves this low-bit stride aliasing.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_fifo.go Outdated
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKey(queueName, gen, dedupID)
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Include MessageGroupId in partitioned dedup routing

DeduplicationScope=messageGroup requires dedup to be isolated per message group, but this key construction uses only (queue, partition, dedupID). When two different groups hash to the same partition and reuse a dedup ID within the window, the second send is incorrectly treated as a duplicate and dropped/acknowledged with the first message ID. That is a correctness/data-loss issue for partitioned FIFO once the dormancy gate is lifted (and it is already reachable in the partitioned-meta integration path).

Useful? React with 👍 / 👎.

… fanout (PR #732, round 3)

Two Codex P1s on commit 11fbd45:

1. Dedup scoping (sqs_fifo.go:95) — On a partitioned queue
DeduplicationScope is messageGroup (the validator rejects queue-scope on
PartitionCount > 1), but the dedup key was being constructed with only
(queue, partition, dedupID). Two distinct MessageGroupIds whose FNV-1a
hashes happen to collide onto the same partition would then share a
dedup namespace, so a fresh send in group B reusing group A's dedup-id
would be silently acked with group A's MessageId — a data-loss outcome.
Per the design doc §4.1 line 200, the dedup key keys on
(queue, partition, MessageGroupId, dedupID); partition is redundant
given the deterministic group→partition map but is kept for keyspace
organisation (every key belonging to a group lives under the same
partition prefix). Thread groupID through sqsPartitionedMsgDedupKey,
sqsMsgDedupKeyDispatch, and loadFifoDedupRecord.

Caller audit: loadFifoDedupRecord has exactly one production caller
(sendFifoMessage at sqs_fifo.go:186); the dedupKey it returns is reused
by the same OCC transaction's ReadKeys + Put list, so the read-guard
and the write share one key. sqsMsgDedupKeyDispatch is only reached via
loadFifoDedupRecord. Legacy (non-partitioned) keys are unchanged on
disk — the legacy branch ignores groupID, preserving the on-disk shape
for queues created before partitioning landed.

2. Fanout aliasing (sqs_messages.go:933) — The round-1 fix derived the
fanout starting partition from a XOR-fold of readTS bits and masked by
(partitions - 1). Codex P1 round 2 flagged a real aliasing concern: HLC
packs a 16-bit logical counter in the low bits of readTS and
ReceiveMessage commits a fixed number of per-message transactions per
call, so consecutive readTS deltas exhibit a structured stride. With
partitions=4 and stride=10 the masked fold alternates between two
starts (0,2,0,2,…) — partitions 1 and 3 never appear as the start, and
opts.Max can fill from those two before the loop reaches the others.

Replace the bit-fold with a per-server atomic counter
(receiveFanoutCounter on SQSServer). Consecutive partitioned receives
walk every partition in strict round-robin regardless of HLC behaviour.
The counter is per-server (not Raft-replicated) because fanout
distribution is a local-fairness property: every node already sees
every message via its own receive scans, and "this node's distribution
is balanced" is the only invariant that matters.

Add regression tests:

- TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId pins that
  distinct MessageGroupIds on the same (queue, partition, dedupID)
  produce distinct dedup keys, and that legacy keys are unaffected.

- TestNextReceiveFanoutStart_RoundRobin pins that 16 consecutive calls
  over 4 partitions produce exactly 4 hits per partition (strict
  round-robin), and that concurrent receivers each get a valid offset
  under the race detector.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 3, 2026

@claude review

Round 3 — addressed two new Codex 🟠 P1 findings on the round-2 commit:

1. Dedup scoping (sqs_fifo.go:95): sqsPartitionedMsgDedupKey was (queue, partition, dedupID) — but DeduplicationScope=messageGroup (the only scope reachable on partitioned queues) requires the dedup window to be per (queue, group, dedupID). Two distinct MessageGroupIds that FNV-collide onto the same partition would share a dedup namespace, so a fresh send in group B reusing group A's dedup-id would be silently acked with group A's MessageId — data loss. Threaded groupID through sqsPartitionedMsgDedupKey, sqsMsgDedupKeyDispatch, and loadFifoDedupRecord.

Caller audit: loadFifoDedupRecord has exactly one production caller (sendFifoMessage at sqs_fifo.go:186); the same dedupKey returned from loadFifoDedupRecord populates both the OCC ReadKeys (line 238) and the Put write (line 243), so read-guard and write share one key. Legacy (non-partitioned) shape is unchanged on disk — the legacy branch ignores groupID, preserving compatibility for queues created before partitioning landed.

2. Fanout aliasing (sqs_messages.go:933): Round-1's XOR-fold of readTS bits aliases under structured strides — HLC packs a 16-bit logical counter in low bits and ReceiveMessage commits a fixed number of per-message transactions per call, so consecutive readTS deltas have a fixed stride. With partitions=4 and stride=10 the masked fold alternates 0,2,0,2,… and partitions 1,3 never appear as the start. Replaced with a per-server atomic.Uint32 counter — strict round-robin regardless of HLC behaviour. Counter is per-server (not Raft-replicated) because fanout distribution is a local-fairness property.

Two new regression tests:

  • TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId — distinct MessageGroupIds on the same (queue, partition, dedupID) must produce distinct dedup keys; legacy keyspace unchanged.
  • TestNextReceiveFanoutStart_RoundRobin — 16 calls over 4 partitions produce exactly 4 hits per partition (strict round-robin); concurrent receivers tested under -race.

go build, go vet, golangci-lint, and the targeted SQS test suite all clean.

@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Claude finished @bootjp's task in 6m 25s —— View job


Code Review: Round 3 — Dedup scoping + Fanout aliasing fixes

  • Gather context and read changed files
  • Review dedup scoping fix (sqs_fifo.go, sqs_keys.go, sqs_keys_dispatch.go)
  • Review fanout aliasing fix (sqs_messages.go, sqs.go)
  • Review new regression tests
  • Verify partition resolver unaffected by key format change
  • Post complete review

Overall assessment

Both P1 findings from the round-2 Codex review are correctly addressed. The implementations are clean, the regression tests are well-targeted, and no new issues were introduced. Ready to merge.


Finding 1: Dedup scoping (sqs_keys.go:326, sqs_fifo.go:98, sqs_keys_dispatch.go:64) ✅

The fix correctly scopes the partitioned dedup key on (queue, partition, gen, groupID, dedupID) by threading groupID through every layer:

  • sqsPartitionedMsgDedupKey: adds encodeSQSSegment(groupID) before encodeSQSSegment(dedupID)
  • sqsMsgDedupKeyDispatch: passes groupID through to the partitioned branch, ignores it on the legacy branch
  • loadFifoDedupRecord: new groupID parameter, one production caller (sendFifoMessage:186 passing in.MessageGroupId)
  • OCC invariant preserved: the same dedupKey returned by loadFifoDedupRecord populates both ReadKeys (the read-guard) and the Put write — they stay in sync

Partition resolver unaffected: parsePartitionedSQSKey only reads the key through the partition field (prefix → queue segment → | terminator → 4 bytes for partition) and stops. The groupID and dedupID appended after gen are never parsed by the resolver. ✓

Legacy keyspace unchanged: The meta == nil || PartitionCount <= 1 branch routes to sqsMsgDedupKey, which has always been (queue, gen, dedupID)groupID is not threaded there. On-disk compatibility preserved for pre-partitioning queues. ✓

Caller audit: sqsPartitionedMsgDedupKey has exactly two callers — sqsMsgDedupKeyDispatch (production dispatch layer) and the partition resolver test (updated). loadFifoDedupRecord has exactly one production caller (sendFifoMessage). ✓


Finding 2: Fanout aliasing (sqs.go:179, sqs_messages.go:931) ✅

The round-1 startPartitionOffset (XOR-fold of readTS bits) is fully removed and replaced with nextReceiveFanoutStart (per-server atomic.Uint32 counter).

func (s *SQSServer) nextReceiveFanoutStart(partitions uint32) uint32 {
    if partitions <= 1 {
        return 0
    }
    return s.receiveFanoutCounter.Add(1) & (partitions - 1)
}

Correctness:

  • atomic.Uint32.Add is thread-safe — no torn reads under concurrent ReceiveMessage calls ✓
  • Power-of-two invariant (htfifoValidatePartitionCount) makes & (partitions-1) equivalent to modulo ✓
  • Counter wraps at math.MaxUint32 cleanly — the mask-AND cycle continues uninterrupted ✓
  • partitions <= 1 short-circuits without calling Add, so legacy receives don't perturb the counter ✓
  • receiveFanoutCounter is zero-initialized by default in NewSQSServer (via &SQSServer{...} with named fields; atomic.Uint32 zero value is 0) ✓
  • startPartitionOffset fully removed — no dangling references ✓

Starting offset: The first partitioned call yields 1 & (partitions-1) (counter starts at 0, first Add(1) returns 1). This means partition 0 is reached on the 4th call for a 4-partition queue, not the 1st. The test correctly accounts for this (16 calls starting from 1 → counts: p1=4, p2=4, p3=4, p0=4 ✓).


Regression tests ✅

TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId (sqs_keys_dispatch_test.go:171):

  • Same (queue, partition, dedupID) with different groupID → distinct keys (the actual regression scenario)
  • Same (group, dedupID) → identical key (idempotency contract)
  • Legacy branch ignores groupID (backward compat)

TestNextReceiveFanoutStart_RoundRobin (sqs_partitioned_dispatch_test.go:355):

  • partitions <= 1 → returns 0, counter unchanged
  • 16 calls over 4 partitions → exactly 4 hits per partition (strict round-robin)
  • 1024 calls → all offsets in [0, partitions)
  • 8 goroutines × 256 calls → all concurrent results in [0, partitions) (atomicity check under -race)

These directly pin the failure modes Codex identified and are stronger than a "distribution covers all partitions" check.


Minor observations (non-blocking)

1. Theoretical base64 concatenation ambiguity (low, pre-existing)

encodeSQSSegment(groupID) + encodeSQSSegment(dedupID) without a delimiter is theoretically ambiguous: pairs with lengths |g|=2, |d|=1 and |g|=1, |d|=2 could produce the same concatenated string if specific bit patterns align at the boundary. In practice this is not exploitable with valid AWS strings (MessageGroupId and MessageDeduplicationId are constrained to alphanumeric + _-, all ASCII 0x2D–0x7A), because constructing a collision requires the padding bits at the base64 boundary to be simultaneously zero for both interpretations — a constraint the valid ASCII range essentially never satisfies. This is also a pre-existing pattern in the codebase (e.g., the group key constructor uses encodeSQSSegment(groupID) as the final segment, and the legacy dedup key uses encodeSQSSegment(dedupID) as the final segment without issue). Not a new risk and not a blocker.

2. Comment length (sqs_messages.go:907–935)

The nextReceiveFanoutStart doc block is ~24 lines. CLAUDE.md convention is "one short line max" for comments. The historical context (round-1 fix, Codex P1) is valuable for future maintainers but most of it belongs in the PR description rather than the source file. Could be trimmed to ~5 lines: the invariant, the power-of-two property, and the counter-not-replicated justification.


Correctness table (Round 3)

Contract Before Round 3 After Round 3
Partitioned dedup key scope (queue, partition, gen, dedupID) — two groups colliding on same partition share dedup namespace (queue, partition, gen, groupID, dedupID) — groups have disjoint namespaces ✅
Receive fanout rotation XOR-fold of readTS bits — aliases under HLC stride patterns (Codex P1) Per-server atomic counter — true round-robin regardless of HLC ✅
Partition resolver compatibility N/A Unaffected — resolver only parses through partition field ✅
Legacy keyspace on-disk compatibility N/A Unaffected — legacy branch ignores groupID

Verdict: Ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ea0cf4ba25

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go Outdated
if partitions <= 1 {
return 0
}
return s.receiveFanoutCounter.Add(1) & (partitions - 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Scope fanout rotation counter per queue

Using a single server-wide receiveFanoutCounter lets each queue observe only a strided subsequence of start offsets whenever other queues call ReceiveMessage in between. If that stride shares a factor with PartitionCount (for example, a 4-partition queue that is called every other partitioned receive), the queue’s starts cycle through only a subset of partitions, and with MaxNumberOfMessages filling on earlier scanned partitions, the remaining partitions can be starved indefinitely. This means the global counter can reintroduce the same starvation class the rotation logic is trying to prevent; the rotation state should be tracked per queue (or otherwise keyed by queue identity).

Useful? React with 👍 / 👎.

Codex P1 round 4 on commit ea0cf4b: the round-3 fix used a single
server-wide receiveFanoutCounter, but that aliases across queues.
When other queues call ReceiveMessage in between, the queue-of-interest
observes only a strided subsequence of counter values; if that stride
shares a factor with PartitionCount the queue cycles through only a
subset of partitions, and with MaxNumberOfMessages filling on the
earlier-scanned partitions the rest can be starved indefinitely.
Concrete example: 4-partition queue receiving every other tick → its
counter values are 1,3,5,7,… and 1,3,5,7 mod 4 = 1,3,1,3 → partitions
0 and 2 never appear as the start.

Replace the server-wide atomic.Uint32 with a sync.Map keyed by queue
name; each queue owns its own *atomic.Uint32. nextReceiveFanoutStart
takes (queueName, partitions) and looks up / installs the per-queue
counter. Each queue's rotation now depends solely on its own receive
cadence — strict round-robin regardless of cross-queue interleaving.

Caller audit: nextReceiveFanoutStart has exactly one production caller
(scanAndDeliverOnce at sqs_messages.go:885), which now passes queueName
from its existing scope. Return type and range are unchanged
([0, partitions)); no caller-side semantic shift.

sync.Map is the right shape: lookups are read-mostly (the same queue
keeps getting the same counter) and the keyset grows only with the
number of distinct queues this server has handled receives for
in-process — bounded by the operator-controlled CreateQueue rate.

Update TestNextReceiveFanoutStart_RoundRobin → renamed to
TestNextReceiveFanoutStart_PerQueueRoundRobin. New assertions:

- queueB and queueC interleaved must each independently see strict
  round-robin (the round-4 isolation contract); the legacy server-wide
  counter would have surfaced 1,3,1,3 on queueB → fails.
- Concurrent receivers spread across multiple queue names exercise
  the sync.Map LoadOrStore path under -race so any missed
  synchronisation on the insert is caught.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 3, 2026

@claude review

Round 4 — addressed Codex 🟠 P1 round 4: per-queue fanout counter.

The round-3 server-wide counter aliased across queues — when queues interleave with strides that share a factor with PartitionCount, a queue cycles through only a subset of partitions. Concrete: 4-partition queue receiving every other tick observes 1,3,5,7,… mod 4 = 1,3,1,3 → partitions 0 and 2 never appear.

Replaced atomic.Uint32 field with sync.Map keyed by queue name; each queue gets its own *atomic.Uint32. Each queue's rotation now depends only on its own receive cadence — strict round-robin regardless of cross-queue interleaving.

Caller audit: nextReceiveFanoutStart has one production caller (scanAndDeliverOnce at sqs_messages.go:885), which now passes queueName from existing scope. Return type and range unchanged ([0, partitions)).

Test: TestNextReceiveFanoutStart_PerQueueRoundRobin (renamed) now asserts queueB and queueC interleaved each see strict round-robin — the round-3 server-wide counter would fail this. Concurrent receivers across many queue names exercise LoadOrStore under -race.

go build, go vet, golangci-lint, and the broader SQS test suite all clean (10.5s).

@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Claude finished @bootjp's task in 2m 5s —— View job


Code Review: Round 4 — Per-queue fanout counter

  • Gather context and read changed files
  • Review per-queue counter implementation (sqs.go, sqs_messages.go)
  • Verify sync.Map + atomic.Uint32 correctness and race safety
  • Review renamed/updated test
  • Check for any remaining issues
  • Post complete review

Overall assessment

The fix is correct and the right abstraction for this problem. The sync.Map + *atomic.Uint32 pattern is the standard Go idiom for per-key counters with read-mostly access patterns. The test directly pins the exact failure mode the Codex P1 finding identified. No new issues introduced. Ready to merge.


Correctness: nextReceiveFanoutStart (sqs_messages.go:920–930)

func (s *SQSServer) nextReceiveFanoutStart(queueName string, partitions uint32) uint32 {
    if partitions <= 1 {
        return 0
    }
    v, ok := s.receiveFanoutCounters.Load(queueName)
    if !ok {
        v, _ = s.receiveFanoutCounters.LoadOrStore(queueName, &atomic.Uint32{})
    }
    counter, _ := v.(*atomic.Uint32)
    return counter.Add(1) & (partitions - 1)
}

Load → LoadOrStore idiom — correct. Load first avoids the allocation cost of &atomic.Uint32{} on every call in the common path (queue already in map). On a race between two goroutines making the queue's first receive, LoadOrStore is atomic — both goroutines get the winner's pointer from the map; the loser's allocation is immediately GC'd. The returned v is always a valid *atomic.Uint32. ✅

Type assertionv.(*atomic.Uint32) is safe because the only value ever stored under any key is &atomic.Uint32{}. The blank discard on the ok bool is idiomatic when the type is guaranteed by construction. ✅

Non-partitioned short-circuitpartitions <= 1 returns 0 without touching the map, so unconfigured / legacy queues pay one branch check and zero map operations per call. ✅

Round-robin contract — each queue has its own counter, incremented independently. After partitions calls on the same queue, every starting offset in [0, partitions) appears exactly once. With power-of-two partitions (enforced by htfifoValidatePartitionCount), the mask-AND is equivalent to modulo. Counter wraps cleanly at math.MaxUint32 — the mask-AND cycle continues uninterrupted. ✅

The Round 4 starvation scenario is closed. The concrete example from the Codex finding: queueB receiving every other tick with a server-wide counter would observe 1,3,5,7,… mod 4 = 1,3,1,3 → partitions 0 and 2 starved. With per-queue counters, queueB's counter increments independently (1,2,3,4,5,… mod 4 = 1,2,3,0,…) → all four partitions appear as starting points in strict round-robin. ✅


Test quality: TestNextReceiveFanoutStart_PerQueueRoundRobin

The key addition is the cross-queue isolation assertion (sqs_partitioned_dispatch_test.go:399–419):

bSeen := make(map[uint32]int, partitions)
cSeen := make(map[uint32]int, partitions)
for range 16 {
    bSeen[s.nextReceiveFanoutStart("queueB", partitions)]++
    cSeen[s.nextReceiveFanoutStart("queueC", partitions)]++
}
for p := uint32(0); p < partitions; p++ {
    require.Equal(t, 4, bSeen[p], ...)
    require.Equal(t, 4, cSeen[p], ...)
}

This test would fail deterministically on the round-3 server-wide counter: B's values are every-other-tick (1,3,5,7,…) → 1,3,5,7 mod 4 = 1,3,1,3 → bSeen[0] and bSeen[2] would be 0, not 4. ✅

The concurrent sub-test now uses fmt.Sprintf("concurrent-%d", idx%3) so 8 goroutines share 3 queue names — specifically exercises the LoadOrStore path under -race when multiple goroutines race on the same queue's initial counter install. ✅


Minor observations (non-blocking)

1. Counter persists after DeleteQueue

When a queue is deleted and then recreated with the same name, the new queue reuses the previous counter from the map — its first receive starts at an arbitrary offset, not necessarily partition 0. This is correct behavior (round-robin from any starting point is still round-robin), but it's worth noting in a follow-up doc or comment if DeleteQueue behavior is ever tested for this path. Not a bug.

2. One allocation per first receive per queue per process lifetime

&atomic.Uint32{} is allocated once per distinct queue that ever receives a partitioned message. In a contention race on the initial insert, the losing allocation is GC'd immediately. The commit message correctly characterizes this as bounded by the CreateQueue rate — negligible in practice. ✅

3. Comment block length

Both the receiveFanoutCounters field doc (sqs.go:171–186) and the nextReceiveFanoutStart function comment (sqs_messages.go:908–919) exceed CLAUDE.md's "one short line max" convention by a wide margin. The historical context (round 3 → round 4 aliasing) is genuinely non-obvious and helpful, but most of it belongs in the commit message and PR description rather than the source file. Minor style nit — the information is correct, just verbose.


Correctness table (Round 4)

Contract Before Round 4 After Round 4
Fanout rotation scoping Single server-wide atomic.Uint32 — aliases when queues interleave with strides sharing a factor with PartitionCount Per-queue *atomic.Uint32 in sync.Map — rotation depends only on each queue's own receive cadence ✅
Cross-queue isolation Broken: 4-partition queueB receiving every other tick → 1,3,1,3 → partitions 0,2 starved Fixed: queueB's counter increments only on queueB receives ✅
Non-partitioned queues Unchanged: returns 0, no map touch Unchanged ✅
Race safety atomic.Uint32.Add — safe sync.Map.Load/LoadOrStore + atomic.Uint32.Add — safe ✅

Verdict: Ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 667dea9c37

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go
}
v, ok := s.receiveFanoutCounters.Load(queueName)
if !ok {
v, _ = s.receiveFanoutCounters.LoadOrStore(queueName, &atomic.Uint32{})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Evict stale fanout counters when queues are removed

nextReceiveFanoutStart creates and retains a receiveFanoutCounters entry for every queue name ever received from, but there is no corresponding cleanup on DeleteQueue/recreate paths, so repeated create/delete of unique queue names causes unbounded in-memory growth over process lifetime. This is a new long-lived state path introduced by the per-queue counter and can become a memory/perf issue in multi-tenant or high-churn environments; the delete path currently invalidates throttle state only, not this map.

Useful? React with 👍 / 👎.

… round 5)

Codex P2 (user-elevated): nextReceiveFanoutStart populates a sync.Map
entry per queue name on the first partitioned receive, but DeleteQueue
never removes it. Repeated create/delete of unique queue names then
leaks one entry per name for the lifetime of the process — a real
problem for multi-tenant or high-churn deployments.

Mirror throttle.invalidateQueue's two call sites:

  - createQueue genuine-create branch — drops a counter that survived
    a delete-then-create race (a partitioned receive holding pre-delete
    meta could repopulate the entry between the delete-side cleanup
    and this create commit). Tied to the same "fresh state on a true
    create" guarantee the throttle invalidate provides.
  - deleteQueue HTTP handler — drops the counter alongside the
    throttle bucket so DeleteQueue+CreateQueue resets every piece of
    in-memory queue state, not just the rate-limit bucket.

setQueueAttributes is intentionally NOT mirrored: it only mutates
throttle config, never PartitionCount, so the counter remains valid
for the same queue across attribute changes — same conditional shape
the throttle invalidate already uses there.

Caller audit: the new dropReceiveFanoutCounter helper has exactly two
production callers, both adjacent to the existing throttle.invalidateQueue
calls. AdminDeleteQueue does not call throttle.invalidateQueue today
either; staying consistent rather than fixing that pre-existing
inconsistency in this round.

Test: TestDropReceiveFanoutCounter_ClearsEntry pins
  - no-op on never-touched queues (must not allocate or panic),
  - removal after a populated counter,
  - per-queue isolation (dropping queueB leaves queueC's counter
    intact),
  - fresh allocation after drop (the recreated queue starts from a
    zero counter, not a recycled pointer — pinned by asserting the
    first offset == 1 on a 4-partition queue).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 3, 2026

@claude review

Round 5 — addressed Codex P2 (user-elevated): per-queue fanout counter eviction.

The round-4 per-queue receiveFanoutCounters sync.Map populates one entry per queue name on the first partitioned receive but never removes it. Repeated DeleteQueue+CreateQueue of unique names then leaks one entry per name for the process lifetime — a real concern in multi-tenant / high-churn deployments.

Fix: new dropReceiveFanoutCounter(queueName) helper, mirrored to the same two call sites as the existing throttle.invalidateQueue:

  • createQueue genuine-create branch — covers a delete-then-create race where a partitioned receive holding pre-delete meta could repopulate the entry between the delete-side cleanup and this create commit.
  • deleteQueue HTTP handler — drops the counter alongside the throttle bucket so DeleteQueue+CreateQueue resets every piece of in-memory queue state.

setQueueAttributes is intentionally NOT mirrored — it only mutates throttle config, never PartitionCount, so the counter stays valid across attribute changes (same conditional shape the throttle invalidate already uses there).

Caller audit: new helper has exactly two production callers, both adjacent to existing throttle.invalidateQueue calls. AdminDeleteQueue does not call throttle.invalidateQueue today either — staying consistent (not fixing that pre-existing inconsistency in this round).

Test: TestDropReceiveFanoutCounter_ClearsEntry pins

  • no-op on never-touched queues (no allocation, no panic)
  • removal after a populated counter
  • per-queue isolation (dropping queueB leaves queueC intact)
  • fresh allocation after drop (first offset == 1 on 4-partition queue, proving the old pointer was not recycled)

Build, golangci-lint, and the targeted SQS test sweep (TestDropReceiveFanoutCounter | TestNextReceiveFanoutStart | TestSQSServer_PartitionedFIFO | TestValidateReceiptHandleVersion, plus a wider CreateQueue|DeleteQueue|PurgeQueue|SetQueueAttributes|SendMessage|ReceiveMessage|DeleteMessage regression sweep) all clean under -race.

@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Claude finished @bootjp's task in 1m 53s —— View job


Code Review: Round 5 — Per-queue fanout counter eviction

  • Gather PR context and understand Round 5 changes
  • Read adapter/sqs_messages.godropReceiveFanoutCounter helper
  • Read adapter/sqs_catalog.gocreateQueue and deleteQueue call sites
  • Read adapter/sqs_admin.goAdminDeleteQueue gap
  • Review TestDropReceiveFanoutCounter_ClearsEntry
  • Verify caller audit completeness
  • Check race safety and post-drop re-population path
  • Post complete review

Overall assessment

The fix is correct, minimal, and faithful to the pattern it mirrors. dropReceiveFanoutCounter is a one-liner wrapping sync.Map.Delete, placed at the same two call sites as throttle.invalidateQueue. The leak scenario described in the Codex P2 finding is closed. No new issues introduced. Ready to merge.


Implementation

dropReceiveFanoutCounter (sqs_messages.go:940–942):

func (s *SQSServer) dropReceiveFanoutCounter(queueName string) {
    s.receiveFanoutCounters.Delete(queueName)
}

sync.Map.Delete is a documented no-op when the key is absent, so calling this on a queue that never issued a partitioned receive is safe. The function correctly does not check for existence first — that would introduce a TOCTOU window. ✅

createQueue call site (sqs_catalog.go:1020): placed after deleteQueueWithRetry commits and immediately after throttle.invalidateQueue. The race the comment describes is real: a partitioned ReceiveMessage holding stale pre-delete meta could call nextReceiveFanoutStart between the delete-side dropReceiveFanoutCounter and this create commit, repopulating the map. Dropping here on the genuine-create path removes that survivor. ✅

deleteQueue HTTP handler (sqs_catalog.go:1052): placed after deleteQueueWithRetry succeeds, immediately after throttle.invalidateQueue. Pattern-matches the throttle cleanup. ✅

Race analysis: dropReceiveFanoutCounter (Delete) racing with nextReceiveFanoutStart (LoadOrStore) during the delete window is safe — sync.Map operations are individually atomic. The worst outcome is a concurrent receive re-inserting an entry that the drop immediately removes, which is still correct: the next receive after delete gets a fresh zero counter via LoadOrStore. ✅


Caller audit

Call site Cleans up throttle? Cleans up fanout counter?
deleteQueue HTTP handler ✅ (Round 5)
tryCreateQueueOnce genuine-create ✅ (Round 5)
setQueueAttributes (if throttle changed) intentionally not mirrored
AdminDeleteQueue ❌ (pre-existing) ❌ (pre-existing, acknowledged)

The AdminDeleteQueue gap is pre-existing (it also doesn't clean up the throttle), acknowledged in the commit message, and intentionally deferred. The PR's stated scope is to mirror throttle.invalidateQueue's two call sites — this is satisfied. The setQueueAttributes exclusion is justified: PartitionCount is immutable post-create, so the counter for an existing queue remains valid across attribute changes. ✅


Test: TestDropReceiveFanoutCounter_ClearsEntry

Four assertions, each pins a distinct failure mode:

Assertion What regression it catches
No entry for "never-touched" after drop Delete allocates a zero entry — would break the memory-leak fix
Entry present after nextReceiveFanoutStart No-op LoadOrStore regression
Entry absent after drop The core leak: Delete silently failing
Per-queue isolation (B removed, C intact) Delete clearing the whole map or wrong key
first == 1 after drop + re-populate Old pointer recycled: Delete didn't actually remove the entry

The first == 1 assertion is particularly tight: it would fail if Delete removed the key but a concurrent goroutine's LoadOrStore returned the old pointer (impossible with sync.Map semantics, but the test proves it). ✅


Minor observations (non-blocking)

1. PR reference in source comment (sqs_catalog.go:1051)

// — a leak in multi-tenant / high-churn deployments. Codex P2,
// PR #732 round 5.

Per CLAUDE.md: "Don't reference the current task, fix, or callers... since those belong in the PR description and rot as the codebase evolves." The "Codex P2, PR #732 round 5" attribution belongs in the commit message (where it already is, verbatim). The comment at the createQueue call site does not have this issue. Minor style nit; the deleteQueue comment reads naturally without it.

2. dropReceiveFanoutCounter doc block length

The function comment is 9 lines. CLAUDE.md convention is "one short line max" — the non-obvious WHY fits in two lines: the leak scenario + the no-op guarantee. The rest is reachable from the commit message. Not new to Round 5 (Rounds 3–4 had the same issue on nextReceiveFanoutStart) and not blocking.


Correctness table (Round 5)

Contract Before Round 5 After Round 5
receiveFanoutCounters entry lifetime Retained for process lifetime per queue name ever served Evicted on deleteQueue and genuine createQueue
Multi-tenant / high-churn leak One entry per unique queue name leaked for process lifetime Bounded to currently-alive queues ✅
Delete-then-create race Stale counter from pre-delete incarnation survives into new queue Cleaned up at genuine-create path ✅
AdminDeleteQueue eviction Not performed (pre-existing, throttle.invalidateQueue also absent) Not performed (intentionally consistent)

Verdict: Ready to merge.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
adapter/sqs_messages.go (1)

865-885: ⚡ Quick win

Update this comment to describe the counter-based rotation.

The code now uses nextReceiveFanoutStart(queueName, partitions) and receiveFanoutCounters, but this block still explains the removed readTS/HLC selector and "no per-server state". That mismatch will mislead the next refactor, especially around the create/delete counter cleanup.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_messages.go` around lines 865 - 885, Update the comment block
explaining partition rotation to reflect the new counter-based rotation: replace
references to readTS/HLC and "no per-server state" with a description that
rotation is driven by per-queue counters accessed via
nextReceiveFanoutStart(queueName, partitions) and stored in
receiveFanoutCounters, how effectivePartitionCount(meta) and startOffset are
derived, and that this introduces per-queue counter state which must be cleaned
up on queue create/delete; also clarify that FIFO/MessageGroupId semantics
remain unchanged and note any implications for create/delete counter cleanup and
concurrency.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/sqs_keys.go`:
- Around line 326-334: The suffix built by sqsPartitionedMsgDedupKey is
ambiguous because encodeSQSSegment(groupID) and encodeSQSSegment(dedupID) are
appended back-to-back; fix by inserting the same segment terminator used
elsewhere (e.g., sqsPartitionedQueueTerminator) between the two encoded segments
so the boundary is explicit. Modify sqsPartitionedMsgDedupKey to append
encodeSQSSegment(groupID), then append the terminator (the same symbol used
after queueName), then append encodeSQSSegment(dedupID) so (groupID,dedupID)
pairs are unambiguous.

In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 200-212: The post-delete sleep + receive block falsely claims it
will catch "leave-invisible" regressions because the drain loop sets
VisibilityTimeout: 60, so sleeping 1100ms won't re-expose messages left
invisible by a failed DeleteMessage; fix by either lowering the drain loop's
VisibilityTimeout to 1 (so the 1100ms sleep will expire it) or update the
comment to state this receive does NOT cover drain-phase "leave-invisible"
regressions and that the keyspace scan is the real guard—change references
around the drain loop and the receive call (callSQS with sqsReceiveMessageTarget
and the VisibilityTimeout param) accordingly.

---

Nitpick comments:
In `@adapter/sqs_messages.go`:
- Around line 865-885: Update the comment block explaining partition rotation to
reflect the new counter-based rotation: replace references to readTS/HLC and "no
per-server state" with a description that rotation is driven by per-queue
counters accessed via nextReceiveFanoutStart(queueName, partitions) and stored
in receiveFanoutCounters, how effectivePartitionCount(meta) and startOffset are
derived, and that this introduces per-queue counter state which must be cleaned
up on queue create/delete; also clarify that FIFO/MessageGroupId semantics
remain unchanged and note any implications for create/delete counter cleanup and
concurrency.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4d4c114c-4ddf-4764-b8b9-de5b92b9e0f3

📥 Commits

Reviewing files that changed from the base of the PR and between 11fbd45 and 3961d4a.

📒 Files selected for processing (10)
  • adapter/sqs.go
  • adapter/sqs_catalog.go
  • adapter/sqs_fifo.go
  • adapter/sqs_keys.go
  • adapter/sqs_keys_dispatch.go
  • adapter/sqs_keys_dispatch_test.go
  • adapter/sqs_keys_test.go
  • adapter/sqs_messages.go
  • adapter/sqs_partition_resolver_test.go
  • adapter/sqs_partitioned_dispatch_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • adapter/sqs_fifo.go

Comment thread adapter/sqs_keys.go
Comment on lines +200 to +212
// And the public API must agree: a fresh receive with a short
// visibility timeout (after sleeping past it, so any in-flight
// invisible record would re-expose) returns no messages.
time.Sleep(1100 * time.Millisecond)
status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MaxNumberOfMessages": 10,
"VisibilityTimeout": 1,
})
require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out)
if msgs, _ := out["Messages"].([]any); len(msgs) > 0 {
t.Fatalf("expected empty queue after delete; got %d messages", len(msgs))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Post-delete API check cannot catch "leave-invisible" regressions from the drain phase.

The comment claims sleeping 1 100 ms ensures "any in-flight invisible record would re-expose," but the drain loop (lines 137–141) uses VisibilityTimeout: 60. A message left invisible by a failing DeleteMessage would still have ~58.9 s of visibility remaining after 1 100 ms and would not be returned — so the require on line 211 would silently pass even if deletion regressed to a no-op.

The keyspace scan (lines 186–198) is the only correct guard for that class of bug. The comment incorrectly implies this block provides equivalent coverage; a future maintainer who removes the keyspace scan while trusting this comment would lose the protection.

Two clean fixes:

🛠️ Option A — lower the drain VTO so the sleep does expire it
-		"VisibilityTimeout":   60,
+		"VisibilityTimeout":   1,

The 1 100 ms sleep then expires the 1 s window, and any invisible-but-not-deleted record would reappear in the final receive.

📝 Option B — correct the comment to reflect what this block actually tests
-	// And the public API must agree: a fresh receive with a short
-	// visibility timeout (after sleeping past it, so any in-flight
-	// invisible record would re-expose) returns no messages.
+	// Confirm the public API also observes an empty queue. Note: the
+	// drain receives used VisibilityTimeout=60, so records left invisible
+	// by a regressed DeleteMessage would NOT re-expose here. The
+	// keyspace scan above is the definitive guard for that regression;
+	// this receive is a belt-and-suspenders check for completely unrelated
+	// messages only.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 200 - 212, The
post-delete sleep + receive block falsely claims it will catch "leave-invisible"
regressions because the drain loop sets VisibilityTimeout: 60, so sleeping
1100ms won't re-expose messages left invisible by a failed DeleteMessage; fix by
either lowering the drain loop's VisibilityTimeout to 1 (so the 1100ms sleep
will expire it) or update the comment to state this receive does NOT cover
drain-phase "leave-invisible" regressions and that the keyspace scan is the real
guard—change references around the drain loop and the receive call (callSQS with
sqsReceiveMessageTarget and the VisibilityTimeout param) accordingly.

… round 6)

CodeRabbit major: sqsPartitionedMsgDedupKey appends encodeSQSSegment(groupID)
and encodeSQSSegment(dedupID) back-to-back. encodeSQSSegment uses
base64.RawURLEncoding (no padding, alphabet [A-Za-z0-9_-]), so the boundary
between the two segments is variable-length and ambiguous: distinct
(groupID, dedupID) pairs can collapse onto the same byte sequence — most
trivially when one of the two is empty: ("", "abcd") and ("abcd", "") both
produce "...QUJDRA". Even for non-empty IDs the boundary depends on input
length mod 3 in subtle ways. This is the on-disk dedup-key format PR 5b-3
will rely on once the dormancy gate lifts; fixing it now avoids shipping
an ambiguous format.

Fix: insert sqsPartitionedQueueTerminator pipe-byte between the two segments.
That byte is outside RawURLEncoding's alphabet so neither segment can
contribute one of its own — the boundary becomes unambiguous regardless of
input length. Mirrors how sqsPartitionedMsgDataKey, MsgVisKey, MsgGroupKey,
and MsgByAgeKey already use the same terminator at the queue/partition
boundary.

Caller audit: sqsPartitionedMsgDedupKey is reached only via
sqsMsgDedupKeyDispatch, which has exactly two production callers —
loadFifoDedupRecord (read-side) and sendFifoMessage (write-side, via the
key returned from the same loadFifoDedupRecord call). Read and write share
one helper invocation, so the new format takes effect symmetrically with
no read/write skew window. parsePartitionedSQSKey only inspects the key
through the partition field and stops; the new terminator after groupID
does not affect parsing.

Note on dormancy: partitioned queues are still gated at CreateQueue
(§11 PR 2 dormancy gate, lifted atomically with the capability check in
PR 5b-3), so no production data carries the old format. This is purely
a forward-format correction.

Test: TestSqsPartitionedMsgDedupKey_GroupDedupSeparator covers
  - the empty-segment collision class (the most blatant ambiguity),
  - the new terminator's exact placement (key ends with the
    b64(group)+pipe+b64(dedup) suffix, proving the byte sits at the boundary),
  - non-empty pair non-collision (("ab", "cd") vs ("abcd", "")),
  - read-write symmetry through sqsMsgDedupKeyDispatch (the dispatch
    helper produces the same bytes as the underlying constructor, so
    a regression that diverged the two would fail here).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 3, 2026

@claude review

Round 6 — addressed CodeRabbit 🟠 Major: dedup key segment-boundary ambiguity.

sqsPartitionedMsgDedupKey was appending encodeSQSSegment(groupID) and encodeSQSSegment(dedupID) back-to-back. encodeSQSSegment uses base64.RawURLEncoding (no padding, alphabet [A-Za-z0-9_-]), so the boundary is variable-length and ambiguous: distinct (group, dedup) pairs can collapse onto the same byte sequence — most blatantly when one is empty (("", "abcd") and ("abcd", "") both produce "...QUJDRA"). This is the on-disk format PR 5b-3 will rely on, so it had to be fixed before that PR.

Fix: insert sqsPartitionedQueueTerminator | between the two segments. | is outside RawURLEncoding's alphabet so neither segment can contribute one of its own — boundary unambiguous regardless of input length. Mirrors how sqsPartitionedMsgDataKey, MsgVisKey, MsgGroupKey, and MsgByAgeKey already use the same terminator at the queue/partition boundary.

Caller audit: sqsPartitionedMsgDedupKey is reached only through sqsMsgDedupKeyDispatch, which has exactly two production callers — loadFifoDedupRecord (read-side) and sendFifoMessage (write-side, using the key returned from the same loadFifoDedupRecord call). Read and write share one helper invocation, so the new format takes effect symmetrically — no read/write skew window. parsePartitionedSQSKey only inspects the key through the partition field and stops; the new terminator after groupID does not affect parsing.

Dormancy: partitioned queues are still gated at CreateQueue (§11 PR 2, lifted atomically with the capability check in PR 5b-3), so no production data carries the old format. Pure forward-format correction.

Test (TestSqsPartitionedMsgDedupKey_GroupDedupSeparator):

  • Empty-segment collision class (the most blatant ambiguity)
  • New terminator exact placement (key ends with b64(group)|b64(dedup))
  • Non-empty pair non-collision (("ab", "cd") vs ("abcd", ""))
  • Read-write symmetry through the dispatch helper

Build, golangci-lint, and the targeted dedup/dispatch/partitioned-FIFO test sweep all clean under -race. CodeRabbit's other finding on the test (🟡 Minor on the post-delete check comment) is below the loop threshold and not addressed in this round.

@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Claude finished @bootjp's task in 2m 30s —— View job


Code Review: Round 6 — Dedup key segment-boundary fix

  • Gather PR context and understand Round 6 changes
  • Read adapter/sqs_keys.gosqsPartitionedMsgDedupKey fix
  • Read adapter/sqs_keys_dispatch_test.goTestSqsPartitionedMsgDedupKey_GroupDedupSeparator
  • Read adapter/sqs_keys_dispatch.gosqsMsgDedupKeyDispatch caller
  • Verify segment terminator placement and ambiguity closure
  • Verify parsePartitionedSQSKey is unaffected
  • Review test coverage for the new boundary cases
  • Post complete review

Overall assessment

The Round 6 fix is correct, minimal, and the right shape. The sqsPartitionedQueueTerminator insertion is the natural fix for the ambiguity — it mirrors how every other partitioned key constructor already uses | to delimit variable-length segments. The tests are well-targeted and directly pin the failure mode. One nit remains from prior rounds (not introduced by Round 6). The deferred 🟡 Minor is acknowledged and below threshold. Ready to merge.


Correctness: sqsPartitionedMsgDedupKey (adapter/sqs_keys.go:343)

buf = append(buf, encodeSQSSegment(groupID)...)
buf = append(buf, sqsPartitionedQueueTerminator)  // ← Round 6 addition
buf = append(buf, encodeSQSSegment(dedupID)...)

Ambiguity closureencodeSQSSegment uses base64.RawURLEncoding (no padding, no =). The RawURLEncoding alphabet is A-Za-z0-9-_; | (0x7C) is not in it. Neither segment can contribute a | byte of its own, so the terminator uniquely marks the segment boundary regardless of input length. The ("", "abcd") / ("abcd", "") collision class, and the length-mod-3 boundary cases for non-empty inputs, are all closed. ✅

Mirrors existing conventionssqsPartitionedMsgDataKey, MsgVisKey, MsgGroupKey, MsgByAgeKey all use the same sqsPartitionedQueueTerminator after the queue name. This is now the uniform pattern. ✅

Read-write symmetryloadFifoDedupRecord (sqs_fifo.go:99) calls sqsMsgDedupKeyDispatch and returns (rec, key, err). sendFifoMessage (sqs_fifo.go:186) gets back that same dedupKey and uses it for both ReadKeys (OCC read-guard) and the Put write (sqs_fifo.go:242,247). Both sides use the same single helper invocation — no skew window. ✅

parsePartitionedSQSKey unaffected — The resolver parses prefix → queue segment → | (terminator) → 4 bytes for partition. It stops there. The gen, groupID, |, and dedupID appended after the partition are never parsed by the resolver. The Round 6 terminator is beyond its stopping point. ✅

Dormancy safetyCreateQueue still rejects PartitionCount > 1 via validatePartitionDormancyGate. No production data was ever written in the old (ambiguous) format. The key format change is purely forward-only. ✅


Test quality: TestSqsPartitionedMsgDedupKey_GroupDedupSeparator

Four targeted assertions, each pinning a distinct failure mode:

Assertion What regression it catches
("", "abcd")("abcd", "") Empty-segment collision — the most trivial ambiguity; fails without the terminator
`bytes.HasSuffix(key, b64(groupA)+" "+b64(dedup-token))`
("ab", "cd")("abcd", "") Length-aligned non-empty collision — covers the length-mod-3 boundary cases
sqsMsgDedupKeyDispatch(meta, ...) == sqsPartitionedMsgDedupKey(...) Read-write symmetry via dispatch helper — prevents dispatch helper diverging from underlying constructor

The existing TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId (from Round 3) remains in place and still passes, providing orthogonal coverage: it asserts that two distinct groupID values on the same (queue, partition, dedupID) produce different keys. ✅


Minor observations (not blocking)

1. Comment block length and PR reference (sqs_keys.go:334–342)

The 9-line inline comment includes "CodeRabbit major, PR #732 round 6". Per CLAUDE.md: "one short line max" and "Don't reference the current task, fix, or callers... since those belong in the PR description and rot as the codebase evolves." Same pre-existing pattern as rounds 3–5; not introduced by Round 6 and not blocking.

2. Deferred 🟡 Minor: misleading post-delete comment (sqs_partitioned_dispatch_test.go:200–202)

Explicitly deferred by the trigger comment ("below the loop threshold"). The claim that sleeping 1100ms will re-expose drain-phase invisible messages is incorrect (drain uses VTO=60), but the direct keyspace scan at lines 183-198 is the definitive correctness guard — the comment overstates the API receive's coverage. Not blocking; acknowledged.


Correctness table (Round 6)

Contract Before Round 6 After Round 6
(groupID, dedupID) → unique dedup key Ambiguous: variable-length segments back-to-back, ("", "X") collides with ("X", "") Unambiguous: `
parsePartitionedSQSKey Stops at partition field Unaffected — new terminator is past its stopping point ✅
Legacy keyspace Unchanged Unchanged ✅
On-disk format for existing queues N/A (partitioned queues gated at CreateQueue) N/A — dormancy gate holds ✅

Verdict: Ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5047223acc

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_fifo.go
// dedup key so that two groups colliding onto the same partition keep
// disjoint dedup namespaces (the AWS messageGroup-scope contract).
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, groupID, dedupID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reap partitioned dedup records to prevent key growth

loadFifoDedupRecord now resolves dedup keys via sqsMsgDedupKeyDispatch, which routes PartitionCount > 1 queues to the partitioned dedup prefix, but the dedup reaper still scans only the legacy SqsMsgDedupPrefix + encodeSQSSegment(queueName) range in adapter/sqs_reaper.go. Once partitioned FIFO is enabled, expired dedup entries for those queues will never be swept, so high-cardinality MessageDeduplicationId traffic causes unbounded dedup-key accumulation and steadily worse storage/scan overhead.

Useful? React with 👍 / 👎.

@bootjp bootjp merged commit 637e543 into main May 4, 2026
11 checks passed
@bootjp bootjp deleted the feat/sqs-htfifo-dispatch-wiring branch May 4, 2026 06:35
bootjp added a commit that referenced this pull request May 4, 2026
…3.D PR 5b-3) (#734)

## Summary

Phase 3.D PR 5b-3: lift the §11 PR 2 dormancy gate (PartitionCount > 1
hard-rejected at CreateQueue) and replace it with the §8.5 cluster-wide
htfifo capability poll, atomically in one commit. The data plane caught
up in #731 + #732, so partitioned FIFO queues are now creatable
end-to-end.

**What changes**

- **Remove** `validatePartitionDormancyGate` and
`htfifoTemporaryGateMessage` from `adapter/sqs_partitioning.go`. Both
were marked "Removed in PR 5 in the same commit that wires the data
plane so the gate-and-lift land atomically" — this is that commit.
- **Add** `(*SQSServer).validateHTFIFOCapability` in a new
`adapter/sqs_capability_gate.go`, called from `createQueueCore`.
Two-stage fail-closed check on `PartitionCount > 1`:
1. **Local**: this binary must advertise htfifo
(`htfifoCapabilityAdvertised`). Refuses with `InvalidAttributeValue`
otherwise.
2. **Peers**: every entry in `s.leaderSQS` must report htfifo via
`/sqs_health` within the poller's per-peer timeout. Any timeout, HTTP
error, malformed body, or missing capability blocks the create.

Vacuous on `PartitionCount <= 1` and on empty `leaderSQS` (single-node
cluster — the local check is the whole cluster).
- **Helpers**: `collectSQSPeers` returns `leaderSQS` values in
deterministic sorted order with empty/duplicate addresses filtered.
`buildHTFIFOCapabilityRejection` composes the rejection message with
each failing peer's address + reason so an operator triaging a
partial-rolling-upgrade cluster does not need to re-run the poll
out-of-band.

**CreateQueue control flow on `PartitionCount > 1`**

```
schema validators (validatePartitionConfig, etc.)
→ validateHTFIFOCapability
    → htfifoCapabilityAdvertised check (local)
    → PollSQSHTFIFOCapability(ctx, collectSQSPeers(), …)
    → reject with InvalidAttributeValue on any failure
→ createQueueWithRetry
```

**Caller audit:** `validateHTFIFOCapability` has exactly one production
caller (`createQueueCore` in `sqs_catalog.go`); both the JSON handler
and the future query-protocol handler reach it through that one path.
`SetQueueAttributes` is unaffected because `PartitionCount` is immutable
post-create (`validatePartitionImmutability`).

**Test changes**

- Delete `TestValidatePartitionDormancyGate_RejectsAboveOne` (function
gone).
- Convert `TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate` →
`TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode`: the same wire
payloads now SUCCEED because the local node advertises htfifo and there
are no peers to poll.
- Rename `TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne` →
`TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne` for
consistency.
- Refresh comments on related tests + helpers.

**New unit tests** (`sqs_capability_gate_test.go`):
- `TestValidateHTFIFOCapability_ShortCircuitsOnLegacyMeta`
- `TestValidateHTFIFOCapability_AcceptsWhenAllPeersAdvertise`
- `TestValidateHTFIFOCapability_AcceptsOnEmptyPeerList`
- `TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability`
- `TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable`
- `TestCollectSQSPeers_Deterministic`
- `TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage`

## Self-review (CLAUDE.md)

1. **Data loss** — None. The gate strictly tightens CreateQueue
acceptance vs. the previous dormancy reject; no path now accepts a write
that would have been rejected before. The dormancy gate's invariant
("partitioned-shape meta cannot land on a binary that does not handle
the partitioned keyspace") is preserved by the local
`htfifoCapabilityAdvertised` check and strengthened by the cluster-wide
poll.
2. **Concurrency / distributed failures** — Poll runs concurrently
across peers via the existing `PollSQSHTFIFOCapability` helper (covered
by its own race tests). `collectSQSPeers` + sort are pure /
deterministic. `leaderSQS` is only mutated at SQSServer construction
(`WithSQSLeaderMap`), not at request time, so no read/write races.
Leader transitions during the poll are handled by the existing
`proxyToLeader` path that gates `createQueue` before
`validateHTFIFOCapability` runs.
3. **Performance** — Poll cost is O(peers) and only paid on
`PartitionCount > 1` creates (rare control-plane operation). Legacy /
single-partition CreateQueue calls pay one short-circuit branch.
`collectSQSPeers`' sort is O(N log N) on a small N (cluster size). No
hot-path impact.
4. **Data consistency** — Schema validators (`PartitionCount` shape,
dedup-scope rule, perMessageGroupId rule) still run BEFORE the
capability gate inside `parseAttributesIntoMeta`, so an invalid shape
rejects with the schema's reason rather than the gate's.
SetQueueAttributes immutability remains the guard for post-create
partition-shape changes.
5. **Test coverage** — Gate function: 5 unit tests covering
short-circuit, happy path, vacuous empty, rolling-upgrade, and
unreachable-peer classes. Helpers: 2 unit tests pinning deterministic
order and message shape. Wire-level: existing HT-FIFO integration tests
carry forward, with the dormancy-reject test converted to the new
accepts-on-single-node happy path.

## Test plan
- [x] `make lint` — golangci-lint 0 issues
- [x] Targeted: `TestValidateHTFIFOCapability* | TestCollectSQSPeers |
TestBuildHTFIFOCapabilityRejection | TestSQSServer_HTFIFO* |
TestValidatePartition*` (-race, clean)
- [x] Wider regression sweep on CreateQueue / DeleteQueue / PurgeQueue /
SetQueueAttributes / Send/Receive/Delete / PartitionedFIFO / dispatch
(-race, clean)
- [ ] CI: full Jepsen + race on the PR


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Cluster-wide capability validation for creating partitioned FIFO
queues—creation now requires peers to advertise HT‑FIFO.
* Client-facing rejection messages are sanitized; detailed per-peer
diagnostics are recorded in server logs only.

* **Bug Fixes**
* Partitioned FIFO queues can be created on single-node clusters (no
longer rejected).

* **Tests**
* Added comprehensive tests for the capability gate, peer collection,
deterministic behavior, and server-side log formatting.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
bootjp added a commit that referenced this pull request May 4, 2026
…D PR 6a) (#735)

## Summary

Phase 3.D PR 6a: tombstone reaper enumerates partitioned keyspace.
Closes the Codex P2 from PR #732 round 0 ("Reap partitioned dedup
records to prevent key growth"), deferred at the time as a "PR 6
concern". With PR #734 (5b-3) lifting the dormancy gate, partitioned
FIFO queues can now land in production — but until this PR the tombstone
reaper only swept the legacy keyspace, so partitioned data / vis / byage
/ dedup / group rows leaked permanently after DeleteQueue / PurgeQueue.

## What changes

- **Tombstone schema**: tombstone value now encodes `PartitionCount` so
the reaper can drive partition iteration off the tombstone alone (the
meta record is gone by the time the reaper observes the tombstone). 8
bytes big-endian uint64. Backward compat: `PartitionCount<=1` keeps the
byte-identical legacy `[]byte{1}` sentinel; non-canonical values (empty,
1-byte, 4-byte, out-of-range 8-byte) decode to `PartitionCount=1` so a
partial rollback or future encoding revision degrades safely to the
legacy single-partition reap.

- **DeleteQueue + PurgeQueue** write the encoded value, reading
`PartitionCount` from the pre-delete / pre-purge meta (immutable across
SetQueueAttributes / PurgeQueue per §3.2).

- **Tombstone reaper**: `reapTombstonedGeneration` takes the decoded
count, sweeps the legacy keyspace (always — covers pre-HT-FIFO queues
plus a defensive pass) and on `PartitionCount > 1` ALSO sweeps every
partition's byage / dedup / group prefix family via the new
`reapPartitionedGeneration` helper. `allDone` gating delays tombstone
deletion until every partition is fully drained.

- **Per-record dispatch**: `reapOneRecord` and `buildReapOps` now take
`*sqsQueueMeta` + `partition uint32` instead of hardcoding `nil + 0`.
Legacy callers pass `nil + 0` (byte-identical to pre-PR-5b layout); the
partitioned twin `reapOneRecordPartitioned` synthesises a meta carrying
`PartitionCount > 1` so the dispatch helpers route to
`sqsPartitionedMsg{Data,Vis,Group}Key`.

- **New parsed-key helpers** in `sqs_keys.go`:
  - `sqsPartitionedMsgByAgePrefixForPartition(queue, partition, gen)`
  - `sqsPartitionedMsgDedupKeyPrefix(queue, partition, gen)`
  - `sqsPartitionedMsgGroupKeyPrefix(queue, partition, gen)`

## Out of scope (follow-up PR 6b)

- Live-queue retention reap (`reapQueue`) still iterates only the legacy
byage prefix; partitioned-byage live retention is the next slice.
- `PurgeQueue` / `DeleteQueue` do not iterate partition keyspace
synchronously; the tombstone-driven async sweep is sufficient for
AWS-shaped semantics.

## Caller audit

- `encodeQueueTombstoneValue` — 2 production callers (DeleteQueue,
PurgeQueue), both pass the queue's `PartitionCount`.
- `decodeQueueTombstoneValue` — 1 production caller (reaper); legacy /
out-of-range fallback to 1 documented and tested.
- `reapOneRecord` (signature changed) — 3 production callers:
`reapDeadByAgePage` (legacy), `reapDeadByAgePartitionPage` (via
`reapOneRecordPartitioned`, partitioned), `reapQueue`'s over-retention
sweep (legacy). All updated; partitioned path uses the synthetic meta to
flip the dispatch helper branch.
- New helpers (`reapPartitionedGeneration`, `reapDeadByAgePartition`,
`reapDeadByAgePartitionPage`) each have exactly one caller in the new
tombstone-reap path.

## Self-review (CLAUDE.md)

1. **Data loss** — Closes the Codex P2 leak; no path now leaves
partitioned records orphaned after DeleteQueue/PurgeQueue. Legacy queues
unchanged: tombstone value, reaper sweep prefixes, and dispatch helpers
all keep the byte-identical pre-PR-6a behaviour for `PartitionCount<=1`.
2. **Concurrency / distributed failures** — Reaper still runs only on
the leader (existing `reaperCtx`). Tombstone read is at the reaper's
snapshot `readTS`; every individual reap is its own OCC dispatch with
`ErrWriteConflict` treated as success. Per-partition iteration is
sequential; no new shared state.
3. **Performance** — Per-tick cost grows from O(legacy prefix scans) to
O((partition_count + 1) * legacy prefix scans). Capped by
`sqsReaperPerQueueBudget` per partition. `PartitionCount=1` queues see
no extra cost (partitioned branch short-circuits).
4. **Data consistency** — Tombstone deletion gated on full-drain across
all partition + legacy prefix families. Partial drains leave the
tombstone in place so the next reaper tick can resume. `PartitionCount`
immutability invariant means the tombstone-encoded count and the
reaper's iteration bound can never disagree with the live queue.
5. **Test coverage** — 4 unit tests on the tombstone-value codec (legacy
roundtrip, partitioned encode, fallback for non-canonical values,
out-of-range clamp). 1 wire-level integration test that creates an
8-partition queue, sends to 6 distinct groups, calls DeleteQueue,
hand-runs `reapTombstonedQueues`, and asserts every partitioned data /
vis / byage / dedup / group prefix is empty + the tombstone itself is
deleted.

## Test plan
- [x] `make lint` — golangci-lint 0 issues
- [x] Targeted: tombstone-value codec +
DeleteQueue/PurgeQueue/Reaper/HTFIFO/PartitionedFIFO suites (-race,
clean)
- [x] Wider regression sweep on CreateQueue / DeleteQueue / PurgeQueue /
Send/Receive/Delete / dispatch / capability gate (-race, clean)
- [ ] CI: full Jepsen + race on the PR


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

* **Bug Fixes**
* Improved queue deletion cleanup to properly enumerate and fully remove
all partitions from deleted queues, ensuring complete data removal.
* Enhanced cleanup logic to correctly handle both single and
multi-partition queue configurations.

* **Tests**
* Added comprehensive tests for tombstone reaper functionality,
including partition-aware cleanup verification and edge case handling.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
bootjp added a commit that referenced this pull request May 4, 2026
….D PR 6b) (#736)

## Summary

Phase 3.D PR 6b: live-queue reaper enumerates partitioned keyspace.
Follow-up to PR #735 (6a) — the tombstone-driven sweep already walks
partitioned data on DeleteQueue / PurgeQueue, but the live-queue
retention reaper still only saw the legacy keyspace, so:

- retention-expired messages on partitioned queues leaked their data /
vis / byage / group rows forever (`reapQueue` walked
`sqsMsgByAgePrefixAllGenerations` only),
- expired dedup records on partitioned FIFO queues leaked forever
(`reapExpiredDedup` scanned `SqsMsgDedupPrefix` only — empty for
partitioned queues since `sqsMsgDedupKeyDispatch` routes their writes
under `SqsPartitionedMsgDedupPrefix`).

Closes the live-queue half of the Codex P2 from PR #732 round 0; PR 6a
covered the tombstoned-cohort half.

## What changes

- **`reapQueue`**: legacy byage walk extracted as `reapQueueLegacy`
(byte-identical to pre-PR-6b for non-partitioned queues). Adds
`reapQueuePartition` step that runs once per partition for
`PartitionCount > 1` queues. Per-partition budget per the §6 design
("partitions × budget per cycle"); 30s tick interval comfortably
absorbs.

- **`reapPartitionedPage`**: partitioned twin of `reapPage`. Same
live-vs-orphan classification, but parses each entry with
`parseSqsPartitionedMsgByAgeKey` and routes the dispatch through
`reapOneRecordPartitioned`.

- **`classifyPartitionedByAgeEntry`**: helper extracted from
`reapPartitionedPage` so the loop body stays under the cyclop ceiling.
Returns `(parsedKey, reapable bool)`.

- **`reapExpiredDedup`** (signature changed): now takes `*sqsQueueMeta`
and routes by `PartitionCount`. Legacy meta → `reapExpiredDedupLegacy`
(byte-identical). Partitioned meta → `reapExpiredDedupPartitioned`
(NEW), iterates each partition's dedup prefix under its own
per-partition budget.

## Caller audit

- `reapQueue` — one production caller (`reapAllQueues`); signature
unchanged. Non-partitioned queues byte-identical; partitioned get the
extra per-partition pass.
- `reapExpiredDedup` — signature changed to take `*sqsQueueMeta`; one
production caller (`reapAllQueues`), updated. No tests called it
directly.
- New helpers (`reapQueueLegacy` / `reapQueuePartition` /
`reapPartitionedPage` / `reapExpiredDedupLegacy` /
`reapExpiredDedupPartitioned` / `classifyPartitionedByAgeEntry`) each
have exactly one production caller in the new live-queue reap path.
- `reapOneRecordPartitioned` (existing PR 6a helper): previously called
from `reapDeadByAgePartitionPage` (tombstone path); now also from
`reapPartitionedPage` (live-queue path). Same dispatch semantics.

## Tests

- New `TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions`:
4-partition queue, send across 6 distinct groups, backdate every
partitioned dedup record's `ExpiresAtMillis`, run `reapAllQueues`,
assert every partitioned dedup row across `[0, 4)` is gone. Pre-PR-6b
reaper would leave every row in place.

## Self-review (CLAUDE.md)

1. **Data loss** — Closes the live-queue dedup leak + partitioned
retention-expired-message leak. Legacy queues unchanged.
2. **Concurrency / distributed failures** — Reaper still runs only on
the leader. Per-partition pass is sequential; per-partition budget
bounds the pass. OCC semantics on each record reap unchanged.
3. **Performance** — Per-tick partitioned-queue cost grows from O(1
walk) to O(partition_count walks) on byage AND dedup. Each partition
bounded by `sqsReaperPerQueueBudget`. 30s tick interval comfortably
absorbs 32-partition × per-queue budget per design.
4. **Data consistency** — Live-vs-orphan classification on partitioned
byage mirrors the legacy branch exactly (`reapPage` /
`reapPartitionedPage` share the rules through
`classifyPartitionedByAgeEntry`). `PartitionCount` immutability means
the meta-driven iteration bound matches the on-disk keys.
5. **Test coverage** — One new wire-level integration test for the
partitioned dedup walk; the partitioned byage walk reuses parsing /
dispatch helpers already covered by PR 6a's tombstone-reap integration
test.

## Test plan
- [x] `make lint` — 0 issues
- [x] Targeted reaper / retention / dedup / HTFIFO / PartitionedFIFO
suites (-race, clean)
- [x] Wider regression on Send/Receive/Delete +
CreateQueue/DeleteQueue/PurgeQueue (-race, clean)
- [ ] CI: full Jepsen + race


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Bug Fixes**
* Fixed deduplication record cleanup in partitioned FIFO queues to
properly remove expired records across all partitions.
* Enhanced cleanup mechanism to correctly handle partition-aware
behavior and prevent memory leaks in partitioned queue scenarios.

* **Tests**
* Added comprehensive test for deduplication record cleanup across
partitioned FIFO queue partitions.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant