From 41f7efc8a1573e8d6bc16eecb74f715f4319290e Mon Sep 17 00:00:00 2001 From: Wondertan Date: Mon, 22 Jul 2024 19:24:34 +0200 Subject: [PATCH 1/7] feat(shwap/p2p): Bitswap Getter --- share/shwap/p2p/bitswap/getter.go | 172 ++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 share/shwap/p2p/bitswap/getter.go diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go new file mode 100644 index 0000000000..03fce91fd0 --- /dev/null +++ b/share/shwap/p2p/bitswap/getter.go @@ -0,0 +1,172 @@ +package bitswap + +import ( + "context" + "fmt" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange" + + "github.com/celestiaorg/celestia-app/pkg/wrapper" + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share" +) + +// Getter implements share.Getter. +type Getter struct { + exchange exchange.SessionExchange + bstore blockstore.Blockstore + session exchange.Fetcher + cancel context.CancelFunc +} + +// NewGetter constructs a new Getter. +func NewGetter(exchange exchange.SessionExchange, bstore blockstore.Blockstore) *Getter { + return &Getter{exchange: exchange, bstore: bstore} +} + +// Start kicks off internal fetching session. +func (g *Getter) Start() { + ctx, cancel := context.WithCancel(context.Background()) + g.session = g.exchange.NewSession(ctx) + g.cancel = cancel +} + +// Stop shuts down Getter's internal fetching session. +func (g *Getter) Stop() { + g.cancel() +} + +// TODO(@Wondertan): Rework API to get coordinates as a single param to make it ergonomic. +func (g *Getter) GetShares( + ctx context.Context, + hdr *header.ExtendedHeader, + rowIdxs, colIdxs []int, +) ([]share.Share, error) { + if len(rowIdxs) != len(colIdxs) { + return nil, fmt.Errorf("row indecies and col indecies must be same length") + } + + if len(rowIdxs) == 0 { + return nil, fmt.Errorf("empty coordinates") + } + + blks := make([]Block, 0, len(rowIdxs)) + for i, rowIdx := range rowIdxs { + sid, err := NewEmptySampleBlock(hdr.Height(), rowIdx, colIdxs[i], len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + + blks[i] = sid + } + + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore)) + if err != nil { + return nil, err + } + + shares := make([]share.Share, len(blks)) + for i, blk := range blks { + shares[i] = blk.(*SampleBlock).Container.Share + } + + return shares, nil +} + +func (g *Getter) GetShare( + ctx context.Context, + hdr *header.ExtendedHeader, + row, col int, +) (share.Share, error) { + shrs, err := g.GetShares(ctx, hdr, []int{row}, []int{col}) + if err != nil { + return nil, err + } + + if len(shrs) != 1 { + return nil, fmt.Errorf("expected 1 share row, got %d", len(shrs)) + } + + return shrs[0], nil +} + +func (g *Getter) GetEDS( + ctx context.Context, + hdr *header.ExtendedHeader, +) (*rsmt2d.ExtendedDataSquare, error) { + sqrLn := len(hdr.DAH.RowRoots) + blks := make([]Block, sqrLn/2) + for i := 0; i < sqrLn/2; i++ { + blk, err := NewEmptyRowBlock(hdr.Height(), i, sqrLn) + if err != nil { + return nil, err + } + + blks[i] = blk + } + + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session)) + if err != nil { + return nil, err + } + + shrs := make([]share.Share, 0, sqrLn*sqrLn) + for _, row := range blks { + rowShrs, err := row.(*RowBlock).Container.Shares() + if err != nil { + return nil, fmt.Errorf("decoding Shares out of Row: %w", err) + } + shrs = append(shrs, rowShrs...) + } + + square, err := rsmt2d.ComputeExtendedDataSquare( + shrs, + share.DefaultRSMT2DCodec(), + wrapper.NewConstructor(uint64(sqrLn/2)), + ) + if err != nil { + return nil, fmt.Errorf("computing EDS: %w", err) + } + + return square, nil +} + +func (g *Getter) GetSharesByNamespace( + ctx context.Context, + hdr *header.ExtendedHeader, + ns share.Namespace, +) (share.NamespacedShares, error) { + if err := ns.ValidateForData(); err != nil { + return nil, err + } + + rowIdxs := share.RowsWithNamespace(hdr.DAH, ns) + blks := make([]Block, len(rowIdxs)) + for i, rowNdIdx := range rowIdxs { + rndblk, err := NewEmptyRowNamespaceDataBlock(hdr.Height(), rowNdIdx, ns, len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + blks[i] = rndblk + } + + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session)) + if err != nil { + return nil, err + } + + // TODO(@Wondertan): this must use shwap types eventually + nsShrs := make(share.NamespacedShares, len(blks)) + for i, blk := range blks { + rnd := blk.(*RowNamespaceDataBlock).Container + nsShrs[i] = share.NamespacedRow{ + Shares: rnd.Shares, + Proof: rnd.Proof, + } + } + + return nsShrs, nil +} From 6cfecfe5ada679851791c969099de79428dd4835 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 23 Jul 2024 20:42:13 +0200 Subject: [PATCH 2/7] review comments --- share/shwap/p2p/bitswap/getter.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 03fce91fd0..6d2e71493f 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -39,6 +39,7 @@ func (g *Getter) Stop() { g.cancel() } +// GetShares uses [SampleBlock] and [Fetch] to get and verify samples for given coordinates. // TODO(@Wondertan): Rework API to get coordinates as a single param to make it ergonomic. func (g *Getter) GetShares( ctx context.Context, @@ -53,7 +54,7 @@ func (g *Getter) GetShares( return nil, fmt.Errorf("empty coordinates") } - blks := make([]Block, 0, len(rowIdxs)) + blks := make([]Block, len(rowIdxs)) for i, rowIdx := range rowIdxs { sid, err := NewEmptySampleBlock(hdr.Height(), rowIdx, colIdxs[i], len(hdr.DAH.RowRoots)) if err != nil { @@ -93,6 +94,8 @@ func (g *Getter) GetShare( return shrs[0], nil } +// GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and +// recomputes the whole EDS from it. func (g *Getter) GetEDS( ctx context.Context, hdr *header.ExtendedHeader, @@ -113,7 +116,7 @@ func (g *Getter) GetEDS( return nil, err } - shrs := make([]share.Share, 0, sqrLn*sqrLn) + shrs := make([]share.Share, 0, sqrLn/2*sqrLn/2) for _, row := range blks { rowShrs, err := row.(*RowBlock).Container.Shares() if err != nil { @@ -134,6 +137,8 @@ func (g *Getter) GetEDS( return square, nil } +// GetSharesByNamespace uses [RowNamespaceDataBlock] and [Fetch] to get all the data +// by the given namespace. func (g *Getter) GetSharesByNamespace( ctx context.Context, hdr *header.ExtendedHeader, From 75948440a08951686064bad06488a4f6f32f9da6 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 23 Jul 2024 22:36:20 +0200 Subject: [PATCH 3/7] add support for archival vs avail sessons --- share/shwap/p2p/bitswap/getter.go | 53 ++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 6d2e71493f..cabac954dc 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -11,26 +11,45 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" ) // Getter implements share.Getter. type Getter struct { - exchange exchange.SessionExchange - bstore blockstore.Blockstore - session exchange.Fetcher - cancel context.CancelFunc + exchange exchange.SessionExchange + bstore blockstore.Blockstore + availWndw pruner.AvailabilityWindow + + availableSession exchange.Fetcher + archivalSession exchange.Fetcher + + cancel context.CancelFunc } // NewGetter constructs a new Getter. -func NewGetter(exchange exchange.SessionExchange, bstore blockstore.Blockstore) *Getter { - return &Getter{exchange: exchange, bstore: bstore} +func NewGetter( + exchange exchange.SessionExchange, + bstore blockstore.Blockstore, + availWndw pruner.AvailabilityWindow, +) *Getter { + return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw} } -// Start kicks off internal fetching session. +// Start kicks off internal fetching sessions. +// +// We keep Bitswap sessions for the whole Getter lifespan: +// - Sessions retain useful heuristics about peers, like TTFB +// - Sessions prefer peers that previously served us related content. +// +// So reusing session is expected to improve fetching performance. +// +// There are two sessions for archival and available data, so archival node peers aren't mixed +// with regular full node peers. func (g *Getter) Start() { ctx, cancel := context.WithCancel(context.Background()) - g.session = g.exchange.NewSession(ctx) + g.availableSession = g.exchange.NewSession(ctx) + g.archivalSession = g.exchange.NewSession(ctx) g.cancel = cancel } @@ -64,7 +83,8 @@ func (g *Getter) GetShares( blks[i] = sid } - err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore)) + ses := g.session(hdr) + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses)) if err != nil { return nil, err } @@ -111,7 +131,8 @@ func (g *Getter) GetEDS( blks[i] = blk } - err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session)) + ses := g.session(hdr) + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses)) if err != nil { return nil, err } @@ -158,7 +179,8 @@ func (g *Getter) GetSharesByNamespace( blks[i] = rndblk } - err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session)) + ses := g.session(hdr) + err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses)) if err != nil { return nil, err } @@ -175,3 +197,12 @@ func (g *Getter) GetSharesByNamespace( return nsShrs, nil } + +// session decides which fetching session to use for the given header. +func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher { + if pruner.IsWithinAvailabilityWindow(hdr.Time(), g.availWndw) { + return g.availableSession + } + + return g.archivalSession +} From 8f54ab63e7d807b53cf9325b3e52794800978081 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 24 Jul 2024 18:00:39 +0200 Subject: [PATCH 4/7] fix compute via import and improve docs --- share/shwap/p2p/bitswap/getter.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index cabac954dc..4843ee04be 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -4,11 +4,10 @@ import ( "context" "fmt" - "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/exchange" - "github.com/celestiaorg/celestia-app/pkg/wrapper" "github.com/celestiaorg/rsmt2d" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" @@ -97,6 +96,7 @@ func (g *Getter) GetShares( return shares, nil } +// GetShare uses [GetShare] to fetch and verify single share by the given coordinates. func (g *Getter) GetShare( ctx context.Context, hdr *header.ExtendedHeader, @@ -116,6 +116,8 @@ func (g *Getter) GetShare( // GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and // recomputes the whole EDS from it. +// We fetch the ODS or Q1 to ensure better compatibility with archival nodes that only +// store ODS and do not recompute other quadrants. func (g *Getter) GetEDS( ctx context.Context, hdr *header.ExtendedHeader, @@ -138,15 +140,18 @@ func (g *Getter) GetEDS( } shrs := make([]share.Share, 0, sqrLn/2*sqrLn/2) - for _, row := range blks { + for i, row := range blks { rowShrs, err := row.(*RowBlock).Container.Shares() if err != nil { return nil, fmt.Errorf("decoding Shares out of Row: %w", err) } - shrs = append(shrs, rowShrs...) + + for j, shr := range rowShrs { + shrs[i*j] = shr + } } - square, err := rsmt2d.ComputeExtendedDataSquare( + square, err := rsmt2d.ImportExtendedDataSquare( shrs, share.DefaultRSMT2DCodec(), wrapper.NewConstructor(uint64(sqrLn/2)), @@ -159,7 +164,8 @@ func (g *Getter) GetEDS( } // GetSharesByNamespace uses [RowNamespaceDataBlock] and [Fetch] to get all the data -// by the given namespace. +// by the given namespace. If data spans over multiple rows, the request is split into +// parallel RowNamespaceDataID requests per each row and then assembled back into NamespaceData. func (g *Getter) GetSharesByNamespace( ctx context.Context, hdr *header.ExtendedHeader, From ea09f391181122e63dcc8e2075d54b730225fb8e Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 25 Jul 2024 23:25:47 +0200 Subject: [PATCH 5/7] extract edsFromRows and write a test --- share/shwap/p2p/bitswap/getter.go | 58 ++++++++++++++++++-------- share/shwap/p2p/bitswap/getter_test.go | 27 ++++++++++++ 2 files changed, 67 insertions(+), 18 deletions(-) create mode 100644 share/shwap/p2p/bitswap/getter_test.go diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 4843ee04be..55391931e4 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -4,14 +4,16 @@ import ( "context" "fmt" - "github.com/celestiaorg/celestia-app/pkg/wrapper" - "github.com/celestiaorg/rsmt2d" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" + "github.com/celestiaorg/celestia-app/pkg/wrapper" + "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/shwap" ) // Getter implements share.Getter. @@ -139,25 +141,14 @@ func (g *Getter) GetEDS( return nil, err } - shrs := make([]share.Share, 0, sqrLn/2*sqrLn/2) - for i, row := range blks { - rowShrs, err := row.(*RowBlock).Container.Shares() - if err != nil { - return nil, fmt.Errorf("decoding Shares out of Row: %w", err) - } - - for j, shr := range rowShrs { - shrs[i*j] = shr - } + rows := make([]shwap.Row, len(blks)) + for i, blk := range blks { + rows[i] = blk.(*RowBlock).Container } - square, err := rsmt2d.ImportExtendedDataSquare( - shrs, - share.DefaultRSMT2DCodec(), - wrapper.NewConstructor(uint64(sqrLn/2)), - ) + square, err := edsFromRows(hdr.DAH, rows) if err != nil { - return nil, fmt.Errorf("computing EDS: %w", err) + return nil, err } return square, nil @@ -212,3 +203,34 @@ func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher { return g.archivalSession } + +// edsFromRows imports given Rows and computes EDS out of them, assuming enough were Rows provided. +func edsFromRows(roots *share.Root, rows []shwap.Row) (*rsmt2d.ExtendedDataSquare, error) { + shrs := make([]share.Share, len(roots.RowRoots)*len(roots.RowRoots)) + for i, row := range rows { + rowShrs, err := row.Shares() + if err != nil { + return nil, fmt.Errorf("decoding Shares out of Row: %w", err) + } + + for j, shr := range rowShrs { + shrs[j+(i*len(roots.RowRoots))] = shr + } + } + + square, err := rsmt2d.ImportExtendedDataSquare( + shrs, + share.DefaultRSMT2DCodec(), + wrapper.NewConstructor(uint64(len(roots.RowRoots)/2)), + ) + if err != nil { + return nil, fmt.Errorf("importing EDS: %w", err) + } + + err = square.Repair(roots.RowRoots, roots.ColumnRoots) + if err != nil { + return nil, fmt.Errorf("repairing EDS: %w", err) + } + + return square, nil +} diff --git a/share/shwap/p2p/bitswap/getter_test.go b/share/shwap/p2p/bitswap/getter_test.go new file mode 100644 index 0000000000..0d409f1989 --- /dev/null +++ b/share/shwap/p2p/bitswap/getter_test.go @@ -0,0 +1,27 @@ +package bitswap + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/share/shwap" +) + +func TestEDSFromRows(t *testing.T) { + edsIn := edstest.RandEDS(t, 8) + roots, err := share.NewRoot(edsIn) + require.NoError(t, err) + + rows := make([]shwap.Row, edsIn.Width()/2) + for i := range edsIn.Width() / 2 { + rowShrs := edsIn.Row(i)[:edsIn.Width()/2] + rows[i] = shwap.NewRow(rowShrs, shwap.Left) + } + + edsOut, err := edsFromRows(roots, rows) + require.NoError(t, err) + require.True(t, edsIn.Equals(edsOut)) +} From b58c3fc7a74727869d1c588b308bd9bb6a997ce3 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 25 Jul 2024 23:55:45 +0200 Subject: [PATCH 6/7] rebase fix --- share/shwap/p2p/bitswap/getter.go | 4 ++-- share/shwap/p2p/bitswap/getter_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 55391931e4..fc3b2e918b 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -67,7 +67,7 @@ func (g *Getter) GetShares( rowIdxs, colIdxs []int, ) ([]share.Share, error) { if len(rowIdxs) != len(colIdxs) { - return nil, fmt.Errorf("row indecies and col indecies must be same length") + return nil, fmt.Errorf("row indecies and col indices must be same length") } if len(rowIdxs) == 0 { @@ -205,7 +205,7 @@ func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher { } // edsFromRows imports given Rows and computes EDS out of them, assuming enough were Rows provided. -func edsFromRows(roots *share.Root, rows []shwap.Row) (*rsmt2d.ExtendedDataSquare, error) { +func edsFromRows(roots *share.AxisRoots, rows []shwap.Row) (*rsmt2d.ExtendedDataSquare, error) { shrs := make([]share.Share, len(roots.RowRoots)*len(roots.RowRoots)) for i, row := range rows { rowShrs, err := row.Shares() diff --git a/share/shwap/p2p/bitswap/getter_test.go b/share/shwap/p2p/bitswap/getter_test.go index 0d409f1989..757c34d7a0 100644 --- a/share/shwap/p2p/bitswap/getter_test.go +++ b/share/shwap/p2p/bitswap/getter_test.go @@ -12,7 +12,7 @@ import ( func TestEDSFromRows(t *testing.T) { edsIn := edstest.RandEDS(t, 8) - roots, err := share.NewRoot(edsIn) + roots, err := share.NewAxisRoots(edsIn) require.NoError(t, err) rows := make([]shwap.Row, edsIn.Width()/2) From bddc697f32860ffdd3c8cb5b707f7fc4a19f89ff Mon Sep 17 00:00:00 2001 From: Wondertan Date: Fri, 26 Jul 2024 00:04:32 +0200 Subject: [PATCH 7/7] fix comment --- share/shwap/p2p/bitswap/getter.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index fc3b2e918b..ad4e9f5db2 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -204,7 +204,8 @@ func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher { return g.archivalSession } -// edsFromRows imports given Rows and computes EDS out of them, assuming enough were Rows provided. +// edsFromRows imports given Rows and computes EDS out of them, assuming enough Rows were provided. +// It is designed to reuse Row halves computed during verification on [Fetch] level. func edsFromRows(roots *share.AxisRoots, rows []shwap.Row) (*rsmt2d.ExtendedDataSquare, error) { shrs := make([]share.Share, len(roots.RowRoots)*len(roots.RowRoots)) for i, row := range rows {