Skip to content

Commit

Permalink
Enable E2E For PeerDAS (#13945)
Browse files Browse the repository at this point in the history
* Enable E2E And Add Fixes

* Register Same Topic For Data Columns

* Initialize Capacity Of Slice

* Fix Initialization of Data Column Receiver

* Remove Mix In From Merkle Proof

* E2E: Subscribe to all subnets.

* Remove Index Check

* Remaining Bug Fixes to Get It Working

* Change Evaluator to Allow Test to Finish

* Fix Build

* Add Data Column Verification

* Fix LoopVar Bug

* Do Not Allocate Memory

* Update beacon-chain/blockchain/process_block.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Update beacon-chain/core/peerdas/helpers.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Update beacon-chain/core/peerdas/helpers.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Gofmt

* Fix It Again

* Fix Test Setup

* Fix Build

* Fix Trusted Setup panic

* Fix Trusted Setup panic

* Use New Test

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
  • Loading branch information
nisdas and nalepae committed Nov 27, 2024
1 parent b0ea450 commit 32ce642
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 45 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/kzg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
deps = [
"//consensus-types/blocks:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
Expand Down
29 changes: 26 additions & 3 deletions beacon-chain/blockchain/kzg/trusted_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,47 @@ import (
"encoding/json"

GoKZG "github.com/crate-crypto/go-kzg-4844"
CKZG "github.com/ethereum/c-kzg-4844/bindings/go"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
)

var (
//go:embed trusted_setup.json
embeddedTrustedSetup []byte // 1.2Mb
kzgContext *GoKZG.Context
kzgLoaded bool
)

func Start() error {
parsedSetup := GoKZG.JSONTrustedSetup{}
err := json.Unmarshal(embeddedTrustedSetup, &parsedSetup)
parsedSetup := &GoKZG.JSONTrustedSetup{}
err := json.Unmarshal(embeddedTrustedSetup, parsedSetup)
if err != nil {
return errors.Wrap(err, "could not parse trusted setup JSON")
}
kzgContext, err = GoKZG.NewContext4096(&parsedSetup)
kzgContext, err = GoKZG.NewContext4096(parsedSetup)
if err != nil {
return errors.Wrap(err, "could not initialize go-kzg context")
}
g1Lagrange := &parsedSetup.SetupG1Lagrange

// Length of a G1 point, converted from hex to binary.
g1s := make([]byte, len(g1Lagrange)*(len(g1Lagrange[0])-2)/2)
for i, g1 := range g1Lagrange {
copy(g1s[i*(len(g1)-2)/2:], hexutil.MustDecode(g1))
}
// Length of a G2 point, converted from hex to binary.
g2s := make([]byte, len(parsedSetup.SetupG2)*(len(parsedSetup.SetupG2[0])-2)/2)
for i, g2 := range parsedSetup.SetupG2 {
copy(g2s[i*(len(g2)-2)/2:], hexutil.MustDecode(g2))
}
if !kzgLoaded {
// Free the current trusted setup before running this method. CKZG
// panics if the same setup is run multiple times.
if err = CKZG.LoadTrustedSetup(g1s, g2s); err != nil {
panic(err)
}
}
kzgLoaded = true
return nil
}
7 changes: 6 additions & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,12 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
s.blobNotifiers.delete(root)
return nil
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x", block.Slot(), root)
missingIndexes := make([]uint64, 0, len(missing))
for val := range missing {
copiedVal := val
missingIndexes = append(missingIndexes, copiedVal)
}
return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndexes)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/receive_data_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ func (s *Service) ReceiveDataColumn(ctx context.Context, ds blocks.VerifiedRODat

// TODO use a custom event or new method of for data columns. For speed
// we are simply reusing blob paths here.
s.sendNewBlobEvent(ds.BlockRoot(), uint64(ds.SignedBlockHeader.Header.Slot))
s.sendNewBlobEvent(ds.BlockRoot(), ds.ColumnIndex)
return nil
}
16 changes: 9 additions & 7 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ var ErrMissingClockSetter = errors.New("blockchain Service initialized without a
type blobNotifierMap struct {
sync.RWMutex
notifiers map[[32]byte]chan uint64
seenIndex map[[32]byte][fieldparams.MaxBlobsPerBlock]bool
seenIndex map[[32]byte][fieldparams.NumberOfColumns]bool
}

// notifyIndex notifies a blob by its index for a given root.
// It uses internal maps to keep track of seen indices and notifier channels.
func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
if idx >= fieldparams.MaxBlobsPerBlock {
return
}
// TODO: Separate Data Columns from blobs
/*
if idx >= fieldparams.MaxBlobsPerBlock {
return
}*/

bn.Lock()
seen := bn.seenIndex[root]
Expand All @@ -129,7 +131,7 @@ func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
// Retrieve or create the notifier channel for the given root.
c, ok := bn.notifiers[root]
if !ok {
c = make(chan uint64, fieldparams.MaxBlobsPerBlock)
c = make(chan uint64, fieldparams.NumberOfColumns)
bn.notifiers[root] = c
}

Expand All @@ -143,7 +145,7 @@ func (bn *blobNotifierMap) forRoot(root [32]byte) chan uint64 {
defer bn.Unlock()
c, ok := bn.notifiers[root]
if !ok {
c = make(chan uint64, fieldparams.MaxBlobsPerBlock)
c = make(chan uint64, fieldparams.NumberOfColumns)
bn.notifiers[root] = c
}
return c
Expand All @@ -169,7 +171,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
bn := &blobNotifierMap{
notifiers: make(map[[32]byte]chan uint64),
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
seenIndex: make(map[[32]byte][fieldparams.NumberOfColumns]bool),
}
srv := &Service{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (s *MockClockSetter) SetClock(g *startup.Clock) error {
func TestNotifyIndex(t *testing.T) {
// Initialize a blobNotifierMap
bn := &blobNotifierMap{
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
seenIndex: make(map[[32]byte][fieldparams.NumberOfColumns]bool),
notifiers: make(map[[32]byte]chan uint64),
}

Expand Down
18 changes: 17 additions & 1 deletion beacon-chain/core/peerdas/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@prysm//tools/go:def.bzl", "go_library")
load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -19,3 +19,19 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["helpers_test.go"],
deps = [
":go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//consensus-types/blocks:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
51 changes: 48 additions & 3 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
// Custom errors
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errCellNotFound = errors.New("cell not found (should never happen)")
errIndexTooLarge = errors.New("column index is larger than the specified number of columns")
errMismatchLength = errors.New("mismatch in the length of the commitments and proofs")

// maxUint256 is the maximum value of a uint256.
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
Expand Down Expand Up @@ -176,6 +178,9 @@ func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)
if blobsCount == 0 {
return nil, nil
}

// Get the signed block header.
signedBlockHeader, err := signedBlock.Header()
Expand Down Expand Up @@ -215,7 +220,7 @@ func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48
}

// Get the column sidecars.
sidecars := make([]*ethpb.DataColumnSidecar, cKzg4844.CellsPerExtBlob)
sidecars := make([]*ethpb.DataColumnSidecar, 0, cKzg4844.CellsPerExtBlob)
for columnIndex := uint64(0); columnIndex < cKzg4844.CellsPerExtBlob; columnIndex++ {
column := make([]cKzg4844.Cell, 0, blobsCount)
kzgProofOfColumn := make([]cKzg4844.KZGProof, 0, blobsCount)
Expand All @@ -234,15 +239,17 @@ func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48

cellBytes := make([]byte, 0, bytesPerCell)
for _, fieldElement := range cell {
cellBytes = append(cellBytes, fieldElement[:]...)
copiedElem := fieldElement
cellBytes = append(cellBytes, copiedElem[:]...)
}

columnBytes = append(columnBytes, cellBytes)
}

kzgProofOfColumnBytes := make([][]byte, 0, blobsCount)
for _, kzgProof := range kzgProofOfColumn {
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, kzgProof[:])
copiedProof := kzgProof
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, copiedProof[:])
}

sidecar := &ethpb.DataColumnSidecar{
Expand All @@ -259,3 +266,41 @@ func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg48

return sidecars, nil
}

// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc *ethpb.DataColumnSidecar) (bool, error) {
if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
return false, errIndexTooLarge
}
if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
return false, errMismatchLength
}
blobsCount := len(sc.DataColumn)

rowIdx := make([]uint64, 0, blobsCount)
colIdx := make([]uint64, 0, blobsCount)
for i := 0; i < len(sc.DataColumn); i++ {
copiedI := uint64(i)
rowIdx = append(rowIdx, copiedI)
colI := sc.ColumnIndex
colIdx = append(colIdx, colI)
}
ckzgComms := make([]cKzg4844.Bytes48, 0, len(sc.KzgCommitments))
for _, com := range sc.KzgCommitments {
ckzgComms = append(ckzgComms, cKzg4844.Bytes48(com))
}
var cells []cKzg4844.Cell
for _, ce := range sc.DataColumn {
var newCell []cKzg4844.Bytes32
for i := 0; i < len(ce); i += 32 {
newCell = append(newCell, cKzg4844.Bytes32(ce[i:i+32]))
}
cells = append(cells, cKzg4844.Cell(newCell))
}
var proofs []cKzg4844.Bytes48
for _, p := range sc.KzgProof {
proofs = append(proofs, cKzg4844.Bytes48(p))
}
return cKzg4844.VerifyCellKZGProofBatch(ckzgComms, rowIdx, colIdx, cells, proofs)
}
91 changes: 91 additions & 0 deletions beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package peerdas_test

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"fmt"
"testing"

