Skip to content

Commit 62ee92a

Browse files
authored
bolster BlobSidecar syncing on incomplete responses (#5766)
Avoid marking blocks invalid when corresponding `blobSidecarsByRange` returns an incomplete / incorrect response while syncing. The block itself may still be valid in that scenario.
1 parent 0b5ddd8 commit 62ee92a

File tree

4 files changed

+93
-56
lines changed

4 files changed

+93
-56
lines changed

beacon_chain/gossip_processing/gossip_validation.nim

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,11 @@ func check_attestation_subnet(
179179

180180
ok()
181181

182-
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/deneb/p2p-interface.md#verify_blob_sidecar_inclusion_proof
183-
func verify_blob_sidecar_inclusion_proof(
182+
func check_blob_sidecar_inclusion_proof(
184183
blob_sidecar: deneb.BlobSidecar): Result[void, ValidationError] =
185-
let gindex = kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
186-
if not is_valid_merkle_branch(
187-
hash_tree_root(blob_sidecar.kzg_commitment),
188-
blob_sidecar.kzg_commitment_inclusion_proof,
189-
KZG_COMMITMENT_INCLUSION_PROOF_DEPTH,
190-
get_subtree_index(gindex),
191-
blob_sidecar.signed_block_header.message.body_root):
192-
return errReject("BlobSidecar: inclusion proof not valid")
184+
let res = blob_sidecar.verify_blob_sidecar_inclusion_proof()
185+
if res.isErr:
186+
return errReject(res.error)
193187

194188
ok()
195189

@@ -361,7 +355,7 @@ proc validateBlobSidecar*(
361355
# [REJECT] The sidecar's inclusion proof is valid as verified by
362356
# `verify_blob_sidecar_inclusion_proof(blob_sidecar)`.
363357
block:
364-
let v = verify_blob_sidecar_inclusion_proof(blob_sidecar)
358+
let v = check_blob_sidecar_inclusion_proof(blob_sidecar)
365359
if v.isErr:
366360
return dag.checkedReject(v.error)
367361

beacon_chain/spec/helpers.nim

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,19 @@ func has_flag*(flags: ParticipationFlags, flag_index: TimelyFlag): bool =
211211
let flag = ParticipationFlags(1'u8 shl ord(flag_index))
212212
(flags and flag) == flag
213213

214+
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/deneb/p2p-interface.md#check_blob_sidecar_inclusion_proof
215+
func verify_blob_sidecar_inclusion_proof*(
216+
blob_sidecar: BlobSidecar): Result[void, string] =
217+
let gindex = kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
218+
if not is_valid_merkle_branch(
219+
hash_tree_root(blob_sidecar.kzg_commitment),
220+
blob_sidecar.kzg_commitment_inclusion_proof,
221+
KZG_COMMITMENT_INCLUSION_PROOF_DEPTH,
222+
get_subtree_index(gindex),
223+
blob_sidecar.signed_block_header.message.body_root):
224+
return err("BlobSidecar: inclusion proof not valid")
225+
ok()
226+
214227
func create_blob_sidecars*(
215228
forkyBlck: deneb.SignedBeaconBlock,
216229
kzg_proofs: KzgProofs,

beacon_chain/sync/sync_manager.nim

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,9 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
193193
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
194194
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
195195

196-
proc getBlobSidecars*[A, B](man: SyncManager[A, B], peer: A,
197-
req: SyncRequest): Future[BlobSidecarsRes] {.async.} =
196+
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
197+
req: SyncRequest
198+
): Future[BlobSidecarsRes] {.async.} =
198199
mixin getScore, `==`
199200

200201
logScope:
@@ -241,30 +242,46 @@ func groupBlobs*[T](req: SyncRequest[T],
241242
blocks: seq[ref ForkedSignedBeaconBlock],
242243
blobs: seq[ref BlobSidecar]):
243244
Result[seq[BlobSidecars], string] =
244-
var grouped = newSeq[BlobSidecars](len(blocks))
245-
var blobCursor = 0
246-
var i = 0
247-
for blck in blocks:
248-
let slot = blck[].slot
249-
if blobCursor == len(blobs):
250-
# reached end of blobs, have more blobless blocks
251-
break
252-
for blob in blobs[blobCursor..len(blobs)-1]:
253-
if blob.signed_block_header.message.slot < slot:
254-
return Result[seq[BlobSidecars], string].err "invalid blob sequence"
255-
if blob.signed_block_header.message.slot == slot:
256-
grouped[i].add(blob)
257-
blobCursor = blobCursor + 1
258-
i = i + 1
259-
260-
if blobCursor != len(blobs):
245+
var
246+
grouped = newSeq[BlobSidecars](len(blocks))
247+
blob_cursor = 0
248+
for block_idx, blck in blocks:
249+
withBlck(blck[]):
250+
when consensusFork >= ConsensusFork.Deneb:
251+
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
252+
if kzgs.len == 0:
253+
continue
254+
# Clients MUST include all blob sidecars of each block from which they include blob sidecars.
255+
# The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
256+
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobsidecarsbyrange-v1
257+
let header = forkyBlck.toSignedBeaconBlockHeader()
258+
for blob_idx, kzg_commitment in kzgs:
259+
if blob_cursor >= blobs.len:
260+
return err("BlobSidecar: response too short")
261+
let blob_sidecar = blobs[blob_cursor]
262+
if blob_sidecar.index != BlobIndex blob_idx:
263+
return err("BlobSidecar: unexpected index")
264+
if blob_sidecar.kzg_commitment != kzg_commitment:
265+
return err("BlobSidecar: unexpected kzg_commitment")
266+
if blob_sidecar.signed_block_header != header:
267+
return err("BlobSidecar: unexpected signed_block_header")
268+
grouped[block_idx].add(blob_sidecar)
269+
inc blob_cursor
270+
271+
if blob_cursor != len(blobs):
261272
# we reached end of blocks without consuming all blobs so either
262273
# the peer we got too few blocks in the paired request, or the
263274
# peer is sending us spurious blobs.
264275
Result[seq[BlobSidecars], string].err "invalid block or blob sequence"
265276
else:
266277
Result[seq[BlobSidecars], string].ok grouped
267278

279+
func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
280+
for blob_sidecars in blobs:
281+
for blob_sidecar in blob_sidecars:
282+
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
283+
ok()
284+
268285
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
269286
logScope:
270287
peer_score = peer.getScore()
@@ -454,30 +471,24 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
454471
return
455472
let groupedBlobs = groupBlobs(req, blockData, blobData)
456473
if groupedBlobs.isErr():
474+
peer.updateScore(PeerScoreNoValues)
475+
man.queue.push(req)
476+
info "Received blobs sequence is inconsistent",
477+
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
478+
return
479+
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
457480
peer.updateScore(PeerScoreBadResponse)
458481
man.queue.push(req)
459482
warn "Received blobs sequence is invalid",
460-
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
483+
blobs_count = len(blobData),
484+
blobs_map = getShortMap(req, blobData),
485+
request = req,
486+
msg = checkRes.error
461487
return
462488
Opt.some(groupedBlobs.get())
463489
else:
464490
Opt.none(seq[BlobSidecars])
465491

466-
if blobData.isSome:
467-
let blobs = blobData.get()
468-
if len(blobs) != len(blockData):
469-
peer.updateScore(PeerScoreNoValues)
470-
man.queue.push(req)
471-
info "block and blobs have different lengths", blobs=len(blobs), blocks=len(blockData)
472-
return
473-
for i, blk in blockData:
474-
if len(blobs[i]) > 0 and blk[].slot !=
475-
blobs[i][0].signed_block_header.message.slot:
476-
peer.updateScore(PeerScoreNoValues)
477-
man.queue.push(req)
478-
debug "block and blobs data have inconsistent slots"
479-
return
480-
481492
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
482493
req.contains(man.getSafeSlot()):
483494
# The sync protocol does not distinguish between:

tests/test_sync_manager.nim

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import unittest2
1212
import chronos
1313
import ../beacon_chain/gossip_processing/block_processor,
1414
../beacon_chain/sync/sync_manager,
15-
../beacon_chain/spec/datatypes/phase0,
1615
../beacon_chain/spec/forks
1716

1817
type
@@ -66,16 +65,36 @@ suite "SyncManager test suite":
6665
var res = newSeq[ref ForkedSignedBeaconBlock](count)
6766
var curslot = start
6867
for item in res.mitems():
69-
item = new ForkedSignedBeaconBlock
70-
item[].phase0Data.message.slot = curslot
68+
item = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb)
69+
item[].denebData.message.slot = curslot
7170
curslot = curslot + 1'u64
7271
res
7372

74-
func createBlobs(slots: seq[Slot]): seq[ref BlobSidecar] =
73+
func createBlobs(
74+
blocks: var seq[ref ForkedSignedBeaconBlock], slots: seq[Slot]
75+
): seq[ref BlobSidecar] =
7576
var res = newSeq[ref BlobSidecar](len(slots))
76-
for (i, item) in res.mpairs():
77-
item = new BlobSidecar
78-
item[].signed_block_header.message.slot = slots[i]
77+
for blck in blocks:
78+
withBlck(blck[]):
79+
when consensusFork >= ConsensusFork.Deneb:
80+
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
81+
for i, slot in slots:
82+
if slot == forkyBlck.message.slot:
83+
doAssert kzgs.add default(KzgCommitment)
84+
if kzgs.len > 0:
85+
forkyBlck.root = hash_tree_root(forkyBlck.message)
86+
var
87+
kzg_proofs: KzgProofs
88+
blobs: Blobs
89+
for _ in kzgs:
90+
doAssert kzg_proofs.add default(KzgProof)
91+
doAssert blobs.add default(Blob)
92+
let sidecars = forkyBlck.create_blob_sidecars(kzg_proofs, blobs)
93+
var sidecarIdx = 0
94+
for i, slot in slots:
95+
if slot == forkyBlck.message.slot:
96+
res[i] = newClone sidecars[sidecarIdx]
97+
inc sidecarIdx
7998
res
8099

81100
proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot,
@@ -1064,8 +1083,8 @@ suite "SyncManager test suite":
10641083
checkResponse(r21, @[slots[3]]) == false
10651084

10661085
test "[SyncManager] groupBlobs() test":
1067-
var blobs = createBlobs(@[Slot(11), Slot(11), Slot(12), Slot(14)])
10681086
var blocks = createChain(Slot(10), Slot(15))
1087+
var blobs = createBlobs(blocks, @[Slot(11), Slot(11), Slot(12), Slot(14)])
10691088

10701089
let req = SyncRequest[SomeTPeer](slot: Slot(10))
10711090
let groupedRes = groupBlobs(req, blocks, blobs)
@@ -1095,8 +1114,8 @@ suite "SyncManager test suite":
10951114
len(grouped[5]) == 0
10961115

10971116
# Add block with a gap from previous block.
1098-
let block17 = new (ref ForkedSignedBeaconBlock)
1099-
block17[].phase0Data.message.slot = Slot(17)
1117+
let block17 = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb)
1118+
block17[].denebData.message.slot = Slot(17)
11001119
blocks.add(block17)
11011120
let groupedRes2 = groupBlobs(req, blocks, blobs)
11021121

0 commit comments

Comments
 (0)