diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 9eca3f43..f4bc38ea 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -7,13 +7,14 @@ use crate::data::client::peer_xor_distance; use crate::data::client::Client; use crate::data::error::{Error, Result}; use ant_protocol::evm::{Amount, PaymentQuote}; -use ant_protocol::transport::{DHTNode, MultiAddr, PeerId, WitnessedCloseGroup}; +use ant_protocol::transport::{DHTNode, MultiAddr, P2PNode, PeerId, WitnessedCloseGroup}; use ant_protocol::{ compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody, ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE, }; use futures::stream::{FuturesUnordered, StreamExt}; use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use std::time::Duration; use tracing::{debug, info, warn}; @@ -158,6 +159,105 @@ fn drop_quotes_with_bad_bindings( before - quotes.len() } +#[allow(clippy::too_many_arguments)] +async fn request_store_quote_from_peer( + node: Arc, + peer_id: PeerId, + peer_addrs: Vec, + request_id: u64, + address: [u8; 32], + data_size: u64, + data_type: u32, + per_peer_timeout: Duration, +) -> StoreQuoteRequestResult { + let request = ChunkQuoteRequest { + address, + data_size, + data_type, + }; + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::QuoteRequest(request), + }; + + let message_bytes = match message.encode() { + Ok(bytes) => bytes, + Err(e) => { + return ( + peer_id, + peer_addrs, + Err(Error::Protocol(format!( + "Failed to encode quote request for {peer_id}: {e}" + ))), + ); + } + }; + + let result = send_and_await_chunk_response( + &node, + &peer_id, + message_bytes, + request_id, + per_peer_timeout, + &peer_addrs, + |body| match body { + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { + quote, + already_stored, + }) => Some(classify_quote_response(&peer_id, "e, already_stored)), + ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err( + Error::Protocol(format!("Quote error from {peer_id}: {e}")), + )), + _ => None, + }, + |e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")), + || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")), + ) + .await; + + (peer_id, peer_addrs, result) +} + +#[allow(clippy::too_many_arguments)] +fn record_store_quote_result( + peer_id: PeerId, + addrs: Vec, + quote_result: Result<(PaymentQuote, Amount)>, + address: &[u8; 32], + quotes: &mut Vec, + already_stored_peers: &mut Vec<(PeerId, [u8; 32])>, + failures: &mut Vec, + bad_quote_count: &mut usize, +) { + match quote_result { + Ok((quote, price)) => { + quotes.push((peer_id, addrs, quote, price)); + } + Err(Error::AlreadyStored) => { + info!("Peer {peer_id} reports chunk already stored"); + let dist = peer_xor_distance(&peer_id, address); + already_stored_peers.push((peer_id, dist)); + } + Err(e) => { + if matches!(&e, Error::BadQuoteBinding { .. }) { + *bad_quote_count += 1; + } + warn!("Failed to get quote from {peer_id}: {e}"); + failures.push(format!("{peer_id}: {e}")); + } + } +} + +fn witnessed_quote_launch_budget( + successful_quotes: usize, + in_flight: usize, + remaining_peers: usize, +) -> usize { + CLOSE_GROUP_SIZE + .saturating_sub(successful_quotes.saturating_add(in_flight)) + .min(remaining_peers) +} + fn single_node_quote_query_count() -> usize { CLOSE_GROUP_SIZE } @@ -170,8 +270,21 @@ fn witnessed_close_group_quorum() -> usize { (CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR) } -fn witnessed_median_voter_quorum() -> usize { +fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize { witnessed_close_group_quorum() + .saturating_sub(missing_views) + .max(1) +} + +fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize { + witnessed + .initial_closest + .len() + .saturating_sub(witnessed.responder_views.len()) +} + +fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize { + witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed)) } fn peer_list(peers: &[PeerId]) -> Vec { @@ -179,6 +292,7 @@ fn peer_list(peers: &[PeerId]) -> Vec { } pub(crate) type StoreQuote = (PeerId, Vec, PaymentQuote, Amount); +type StoreQuoteRequestResult = (PeerId, Vec, Result<(PaymentQuote, Amount)>); type VotersByPeer = HashMap>; type WitnessedVoteData = (HashMap, VotersByPeer, Vec<(PeerId, usize)>); @@ -205,11 +319,15 @@ struct WitnessedQuotePeer { struct WitnessedQuoteSelection { quote_peers: Vec, initial_put_peers: Vec<(PeerId, Vec)>, + quorum: usize, } enum QuoteSelectionPolicy { ClosestByDistance, - WitnessedMedianVoters { voters_by_peer: VotersByPeer }, + WitnessedMedianVoters { + voters_by_peer: VotersByPeer, + quorum: usize, + }, } fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec { @@ -404,6 +522,7 @@ fn witnessed_quote_selection_or_error( Ok(WitnessedQuoteSelection { quote_peers, initial_put_peers, + quorum, }) } @@ -498,6 +617,7 @@ fn select_witnessed_median_voter_quotes( mut quotes: Vec, address: &[u8; 32], voters_by_peer: &VotersByPeer, + required_support: usize, ) -> Option> { if quotes.len() < CLOSE_GROUP_SIZE { return None; @@ -507,7 +627,6 @@ fn select_witnessed_median_voter_quotes( let mut best_indices: Option<(usize, Vec)> = None; let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE); - let required_support = witnessed_median_voter_quorum(); visit_quote_subsets( quotes.len(), CLOSE_GROUP_SIZE, @@ -542,6 +661,7 @@ fn put_peers_with_median_voters_first( quotes: &[StoreQuote], put_peers: &[(PeerId, Vec)], voters_by_peer: &VotersByPeer, + required_support: usize, ) -> Option)>> { let (median_peer_id, _) = median_paid_quote_issuer(quotes)?; let voters = voters_by_peer.get(&median_peer_id)?; @@ -557,7 +677,7 @@ fn put_peers_with_median_voters_first( } } - if supporting_peers.len() < witnessed_median_voter_quorum() { + if supporting_peers.len() < required_support { return None; } @@ -617,6 +737,7 @@ impl Client { .map(|peer| (peer.peer_id, peer.addrs)) .collect(); let initial_put_peers = witnessed_selection.initial_put_peers; + let quorum = witnessed_selection.quorum; let quotes = self .collect_store_quotes_from_remote_peers( address, @@ -625,20 +746,25 @@ impl Client { remote_peers, QuoteSelectionPolicy::WitnessedMedianVoters { voters_by_peer: voters_by_peer.clone(), + quorum, }, ) .await?; - let put_peers = - put_peers_with_median_voters_first("es, &initial_put_peers, &voters_by_peer) - .ok_or_else(|| { - Error::InsufficientPeers(format!( + let put_peers = put_peers_with_median_voters_first( + "es, + &initial_put_peers, + &voters_by_peer, + quorum, + ) + .ok_or_else(|| { + Error::InsufficientPeers(format!( "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \ voted for the paid median issuer for {}", quotes.len(), - witnessed_median_voter_quorum(), + quorum, hex::encode(address) )) - })?; + })?; Ok(StoreQuotePlan { quotes, put_peers }) } @@ -676,7 +802,6 @@ impl Client { address: &[u8; 32], ) -> Result { let required = single_node_quote_query_count(); - let quorum = witnessed_close_group_quorum(); let witnessed = self .network() .find_witnessed_close_group_with_view_count( @@ -691,6 +816,21 @@ impl Client { hex::encode(address) )) })?; + let base_quorum = witnessed_close_group_quorum(); + let missing_views = missing_witnessed_responder_views(&witnessed); + let quorum = witnessed_close_group_quorum_for_transcript(&witnessed); + + if missing_views > 0 { + warn!( + target = %hex::encode(address), + initial = witnessed.initial_closest.len(), + responder_views = witnessed.responder_views.len(), + missing_views = missing_views, + base_quorum = base_quorum, + adjusted_quorum = quorum, + "Witnessed close group transcript is missing responder views; lowering SNP witness quorum" + ); + } debug!( target = %hex::encode(address), @@ -754,93 +894,64 @@ impl Client { ); if staged_witnessed_collection { - let collect_result: std::result::Result<(), Error> = + let mut quote_futures = FuturesUnordered::new(); + let mut next_peer_index = 0usize; + let collect_result: std::result::Result, _> = tokio::time::timeout(overall_timeout, async { - for (peer_id, peer_addrs) in &remote_peers { - if quotes.len() >= CLOSE_GROUP_SIZE { - break; + loop { + let launch_count = witnessed_quote_launch_budget( + quotes.len(), + quote_futures.len(), + remote_peers.len().saturating_sub(next_peer_index), + ); + for _ in 0..launch_count { + let (peer_id, peer_addrs) = &remote_peers[next_peer_index]; + next_peer_index += 1; + quote_futures.push(request_store_quote_from_peer( + node.clone(), + *peer_id, + peer_addrs.clone(), + self.next_request_id(), + *address, + data_size, + data_type, + per_peer_timeout, + )); } - let request_id = self.next_request_id(); - let request = ChunkQuoteRequest { - address: *address, - data_size, - data_type, - }; - let message = ChunkMessage { - request_id, - body: ChunkMessageBody::QuoteRequest(request), - }; + if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() { + break; + } - let message_bytes = match message.encode() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to encode quote request for {peer_id}: {e}"); - failures.push(format!("{peer_id}: encode failed: {e}")); - continue; - } + let Some((peer_id, addrs, quote_result)) = quote_futures.next().await + else { + break; }; - - let quote_result = send_and_await_chunk_response( - node, + record_store_quote_result( peer_id, - message_bytes, - request_id, - per_peer_timeout, - peer_addrs, - |body| match body { - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { - quote, - already_stored, - }) => { - Some(classify_quote_response(peer_id, "e, already_stored)) - } - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { - Some(Err(Error::Protocol(format!( - "Quote error from {peer_id}: {e}" - )))) - } - _ => None, - }, - |e| { - Error::Network(format!( - "Failed to send quote request to {peer_id}: {e}" - )) - }, - || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")), - ) - .await; - - match quote_result { - Ok((quote, price)) => { - quotes.push((*peer_id, peer_addrs.clone(), quote, price)); - } - Err(Error::AlreadyStored) => { - info!("Peer {peer_id} reports chunk already stored"); - let dist = peer_xor_distance(peer_id, address); - already_stored_peers.push((*peer_id, dist)); - } - Err(e) => { - if matches!(&e, Error::BadQuoteBinding { .. }) { - bad_quote_count += 1; - } - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); - } - } + addrs, + quote_result, + address, + &mut quotes, + &mut already_stored_peers, + &mut failures, + &mut bad_quote_count, + ); } Ok(()) }) - .await - .unwrap_or_else(|_elapsed| { + .await; + + match collect_result { + Err(_elapsed) => { warn!( "Quote collection timed out after {overall_timeout:?} for address {}", hex::encode(address) ); - Ok(()) - }); - - collect_result?; + } + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} + } } else { // Merkle preflight keeps the previous behaviour: query the full // over-query set concurrently because those quote responses are @@ -848,92 +959,31 @@ impl Client { let mut quote_futures = FuturesUnordered::new(); for (peer_id, peer_addrs) in &remote_peers { - let request_id = self.next_request_id(); - let request = ChunkQuoteRequest { - address: *address, + quote_futures.push(request_store_quote_from_peer( + node.clone(), + *peer_id, + peer_addrs.clone(), + self.next_request_id(), + *address, data_size, data_type, - }; - let message = ChunkMessage { - request_id, - body: ChunkMessageBody::QuoteRequest(request), - }; - - let message_bytes = match message.encode() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to encode quote request for {peer_id}: {e}"); - continue; - } - }; - - let peer_id_clone = *peer_id; - let addrs_clone = peer_addrs.clone(); - let node_clone = node.clone(); - - let quote_future = async move { - let result = send_and_await_chunk_response( - &node_clone, - &peer_id_clone, - message_bytes, - request_id, - per_peer_timeout, - &addrs_clone, - |body| match body { - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { - quote, - already_stored, - }) => Some(classify_quote_response( - &peer_id_clone, - "e, - already_stored, - )), - ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => { - Some(Err(Error::Protocol(format!( - "Quote error from {peer_id_clone}: {e}" - )))) - } - _ => None, - }, - |e| { - Error::Network(format!( - "Failed to send quote request to {peer_id_clone}: {e}" - )) - }, - || { - Error::Timeout(format!( - "Timeout waiting for quote from {peer_id_clone}" - )) - }, - ) - .await; - - (peer_id_clone, addrs_clone, result) - }; - - quote_futures.push(quote_future); + per_peer_timeout, + )); } let collect_result: std::result::Result, _> = tokio::time::timeout(overall_timeout, async { while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { - match quote_result { - Ok((quote, price)) => { - quotes.push((peer_id, addrs, quote, price)); - } - Err(Error::AlreadyStored) => { - info!("Peer {peer_id} reports chunk already stored"); - let dist = peer_xor_distance(&peer_id, address); - already_stored_peers.push((peer_id, dist)); - } - Err(e) => { - if matches!(&e, Error::BadQuoteBinding { .. }) { - bad_quote_count += 1; - } - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); - } - } + record_store_quote_result( + peer_id, + addrs, + quote_result, + address, + &mut quotes, + &mut already_stored_peers, + &mut failures, + &mut bad_quote_count, + ); } Ok(()) }) @@ -1004,21 +1054,22 @@ impl Client { if quotes.len() >= CLOSE_GROUP_SIZE { let selected_quotes = match quote_selection_policy { QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address), - QuoteSelectionPolicy::WitnessedMedianVoters { voters_by_peer } => { - select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer) - .ok_or_else(|| { - Error::InsufficientPeers(format!( - "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \ + QuoteSelectionPolicy::WitnessedMedianVoters { + voters_by_peer, + quorum, + } => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum) + .ok_or_else(|| { + Error::InsufficientPeers(format!( + "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \ median issuer is recognised by at least {} \ selected witness peers ({total_responses} responses: \ {already_stored_count} already_stored, {failure_count} failed \ including {bad_quote_count} with mismatched peer bindings). \ Failures: [{}]", - witnessed_median_voter_quorum(), - failures.join("; ") - )) - })? - } + quorum, + failures.join("; ") + )) + })?, }; info!( @@ -1264,6 +1315,9 @@ mod tests { assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20); assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count()); assert_eq!(witnessed_close_group_quorum(), 5); + assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5); + assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4); + assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3); assert_eq!( fault_tolerant_quote_query_count(), CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER @@ -1271,6 +1325,35 @@ mod tests { assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count()); } + #[test] + fn witnessed_quote_launch_budget_keeps_exact_quote_window() { + assert_eq!( + witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2), + CLOSE_GROUP_SIZE, + "initial SNP quote fetch should launch the closest seven peers" + ); + assert_eq!( + witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE), + 0, + "a successful quote should not launch an extra fallback" + ); + assert_eq!( + witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE), + 1, + "a failed in-flight quote should launch the next closest fallback" + ); + assert_eq!( + witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3), + 1, + "only one more peer is needed for the seventh quote" + ); + assert_eq!( + witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1), + CLOSE_GROUP_SIZE - 1, + "launch budget is capped by remaining candidates" + ); + } + #[test] fn witnessed_candidates_sort_by_xor_distance_then_votes() { let address = [0u8; 32]; @@ -1379,6 +1462,44 @@ mod tests { ); } + #[test] + fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() { + let address = [0u8; 32]; + let witnessed = WitnessedCloseGroup { + target: address, + k: CLOSE_GROUP_SIZE, + initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]), + responder_views: vec![ + witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]), + witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]), + witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]), + witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]), + witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]), + witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]), + ], + }; + let quorum = witnessed_close_group_quorum_for_transcript(&witnessed); + + assert_eq!(missing_witnessed_responder_views(&witnessed), 1); + assert_eq!(quorum, 4); + + let selection = + witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum) + .expect( + "one missing responder view should lower quorum and still select candidates", + ); + + assert_eq!( + selection + .quote_peers + .iter() + .map(|peer| peer.peer_id.as_bytes()[0]) + .collect::>(), + vec![1, 2, 3, 4, 5, 6, 7, 8] + ); + assert_eq!(selection.quorum, quorum); + } + #[test] fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() { const MEDIAN_ISSUER_SEED: u8 = 7; @@ -1409,14 +1530,16 @@ mod tests { ]), ); - let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer) - .expect("a supported close-group quote set should be selected"); + let quorum = witnessed_close_group_quorum(); + let selected = + select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum) + .expect("a supported close-group quote set should be selected"); assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]); let (median_peer_id, _) = median_paid_quote_issuer(&selected).expect("selected quotes have a median"); assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED)); - assert!(voters_by_peer[&median_peer_id].len() >= witnessed_median_voter_quorum()); + assert!(voters_by_peer[&median_peer_id].len() >= quorum); } #[test] @@ -1439,8 +1562,10 @@ mod tests { synthetic_voters(&[20, 21, 22, 23, 24]), ); - let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer) - .expect("direct witness recognition should support the paid median issuer"); + let quorum = witnessed_close_group_quorum(); + let selected = + select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum) + .expect("direct witness recognition should support the paid median issuer"); let (median_peer_id, _) = median_paid_quote_issuer(&selected).expect("selected quotes have a median"); @@ -1456,10 +1581,7 @@ mod tests { 0, "recognising witnesses need not also be selected quote issuers" ); - assert_eq!( - voters_by_peer[&median_peer_id].len(), - witnessed_median_voter_quorum() - ); + assert_eq!(voters_by_peer[&median_peer_id].len(), quorum); } #[test] @@ -1483,7 +1605,12 @@ mod tests { synthetic_voters(&[1, 2, 3, 20]), ); - let selected = select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer); + let selected = select_witnessed_median_voter_quotes( + quotes, + &address, + &voters_by_peer, + witnessed_close_group_quorum(), + ); assert!( selected.is_none(), @@ -1512,9 +1639,13 @@ mod tests { ); let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]); - let put_peers = - put_peers_with_median_voters_first("es, &put_candidates, &voters_by_peer) - .expect("median voters should produce an ordered PUT set"); + let put_peers = put_peers_with_median_voters_first( + "es, + &put_candidates, + &voters_by_peer, + witnessed_close_group_quorum(), + ) + .expect("median voters should produce an ordered PUT set"); assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]); let (median_peer_id, _) =