Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export-sample-test-data:

.PHONY: docs
docs:
cargo docs --document-private-items --exclude rollup-node-chain-orchestrator
cargo +$(NIGHTLY_TOOLCHAIN) docs --document-private-items --exclude rollup-node-chain-orchestrator

.PHONY: pr
pr: lint test docs
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use scroll_network::NewBlockWithPeer;
pub enum ChainOrchestratorEvent {
/// A received block failed the consensus checks.
BlockFailedConsensusChecks(B256, PeerId),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(B256, PeerId),
/// A new block has been received from the network but we have insufficient data to process it
/// due to being in optimistic mode.
InsufficientDataForReceivedBlock(B256),
Expand Down
11 changes: 11 additions & 0 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,17 @@ impl<
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
tracing::debug!(target: "scroll::chain_orchestrator", block_hash = ?block_with_peer.block.header.hash_slow(), block_number = ?block_with_peer.block.number, peer_id = ?block_with_peer.peer_id, "Received new block from peer");

// Check we are not handling a finalized block.
if block_with_peer.block.header.number <= self.engine.fcs().finalized_block_info().number {
self.network
.handle()
.block_import_outcome(BlockImportOutcome::finalized_block(block_with_peer.peer_id));
return Ok(Some(ChainOrchestratorEvent::L2FinalizedBlockReceived(
block_with_peer.block.header.hash_slow(),
block_with_peer.peer_id,
)));
}

if let Err(err) =
self.consensus.validate_new_block(&block_with_peer.block, &block_with_peer.signature)
{
Expand Down
7 changes: 7 additions & 0 deletions crates/network/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub struct BlockImportOutcome {
}

impl BlockImportOutcome {
/// Creates a new `BlockImportOutcome` instance for a finalized block with the given peer ID.
pub fn finalized_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::L2FinalizedBlockReceived(peer)) }
}

/// Creates a new `BlockImportOutcome` instance for an invalid block with the given peer ID.
pub fn invalid_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::Validation(BlockValidationError::InvalidBlock)) }
Expand Down Expand Up @@ -56,6 +61,8 @@ pub enum BlockImportError {
Consensus(ConsensusError),
/// An error occurred during block validation.
Validation(BlockValidationError),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(PeerId),
}

/// A consensus related error that can occur during block import.
Expand Down
85 changes: 71 additions & 14 deletions crates/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use reth_tokio_util::{EventSender, EventStream};
use rollup_node_primitives::{sig_encode_hash, BlockInfo};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_wire::{
NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler,
LRU_CACHE_SIZE,
NewBlock, PeerBlockState, ScrollWireConfig, ScrollWireEvent, ScrollWireManager,
ScrollWireProtocolHandler, LRU_CACHE_SIZE,
};
use std::{
pin::Pin,
Expand Down Expand Up @@ -184,12 +184,26 @@ impl<
// Compute the block hash.
let hash = block.block.hash_slow();

// Filter the peers that have not seen this block hash.
// Filter the peers that have not seen this block hash via either protocol.
// We iterate over all connected scroll-wire peers.
let peers: Vec<FixedBytes<64>> = self
.scroll_wire
.state()
.iter()
.filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id))
.connected_peers()
.filter_map(|peer_id| {
// Check if peer has seen this block via any protocol
let has_seen = self
.scroll_wire
.peer_block_state()
.get(peer_id)
.is_some_and(|state| state.has_seen(&hash));

// Only announce if peer hasn't seen this block
if !has_seen {
Some(*peer_id)
} else {
None
}
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should change the logic below.

If we have a connection with a peer via scroll wire - only send via scroll wire. If we don't have a connection with a peer via scroll wire then send only via eth wire.

What do you think?

.collect();

// TODO: remove this once we deprecate l2geth.
Expand Down Expand Up @@ -240,15 +254,35 @@ impl<
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
let block_hash = block.hash_slow();
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");

// Check if this peer has already received this block via scroll-wire, if so
// penalize it.
Comment on lines +258 to +259
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if this peer has already received this block via scroll-wire, if so
// penalize it.
// Check if we have already received this block via scroll-wire from this peer, if so
// penalize it.

let state = self
.scroll_wire
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE));
if state.has_seen_via_scroll_wire(&block_hash) {
tracing::warn!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via scroll-wire, penalizing");
self.inner_network_handle.reputation_change(
peer_id,
reth_network_api::ReputationChangeKind::BadBlock,
);
return None;
} else {
// Update the state: peer has seen this block via scroll-wire
state.insert_scroll_wire(block_hash);
}

if self.blocks_seen.contains(&(block_hash, signature)) {
None
} else {
// Update the state of the peer cache i.e. peer has seen this block.
// Update the state: peer has seen this block via scroll-wire
self.scroll_wire
.state_mut()
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_scroll_wire(block_hash);
Comment on lines -246 to +285
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already do this above on line 274 above?

// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block.hash_slow(), signature));

Expand Down Expand Up @@ -310,6 +344,11 @@ impl<
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
Err(BlockImportError::L2FinalizedBlockReceived(peer)) => {
trace!(target: "scroll::network::manager", peer_id = ?peer, "Block import failed - finalized block received - penalizing peer");
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
}
}

Expand Down Expand Up @@ -339,17 +378,35 @@ impl<
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
{
let block_hash = block.hash_slow();

// Check if this peer has already sent this block to us via eth-wire, if so penalize it.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if this peer has already sent this block to us via eth-wire, if so penalize it.
// Check if we have already received this block from this peer via eth-wire, if so, penalize the peer.

let state = self
.scroll_wire
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE));

