Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/replication/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,18 @@ pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3;

/// Maximum number of concurrent in-flight audit-responder tasks.
///
/// Subtree (round 1) and byte (round 2) challenge handlers are spawned off the
/// serial replication message loop so their disk reads don't stall replication.
/// This caps how many run at once across the engine, restoring backpressure: a
/// peer flooding audit challenges cannot fan out unbounded `get_raw` reads or
/// multi-MiB byte serves. When the cap is hit, the challenge is dropped — the
/// auditor graces a non-response as a timeout, so honest auditors are
/// unaffected and only a flooder is throttled. Sized to cover a handful of
/// concurrent honest auditors (the per-peer gossip-audit cooldown is 30 min, so
/// genuine concurrent audits are few) while bounding the byte round's worst-case
/// resident bytes (`N × MAX_BYTE_CHALLENGE_KEYS × MAX_CHUNK_SIZE`).
pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 8;
/// The responsible-chunk (audit #2), subtree (round 1), and byte (round 2)
/// challenge handlers are all spawned off the serial replication message loop so
/// their disk reads don't stall replication. This caps how many run at once
/// across the engine, restoring backpressure: a peer flooding audit challenges
/// cannot fan out unbounded `get_raw` reads or multi-MiB byte serves. When the
/// cap is hit, the challenge is dropped — the auditor graces a non-response as a
/// timeout, so honest auditors are unaffected and only a flooder is throttled.
/// Sized to cover a handful of concurrent honest auditors (the per-peer
/// gossip-audit cooldown is 30 min, so genuine concurrent audits are few) while
/// bounding the byte round's worst-case resident bytes
/// (`N × MAX_BYTE_CHALLENGE_KEYS × MAX_CHUNK_SIZE`).
pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 16;

/// Maximum concurrent in-flight audit-responder tasks from any SINGLE peer.
///
Expand Down
53 changes: 42 additions & 11 deletions src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,22 +1691,53 @@ async fn handle_replication_message(
)
.await
}
ReplicationMessageBody::AuditChallenge(ref challenge) => {
ReplicationMessageBody::AuditChallenge(challenge) => {
// Responsible-chunk audit (audit #2) responder: answer with per-key
// possession digests. This same handler also answers the
// prune-confirmation audit, which sends the same `AuditChallenge`
// wire message.
//
// Answering digests the stored bytes of every challenged key, so —
// like the subtree/byte audits below — run it on a detached task off
// this serial message loop. Handling it inline lets one challenge
// block all other replication traffic until its digests complete
// (head-of-line blocking). The same flood-fair admission applies: a
// global ceiling AND a per-peer cap, dropping the challenge if either
// is hit (an honest auditor graces a non-response as a timeout, while
// a flooder is held to its per-peer share and cannot starve others).
let Some(guard) =
admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
.await
else {
warn!(
"Audit challenge reply not sent: kind=responsible response=dropped \
source={source} (audit-responder capacity reached)"
);
return Ok(());
};
let bootstrapping = *is_bootstrapping.read().await;
handle_audit_challenge_msg(
source,
challenge,
storage,
p2p_node,
bootstrapping,
msg.request_id,
rr_message_id,
)
.await
let storage = Arc::clone(storage);
let p2p_node = Arc::clone(p2p_node);
let source = *source;
let request_id = msg.request_id;
let rr_message_id = rr_message_id.map(ToOwned::to_owned);
tokio::spawn(async move {
let _guard = guard; // global permit + per-peer slot, held until done
if let Err(e) = handle_audit_challenge_msg(
&source,
&challenge,
&storage,
&p2p_node,
bootstrapping,
request_id,
rr_message_id.as_deref(),
)
.await
{
debug!("Audit challenge from {source} error: {e}");
}
});
Ok(())
}
ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
// Gossip-triggered storage-bound subtree audit (ADR-0002). The
Expand Down
Loading