Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blocksync): share.WriteEDS #1139

Merged
merged 18 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.7.0
github.com/ipld/go-car v0.5.0
github.com/libp2p/go-libp2p v0.21.0
github.com/libp2p/go-libp2p-core v0.19.1
github.com/libp2p/go-libp2p-kad-dht v0.17.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipld/go-car v0.5.0 h1:kcCEa3CvYMs0iE5BzD5sV7O2EwMiCIp3uF8tA6APQT8=
github.com/ipld/go-car v0.5.0/go.mod h1:ppiN5GWpjOZU9PgpAZ9HbZd9ZgSpwPMr48fGRJOWmvE=
github.com/ipld/go-codec-dagpb v1.3.1 h1:yVNlWRQexCa54ln3MSIiUN++ItH7pdhBFhh0hSgZu1w=
github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
Expand Down
221 changes: 221 additions & 0 deletions share/eds/eds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package eds

import (
"context"
"errors"
"fmt"
"io"
"math"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/wrapper"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"
)

var ErrEmptySquare = errors.New("share: importing empty data")

// writingSession contains the components needed to write an EDS to a CARv1 file with our custom node order.
type writingSession struct {
eds *rsmt2d.ExtendedDataSquare
// store is an in-memory blockstore, used to cache the inner nodes (proofs) while we walk the nmt tree.
store blockstore.Blockstore
w io.Writer
}

// WriteEDS writes the entire EDS into the given io.Writer as CARv1 file.
// This includes all shares in quadrant order, followed by all inner nodes of the NMT tree.
// Order: [ Carv1Header | Q1 | Q2 | Q3 | Q4 | inner nodes ]
// For more information about the header: https://ipld.io/specs/transport/car/carv1/#header
func WriteEDS(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io.Writer) error {
// 1. Reimport EDS. This is needed to traverse the NMT tree and cache the inner nodes (proofs)
writer, err := initializeWriter(ctx, eds, w)
if err != nil {
return fmt.Errorf("share: failure creating eds writer: %w", err)
}

// 2. Creates and writes Carv1Header
// - Roots are the eds Row + Col roots
err = writer.writeHeader()
if err != nil {
return fmt.Errorf("share: failure writing carv1 header: %w", err)
}

// 3. Iterates over shares in quadrant order via eds.GetCell
err = writer.writeQuadrants()
if err != nil {
return fmt.Errorf("share: failure writing shares: %w", err)
}

// 4. Iterates over in-memory Blockstore and writes proofs to the CAR
err = writer.writeProofs(ctx)
if err != nil {
return fmt.Errorf("share: failure writing proofs: %w", err)
}
return nil
}

// initializeWriter reimports the EDS into an in-memory blockstore in order to cache the proofs.
func initializeWriter(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io.Writer) (*writingSession, error) {
// we use an in-memory blockstore and an offline exchange
store := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bs := blockservice.New(store, nil)
// shares are extracted from the eds so that we can reimport them to traverse
shares := share.ExtractEDS(eds)
shareCount := len(shares)
if shareCount == 0 {
return nil, ErrEmptySquare
}
odsWidth := int(math.Sqrt(float64(shareCount)) / 2)
// (shareCount*2) - (odsWidth*4) is the amount of inner nodes visited
batchAdder := ipld.NewNmtNodeAdder(ctx, bs, format.MaxSizeBatchOption(innerNodeBatchSize(shareCount, odsWidth)))
// this adder ignores leaves, so that they are not added to the store we iterate through in writeProofs
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(odsWidth), nmt.NodeVisitor(batchAdder.VisitInnerNodes))
eds, err := rsmt2d.ImportExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), tree.Constructor)
if err != nil {
return nil, fmt.Errorf("failure to recompute the extended data square: %w", err)
}
// compute roots
eds.RowRoots()
// commit the batch to DAG
err = batchAdder.Commit()
if err != nil {
return nil, fmt.Errorf("failure to commit the inner nodes to the dag: %w", err)
}

return &writingSession{
eds: eds,
store: store,
w: w,
}, nil
}

// writeHeader creates a CarV1 header using the EDS's Row and Column roots as the list of DAG roots.
func (w *writingSession) writeHeader() error {
rootCids, err := rootsToCids(w.eds)
if err != nil {
return fmt.Errorf("failure getting root cids: %w", err)
}

return car.WriteHeader(&car.CarHeader{
Roots: rootCids,
Version: 1,
}, w.w)
}