if state.has_seen_via_eth_wire(&block_hash) {
tracing::warn!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via eth-wire, penalizing");
self.inner_network_handle
.reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock);
return None;
} else {
// Update the state: peer has seen this block via eth-wire
state.insert_eth_wire(block_hash);
}

if self.blocks_seen.contains(&(block_hash, signature)) {
return None;
}
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol");

// Update the state of the peer cache i.e. peer has seen this block.
// Update the state: peer has seen this block via eth-wire
self.scroll_wire
.state_mut()
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_eth_wire(block_hash);
Comment on lines 405 to +409
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already do this above on line 396?


// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block_hash, signature));
Expand Down
2 changes: 1 addition & 1 deletion crates/scroll-wire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use config::ScrollWireConfig;

mod connection;
mod manager;
pub use manager::{ScrollWireManager, LRU_CACHE_SIZE};
pub use manager::{PeerBlockState, ScrollWireManager, LRU_CACHE_SIZE};

mod protocol;
pub use protocol::{NewBlock, ScrollWireEvent, ScrollWireProtocolHandler};
98 changes: 81 additions & 17 deletions crates/scroll-wire/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,118 @@ use tracing::trace;
/// The size of the LRU cache used to track blocks that have been seen by peers.
pub const LRU_CACHE_SIZE: u32 = 100;

/// Tracks block announced and received state for a peer.
#[derive(Debug)]
pub struct PeerBlockState {
/// blocks announced to the peer
announced: LruCache<B256>,
/// blocks received via scroll-wire protocol, this is used to penalize peers that send
/// duplicate blocks via scroll-wire.
scroll_wire_received: LruCache<B256>,
/// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate
/// blocks via eth-wire.
eth_wire_received: LruCache<B256>,
}

impl PeerBlockState {
/// Creates a new `PeerBlockState` with the specified LRU cache capacity.
pub fn new(capacity: u32) -> Self {
Self {
announced: LruCache::new(capacity),
scroll_wire_received: LruCache::new(capacity),
eth_wire_received: LruCache::new(capacity),
}
}

/// Check if peer knows about this block (either received or announced).
pub fn has_seen(&self, hash: &B256) -> bool {
self.announced.contains(hash) ||
self.scroll_wire_received.contains(hash) ||
self.eth_wire_received.contains(hash)
}

/// Check if peer has received this block via scroll-wire specifically (for duplicate
/// detection).
pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool {
self.scroll_wire_received.contains(hash)
}

/// Check if peer has received this block via eth-wire specifically (for duplicate detection).
pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool {
self.eth_wire_received.contains(hash)
}

/// Record that this peer has received a block via scroll-wire.
pub fn insert_scroll_wire(&mut self, hash: B256) {
self.scroll_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that this peer has received a block via eth-wire.
pub fn insert_eth_wire(&mut self, hash: B256) {
self.eth_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that we have announced a block to this peer.
pub fn insert_announced(&mut self, hash: B256) {
self.announced.insert(hash); // Only update unified announced, not protocol-specific
}
}

/// A manager for the `ScrollWire` protocol.
#[derive(Debug)]
pub struct ScrollWireManager {
/// A stream of [`ScrollWireEvent`]s produced by the scroll wire protocol.
events: UnboundedReceiverStream<ScrollWireEvent>,
/// A map of connections to peers.
connections: HashMap<PeerId, UnboundedSender<ScrollMessage>>,
/// A map of the state of the scroll wire protocol. Currently the state for each peer
/// is just a cache of the last 100 blocks seen by each peer.
state: HashMap<PeerId, LruCache<B256>>,
/// Unified state tracking block state and blocks received from each peer via both protocols.
peer_block_state: HashMap<PeerId, PeerBlockState>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we rename this to peer_state and PeerBlockState to PeerState

}

impl ScrollWireManager {
/// Creates a new [`ScrollWireManager`] instance.
pub fn new(events: UnboundedReceiver<ScrollWireEvent>) -> Self {
trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance");
Self { events: events.into(), connections: HashMap::new(), state: HashMap::new() }
Self {
events: events.into(),
connections: HashMap::new(),
peer_block_state: HashMap::new(),
}
}

/// Announces a new block to the specified peer.
pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) {
if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) {
// We send the block to the peer. If we receive an error we remove the peer from the
// connections map and delete its state as the connection is no longer valid.
// connections map and peer_block_state as the connection is no longer valid.
if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() {
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer.");
self.state.remove(&peer_id);
self.peer_block_state.remove(&peer_id);
to_connection.remove();
} else {
// Upon successful sending of the block we update the state of the peer.
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer");
self.state
// Record that we announced this block to the peer
self.peer_block_state
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_announced(hash);
}
}
}

/// Returns the state of the `ScrollWire` protocol.
pub const fn state(&self) -> &HashMap<PeerId, LruCache<B256>> {
&self.state
/// Returns an iterator over the connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connections.keys()
}

/// Returns a reference to the peer block state map.
pub const fn peer_block_state(&self) -> &HashMap<PeerId, PeerBlockState> {
&self.peer_block_state
}

/// Returns a mutable reference to the state of the `ScrollWire` protocol.
pub const fn state_mut(&mut self) -> &mut HashMap<PeerId, LruCache<B256>> {
&mut self.state
/// Returns a mutable reference to the peer block state map.
pub const fn peer_block_state_mut(&mut self) -> &mut HashMap<PeerId, PeerBlockState> {
&mut self.peer_block_state
}
}

Expand Down Expand Up @@ -94,7 +159,6 @@ impl Future for ScrollWireManager {
direction
);
this.connections.insert(peer_id, to_connection);
this.state.insert(peer_id, LruCache::new(100));
}
None => break,
}
Expand Down
Loading