"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
GoKZG "github.com/crate-crypto/go-kzg-4844"
ckzg4844 "github.com/ethereum/c-kzg-4844/bindings/go"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/sirupsen/logrus"
)

func deterministicRandomness(seed int64) [32]byte {
// Converts an int64 to a byte slice
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, seed)
if err != nil {
logrus.WithError(err).Error("Failed to write int64 to bytes buffer")
return [32]byte{}
}
bytes := buf.Bytes()

return sha256.Sum256(bytes)
}

// Returns a serialized random field element in big-endian
func GetRandFieldElement(seed int64) [32]byte {
bytes := deterministicRandomness(seed)
var r fr.Element
r.SetBytes(bytes[:])

return GoKZG.SerializeScalar(r)
}

// Returns a random blob using the passed seed as entropy
func GetRandBlob(seed int64) ckzg4844.Blob {
var blob ckzg4844.Blob
bytesPerBlob := GoKZG.ScalarsPerBlob * GoKZG.SerializedScalarSize
for i := 0; i < bytesPerBlob; i += GoKZG.SerializedScalarSize {
fieldElementBytes := GetRandFieldElement(seed + int64(i))
copy(blob[i:i+GoKZG.SerializedScalarSize], fieldElementBytes[:])
}
return blob
}

func GenerateCommitmentAndProof(blob ckzg4844.Blob) (ckzg4844.KZGCommitment, ckzg4844.KZGProof, error) {
commitment, err := ckzg4844.BlobToKZGCommitment(&blob)
if err != nil {
return ckzg4844.KZGCommitment{}, ckzg4844.KZGProof{}, err
}
proof, err := ckzg4844.ComputeBlobKZGProof(&blob, ckzg4844.Bytes48(commitment))
if err != nil {
return ckzg4844.KZGCommitment{}, ckzg4844.KZGProof{}, err
}
return commitment, proof, err
}