// writeQuadrants reorders the shares to quadrant order and writes them to the CARv1 file.
func (w *writingSession) writeQuadrants() error {
shares := quadrantOrder(w.eds)
for _, share := range shares {
cid, err := ipld.CidFromNamespacedSha256(nmt.Sha256Namespace8FlaggedLeaf(share))
if err != nil {
return fmt.Errorf("failure to get cid from share: %w", err)
}
err = util.LdWrite(w.w, cid.Bytes(), share)
if err != nil {
return fmt.Errorf("failure to write share: %w", err)
}
}
return nil
}

// writeProofs iterates over the in-memory blockstore's keys and writes all inner nodes to the CARv1 file.
func (w *writingSession) writeProofs(ctx context.Context) error {
// we only stored proofs to the store, so we can just iterate over them here without getting any leaves
proofs, err := w.store.AllKeysChan(ctx)
if err != nil {
return fmt.Errorf("failure to get all keys from the blockstore: %w", err)
}
for proofCid := range proofs {
node, err := w.store.Get(ctx, proofCid)
if err != nil {
return fmt.Errorf("failure to get proof from the blockstore: %w", err)
}
cid, err := ipld.CidFromNamespacedSha256(nmt.Sha256Namespace8FlaggedInner(node.RawData()))
if err != nil {
return fmt.Errorf("failure to get cid: %w", err)
}
err = util.LdWrite(w.w, cid.Bytes(), node.RawData())
if err != nil {
return fmt.Errorf("failure to write proof to the car: %w", err)
}
}
return nil
}

// quadrantOrder reorders the shares in the EDS to quadrant row-by-row order, prepending the respective namespace
// to the shares.
// e.g. [ Q1 R1 | Q1 R2 | Q1 R3 | Q1 R4 | Q2 R1 | Q2 R2 .... ]
func quadrantOrder(eds *rsmt2d.ExtendedDataSquare) [][]byte {
size := eds.Width() * eds.Width()
shares := make([][]byte, size)

quadrantWidth := int(eds.Width() / 2)
quadrantSize := quadrantWidth * quadrantWidth
for i := 0; i < quadrantWidth; i++ {
for j := 0; j < quadrantWidth; j++ {
cells := getQuadrantCells(eds, uint(i), uint(j))
innerOffset := i*quadrantWidth + j
for quadrant := 0; quadrant < 4; quadrant++ {
shares[(quadrant*quadrantSize)+innerOffset] = prependNamespace(quadrant, cells[quadrant])
}
}
}
return shares
}

// getQuadrantCells returns the cell of each EDS quadrant with the passed inner-quadrant coordinates
func getQuadrantCells(eds *rsmt2d.ExtendedDataSquare, i, j uint) [][]byte {
cells := make([][]byte, 4)
quadrantWidth := eds.Width() / 2
cells[0] = eds.GetCell(i, j)
cells[1] = eds.GetCell(i, j+quadrantWidth)
cells[2] = eds.GetCell(i+quadrantWidth, j)
cells[3] = eds.GetCell(i+quadrantWidth, j+quadrantWidth)
return cells
}

// prependNamespace adds the namespace to the passed share if in the first quadrant,
// otherwise it adds the ParitySharesNamespace to the beginning.
func prependNamespace(quadrant int, share []byte) []byte {
switch quadrant {
case 0:
return append(share[:8], share...)
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
case 1, 2, 3:
return append(appconsts.ParitySharesNamespaceID, share...)
default:
panic("invalid quadrant")
}
}

// rootsToCids converts the EDS's Row and Column roots to CIDs.
func rootsToCids(eds *rsmt2d.ExtendedDataSquare) ([]cid.Cid, error) {
var err error
roots := append(eds.RowRoots(), eds.ColRoots()...)
rootCids := make([]cid.Cid, len(roots))
for i, r := range roots {
rootCids[i], err = ipld.CidFromNamespacedSha256(r)
if err != nil {
return nil, fmt.Errorf("failure to get cid from root: %w", err)
}
}
return rootCids, nil
}

// innerNodeBatchSize calculates the total number of inner nodes in an EDS,
// to be flushed to the dagstore in a single write.
func innerNodeBatchSize(shareCount int, odsWidth int) int {
return (shareCount * 2) - (odsWidth * 4)
}
155 changes: 155 additions & 0 deletions share/eds/eds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package eds

import (
"context"
"os"
"testing"

ds "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
carv1 "github.com/ipld/go-car"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
)

