Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,15 @@ AllTests-mainnet
```diff
+ pre-1.1.0 OK
```
## Payload attestation pool [Preset: mainnet]
```diff
+ Can add and retrieve payload attestations [Preset: mainnet] OK
+ Can get payload attestations for block production [Preset: mainnet] OK
+ Different payload presence values [Preset: mainnet] OK
+ Duplicate validator in PTC - multiple signatures [Preset: mainnet] OK
+ Multiple validators in PTC can attest [Preset: mainnet] OK
+ Payload attestations get pruned [Preset: mainnet] OK
```
## PeerPool testing suite
```diff
+ Access peers by key test OK
Expand Down
5 changes: 4 additions & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import
./el/el_manager,
./consensus_object_pools/[
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
attestation_pool, sync_committee_msg_pool, validator_change_pool,
attestation_pool, execution_payload_pool, payload_attestation_pool,
sync_committee_msg_pool, validator_change_pool,
blockchain_list],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
Expand Down Expand Up @@ -87,6 +88,8 @@ type
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
lightClientPool*: ref LightClientPool
validatorChangePool*: ref ValidatorChangePool
executionPayloadBidPool*: ref ExecutionPayloadBidPool
payloadAttestationPool*: ref PayloadAttestationPool
elManager*: ELManager
restServer*: RestServerRef
keymanagerHost*: ref KeymanagerHost
Expand Down
168 changes: 168 additions & 0 deletions beacon_chain/consensus_object_pools/payload_attestation_pool.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# beacon_chain
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [], gcsafe.}

import
# Status libraries
metrics,
chronicles,
# Internal
../spec/[eth2_merkleization, forks, validator],
"."/[spec_cache, blockchain_dag],
../beacon_clock

from ../spec/beaconstate import get_ptc

logScope: topics = "payattpool"

declareGauge payload_attestation_pool_block_packing_time,
"Time it took to create list of payload attestations for block"

type
PayloadAttestationEntry* = object
data*: PayloadAttestationData
messages*: Table[ValidatorIndex, PayloadAttestationMessage]
aggregated*: Opt[PayloadAttestation]

PayloadAttestationPool* = object
dag*: ChainDAGRef
attestations*: Table[Slot, Table[Eth2Digest, PayloadAttestationEntry]]

func init*(T: type PayloadAttestationPool, dag: ChainDAGRef): T =
T(dag: dag)

func pruneOldEntries(pool: var PayloadAttestationPool, wallTime: BeaconTime) =
let current_slot = wallTime.slotOrZero(pool.dag.timeParams)

# keep only recent slots - since payload attestations
# are only valid for 1 slot
var slotsToRemove: seq[Slot]
for slot in pool.attestations.keys:
if slot + 2 < current_slot:
slotsToRemove.add(slot)

for slot in slotsToRemove:
pool.attestations.del(slot)

proc addPayloadAttestation*(
pool: var PayloadAttestationPool, message: PayloadAttestationMessage,
wallTime: BeaconTime): bool =
let
slot = message.data.slot
beacon_block_root = message.data.beacon_block_root
validator_index = message.validator_index

pool.pruneOldEntries(wallTime)

# create an entry for this block and slot
let
entry = addr pool.attestations.mgetOrPut(slot).mgetOrPut(
beacon_block_root, PayloadAttestationEntry(data: message.data))

# Check for duplicate
let vidx = ValidatorIndex(validator_index)
if vidx in entry[].messages:
return false

entry[].messages[vidx] = message

entry[].aggregated = Opt.none(PayloadAttestation)

true

proc aggregateMessages(
pool: PayloadAttestationPool, slot: Slot,
entry: var PayloadAttestationEntry, cache: var StateCache
): Opt[PayloadAttestation] =

if entry.messages.len == 0:
return Opt.none(PayloadAttestation)

withState(pool.dag.headState):
when consensusFork >= ConsensusFork.Gloas:
var
aggregation_bits: BitArray[int(PTC_SIZE)]
signatures: seq[CookedSig]
ptc_index = 0

for ptc_validator_index in get_ptc(forkyState.data, slot, cache):
entry.messages.withValue(ptc_validator_index, message):
let cookedSig = message[].signature.load().valueOr:
continue
aggregation_bits[ptc_index] = true
signatures.add(cookedSig)
ptc_index += 1

if signatures.len == 0:
return Opt.none(PayloadAttestation)

var aggregated_signature = AggregateSignature.init(signatures[0])
for i in 1..<signatures.len:
aggregated_signature.aggregate(signatures[i])

Opt.some(PayloadAttestation(
aggregation_bits: aggregation_bits,
data: entry.data,
signature: aggregated_signature.finish().toValidatorSig()
))
else:
Opt.none(PayloadAttestation)

proc getAggregatedPayloadAttestation*(
pool: var PayloadAttestationPool, slot: Slot,
beacon_block_root: Eth2Digest, cache: var StateCache
): Opt[PayloadAttestation] =
## Get aggregated payload attestation for a specific block and slot

pool.attestations.withValue(slot, slotEntries):
slotEntries[].withValue(beacon_block_root, entry):
if entry[].aggregated.isNone():
entry[].aggregated = pool.aggregateMessages(slot, entry[], cache)
return entry[].aggregated

Opt.none(PayloadAttestation)

proc getPayloadAttestationsForBlock*(
pool: var PayloadAttestationPool, target_slot: Slot,
cache: var StateCache): seq[PayloadAttestation] =
## Get payload attestations to include in a block for a target slot
let startPackingTick = Moment.now()

if target_slot == 0:
return @[]

let attestation_slot = target_slot - 1

if attestation_slot notin pool.attestations:
return @[]

var
payload_attestations: seq[PayloadAttestation]
totalCandidates = 0

pool.attestations.withValue(attestation_slot, slotEntries):
for beacon_block_root, entry in slotEntries[]:
totalCandidates += 1
let aggregated =
pool.getAggregatedPayloadAttestation(
attestation_slot, beacon_block_root, cache)
if aggregated.isSome():
payload_attestations.add(aggregated.get())
if payload_attestations.len >= MAX_PAYLOAD_ATTESTATIONS.int:
break

let packingDur = Moment.now() - startPackingTick

debug "Packed payload attestations for block",
target_slot = target_slot, attestation_slot = attestation_slot,
packingDur = packingDur, totalCandidates = totalCandidates,
payload_attestations = payload_attestations.len()

payload_attestation_pool_block_packing_time.set(packingDur.toFloatSeconds())

payload_attestations
25 changes: 25 additions & 0 deletions beacon_chain/gossip_processing/batch_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,28 @@ proc scheduleBlsToExecutionChangeCheck*(
pubkey, sig)

ok((fut, sig))

proc schedulePayloadAttestationCheck*(
batchCrypto: ref BatchCrypto, fork: Fork,
genesis_validators_root: Eth2Digest,
msg: PayloadAttestationMessage,
pubkey: CookedPubKey,
signature: ValidatorSig
): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] =
## Schedule crypto verification of a payload attestation
##
## The buffer is processed:
## - when eager processing is enabled and the batch is full
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns an error if crypto sanity checks failed
## and a future with the deferred payload attestation check otherwise.
##
let
sig = signature.load().valueOr:
return err("payload attestation: cannot load signature")
fut = batchCrypto.verifySoon("batch_validation.schedulePayloadAttestationCheck"):
payload_attestation_signature_set(
fork, genesis_validators_root, msg, pubkey, sig)

ok((fut, sig))
27 changes: 26 additions & 1 deletion beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
../consensus_object_pools/[
attestation_pool, blob_quarantine, block_clearance, block_quarantine,
blockchain_dag, envelope_quarantine, execution_payload_pool,
light_client_pool, sync_committee_msg_pool, validator_change_pool],
payload_attestation_pool, light_client_pool,
sync_committee_msg_pool, validator_change_pool],
../validators/validator_pool,
../beacon_clock,
"."/[gossip_validation, block_processor, batch_validation],
Expand Down Expand Up @@ -146,6 +147,7 @@ type
syncCommitteeMsgPool: ref SyncCommitteeMsgPool
lightClientPool: ref LightClientPool
executionPayloadBidPool*: ref ExecutionPayloadBidPool
payloadAttestationPool*: ref PayloadAttestationPool

doppelgangerDetection*: DoppelgangerProtection

Expand Down Expand Up @@ -196,6 +198,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: ref SyncCommitteeMsgPool,
lightClientPool: ref LightClientPool,
executionPayloadBidPool: ref ExecutionPayloadBidPool,
payloadAttestationPool: ref PayloadAttestationPool,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref ColumnQuarantine,
Expand All @@ -216,6 +219,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: syncCommitteeMsgPool,
lightClientPool: lightClientPool,
executionPayloadBidPool: executionPayloadBidPool,
payloadAttestationPool: payloadAttestationPool,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
Expand Down Expand Up @@ -916,3 +920,24 @@ proc processExecutionPayloadBid*(
debug "Dropping execution payload bid", reason = $v.error
beacon_execution_payload_bids_dropped.inc(1, [$v.error[0]])
err(v.error())

proc processPayloadAttestationMessage*(
self: ref Eth2Processor, src: MsgSource,
payload_attestation_message: PayloadAttestationMessage,
checkSignature, checkValidator: bool
): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
let
wallTime = self.getCurrentBeaconTime()
v = await validatePayloadAttestationMessage(
self.dag, self.payloadAttestationPool, self.batchCrypto,
payload_attestation_message, wallTime, checkSignature)

if v.isErr():
debug "Dropping payload attestation", reason = $v.error
return err(v.error())

discard self.payloadAttestationPool[].addPayloadAttestation(
payload_attestation_message, wallTime)

trace "Payload attestation validated"
return ok()
Loading
Loading