From 7910be945a96c86d05f673892c0a3ddc924b3cce Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Tue, 16 Jun 2026 21:06:54 +0100 Subject: [PATCH] Revert "Merge pull request #122 from WithAutonomi/fix/snp-put-initial-witness-close-group" This reverts commit dcaca42132bd37f27d09e78e2752efded539a665, reversing changes made to 76a0c122b616006a9262a490ef0bed46ad21af05. --- ant-core/src/data/client/quote.rs | 416 ++++++++++-------------------- 1 file changed, 134 insertions(+), 282 deletions(-) diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 11796bd6..18466e08 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -201,12 +201,6 @@ struct WitnessedQuotePeer { voters: HashSet, } -#[derive(Debug, Clone)] -struct WitnessedQuoteSelection { - quote_peers: Vec, - initial_put_peers: Vec<(PeerId, Vec)>, -} - enum QuoteSelectionPolicy { ClosestByDistance, WitnessedMedianVoters { voters_by_peer: VotersByPeer }, @@ -312,9 +306,13 @@ fn witnessed_consensus_candidates( .collect::>(); candidates.sort_by(|left, right| { - peer_xor_distance(&left.node.peer_id, address) - .cmp(&peer_xor_distance(&right.node.peer_id, address)) - .then_with(|| right.votes.cmp(&left.votes)) + right + .votes + .cmp(&left.votes) + .then_with(|| { + peer_xor_distance(&left.node.peer_id, address) + .cmp(&peer_xor_distance(&right.node.peer_id, address)) + }) .then_with(|| { left.node .peer_id @@ -360,12 +358,12 @@ fn witnessed_close_group_diagnostics( ) } -fn witnessed_quote_selection_or_error( +fn witnessed_quote_peers_or_error( address: &[u8; 32], witnessed: &WitnessedCloseGroup, required: usize, quorum: usize, -) -> Result { +) -> Result> { let candidates = witnessed_consensus_candidates(witnessed, address, quorum); if candidates.len() < required { return Err(Error::InsufficientPeers(format!( @@ -376,35 +374,14 @@ fn witnessed_quote_selection_or_error( ))); } - let initial_put_peers = witnessed - .initial_closest - .iter() - .take(CLOSE_GROUP_SIZE) - .map(|node| (node.peer_id, node.addresses_by_priority())) - .collect::>(); - - if initial_put_peers.len() < CLOSE_GROUP_SIZE { - return Err(Error::InsufficientPeers(format!( - "Witnessed close group returned only {}/{} initial PUT peers before payment. {}", - initial_put_peers.len(), - CLOSE_GROUP_SIZE, - witnessed_close_group_diagnostics(address, witnessed, quorum) - ))); - } - - let quote_peers = candidates + Ok(candidates .into_iter() .map(|candidate| WitnessedQuotePeer { peer_id: candidate.node.peer_id, addrs: candidate.node.addresses_by_priority(), voters: candidate.voters, }) - .collect(); - - Ok(WitnessedQuoteSelection { - quote_peers, - initial_put_peers, - }) + .collect()) } pub(crate) fn median_paid_quote_issuer( @@ -544,7 +521,6 @@ fn select_witnessed_median_voter_quotes( fn put_peers_with_median_voters_first( quotes: &[StoreQuote], - put_peers: &[(PeerId, Vec)], voters_by_peer: &VotersByPeer, ) -> Option)>> { let (median_peer_id, _) = median_paid_quote_issuer(quotes)?; @@ -552,7 +528,7 @@ fn put_peers_with_median_voters_first( let mut supporting_peers = Vec::new(); let mut fallback_peers = Vec::new(); - for (peer_id, addrs) in put_peers { + for (peer_id, addrs, _, _) in quotes { let peer = (*peer_id, addrs.clone()); if voters.contains(peer_id) { supporting_peers.push(peer); @@ -609,18 +585,15 @@ impl Client { data_size: u64, data_type: u32, ) -> Result { - let witnessed_selection = self.select_witnessed_quote_selection(address).await?; - let voters_by_peer: VotersByPeer = witnessed_selection - .quote_peers + let witnessed_peers = self.select_witnessed_quote_peers(address).await?; + let voters_by_peer: VotersByPeer = witnessed_peers .iter() .map(|peer| (peer.peer_id, peer.voters.clone())) .collect(); - let remote_peers = witnessed_selection - .quote_peers + let remote_peers = witnessed_peers .into_iter() .map(|peer| (peer.peer_id, peer.addrs)) .collect(); - let initial_put_peers = witnessed_selection.initial_put_peers; let quotes = self .collect_store_quotes_from_remote_peers( address, @@ -633,16 +606,15 @@ impl Client { ) .await?; let put_peers = - put_peers_with_median_voters_first("es, &initial_put_peers, &voters_by_peer) - .ok_or_else(|| { - Error::InsufficientPeers(format!( - "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \ - voted for the paid median issuer for {}", - quotes.len(), - witnessed_median_voter_quorum(), - hex::encode(address) - )) - })?; + put_peers_with_median_voters_first("es, &voters_by_peer).ok_or_else(|| { + Error::InsufficientPeers(format!( + "Collected {} witnessed quotes, but fewer than {} \ + selected PUT peers voted for the paid median issuer for {}", + quotes.len(), + witnessed_median_voter_quorum(), + hex::encode(address) + )) + })?; Ok(StoreQuotePlan { quotes, put_peers }) } @@ -675,10 +647,10 @@ impl Client { .await } - async fn select_witnessed_quote_selection( + async fn select_witnessed_quote_peers( &self, address: &[u8; 32], - ) -> Result { + ) -> Result> { let required = single_node_quote_query_count(); let quorum = witnessed_close_group_quorum(); let witnessed = self @@ -707,7 +679,7 @@ impl Client { "Witnessed close group selected for SNP quote collection" ); - witnessed_quote_selection_or_error(address, &witnessed, required, quorum) + witnessed_quote_peers_or_error(address, &witnessed, required, quorum) } #[allow(clippy::too_many_lines)] @@ -739,9 +711,71 @@ impl Client { let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs); let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS); - // Collect quote responses. SNP/witnessed collection deliberately tries - // the closest witnessed peers first and only falls back to further - // witnessed peers when a closer peer fails to produce a usable quote. + // Request quotes from all peers concurrently + let mut quote_futures = FuturesUnordered::new(); + + for (peer_id, peer_addrs) in &remote_peers { + let request_id = self.next_request_id(); + let request = ChunkQuoteRequest { + address: *address, + data_size, + data_type, + }; + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::QuoteRequest(request), + }; + + let message_bytes = match message.encode() { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to encode quote request for {peer_id}: {e}"); + continue; + } + }; + + let peer_id_clone = *peer_id; + let addrs_clone = peer_addrs.clone(); + let node_clone = node.clone(); + + let quote_future = async move { + let result = send_and_await_chunk_response( + &node_clone, + &peer_id_clone, + message_bytes, + request_id, + per_peer_timeout, + &addrs_clone, + |body| match body { + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { + quote, + already_stored, + }) => Some(classify_quote_response( + &peer_id_clone, + "e, + already_stored, + )), + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err( + Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")), + )), + _ => None, + }, + |e| { + Error::Network(format!( + "Failed to send quote request to {peer_id_clone}: {e}" + )) + }, + || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")), + ) + .await; + + (peer_id_clone, addrs_clone, result) + }; + + quote_futures.push(quote_future); + } + + // Collect all responses with an overall timeout to prevent indefinite stalls. let mut quotes = Vec::with_capacity(peer_query_count); let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new(); let mut failures: Vec = Vec::new(); @@ -752,210 +786,46 @@ impl Client { // network-broken) and the user benefits from seeing them called out. let mut bad_quote_count = 0usize; - let staged_witnessed_collection = matches!( - "e_selection_policy, - QuoteSelectionPolicy::WitnessedMedianVoters { .. } - ); - - if staged_witnessed_collection { - let collect_result: std::result::Result<(), Error> = - tokio::time::timeout(overall_timeout, async { - for (peer_id, peer_addrs) in &remote_peers { - if quotes.len() >= CLOSE_GROUP_SIZE { - break; + let collect_result: std::result::Result, _> = + tokio::time::timeout(overall_timeout, async { + while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { + match quote_result { + Ok((quote, price)) => { + quotes.push((peer_id, addrs, quote, price)); } - - let request_id = self.next_request_id(); - let request = ChunkQuoteRequest { - address: *address, - data_size, - data_type, - }; - let message = ChunkMessage { - request_id, - body: ChunkMessageBody::QuoteRequest(request), - }; - - let message_bytes = match message.encode() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to encode quote request for {peer_id}: {e}"); - failures.push(format!("{peer_id}: encode failed: {e}")); - continue; - } - }; - - let quote_result = send_and_await_chunk_response( - node, - peer_id, - message_bytes, - request_id, - per_peer_timeout, - peer_addrs, - |body| match body { - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { - quote, - already_stored, - }) => { - Some(classify_quote_response(peer_id, "e, already_stored)) - } - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { - Some(Err(Error::Protocol(format!( - "Quote error from {peer_id}: {e}" - )))) - } - _ => None, - }, - |e| { - Error::Network(format!( - "Failed to send quote request to {peer_id}: {e}" - )) - }, - || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")), - ) - .await; - - match quote_result { - Ok((quote, price)) => { - quotes.push((*peer_id, peer_addrs.clone(), quote, price)); - } - Err(Error::AlreadyStored) => { - info!("Peer {peer_id} reports chunk already stored"); - let dist = peer_xor_distance(peer_id, address); - already_stored_peers.push((*peer_id, dist)); - } - Err(e) => { - if matches!(&e, Error::BadQuoteBinding { .. }) { - bad_quote_count += 1; - } - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); - } + Err(Error::AlreadyStored) => { + info!("Peer {peer_id} reports chunk already stored"); + let dist = peer_xor_distance(&peer_id, address); + already_stored_peers.push((peer_id, dist)); } - } - Ok(()) - }) - .await - .unwrap_or_else(|_elapsed| { - warn!( - "Quote collection timed out after {overall_timeout:?} for address {}", - hex::encode(address) - ); - Ok(()) - }); - - collect_result?; - } else { - // Merkle preflight keeps the previous behaviour: query the full - // over-query set concurrently because those quote responses are - // only used as an already-stored probe. - let mut quote_futures = FuturesUnordered::new(); - - for (peer_id, peer_addrs) in &remote_peers { - let request_id = self.next_request_id(); - let request = ChunkQuoteRequest { - address: *address, - data_size, - data_type, - }; - let message = ChunkMessage { - request_id, - body: ChunkMessageBody::QuoteRequest(request), - }; - - let message_bytes = match message.encode() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to encode quote request for {peer_id}: {e}"); - continue; - } - }; - - let peer_id_clone = *peer_id; - let addrs_clone = peer_addrs.clone(); - let node_clone = node.clone(); - - let quote_future = async move { - let result = send_and_await_chunk_response( - &node_clone, - &peer_id_clone, - message_bytes, - request_id, - per_peer_timeout, - &addrs_clone, - |body| match body { - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { - quote, - already_stored, - }) => Some(classify_quote_response( - &peer_id_clone, - "e, - already_stored, - )), - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { - Some(Err(Error::Protocol(format!( - "Quote error from {peer_id_clone}: {e}" - )))) - } - _ => None, - }, - |e| { - Error::Network(format!( - "Failed to send quote request to {peer_id_clone}: {e}" - )) - }, - || { - Error::Timeout(format!( - "Timeout waiting for quote from {peer_id_clone}" - )) - }, - ) - .await; - - (peer_id_clone, addrs_clone, result) - }; - - quote_futures.push(quote_future); - } - - let collect_result: std::result::Result, _> = - tokio::time::timeout(overall_timeout, async { - while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { - match quote_result { - Ok((quote, price)) => { - quotes.push((peer_id, addrs, quote, price)); - } - Err(Error::AlreadyStored) => { - info!("Peer {peer_id} reports chunk already stored"); - let dist = peer_xor_distance(&peer_id, address); - already_stored_peers.push((peer_id, dist)); - } - Err(e) => { - if matches!(&e, Error::BadQuoteBinding { .. }) { - bad_quote_count += 1; - } - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); + Err(e) => { + // Count bad-binding peers separately (typed + // variant — no string sniffing). Treat as a + // normal failure for InsufficientPeers reporting. + if matches!(&e, Error::BadQuoteBinding { .. }) { + bad_quote_count += 1; } + warn!("Failed to get quote from {peer_id}: {e}"); + failures.push(format!("{peer_id}: {e}")); } } - Ok(()) - }) - .await; - - match collect_result { - Err(_elapsed) => { - warn!( - "Quote collection timed out after {overall_timeout:?} for address {}", - hex::encode(address) - ); - // Fall through to check if we have enough quotes despite timeout. - // The timeout fires when slow peers haven't responded yet, but we - // may already have enough successful quotes from fast peers. } - Ok(Err(e)) => return Err(e), - Ok(Ok(())) => {} + Ok(()) + }) + .await; + + match collect_result { + Err(_elapsed) => { + warn!( + "Quote collection timed out after {overall_timeout:?} for address {}", + hex::encode(address) + ); + // Fall through to check if we have enough quotes despite timeout. + // The timeout fires when slow peers haven't responded yet, but we + // may already have enough successful quotes from fast peers. } + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} } // Defensive double-check: the per-peer handler already filters @@ -1173,14 +1043,6 @@ mod tests { .collect() } - fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec)> { - seeds - .iter() - .copied() - .map(|seed| (synthetic_peer(seed), Vec::new())) - .collect() - } - /// Independent re-implementation of the storer-side binding spec /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` + /// `peer_id_from_public_key_bytes`): @@ -1276,7 +1138,7 @@ mod tests { } #[test] - fn witnessed_candidates_sort_by_xor_distance_then_votes() { + fn witnessed_candidates_sort_by_votes_then_xor_distance() { let address = [0u8; 32]; let witnessed = WitnessedCloseGroup { target: address, @@ -1301,8 +1163,8 @@ mod tests { .iter() .map(|candidate| candidate.node.peer_id.as_bytes()[0]) .collect::>(), - vec![1, 9], - "XOR closeness must be the primary sort before quote collection" + vec![9, 1], + "higher vote count must sort ahead of a closer XOR peer" ); } @@ -1319,7 +1181,7 @@ mod tests { responder_views, }; - let err = witnessed_quote_selection_or_error( + let err = witnessed_quote_peers_or_error( &address, &witnessed, CLOSE_GROUP_SIZE, @@ -1357,7 +1219,7 @@ mod tests { ], }; - let selection = witnessed_quote_selection_or_error( + let peers = witnessed_quote_peers_or_error( &address, &witnessed, CLOSE_GROUP_SIZE, @@ -1365,22 +1227,14 @@ mod tests { ) .expect("fallback candidates should be retained for quote collection"); + assert_eq!(peers.len(), CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES); assert_eq!( - selection.quote_peers.len(), - CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES - ); - assert_eq!( - selection - .quote_peers + peers .iter() .map(|peer| peer.peer_id.as_bytes()[0]) .collect::>(), vec![1, 2, 3, 4, 5, 6, 7, 8] ); - assert_eq!( - put_peer_seeds(&selection.initial_put_peers), - vec![1, 2, 3, 4, 5, 6, 7] - ); } #[test] @@ -1479,10 +1333,8 @@ mod tests { synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]), ); - let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]); - let put_peers = - put_peers_with_median_voters_first("es, &put_candidates, &voters_by_peer) - .expect("median voters should produce an ordered PUT set"); + let put_peers = put_peers_with_median_voters_first("es, &voters_by_peer) + .expect("median voters should produce an ordered PUT set"); assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]); let (median_peer_id, _) =