Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,15 @@ AllTests-mainnet
```diff
+ pre-1.1.0 OK
```
## Payload attestation pool [Preset: mainnet]
```diff
+ Can add and retrieve payload attestations [Preset: mainnet] OK
+ Can get payload attestations for block production [Preset: mainnet] OK
+ Different payload presence values [Preset: mainnet] OK
+ Duplicate validator in PTC - multiple signatures [Preset: mainnet] OK
+ Multiple validators in PTC can attest [Preset: mainnet] OK
+ Payload attestations get pruned [Preset: mainnet] OK
```
## PeerPool testing suite
```diff
+ Access peers by key test OK
Expand Down
5 changes: 4 additions & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import
./el/el_manager,
./consensus_object_pools/[
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
attestation_pool, sync_committee_msg_pool, validator_change_pool,
attestation_pool, execution_payload_pool, payload_attestation_pool,
sync_committee_msg_pool, validator_change_pool,
blockchain_list],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
Expand Down Expand Up @@ -87,6 +88,8 @@ type
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
lightClientPool*: ref LightClientPool
validatorChangePool*: ref ValidatorChangePool
executionPayloadBidPool*: ref ExecutionPayloadBidPool
payloadAttestationPool*: ref PayloadAttestationPool
elManager*: ELManager
restServer*: RestServerRef
keymanagerHost*: ref KeymanagerHost
Expand Down
168 changes: 168 additions & 0 deletions beacon_chain/consensus_object_pools/payload_attestation_pool.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# beacon_chain
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [], gcsafe.}

import
# Status libraries
metrics,
chronicles,
# Internal
../spec/[eth2_merkleization, forks, validator],
"."/[spec_cache, blockchain_dag],
../beacon_clock

from ../spec/beaconstate import get_ptc

logScope: topics = "payattpool"

declareGauge payload_attestation_pool_block_packing_time,
"Time it took to create list of payload attestations for block"

type
PayloadAttestationEntry* = object
data*: PayloadAttestationData
messages*: Table[ValidatorIndex, PayloadAttestationMessage]
aggregated*: Opt[PayloadAttestation]

PayloadAttestationPool* = object
dag*: ChainDAGRef
attestations*: Table[Slot, Table[Eth2Digest, PayloadAttestationEntry]]

func init*(T: type PayloadAttestationPool, dag: ChainDAGRef): T =
T(dag: dag)

func pruneOldEntries(pool: var PayloadAttestationPool, wallTime: BeaconTime) =
let current_slot = wallTime.slotOrZero(pool.dag.timeParams)

# keep only recent slots - since payload attestations
# are only valid for 1 slot
var slotsToRemove: seq[Slot]
for slot in pool.attestations.keys:
if slot + 2 < current_slot:
slotsToRemove.add(slot)

for slot in slotsToRemove:
pool.attestations.del(slot)

func addPayloadAttestation*(
pool: var PayloadAttestationPool, message: PayloadAttestationMessage,
wallTime: BeaconTime): bool =
template beacon_block_root: untyped = message.data.beacon_block_root
let
slot = message.data.slot
validator_index = message.validator_index

pool.pruneOldEntries(wallTime)

# create an entry for this block and slot
let
entry = addr pool.attestations.mgetOrPut(slot).mgetOrPut(
beacon_block_root, PayloadAttestationEntry(data: message.data))

# Check for duplicate
let vidx = ValidatorIndex(validator_index)
if vidx in entry[].messages:
return false

entry[].messages[vidx] = message

entry[].aggregated = Opt.none(PayloadAttestation)

true

func aggregateMessages(
pool: PayloadAttestationPool, slot: Slot,
entry: var PayloadAttestationEntry, cache: var StateCache
): Opt[PayloadAttestation] =

if entry.messages.len == 0:
return Opt.none(PayloadAttestation)

withState(pool.dag.headState):
when consensusFork >= ConsensusFork.Gloas:
var
aggregation_bits: BitArray[int(PTC_SIZE)]
signatures: seq[CookedSig]
ptc_index = 0

for ptc_validator_index in get_ptc(forkyState.data, slot, cache):
entry.messages.withValue(ptc_validator_index, message):
let cookedSig = message[].signature.load().valueOr:
continue
aggregation_bits[ptc_index] = true
signatures.add(cookedSig)
ptc_index += 1

if signatures.len == 0:
return Opt.none(PayloadAttestation)

var aggregated_signature = AggregateSignature.init(signatures[0])
for i in 1..<signatures.len:
aggregated_signature.aggregate(signatures[i])

Opt.some(PayloadAttestation(
aggregation_bits: aggregation_bits,
data: entry.data,
signature: aggregated_signature.finish().toValidatorSig()
))
else:
Opt.none(PayloadAttestation)

func getAggregatedPayloadAttestation*(
pool: var PayloadAttestationPool, slot: Slot,
beacon_block_root: Eth2Digest, cache: var StateCache
): Opt[PayloadAttestation] =
## Get aggregated payload attestation for a specific block and slot

pool.attestations.withValue(slot, slotEntries):
slotEntries[].withValue(beacon_block_root, entry):
if entry[].aggregated.isNone():
entry[].aggregated = pool.aggregateMessages(slot, entry[], cache)
return entry[].aggregated

Opt.none(PayloadAttestation)

proc getPayloadAttestationsForBlock*(
pool: var PayloadAttestationPool, target_slot: Slot,
cache: var StateCache): seq[PayloadAttestation] =
## Get payload attestations to include in a block for a target slot
let startPackingTick = Moment.now()

if target_slot == 0:
return @[]

let attestation_slot = target_slot - 1

if attestation_slot notin pool.attestations:
return @[]

var
payload_attestations: seq[PayloadAttestation]
totalCandidates = 0

pool.attestations.withValue(attestation_slot, slotEntries):
for beacon_block_root, entry in slotEntries[]:
totalCandidates += 1
let aggregated =
pool.getAggregatedPayloadAttestation(
attestation_slot, beacon_block_root, cache)
if aggregated.isSome():
payload_attestations.add(aggregated.get())
if payload_attestations.len >= MAX_PAYLOAD_ATTESTATIONS.int:
break

let packingDur = Moment.now() - startPackingTick

debug "Packed payload attestations for block",
target_slot = target_slot, attestation_slot = attestation_slot,
packingDur = packingDur, totalCandidates = totalCandidates,
payload_attestations = payload_attestations.len()

payload_attestation_pool_block_packing_time.set(packingDur.toFloatSeconds())

payload_attestations
25 changes: 25 additions & 0 deletions beacon_chain/gossip_processing/batch_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,28 @@ proc scheduleBlsToExecutionChangeCheck*(
pubkey, sig)

ok((fut, sig))

proc schedulePayloadAttestationCheck*(
batchCrypto: ref BatchCrypto, fork: Fork,
genesis_validators_root: Eth2Digest,
msg: PayloadAttestationMessage,
pubkey: CookedPubKey,
signature: ValidatorSig
): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] =
## Schedule crypto verification of a payload attestation
##
## The buffer is processed:
## - when eager processing is enabled and the batch is full
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns an error if crypto sanity checks failed
## and a future with the deferred payload attestation check otherwise.
##
let
sig = signature.load().valueOr:
return err("payload attestation: cannot load signature")
fut = batchCrypto.verifySoon("batch_validation.schedulePayloadAttestationCheck"):
payload_attestation_signature_set(
fork, genesis_validators_root, msg, pubkey, sig)

ok((fut, sig))
27 changes: 26 additions & 1 deletion beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
../consensus_object_pools/[
attestation_pool, blob_quarantine, block_clearance, block_quarantine,
blockchain_dag, envelope_quarantine, execution_payload_pool,
light_client_pool, sync_committee_msg_pool, validator_change_pool],
payload_attestation_pool, light_client_pool,
sync_committee_msg_pool, validator_change_pool],
../validators/validator_pool,
../beacon_clock,
"."/[gossip_validation, block_processor, batch_validation],
Expand Down Expand Up @@ -146,6 +147,7 @@ type
syncCommitteeMsgPool: ref SyncCommitteeMsgPool
lightClientPool: ref LightClientPool
executionPayloadBidPool*: ref ExecutionPayloadBidPool
payloadAttestationPool*: ref PayloadAttestationPool

doppelgangerDetection*: DoppelgangerProtection

Expand Down Expand Up @@ -196,6 +198,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: ref SyncCommitteeMsgPool,
lightClientPool: ref LightClientPool,
executionPayloadBidPool: ref ExecutionPayloadBidPool,
payloadAttestationPool: ref PayloadAttestationPool,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref ColumnQuarantine,
Expand All @@ -216,6 +219,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: syncCommitteeMsgPool,
lightClientPool: lightClientPool,
executionPayloadBidPool: executionPayloadBidPool,
payloadAttestationPool: payloadAttestationPool,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
Expand Down Expand Up @@ -916,3 +920,24 @@ proc processExecutionPayloadBid*(
debug "Dropping execution payload bid", reason = $v.error
beacon_execution_payload_bids_dropped.inc(1, [$v.error[0]])
err(v.error())

proc processPayloadAttestationMessage*(
self: ref Eth2Processor, src: MsgSource,
payload_attestation_message: PayloadAttestationMessage,
checkSignature, checkValidator: bool
): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
let
wallTime = self.getCurrentBeaconTime()
v = await validatePayloadAttestationMessage(
self.dag, self.payloadAttestationPool, self.batchCrypto,
payload_attestation_message, wallTime, checkSignature)

if v.isErr():
debug "Dropping payload attestation", reason = $v.error
return err(v.error())

discard self.payloadAttestationPool[].addPayloadAttestation(
payload_attestation_message, wallTime)

trace "Payload attestation validated"
return ok()
Loading
Loading