diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 18466e0..9eca3f4 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -201,6 +201,12 @@ struct WitnessedQuotePeer { voters: HashSet, } +#[derive(Debug, Clone)] +struct WitnessedQuoteSelection { + quote_peers: Vec, + initial_put_peers: Vec<(PeerId, Vec)>, +} + enum QuoteSelectionPolicy { ClosestByDistance, WitnessedMedianVoters { voters_by_peer: VotersByPeer }, @@ -306,13 +312,9 @@ fn witnessed_consensus_candidates( .collect::>(); candidates.sort_by(|left, right| { - right - .votes - .cmp(&left.votes) - .then_with(|| { - peer_xor_distance(&left.node.peer_id, address) - .cmp(&peer_xor_distance(&right.node.peer_id, address)) - }) + peer_xor_distance(&left.node.peer_id, address) + .cmp(&peer_xor_distance(&right.node.peer_id, address)) + .then_with(|| right.votes.cmp(&left.votes)) .then_with(|| { left.node .peer_id @@ -358,12 +360,12 @@ fn witnessed_close_group_diagnostics( ) } -fn witnessed_quote_peers_or_error( +fn witnessed_quote_selection_or_error( address: &[u8; 32], witnessed: &WitnessedCloseGroup, required: usize, quorum: usize, -) -> Result> { +) -> Result { let candidates = witnessed_consensus_candidates(witnessed, address, quorum); if candidates.len() < required { return Err(Error::InsufficientPeers(format!( @@ -374,14 +376,35 @@ fn witnessed_quote_peers_or_error( ))); } - Ok(candidates + let initial_put_peers = witnessed + .initial_closest + .iter() + .take(CLOSE_GROUP_SIZE) + .map(|node| (node.peer_id, node.addresses_by_priority())) + .collect::>(); + + if initial_put_peers.len() < CLOSE_GROUP_SIZE { + return Err(Error::InsufficientPeers(format!( + "Witnessed close group returned only {}/{} initial PUT peers before payment. {}", + initial_put_peers.len(), + CLOSE_GROUP_SIZE, + witnessed_close_group_diagnostics(address, witnessed, quorum) + ))); + } + + let quote_peers = candidates .into_iter() .map(|candidate| WitnessedQuotePeer { peer_id: candidate.node.peer_id, addrs: candidate.node.addresses_by_priority(), voters: candidate.voters, }) - .collect()) + .collect(); + + Ok(WitnessedQuoteSelection { + quote_peers, + initial_put_peers, + }) } pub(crate) fn median_paid_quote_issuer( @@ -439,11 +462,7 @@ fn median_issuer_voter_support( ) -> Option<(PeerId, usize)> { let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?; let voters = voters_by_peer.get(&median_peer_id)?; - let support = indices - .iter() - .filter(|quote_index| voters.contains("es[**quote_index].0)) - .count(); - Some((median_peer_id, support)) + Some((median_peer_id, voters.len())) } fn visit_quote_subsets( @@ -521,6 +540,7 @@ fn select_witnessed_median_voter_quotes( fn put_peers_with_median_voters_first( quotes: &[StoreQuote], + put_peers: &[(PeerId, Vec)], voters_by_peer: &VotersByPeer, ) -> Option)>> { let (median_peer_id, _) = median_paid_quote_issuer(quotes)?; @@ -528,7 +548,7 @@ fn put_peers_with_median_voters_first( let mut supporting_peers = Vec::new(); let mut fallback_peers = Vec::new(); - for (peer_id, addrs, _, _) in quotes { + for (peer_id, addrs) in put_peers { let peer = (*peer_id, addrs.clone()); if voters.contains(peer_id) { supporting_peers.push(peer); @@ -585,15 +605,18 @@ impl Client { data_size: u64, data_type: u32, ) -> Result { - let witnessed_peers = self.select_witnessed_quote_peers(address).await?; - let voters_by_peer: VotersByPeer = witnessed_peers + let witnessed_selection = self.select_witnessed_quote_selection(address).await?; + let voters_by_peer: VotersByPeer = witnessed_selection + .quote_peers .iter() .map(|peer| (peer.peer_id, peer.voters.clone())) .collect(); - let remote_peers = witnessed_peers + let remote_peers = witnessed_selection + .quote_peers .into_iter() .map(|peer| (peer.peer_id, peer.addrs)) .collect(); + let initial_put_peers = witnessed_selection.initial_put_peers; let quotes = self .collect_store_quotes_from_remote_peers( address, @@ -606,15 +629,16 @@ impl Client { ) .await?; let put_peers = - put_peers_with_median_voters_first("es, &voters_by_peer).ok_or_else(|| { - Error::InsufficientPeers(format!( - "Collected {} witnessed quotes, but fewer than {} \ - selected PUT peers voted for the paid median issuer for {}", - quotes.len(), - witnessed_median_voter_quorum(), - hex::encode(address) - )) - })?; + put_peers_with_median_voters_first("es, &initial_put_peers, &voters_by_peer) + .ok_or_else(|| { + Error::InsufficientPeers(format!( + "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \ + voted for the paid median issuer for {}", + quotes.len(), + witnessed_median_voter_quorum(), + hex::encode(address) + )) + })?; Ok(StoreQuotePlan { quotes, put_peers }) } @@ -647,10 +671,10 @@ impl Client { .await } - async fn select_witnessed_quote_peers( + async fn select_witnessed_quote_selection( &self, address: &[u8; 32], - ) -> Result> { + ) -> Result { let required = single_node_quote_query_count(); let quorum = witnessed_close_group_quorum(); let witnessed = self @@ -679,7 +703,7 @@ impl Client { "Witnessed close group selected for SNP quote collection" ); - witnessed_quote_peers_or_error(address, &witnessed, required, quorum) + witnessed_quote_selection_or_error(address, &witnessed, required, quorum) } #[allow(clippy::too_many_lines)] @@ -711,71 +735,9 @@ impl Client { let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs); let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS); - // Request quotes from all peers concurrently - let mut quote_futures = FuturesUnordered::new(); - - for (peer_id, peer_addrs) in &remote_peers { - let request_id = self.next_request_id(); - let request = ChunkQuoteRequest { - address: *address, - data_size, - data_type, - }; - let message = ChunkMessage { - request_id, - body: ChunkMessageBody::QuoteRequest(request), - }; - - let message_bytes = match message.encode() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to encode quote request for {peer_id}: {e}"); - continue; - } - }; - - let peer_id_clone = *peer_id; - let addrs_clone = peer_addrs.clone(); - let node_clone = node.clone(); - - let quote_future = async move { - let result = send_and_await_chunk_response( - &node_clone, - &peer_id_clone, - message_bytes, - request_id, - per_peer_timeout, - &addrs_clone, - |body| match body { - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { - quote, - already_stored, - }) => Some(classify_quote_response( - &peer_id_clone, - "e, - already_stored, - )), - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err( - Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")), - )), - _ => None, - }, - |e| { - Error::Network(format!( - "Failed to send quote request to {peer_id_clone}: {e}" - )) - }, - || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")), - ) - .await; - - (peer_id_clone, addrs_clone, result) - }; - - quote_futures.push(quote_future); - } - - // Collect all responses with an overall timeout to prevent indefinite stalls. + // Collect quote responses. SNP/witnessed collection deliberately tries + // the closest witnessed peers first and only falls back to further + // witnessed peers when a closer peer fails to produce a usable quote. let mut quotes = Vec::with_capacity(peer_query_count); let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new(); let mut failures: Vec = Vec::new(); @@ -786,46 +748,210 @@ impl Client { // network-broken) and the user benefits from seeing them called out. let mut bad_quote_count = 0usize; - let collect_result: std::result::Result, _> = - tokio::time::timeout(overall_timeout, async { - while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { - match quote_result { - Ok((quote, price)) => { - quotes.push((peer_id, addrs, quote, price)); + let staged_witnessed_collection = matches!( + "e_selection_policy, + QuoteSelectionPolicy::WitnessedMedianVoters { .. } + ); + + if staged_witnessed_collection { + let collect_result: std::result::Result<(), Error> = + tokio::time::timeout(overall_timeout, async { + for (peer_id, peer_addrs) in &remote_peers { + if quotes.len() >= CLOSE_GROUP_SIZE { + break; } - Err(Error::AlreadyStored) => { - info!("Peer {peer_id} reports chunk already stored"); - let dist = peer_xor_distance(&peer_id, address); - already_stored_peers.push((peer_id, dist)); + + let request_id = self.next_request_id(); + let request = ChunkQuoteRequest { + address: *address, + data_size, + data_type, + }; + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::QuoteRequest(request), + }; + + let message_bytes = match message.encode() { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to encode quote request for {peer_id}: {e}"); + failures.push(format!("{peer_id}: encode failed: {e}")); + continue; + } + }; + + let quote_result = send_and_await_chunk_response( + node, + peer_id, + message_bytes, + request_id, + per_peer_timeout, + peer_addrs, + |body| match body { + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { + quote, + already_stored, + }) => { + Some(classify_quote_response(peer_id, "e, already_stored)) + } + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { + Some(Err(Error::Protocol(format!( + "Quote error from {peer_id}: {e}" + )))) + } + _ => None, + }, + |e| { + Error::Network(format!( + "Failed to send quote request to {peer_id}: {e}" + )) + }, + || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")), + ) + .await; + + match quote_result { + Ok((quote, price)) => { + quotes.push((*peer_id, peer_addrs.clone(), quote, price)); + } + Err(Error::AlreadyStored) => { + info!("Peer {peer_id} reports chunk already stored"); + let dist = peer_xor_distance(peer_id, address); + already_stored_peers.push((*peer_id, dist)); + } + Err(e) => { + if matches!(&e, Error::BadQuoteBinding { .. }) { + bad_quote_count += 1; + } + warn!("Failed to get quote from {peer_id}: {e}"); + failures.push(format!("{peer_id}: {e}")); + } } - Err(e) => { - // Count bad-binding peers separately (typed - // variant — no string sniffing). Treat as a - // normal failure for InsufficientPeers reporting. - if matches!(&e, Error::BadQuoteBinding { .. }) { - bad_quote_count += 1; + } + Ok(()) + }) + .await + .unwrap_or_else(|_elapsed| { + warn!( + "Quote collection timed out after {overall_timeout:?} for address {}", + hex::encode(address) + ); + Ok(()) + }); + + collect_result?; + } else { + // Merkle preflight keeps the previous behaviour: query the full + // over-query set concurrently because those quote responses are + // only used as an already-stored probe. + let mut quote_futures = FuturesUnordered::new(); + + for (peer_id, peer_addrs) in &remote_peers { + let request_id = self.next_request_id(); + let request = ChunkQuoteRequest { + address: *address, + data_size, + data_type, + }; + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::QuoteRequest(request), + }; + + let message_bytes = match message.encode() { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to encode quote request for {peer_id}: {e}"); + continue; + } + }; + + let peer_id_clone = *peer_id; + let addrs_clone = peer_addrs.clone(); + let node_clone = node.clone(); + + let quote_future = async move { + let result = send_and_await_chunk_response( + &node_clone, + &peer_id_clone, + message_bytes, + request_id, + per_peer_timeout, + &addrs_clone, + |body| match body { + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { + quote, + already_stored, + }) => Some(classify_quote_response( + &peer_id_clone, + "e, + already_stored, + )), + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { + Some(Err(Error::Protocol(format!( + "Quote error from {peer_id_clone}: {e}" + )))) + } + _ => None, + }, + |e| { + Error::Network(format!( + "Failed to send quote request to {peer_id_clone}: {e}" + )) + }, + || { + Error::Timeout(format!( + "Timeout waiting for quote from {peer_id_clone}" + )) + }, + ) + .await; + + (peer_id_clone, addrs_clone, result) + }; + + quote_futures.push(quote_future); + } + + let collect_result: std::result::Result, _> = + tokio::time::timeout(overall_timeout, async { + while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { + match quote_result { + Ok((quote, price)) => { + quotes.push((peer_id, addrs, quote, price)); + } + Err(Error::AlreadyStored) => { + info!("Peer {peer_id} reports chunk already stored"); + let dist = peer_xor_distance(&peer_id, address); + already_stored_peers.push((peer_id, dist)); + } + Err(e) => { + if matches!(&e, Error::BadQuoteBinding { .. }) { + bad_quote_count += 1; + } + warn!("Failed to get quote from {peer_id}: {e}"); + failures.push(format!("{peer_id}: {e}")); } - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); } } - } - Ok(()) - }) - .await; + Ok(()) + }) + .await; - match collect_result { - Err(_elapsed) => { - warn!( - "Quote collection timed out after {overall_timeout:?} for address {}", - hex::encode(address) - ); - // Fall through to check if we have enough quotes despite timeout. - // The timeout fires when slow peers haven't responded yet, but we - // may already have enough successful quotes from fast peers. + match collect_result { + Err(_elapsed) => { + warn!( + "Quote collection timed out after {overall_timeout:?} for address {}", + hex::encode(address) + ); + // Fall through to check if we have enough quotes despite timeout. + // The timeout fires when slow peers haven't responded yet, but we + // may already have enough successful quotes from fast peers. + } + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} } - Ok(Err(e)) => return Err(e), - Ok(Ok(())) => {} } // Defensive double-check: the per-peer handler already filters @@ -1043,6 +1169,14 @@ mod tests { .collect() } + fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec)> { + seeds + .iter() + .copied() + .map(|seed| (synthetic_peer(seed), Vec::new())) + .collect() + } + /// Independent re-implementation of the storer-side binding spec /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` + /// `peer_id_from_public_key_bytes`): @@ -1138,7 +1272,7 @@ mod tests { } #[test] - fn witnessed_candidates_sort_by_votes_then_xor_distance() { + fn witnessed_candidates_sort_by_xor_distance_then_votes() { let address = [0u8; 32]; let witnessed = WitnessedCloseGroup { target: address, @@ -1163,8 +1297,8 @@ mod tests { .iter() .map(|candidate| candidate.node.peer_id.as_bytes()[0]) .collect::>(), - vec![9, 1], - "higher vote count must sort ahead of a closer XOR peer" + vec![1, 9], + "XOR closeness must be the primary sort before quote collection" ); } @@ -1181,7 +1315,7 @@ mod tests { responder_views, }; - let err = witnessed_quote_peers_or_error( + let err = witnessed_quote_selection_or_error( &address, &witnessed, CLOSE_GROUP_SIZE, @@ -1219,7 +1353,7 @@ mod tests { ], }; - let peers = witnessed_quote_peers_or_error( + let selection = witnessed_quote_selection_or_error( &address, &witnessed, CLOSE_GROUP_SIZE, @@ -1227,14 +1361,22 @@ mod tests { ) .expect("fallback candidates should be retained for quote collection"); - assert_eq!(peers.len(), CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES); assert_eq!( - peers + selection.quote_peers.len(), + CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES + ); + assert_eq!( + selection + .quote_peers .iter() .map(|peer| peer.peer_id.as_bytes()[0]) .collect::>(), vec![1, 2, 3, 4, 5, 6, 7, 8] ); + assert_eq!( + put_peer_seeds(&selection.initial_put_peers), + vec![1, 2, 3, 4, 5, 6, 7] + ); } #[test] @@ -1270,22 +1412,58 @@ mod tests { let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer) .expect("a supported close-group quote set should be selected"); - assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 20]); + assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]); let (median_peer_id, _) = median_paid_quote_issuer(&selected).expect("selected quotes have a median"); assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED)); + assert!(voters_by_peer[&median_peer_id].len() >= witnessed_median_voter_quorum()); + } + + #[test] + fn witnessed_quote_selection_uses_direct_median_witness_recognition() { + const MEDIAN_ISSUER_SEED: u8 = 7; + + let address = [0u8; 32]; + let quotes = vec![ + synthetic_quote(1, 10), + synthetic_quote(2, 20), + synthetic_quote(3, 30), + synthetic_quote(4, 50), + synthetic_quote(MEDIAN_ISSUER_SEED, 40), + synthetic_quote(8, 60), + synthetic_quote(9, 70), + ]; + let mut voters_by_peer = HashMap::new(); + voters_by_peer.insert( + synthetic_peer(MEDIAN_ISSUER_SEED), + synthetic_voters(&[20, 21, 22, 23, 24]), + ); + + let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer) + .expect("direct witness recognition should support the paid median issuer"); + + let (median_peer_id, _) = + median_paid_quote_issuer(&selected).expect("selected quotes have a median"); let selected_peers = selected .iter() .map(|(peer_id, _, _, _)| *peer_id) .collect::>(); - let support = voters_by_peer[&median_peer_id] - .intersection(&selected_peers) - .count(); - assert_eq!(support, witnessed_median_voter_quorum()); + assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED)); + assert_eq!( + voters_by_peer[&median_peer_id] + .intersection(&selected_peers) + .count(), + 0, + "recognising witnesses need not also be selected quote issuers" + ); + assert_eq!( + voters_by_peer[&median_peer_id].len(), + witnessed_median_voter_quorum() + ); } #[test] - fn witnessed_quote_selection_rejects_median_without_selected_voter_quorum() { + fn witnessed_quote_selection_rejects_median_without_witness_quorum() { const MEDIAN_ISSUER_SEED: u8 = 7; let address = [0u8; 32]; @@ -1302,7 +1480,7 @@ mod tests { let mut voters_by_peer = HashMap::new(); voters_by_peer.insert( synthetic_peer(MEDIAN_ISSUER_SEED), - synthetic_voters(&[1, 2, 3, 20, 21]), + synthetic_voters(&[1, 2, 3, 20]), ); let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer); @@ -1310,7 +1488,7 @@ mod tests { assert!( selected.is_none(), "the selector must not return a paid quote set when fewer than the \ - witnessed median voter quorum produced usable quotes" + witnessed median voter quorum recognised the paid median issuer" ); } @@ -1333,8 +1511,10 @@ mod tests { synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]), ); - let put_peers = put_peers_with_median_voters_first("es, &voters_by_peer) - .expect("median voters should produce an ordered PUT set"); + let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]); + let put_peers = + put_peers_with_median_voters_first("es, &put_candidates, &voters_by_peer) + .expect("median voters should produce an ordered PUT set"); assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]); let (median_peer_id, _) =