func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
dbBlock := util.NewBeaconBlockDeneb()
require.NoError(t, kzg.Start())

comms := [][]byte{}
blobs := []ckzg4844.Blob{}
for i := int64(0); i < 6; i++ {
blob := GetRandBlob(i)
commitment, _, err := GenerateCommitmentAndProof(blob)
require.NoError(t, err)
comms = append(comms, commitment[:])
blobs = append(blobs, blob)
}

dbBlock.Block.Body.BlobKzgCommitments = comms
sBlock, err := blocks.NewSignedBeaconBlock(dbBlock)
require.NoError(t, err)
sCars, err := peerdas.DataColumnSidecars(sBlock, blobs)
require.NoError(t, err)

for i, sidecar := range sCars {
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(sidecar)
require.NoError(t, err)
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
}
11 changes: 7 additions & 4 deletions beacon-chain/db/filesystem/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// blobIndexMask is a bitmask representing the set of blob indices that are currently set.
type blobIndexMask [fieldparams.MaxBlobsPerBlock]bool
type blobIndexMask [fieldparams.NumberOfColumns]bool

// BlobStorageSummary represents cached information about the BlobSidecars on disk for each root the cache knows about.
type BlobStorageSummary struct {
Expand Down Expand Up @@ -68,9 +68,12 @@ func (s *blobStorageCache) Summary(root [32]byte) BlobStorageSummary {
}

func (s *blobStorageCache) ensure(key [32]byte, slot primitives.Slot, idx uint64) error {
if idx >= fieldparams.MaxBlobsPerBlock {
return errIndexOutOfBounds
}
// TODO: Separate blob index checks from data column index checks
/*
if idx >= fieldparams.MaxBlobsPerBlock {
return errIndexOutOfBounds
}
*/
s.mu.Lock()
defer s.mu.Unlock()
v := s.cache[key]
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/filesystem/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func TestSlotByRoot_Summary(t *testing.T) {
t.Skip("Use new test for data columns")
var noneSet, allSet, firstSet, lastSet, oneSet blobIndexMask
firstSet[0] = true
lastSet[len(lastSet)-1] = true
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
FinalizationFetcher: chainService,
BlockReceiver: chainService,
BlobReceiver: chainService,
DataColumnReceiver: chainService,
AttestationReceiver: chainService,
GenesisTimeFetcher: chainService,
GenesisFetcher: chainService,
Expand Down
Loading

0 comments on commit 32ce642

Please sign in to comment.