Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ cargo test -p ethlambda-blockchain --features skip-signature-verification --test

## Common Gotchas

### Aggregator Flag Required for Finalization
- At least one node **must** be started with `--is-aggregator` to finalize blocks in production (without `skip-signature-verification`)
- Without this flag, attestations pass signature verification and are logged as "Attestation processed", but the signature is never stored for aggregation (`store.rs:368`), so blocks are always built with `attestation_count=0`
- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks
- With `skip-signature-verification` (tests only), attestations bypass aggregation and go directly to `new_aggregated_payloads`, so the flag is not needed
- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip

### Signature Verification
- Tests require `skip-signature-verification` feature for performance
- Crypto tests marked `#[ignore]` (slow leanVM operations)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker-build: ## 🐳 Build the Docker image
-t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) .
@echo

LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266
LEAN_SPEC_COMMIT_HASH:=b39472e73f8a7d603cc13d14426eed14c6eff6f1

leanSpec:
git clone https://github.com/leanEthereum/leanSpec.git --single-branch
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ make run-devnet
This generates fresh genesis files and starts all configured clients with metrics enabled.
Press `Ctrl+C` to stop all nodes.

> **Important:** When running nodes manually (outside `make run-devnet`), at least one node must be started with `--is-aggregator` for attestations to be aggregated and included in blocks. Without this flag, the network will produce blocks but never finalize.

For custom devnet configurations, go to `lean-quickstart/local-devnet/genesis/validator-config.yaml` and edit the file before running the command above. See `lean-quickstart`'s documentation for more details on how to configure the devnet.

## Philosophy
Expand Down
9 changes: 8 additions & 1 deletion bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ struct CliOptions {
/// When set, skips genesis initialization and syncs from checkpoint.
#[arg(long)]
checkpoint_sync_url: Option<String>,
/// Whether this node acts as a committee aggregator
#[arg(long, default_value = "false")]
is_aggregator: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -114,7 +117,10 @@ async fn main() -> eyre::Result<()> {
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;

let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel();
let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys);
// Use first validator ID for subnet subscription
let first_validator_id = validator_keys.keys().min().copied();
let blockchain =
BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator);

let p2p_handle = tokio::spawn(start_p2p(
node_p2p_key,
Expand All @@ -123,6 +129,7 @@ async fn main() -> eyre::Result<()> {
blockchain,
p2p_rx,
store.clone(),
first_validator_id,
));

ethlambda_rpc::start_rpc_server(metrics_socket, store)
Expand Down
76 changes: 60 additions & 16 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer;
use ethlambda_storage::Store;
use ethlambda_types::{
ShortRoot,
attestation::{Attestation, AttestationData, SignedAttestation},
attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
primitives::{H256, ssz::TreeHash},
signature::ValidatorSecretKey,
Expand All @@ -30,6 +30,8 @@ pub enum P2PMessage {
PublishAttestation(SignedAttestation),
/// Publish a block to the gossip network.
PublishBlock(SignedBlockWithAttestation),
/// Publish an aggregated attestation to the gossip network.
PublishAggregatedAttestation(SignedAggregatedAttestation),
/// Fetch a block by its root hash.
FetchBlock(H256),
}
Expand All @@ -38,14 +40,23 @@ pub struct BlockChain {
handle: GenServerHandle<BlockChainServer>,
}

/// Seconds in a slot. Each slot has 4 intervals of 1 second each.
/// Seconds in a slot.
pub const SECONDS_PER_SLOT: u64 = 4;
/// Milliseconds in a slot.
pub const MILLISECONDS_PER_SLOT: u64 = 4_000;
/// Milliseconds per interval (800ms ticks).
pub const MILLISECONDS_PER_INTERVAL: u64 = 800;
/// Number of intervals per slot (5 intervals of 800ms = 4 seconds).
pub const INTERVALS_PER_SLOT: u64 = 5;
/// Number of attestation committees per slot.
pub const ATTESTATION_COMMITTEE_COUNT: u64 = 1;

impl BlockChain {
pub fn spawn(
store: Store,
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
validator_keys: HashMap<u64, ValidatorSecretKey>,
is_aggregator: bool,
) -> BlockChain {
let genesis_time = store.config().genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
Expand All @@ -54,6 +65,7 @@ impl BlockChain {
p2p_tx,
key_manager,
pending_blocks: HashMap::new(),
is_aggregator,
pending_block_parents: HashMap::new(),
}
.start();
Expand Down Expand Up @@ -85,6 +97,20 @@ impl BlockChain {
.await
.inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation"));
}

/// Sends an aggregated attestation to the BlockChain for processing.
pub async fn notify_new_aggregated_attestation(
&mut self,
attestation: SignedAggregatedAttestation,
) {
let _ = self
.handle
.cast(CastMessage::NewAggregatedAttestation(attestation))
.await
.inspect_err(
|err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"),
);
}
}

/// GenServer that sequences all blockchain updates.
Expand All @@ -104,16 +130,19 @@ struct BlockChainServer {
// chain at lookup time, since a cached ancestor may itself have become pending with
// a deeper missing parent after the entry was created.
pending_block_parents: HashMap<H256, H256>,

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,
}

impl BlockChainServer {
fn on_tick(&mut self, timestamp: u64) {
let genesis_time = self.store.config().genesis_time;
fn on_tick(&mut self, timestamp_ms: u64) {
let genesis_time_ms = self.store.config().genesis_time * 1000;

// Calculate current slot and interval
let time_since_genesis = timestamp.saturating_sub(genesis_time);
let slot = time_since_genesis / SECONDS_PER_SLOT;
let interval = time_since_genesis % SECONDS_PER_SLOT;
// Calculate current slot and interval from milliseconds
let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms);
let slot = time_since_genesis_ms / MILLISECONDS_PER_SLOT;
let interval = (time_since_genesis_ms % MILLISECONDS_PER_SLOT) / MILLISECONDS_PER_INTERVAL;

// Update current slot metric
metrics::update_current_slot(slot);
Expand All @@ -126,7 +155,12 @@ impl BlockChainServer {
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some());
store::on_tick(
&mut self.store,
timestamp_ms,
proposer_validator_id.is_some(),
self.is_aggregator,
);

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
Expand All @@ -138,7 +172,7 @@ impl BlockChainServer {
self.produce_attestations(slot);
}

// Update safe target slot metric (updated by store.on_tick at interval 2)
// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
}

Expand Down Expand Up @@ -437,15 +471,21 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: SignedAttestation) {
let _ = store::on_gossip_attestation(&mut self.store, attestation)
let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}
}

#[derive(Clone, Debug)]
enum CastMessage {
NewBlock(SignedBlockWithAttestation),
NewAttestation(SignedAttestation),
NewAggregatedAttestation(SignedAggregatedAttestation),
Tick,
}

Expand Down Expand Up @@ -476,12 +516,13 @@ impl GenServer for BlockChainServer {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_secs());
// Schedule the next tick at the start of the next second
let millis_to_next_sec =
((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64;
self.on_tick(timestamp.as_millis() as u64);
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL);
send_after(
Duration::from_millis(millis_to_next_sec),
Duration::from_millis(ms_to_next_interval),
handle.clone(),
message,
);
Expand All @@ -490,6 +531,9 @@ impl GenServer for BlockChainServer {
self.on_block(signed_block);
}
CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation),
CastMessage::NewAggregatedAttestation(attestation) => {
self.on_gossip_aggregated_attestation(attestation);
}
}
CastResponse::NoReply
}
Expand Down
Loading