func TestQuadrantOrder(t *testing.T) {
// TODO: add more test cases
nID := []byte{0, 0, 0, 0, 0, 0, 0, 0}
parity := append(appconsts.ParitySharesNamespaceID, nID...) //nolint
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
doubleNID := append(nID, nID...) //nolint
result, _ := rsmt2d.ComputeExtendedDataSquare([][]byte{
append(nID, 1), append(nID, 2),
append(nID, 3), append(nID, 4),
}, rsmt2d.NewRSGF8Codec(), rsmt2d.NewDefaultTree)
// {{1}, {2}, {7}, {13}},
// {{3}, {4}, {13}, {31}},
// {{5}, {14}, {19}, {41}},
// {{9}, {26}, {47}, {69}},
require.Equal(t,
[][]byte{
append(doubleNID, 1), append(doubleNID, 2), append(doubleNID, 3), append(doubleNID, 4),
append(parity, 7), append(parity, 13), append(parity, 13), append(parity, 31),
append(parity, 5), append(parity, 14), append(parity, 9), append(parity, 26),
append(parity, 19), append(parity, 41), append(parity, 47), append(parity, 69),
}, quadrantOrder(result),
)
}

func TestWriteEDS(t *testing.T) {
writeRandomEDS(t)
}

func TestWriteEDSHeaderRoots(t *testing.T) {
eds := writeRandomEDS(t)
f := openWrittenEDS(t)
defer f.Close()

reader, err := carv1.NewCarReader(f)
require.NoError(t, err, "error creating car reader")
roots, err := rootsToCids(eds)
require.NoError(t, err, "error converting roots to cids")
require.Equal(t, roots, reader.Header.Roots)
}

func TestWriteEDSStartsWithLeaves(t *testing.T) {
eds := writeRandomEDS(t)
f := openWrittenEDS(t)
defer f.Close()

reader, err := carv1.NewCarReader(f)
require.NoError(t, err, "error creating car reader")
block, err := reader.Next()
require.NoError(t, err, "error getting first block")

require.Equal(t, block.RawData()[ipld.NamespaceSize:], eds.GetCell(0, 0))
}

func TestWriteEDSIncludesRoots(t *testing.T) {
writeRandomEDS(t)
f := openWrittenEDS(t)
defer f.Close()

bs := blockstore.NewBlockstore(ds.NewMapDatastore())
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
loaded, err := carv1.LoadCar(ctx, bs, f)
require.NoError(t, err, "error loading car file")
for _, root := range loaded.Roots {
ok, err := bs.Has(context.Background(), root)
require.NoError(t, err, "error checking if blockstore has root")
require.True(t, ok, "blockstore does not have root")
}
}

func TestWriteEDSInQuadrantOrder(t *testing.T) {
eds := writeRandomEDS(t)
f := openWrittenEDS(t)
defer f.Close()

reader, err := carv1.NewCarReader(f)
require.NoError(t, err, "error creating car reader")

shares := quadrantOrder(eds)
for i := 0; i < len(shares); i++ {
block, err := reader.Next()
require.NoError(t, err, "error getting block")
require.Equal(t, block.RawData(), shares[i])
}
}

// TestInnerNodeBatchSize verifies that the number of unique inner nodes is equal to ipld.BatchSize - shareCount.
func TestInnerNodeBatchSize(t *testing.T) {
tests := []struct {
name string
origWidth int
}{
{"2", 2},
{"4", 4},
{"8", 8},
{"16", 16},
{"32", 32},
// {"64", 64}, // test case too large for CI with race detector
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extendedWidth := tt.origWidth * 2
shareCount := extendedWidth * extendedWidth
assert.Equalf(
t,
innerNodeBatchSize(shareCount, tt.origWidth),
ipld.BatchSize(extendedWidth)-shareCount,
"batchSize(%v)", extendedWidth,
)
})
}
}

func writeRandomEDS(t *testing.T) *rsmt2d.ExtendedDataSquare {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
tmpDir := t.TempDir()
err := os.Chdir(tmpDir)
require.NoError(t, err, "error changing to the temporary test directory")
f, err := os.OpenFile("test.car", os.O_WRONLY|os.O_CREATE, 0600)
require.NoError(t, err, "error opening file")

eds := share.RandEDS(t, 4)
err = WriteEDS(ctx, eds, f)
require.NoError(t, err, "error writing EDS to file")
t.Cleanup(cancel)
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
f.Close()
return eds
}

func openWrittenEDS(t *testing.T) *os.File {
t.Helper()
f, err := os.OpenFile("test.car", os.O_RDONLY, 0600)
require.NoError(t, err, "error opening file")
return f
}
Loading