diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index 0538a742..5bf7cc64 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -215,7 +215,8 @@ func Fetch(cctx *cli.Context) error { } else { carWriter = storage.NewDeferredCarWriterForPath(rootCid, outfile) } - carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempDir) + tempStore := storage.NewDeferredStorageCar(tempDir) + carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer carStore.Close() var blockCount int diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 437727b1..3786f246 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -19,8 +19,10 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/lassie/pkg/internal/itest/mocknet" "github.com/filecoin-project/lassie/pkg/internal/itest/testpeer" + "github.com/filecoin-project/lassie/pkg/internal/testutil" "github.com/filecoin-project/lassie/pkg/lassie" httpserver "github.com/filecoin-project/lassie/pkg/server/http" + "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/google/uuid" "github.com/ipfs/go-cid" unixfs "github.com/ipfs/go-unixfsnode/testutil" @@ -30,6 +32,8 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/memstore" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" @@ -49,7 +53,10 @@ func TestHttpFetch(t *testing.T) { blockQuery := func(q url.Values, _ []testpeer.TestPeer) { q.Set("dag-scope", "block") } - + noDups := func(header http.Header) { + header.Set("Accept", "application/vnd.ipld.car;order=dfs;version=1;dups=n;") + } + type headerSetter func(http.Header) type queryModifier func(url.Values, []testpeer.TestPeer) type bodyValidator func(*testing.T, unixfs.DirEntry, []byte) @@ -66,6 +73,7 @@ func TestHttpFetch(t *testing.T) { modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry paths []string + setHeader headerSetter modifyQueries []queryModifier validateBodies []bodyValidator }{ @@ -196,6 +204,27 @@ func TestHttpFetch(t *testing.T) { validateCarBody(t, body, srcData.Root, wantCids, true) }}, }, + { + name: "http max block limit", + httpRemotes: 1, + expectUncleanEnd: true, + modifyHttpConfig: func(cfg httpserver.HttpServerConfig) httpserver.HttpServerConfig { + cfg.MaxBlocksPerRequest = 3 + return cfg + }, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, + }, { // dag-scope entity fetch should get the same DAG as full for a plain file name: "graphsync large sharded file, dag-scope entity", @@ -646,6 +675,43 @@ func TestHttpFetch(t *testing.T) { v.Set("providers", strings.Join(maStrings, ",")) }}, }, + { + name: "http large sharded file with dups", + httpRemotes: 1, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ + Bag: make(map[string][]byte), + }} + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(store) + lsys.SetWriteStorage(store) + _, _, err := verifiedcar.Config{ + Root: srcData.Root, + Selector: selectorparse.CommonSelector_ExploreAllRecursively, + ExpectDuplicatesIn: true, + }.VerifyCar(context.Background(), bytes.NewReader(body), lsys) + require.NoError(t, err) + }}, + }, + { + name: "http large sharded file with dups, no dups response requested", + httpRemotes: 1, + setHeader: noDups, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + wantCids := []cid.Cid{ + srcData.Root, // "/"" + srcData.SelfCids[1], + srcData.SelfCids[len(srcData.SelfCids)-1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, + }, } for _, testCase := range testCases { @@ -712,7 +778,11 @@ func TestHttpFetch(t *testing.T) { addr := fmt.Sprintf("http://%s/ipfs/%s%s", httpServer.Addr(), srcData[i].Root.String(), path) getReq, err := http.NewRequest("GET", addr, nil) req.NoError(err) - getReq.Header.Add("Accept", "application/vnd.ipld.car") + if testCase.setHeader == nil { + getReq.Header.Add("Accept", "application/vnd.ipld.car") + } else { + testCase.setHeader(getReq.Header) + } if testCase.modifyQueries != nil && testCase.modifyQueries[i] != nil { q := getReq.URL.Query() testCase.modifyQueries[i](q, mrn.Remotes) @@ -778,6 +848,7 @@ func TestHttpFetch(t *testing.T) { req.NoError(err) if DEBUG_DATA { + t.Logf("Creating CAR %s in temp dir", fmt.Sprintf("%s_received%d.car", testCase.name, i)) dstf, err := os.CreateTemp("", fmt.Sprintf("%s_received%d.car", testCase.name, i)) req.NoError(err) t.Logf("Writing received data to CAR @ %s", dstf.Name()) diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index b478519e..ef878296 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -355,6 +355,24 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res unixfsPath = "/" + strings.Join(urlPath[2:], "/") } + acceptTypes := strings.Split(req.Header.Get("Accept"), ",") + includeDupes := false + for _, acceptType := range acceptTypes { + typeParts := strings.Split(acceptType, ";") + if typeParts[0] == "application/vnd.ipld.car" { + for _, nextPart := range typeParts[1:] { + pair := strings.Split(nextPart, "=") + if len(pair) == 2 { + attr := strings.TrimSpace(pair[0]) + value := strings.TrimSpace(pair[1]) + if attr == "dups" && value == "y" { + includeDupes = true + } + } + } + } + } + // We're always providing the dag-scope parameter, so add a failure case if we stop // providing it in the future if !req.URL.Query().Has("dag-scope") { @@ -384,7 +402,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res } // Write to response writer - carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false)) + carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(includeDupes)) if err != nil { http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError) return @@ -433,7 +451,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res err = progress.WalkAdv(rootNode, sel, visitNoop) if err != nil { - http.Error(res, fmt.Sprintf("Failed to traverse from root node: %v", err), http.StatusInternalServerError) + // if we loaded the first block, we can't write headers any more return } } diff --git a/pkg/internal/testutil/correctedmemstore.go b/pkg/internal/testutil/correctedmemstore.go new file mode 100644 index 00000000..4d41a67f --- /dev/null +++ b/pkg/internal/testutil/correctedmemstore.go @@ -0,0 +1,30 @@ +package testutil + +import ( + "context" + "io" + + format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-ipld-prime/storage/memstore" +) + +// TODO: remove when this is fixed in IPLD prime +type CorrectedMemStore struct { + *memstore.Store +} + +func (cms *CorrectedMemStore) Get(ctx context.Context, key string) ([]byte, error) { + data, err := cms.Store.Get(ctx, key) + if err != nil && err.Error() == "404" { + err = format.ErrNotFound{} + } + return data, err +} + +func (cms *CorrectedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + rc, err := cms.Store.GetStream(ctx, key) + if err != nil && err.Error() == "404" { + err = format.ErrNotFound{} + } + return rc, err +} diff --git a/pkg/internal/testutil/gen.go b/pkg/internal/testutil/gen.go index a2992fae..dcb4e9be 100644 --- a/pkg/internal/testutil/gen.go +++ b/pkg/internal/testutil/gen.go @@ -139,3 +139,12 @@ func GenerateRetrievalIDs(t *testing.T, n int) []types.RetrievalID { } return retrievalIDs } + +type ZeroReader struct{} + +func (ZeroReader) Read(b []byte) (n int, err error) { + for i := range b { + b[i] = 0 + } + return len(b), nil +} diff --git a/pkg/internal/testutil/toblocks.go b/pkg/internal/testutil/toblocks.go new file mode 100644 index 00000000..729d5f37 --- /dev/null +++ b/pkg/internal/testutil/toblocks.go @@ -0,0 +1,61 @@ +package testutil + +import ( + "bytes" + "io" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/stretchr/testify/require" +) + +// ToBlocks makes a block array from ordered blocks in a traversal +func ToBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, selNode datamodel.Node) []blocks.Block { + sel, err := selector.CompileSelector(selNode) + require.NoError(t, err) + traversedBlocks := make([]blocks.Block, 0) + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + osro := lsys.StorageReadOpener + lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + r, err := osro(lc, l) + if err != nil { + return nil, err + } + byts, err := io.ReadAll(r) + if err != nil { + return nil, err + } + blk, err := blocks.NewBlockWithCid(byts, l.(cidlink.Link).Cid) + if err != nil { + return nil, err + } + traversedBlocks = append(traversedBlocks, blk) + return bytes.NewReader(byts), nil + } + var proto datamodel.NodePrototype = basicnode.Prototype.Any + if root.Prefix().Codec == cid.DagProtobuf { + proto = dagpb.Type.PBNode + } + rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto) + require.NoError(t, err) + prog := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser), + }, + } + vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } + err = prog.WalkAdv(rootNode, sel, vf) + require.NoError(t, err) + + return traversedBlocks +} diff --git a/pkg/retriever/bitswapretriever_test.go b/pkg/retriever/bitswapretriever_test.go index d95cf949..1d7b5c42 100644 --- a/pkg/retriever/bitswapretriever_test.go +++ b/pkg/retriever/bitswapretriever_test.go @@ -17,7 +17,6 @@ import ( "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" exchange "github.com/ipfs/go-ipfs-exchange-interface" - format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-libipfs/blocks" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/linking" @@ -32,7 +31,7 @@ import ( func TestBitswapRetriever(t *testing.T) { ctx := context.Background() - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -514,7 +513,7 @@ func makeLsys(blocks []blocks.Block) *linking.LinkSystem { bag[cidlink.Link{Cid: block.Cid()}.Binary()] = block.RawData() } lsys := cidlink.DefaultLinkSystem() - store := &correctedMemStore{&memstore.Store{Bag: bag}} + store := &testutil.CorrectedMemStore{Store: &memstore.Store{Bag: bag}} lsys.SetReadStorage(store) lsys.SetWriteStorage(store) return &lsys @@ -528,27 +527,6 @@ func sizeOf(blocks []blocks.Block) uint64 { return total } -// TODO: remove when this is fixed in IPLD prime -type correctedMemStore struct { - *memstore.Store -} - -func (cms *correctedMemStore) Get(ctx context.Context, key string) ([]byte, error) { - data, err := cms.Store.Get(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return data, err -} - -func (cms *correctedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { - rc, err := cms.Store.GetStream(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return rc, err -} - type mockInProgressCids struct { incremented []cid.Cid decremented []cid.Cid diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index c3fe7235..ed716bed 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -15,7 +15,6 @@ import ( "github.com/filecoin-project/lassie/pkg/types" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipni/go-libipni/metadata" "github.com/multiformats/go-multicodec" ) @@ -114,18 +113,14 @@ func (ph *ProtocolHttp) Retrieve( ttfb = retrieval.Clock.Since(phaseStartTime) shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate)) }) - - sel, err := selector.CompileSelector(retrieval.request.GetSelector()) - if err != nil { - return nil, err - } - cfg := verifiedcar.Config{ - Root: retrieval.request.Cid, - Selector: sel, + Root: retrieval.request.Cid, + Selector: retrieval.request.GetSelector(), + ExpectDuplicatesIn: true, + MaxBlocks: retrieval.request.MaxBlocks, } - blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem) + blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem) if err != nil { return nil, err } diff --git a/pkg/retriever/httpretriever_test.go b/pkg/retriever/httpretriever_test.go index 1089195d..7b766dde 100644 --- a/pkg/retriever/httpretriever_test.go +++ b/pkg/retriever/httpretriever_test.go @@ -30,7 +30,7 @@ import ( func TestHTTPRetriever(t *testing.T) { ctx := context.Background() - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index d49ba37b..2fab1014 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -50,13 +50,56 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response hasAccept := req.Header.Get("Accept") != "" acceptTypes := strings.Split(req.Header.Get("Accept"), ",") validAccept := false + includeDupes := true for _, acceptType := range acceptTypes { typeParts := strings.Split(acceptType, ";") if typeParts[0] == "*/*" || typeParts[0] == "application/*" || typeParts[0] == "application/vnd.ipld.car" { validAccept = true - break + if typeParts[0] == "application/vnd.ipld.car" { + // parse https://github.com/ipfs/specs/pull/412 car attributes + for _, nextPart := range typeParts[1:] { + pair := strings.Split(nextPart, "=") + if len(pair) == 2 { + attr := strings.TrimSpace(pair[0]) + value := strings.TrimSpace(pair[1]) + switch attr { + case "dups": + switch value { + case "y": + case "n": + includeDupes = false + default: + // don't accept un expected values + validAccept = false + } + case "version": + switch value { + case "1": + default: + // don't accept any version but 1 + validAccept = false + } + case "order": + switch value { + case "dfs": + case "unk": + default: + // we only do dfs, which also satisfies unk, future extensions are not yet supported + validAccept = false + } + default: + // ignore others + } + } + } + } + // only break if further validation didn't fail + if validAccept { + break + } } } + if hasAccept && !validAccept { statusLogger.logStatus(http.StatusBadRequest, "No acceptable content type") res.WriteHeader(http.StatusBadRequest) @@ -188,8 +231,14 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response // the response writer. Once closed, no other content should be written. bytesWritten := make(chan struct{}, 1) - carWriter := storage.NewDeferredCarWriterForStream(rootCid, res) - carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), cfg.TempDir) + tempStore := storage.NewDeferredStorageCar(cfg.TempDir) + var carWriter storage.DeferredWriter + if includeDupes { + carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), rootCid, path.String(), dagScope, tempStore, res) + } else { + carWriter = storage.NewDeferredCarWriterForStream(rootCid, res) + } + carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer func() { if err := carStore.Close(); err != nil { logger.Errorf("error closing temp store: %s", err) @@ -272,6 +321,12 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response metric := header.NewMetric(string(re.Phase())) metric.Duration = re.Time().Sub(re.PhaseStartTime()) }) + + // force all blocks to flush + if cerr := carWriter.Close(); cerr != nil { + logger.Infof("error closing car writer: %s", cerr) + } + if err != nil { select { case <-bytesWritten: diff --git a/pkg/storage/cachingtempstore.go b/pkg/storage/cachingtempstore.go index 2f7990d8..30d08654 100644 --- a/pkg/storage/cachingtempstore.go +++ b/pkg/storage/cachingtempstore.go @@ -40,9 +40,9 @@ type CachingTempStore struct { preloadKeys map[string]struct{} } -func NewCachingTempStore(outWriter linking.BlockWriteOpener, tempDir string) *CachingTempStore { +func NewCachingTempStore(outWriter linking.BlockWriteOpener, store *DeferredStorageCar) *CachingTempStore { return &CachingTempStore{ - store: NewDeferredStorageCar(tempDir), + store: store, outWriter: outWriter, preloadKeys: make(map[string]struct{}), } diff --git a/pkg/storage/cachingtempstore_test.go b/pkg/storage/cachingtempstore_test.go index 66466ceb..a69efbf9 100644 --- a/pkg/storage/cachingtempstore_test.go +++ b/pkg/storage/cachingtempstore_test.go @@ -49,7 +49,7 @@ func TestDeferredCarWriterWritesCARv1(t *testing.T) { var buf bytes.Buffer cw := NewDeferredCarWriterForStream(testCid1, &buf) - ss := NewCachingTempStore(cw.BlockWriteOpener(), "") + ss := NewCachingTempStore(cw.BlockWriteOpener(), NewDeferredStorageCar("")) t.Cleanup(func() { ss.Close() }) if tt.readBeforeWrite { diff --git a/pkg/storage/deferredcarwriter.go b/pkg/storage/deferredcarwriter.go index abe3e49a..276b3ef1 100644 --- a/pkg/storage/deferredcarwriter.go +++ b/pkg/storage/deferredcarwriter.go @@ -22,6 +22,13 @@ type putCb struct { var _ ipldstorage.WritableStorage = (*DeferredCarWriter)(nil) var _ io.Closer = (*DeferredCarWriter)(nil) +type DeferredWriter interface { + ipldstorage.WritableStorage + io.Closer + BlockWriteOpener() linking.BlockWriteOpener + OnPut(cb func(int), once bool) +} + // DeferredCarWriter creates a write-only CARv1 either to an existing stream or // to a file designated by a supplied path. CARv1 content (including header) // only begins when the first Put() operation is performed. If the output is a @@ -38,20 +45,21 @@ type DeferredCarWriter struct { f *os.File w carstorage.WritableCar putCb []putCb + opts []carv2.Option } // NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a // file designated by the supplied path. The file will only be created on the // first Put() operation. -func NewDeferredCarWriterForPath(root cid.Cid, outPath string) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outPath: outPath} +func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} } // NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to // the supplied stream. The stream will only be written to on the first Put() // operation. -func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outStream: outStream} +func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} } // OnPut will call a callback when each Put() operation is started. The argument @@ -120,7 +128,7 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { dcw.f = openedFile outStream = openedFile } - w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, carv2.WriteAsCarV1(true)) + w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, append([]carv2.Option{carv2.WriteAsCarV1(true)}, dcw.opts...)...) if err != nil { return nil, err } diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go new file mode 100644 index 00000000..8c58a337 --- /dev/null +++ b/pkg/storage/duplicateaddercar.go @@ -0,0 +1,161 @@ +package storage + +import ( + "bytes" + "container/list" + "context" + "fmt" + "io" + "sync" + + "github.com/filecoin-project/lassie/pkg/types" + "github.com/filecoin-project/lassie/pkg/verifiedcar" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" +) + +type DuplicateAdderCar struct { + *DeferredCarWriter + ctx context.Context + root cid.Cid + path string + scope types.DagScope + store *DeferredStorageCar + blockStream *blockStream + streamCompletion chan error + streamCompletionLk sync.Mutex +} + +func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outStream io.Writer) *DuplicateAdderCar { + blockStream := &blockStream{ctx: ctx} + blockStream.blockBuffer = list.New() + blockStream.cond = sync.NewCond(&blockStream.mu) + + // create the car writer for the final stream + outgoing := NewDeferredCarWriterForStream(root, outStream, carv2.AllowDuplicatePuts(true)) + return &DuplicateAdderCar{ + DeferredCarWriter: outgoing, + ctx: ctx, + root: root, + path: path, + scope: scope, + store: store, + blockStream: blockStream, + } +} + +func (da *DuplicateAdderCar) addDupes() { + var err error + defer func() { + da.streamCompletion <- err + }() + sel := types.PathScopeSelector(da.path, da.scope) + + // we're going to do a verified car where we add dupes back in + cfg := verifiedcar.Config{ + Root: da.root, + Selector: sel, + WriteDuplicatesOut: true, + } + + lsys := cidlink.DefaultLinkSystem() + // use the final car writer to write blocks + lsys.SetWriteStorage(da) + // use the deferred storage car to read in any dups we need + // to serve + lsys.SetReadStorage(da.store) + lsys.TrustedStorage = true + + // run the verification + _, _, err = cfg.VerifyBlockStream(da.ctx, da.blockStream, lsys) +} + +func (da *DuplicateAdderCar) BlockWriteOpener() linking.BlockWriteOpener { + return func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + // first, check if we have a stream completion channel, and abort if this is called twice + da.streamCompletionLk.Lock() + if da.streamCompletion == nil { + da.streamCompletion = make(chan error, 1) + go da.addDupes() + } + da.streamCompletionLk.Unlock() + var buf bytes.Buffer + var written bool + return &buf, func(lnk ipld.Link) error { + if written { + return fmt.Errorf("WriteCommitter already used") + } + written = true + blk, err := blocks.NewBlockWithCid(buf.Bytes(), lnk.(cidlink.Link).Cid) + if err != nil { + return err + } + return da.blockStream.WriteBlock(blk) + }, nil + } +} + +// Close closes the dup stream, verifying completion, if one was created. +func (da *DuplicateAdderCar) Close() error { + // close the block stream + da.blockStream.Close() + + // wait for the dupe stream to complete + da.streamCompletionLk.Lock() + streamCompletion := da.streamCompletion + da.streamCompletionLk.Unlock() + if streamCompletion == nil { + return nil + } + return <-streamCompletion +} + +type blockStream struct { + done bool + ctx context.Context + mu sync.Mutex + cond *sync.Cond + blockBuffer *list.List +} + +func (bs *blockStream) Close() { + bs.mu.Lock() + bs.done = true + bs.mu.Unlock() + bs.cond.Signal() +} + +func (bs *blockStream) WriteBlock(blk blocks.Block) error { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.done { + return errClosed + } + bs.blockBuffer.PushBack(blk) + bs.cond.Signal() + return nil +} + +func (bs *blockStream) Next() (blocks.Block, error) { + bs.mu.Lock() + defer bs.mu.Unlock() + + for { + select { + case <-bs.ctx.Done(): + return nil, bs.ctx.Err() + default: + } + if e := bs.blockBuffer.Front(); e != nil { + return bs.blockBuffer.Remove(e).(blocks.Block), nil + } + if bs.done { + return nil, io.EOF + } + bs.cond.Wait() + } +} diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go new file mode 100644 index 00000000..ed213729 --- /dev/null +++ b/pkg/storage/duplicateaddercar_test.go @@ -0,0 +1,64 @@ +package storage_test + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/filecoin-project/lassie/pkg/internal/testutil" + "github.com/filecoin-project/lassie/pkg/storage" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-libipfs/blocks" + unixfs "github.com/ipfs/go-unixfsnode/testutil" + carv2 "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/memstore" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/stretchr/testify/require" +) + +func TestDuplicateAdderCar(t *testing.T) { + + setupStore := &testutil.CorrectedMemStore{Store: &memstore.Store{ + Bag: make(map[string][]byte), + }} + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + lsys.SetReadStorage(setupStore) + lsys.SetWriteStorage(setupStore) + + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, testutil.ZeroReader{}, 4<<20) + unixfsFileWithDupsBlocks := testutil.ToBlocks(t, lsys, unixfsFileWithDups.Root, selectorparse.CommonSelector_ExploreAllRecursively) + buf := new(bytes.Buffer) + + store := storage.NewDeferredStorageCar("") + ctx := context.Background() + carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", types.DagScopeAll, store, buf) + cachingTempStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), store) + + // write the root block, containing sharding metadata + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[0].Cid().KeyString(), unixfsFileWithDupsBlocks[0].RawData()) + // write the duped block that the root points to for all but the last block + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[1].Cid().KeyString(), unixfsFileWithDupsBlocks[1].RawData()) + // write the last block, which will be unique because of a different length + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().KeyString(), unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].RawData()) + err := carWriter.Close() + require.NoError(t, err) + err = cachingTempStore.Close() + require.NoError(t, err) + + // now, verify the traversal output a whole car with dups. + reader, err := carv2.NewBlockReader(buf) + require.NoError(t, err) + receivedBlocks := make([]blocks.Block, 0, len(unixfsFileWithDupsBlocks)) + for { + blk, err := reader.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + receivedBlocks = append(receivedBlocks, blk) + } + require.Equal(t, unixfsFileWithDupsBlocks, receivedBlocks) +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 1b16da10..7b8a558e 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -40,7 +40,7 @@ func TestTempCarStorage(t *testing.T) { return nil }, nil } - cw = NewCachingTempStore(bwo, tempDir) + cw = NewCachingTempStore(bwo, NewDeferredStorageCar(tempDir)) } else { cw = NewDeferredStorageCar(tempDir) } @@ -154,7 +154,7 @@ func TestPreloadStore(t *testing.T) { return nil }, nil } - mainStore := NewCachingTempStore(bwo, t.TempDir()) + mainStore := NewCachingTempStore(bwo, NewDeferredStorageCar(t.TempDir())) t.Cleanup(func() { require.NoError(t, mainStore.Close()) }) diff --git a/pkg/types/request.go b/pkg/types/request.go index 02918f01..5f235a1d 100644 --- a/pkg/types/request.go +++ b/pkg/types/request.go @@ -88,14 +88,18 @@ func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path stri }, nil } +func PathScopeSelector(path string, scope DagScope) ipld.Node { + // Turn the path / scope into a selector + return unixfsnode.UnixFSPathSelectorBuilder(path, scope.TerminalSelectorSpec(), false) +} + // GetSelector will safely return a selector for this request. If none has been // set, it will generate one for the path & scope. func (r RetrievalRequest) GetSelector() ipld.Node { if r.Selector != nil { // custom selector return r.Selector } - // Turn the path / scope into a selector - return unixfsnode.UnixFSPathSelectorBuilder(r.Path, r.Scope.TerminalSelectorSpec(), false) + return PathScopeSelector(r.Path, r.Scope) } // GetUrlPath returns a URL path and query string valid with the Trusted HTTP diff --git a/pkg/types/types.go b/pkg/types/types.go index 6433aed5..a6472068 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -310,10 +310,5 @@ func (ds DagScope) TerminalSelectorSpec() builder.SelectorSpec { } func (ds DagScope) AcceptHeader() string { - switch ds { - case DagScopeBlock: - return "application/vnd.ipld.block" - default: - return "application/vnd.ipld.car" - } + return "application/vnd.ipld.car;version=1;order=dfs;dups=y" } diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 347daef5..067edde4 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" "io" + "math" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-libipfs/blocks" "github.com/ipfs/go-unixfsnode" "github.com/ipld/go-car/v2" dagpb "github.com/ipld/go-codec-dagpb" @@ -30,12 +32,19 @@ var ( ErrMissingBlock = errors.New("missing block in CAR") ) +type BlockReader interface { + Next() (blocks.Block, error) +} + var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) type Config struct { - Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against - AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 - Selector selector.Selector // The selector to execute, starting at the provided Root, to verify the contents of the CAR + Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against + AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 + Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR + ExpectDuplicatesIn bool // Handles whether the incoming stream has duplicates + WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks + MaxBlocks uint64 // set a budget for the traversal } func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } @@ -51,7 +60,7 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // * https://specs.ipfs.tech/http-gateways/trustless-gateway/ // // * https://specs.ipfs.tech/http-gateways/path-gateway/ -func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { +func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) if err != nil { @@ -72,14 +81,24 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy if len(cbr.Roots) != 1 || cbr.Roots[0] != cfg.Root { return 0, 0, ErrBadRoots } + return cfg.VerifyBlockStream(ctx, cbr, lsys) +} + +func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) { + + sel, err := selector.CompileSelector(cfg.Selector) + if err != nil { + return 0, 0, err + } + cr := &carReader{ cbr: cbr, } - + bt := &writeTracker{} lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - lsys.StorageReadOpener = nextBlockReadOpener(ctx, cr, lsys) + lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys) // run traversal in this goroutine progress := traversal.Progress{ @@ -89,6 +108,12 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy LinkTargetNodePrototypeChooser: protoChooser, }, } + if cfg.MaxBlocks > 0 { + progress.Budget = &traversal.Budget{ + LinkBudget: int64(cfg.MaxBlocks) - 1, // first block is already loaded + NodeBudget: math.MaxInt64, + } + } lc := linking.LinkContext{Ctx: ctx} lnk := cidlink.Link{Cid: cfg.Root} proto, err := protoChooser(lnk, lc) @@ -99,7 +124,7 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy if err != nil { return 0, 0, err } - if err := progress.WalkAdv(rootNode, cfg.Selector, visitNoop); err != nil { + if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil { return 0, 0, traversalError(err) } @@ -110,24 +135,45 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy } // wait for parser to finish and provide errors or stats - return cr.blocks, cr.bytes, nil + return bt.blocks, bt.bytes, nil } -func nextBlockReadOpener(ctx context.Context, cr *carReader, lsys linking.LinkSystem) linking.BlockReadOpener { +func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener { seen := make(map[cid.Cid]struct{}) return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { cid := l.(cidlink.Link).Cid + var data []byte + var err error if _, ok := seen[cid]; ok { - // duplicate block, rely on the supplied LinkSystem to have stored this - return lsys.StorageReadOpener(lc, l) - } - - seen[cid] = struct{}{} - data, err := cr.readNextBlock(ctx, cid) - if err != nil { - return nil, err + if cfg.ExpectDuplicatesIn { + // duplicate block, but in this case we are expecting the stream to have it + data, err = cr.readNextBlock(ctx, cid) + if err != nil { + return nil, err + } + if !cfg.WriteDuplicatesOut { + return bytes.NewReader(data), nil + } + } else { + // duplicate block, rely on the supplied LinkSystem to have stored this + rdr, err := lsys.StorageReadOpener(lc, l) + if !cfg.WriteDuplicatesOut { + return rdr, err + } + data, err = io.ReadAll(rdr) + if err != nil { + return nil, err + } + } + } else { + seen[cid] = struct{}{} + data, err = cr.readNextBlock(ctx, cid) + if err != nil { + return nil, err + } } + bt.recordBlock(data) w, wc, err := lsys.StorageWriteOpener(lc) if err != nil { return nil, err @@ -147,9 +193,7 @@ func nextBlockReadOpener(ctx context.Context, cr *carReader, lsys linking.LinkSy } type carReader struct { - cbr *car.BlockReader - blocks uint64 - bytes uint64 + cbr BlockReader } func (cr *carReader) readNextBlock(ctx context.Context, expected cid.Cid) ([]byte, error) { @@ -164,12 +208,19 @@ func (cr *carReader) readNextBlock(ctx context.Context, expected cid.Cid) ([]byt if blk.Cid() != expected { return nil, fmt.Errorf("%w: %s != %s", ErrUnexpectedBlock, blk.Cid(), expected) } - - cr.bytes += uint64(len(blk.RawData())) - cr.blocks++ return blk.RawData(), nil } +type writeTracker struct { + blocks uint64 + bytes uint64 +} + +func (bt *writeTracker) recordBlock(data []byte) { + bt.blocks++ + bt.bytes += uint64(len(data)) +} + func traversalError(original error) error { err := original for { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 950b5a97..b1027683 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -9,22 +9,21 @@ import ( "testing" "time" + "github.com/filecoin-project/lassie/pkg/internal/testutil" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-libipfs/blocks" "github.com/ipfs/go-unixfsnode" unixfs "github.com/ipfs/go-unixfsnode/testutil" "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/storage" - dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/storage/memstore" "github.com/ipld/go-ipld-prime/traversal" - "github.com/ipld/go-ipld-prime/traversal/selector" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/stretchr/testify/require" ) @@ -38,7 +37,7 @@ func TestVerifiedCar(t *testing.T) { t.Logf("random seed: %d", rndSeed) var rndReader io.Reader = rand.New(rand.NewSource(rndSeed)) - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -48,74 +47,77 @@ func TestVerifiedCar(t *testing.T) { tbc1 := gstestutil.SetupBlockChain(ctx, t, lsys, 1000, 100) root1 := tbc1.TipLink.(cidlink.Link).Cid - allBlocks := make([]block, 0, 100) - for _, b := range tbc1.AllBlocks() { - allBlocks = append(allBlocks, block{b.Cid(), b.RawData()}) - } + allBlocks := tbc1.AllBlocks() extraneousLnk, err := lsys.Store(linking.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.Prefix{Version: 1, Codec: 0x71, MhType: 0x12, MhLength: 32}}, basicnode.NewString("borp")) req.NoError(err) extraneousByts, err := lsys.LoadRaw(linking.LinkContext{}, extraneousLnk) req.NoError(err) + extraneousBlk, err := blocks.NewBlockWithCid(extraneousByts, extraneousLnk.(cidlink.Link).Cid) + req.NoError(err) - allSelector := mustCompile(selectorparse.CommonSelector_ExploreAllRecursively) + allSelector := selectorparse.CommonSelector_ExploreAllRecursively wrapPath := "/some/path/to/content" unixfsFile := unixfs.GenerateFile(t, &lsys, rndReader, 4<<20) - unixfsFileBlocks := toBlocks(t, lsys, unixfsFile.Root, allSelector) + unixfsFileBlocks := testutil.ToBlocks(t, lsys, unixfsFile.Root, allSelector) + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, testutil.ZeroReader{}, 4<<20) + unixfsFileWithDupsBlocks := testutil.ToBlocks(t, lsys, unixfsFileWithDups.Root, allSelector) var unixfsDir unixfs.DirEntry - var unixfsDirBlocks []block + var unixfsDirBlocks []blocks.Block for { unixfsDir = unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, false) - unixfsDirBlocks = toBlocks(t, lsys, unixfsDir.Root, allSelector) + unixfsDirBlocks = testutil.ToBlocks(t, lsys, unixfsDir.Root, allSelector) if len(unixfsDir.Children) > 2 { // we want at least 3 children to test the path subset selector break } } unixfsShardedDir := unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, true) - unixfsShardedDirBlocks := toBlocks(t, lsys, unixfsShardedDir.Root, allSelector) + unixfsShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, allSelector) - unixfsPreloadSelector := mustCompile(unixfsnode.MatchUnixFSPreloadSelector.Node()) + unixfsPreloadSelector := unixfsnode.MatchUnixFSPreloadSelector.Node() - unixfsPreloadDirBlocks := toBlocks(t, lsys, unixfsDir.Root, unixfsPreloadSelector) - unixfsPreloadShardedDirBlocks := toBlocks(t, lsys, unixfsShardedDir.Root, unixfsPreloadSelector) + unixfsPreloadDirBlocks := testutil.ToBlocks(t, lsys, unixfsDir.Root, unixfsPreloadSelector) + unixfsPreloadShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, unixfsPreloadSelector) - unixfsDirSubsetSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSPreloadSelector, false)) + unixfsDirSubsetSelector := unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSPreloadSelector, false) - unixfsWrappedPathSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.ExploreAllRecursivelySelector, false)) - unixfsWrappedPreloadPathSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSPreloadSelector, false)) + unixfsWrappedPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.ExploreAllRecursivelySelector, false) + unixfsWrappedPreloadPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSPreloadSelector, false) unixfsWrappedFile := unixfs.WrapContent(t, rndReader, &lsys, unixfsFile, wrapPath, false) - unixfsWrappedFileBlocks := toBlocks(t, lsys, unixfsWrappedFile.Root, allSelector) + unixfsWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedFile.Root, allSelector) // "trimmed" is similar to "exclusive" except that "trimmed" is a subset // of a larger DAG, whereas "exclusive" is a complete DAG. - unixfsTrimmedWrappedFileBlocks := toBlocks(t, lsys, unixfsWrappedFile.Root, unixfsWrappedPathSelector) + unixfsTrimmedWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedFile.Root, unixfsWrappedPathSelector) unixfsExclusiveWrappedFile := unixfs.WrapContent(t, rndReader, &lsys, unixfsFile, wrapPath, true) - unixfsExclusiveWrappedFileBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedFile.Root, allSelector) + unixfsExclusiveWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedFile.Root, allSelector) unixfsWrappedShardedDir := unixfs.WrapContent(t, rndReader, &lsys, unixfsShardedDir, wrapPath, false) - unixfsWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, allSelector) + unixfsWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, allSelector) // "trimmed" is similar to "exclusive" except that "trimmed" is a subset // of a larger DAG, whereas "exclusive" is a complete DAG. - unixfsTrimmedWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPathSelector) - unixfsTrimmedWrappedShardedDirOnlyBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + unixfsTrimmedWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPathSelector) + unixfsTrimmedWrappedShardedDirOnlyBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) unixfsExclusiveWrappedShardedDir := unixfs.WrapContent(t, rndReader, &lsys, unixfsShardedDir, wrapPath, true) - unixfsExclusiveWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, allSelector) - unixfsExclusiveWrappedShardedDirOnlyBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + unixfsExclusiveWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, allSelector) + unixfsExclusiveWrappedShardedDirOnlyBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + mismatchedCidBlk, _ := blocks.NewBlockWithCid(extraneousByts, allBlocks[99].Cid()) testCases := []struct { - name string - blocks []block - roots []cid.Cid - carv2 bool - err string - cfg verifiedcar.Config + name string + blocks []expectedBlock + roots []cid.Cid + carv2 bool + err string + cfg verifiedcar.Config + incomingHasDups bool }{ { name: "complete carv1", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, cfg: verifiedcar.Config{ Root: root1, @@ -124,7 +126,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv2 without AllowCARv2 errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, carv2: true, err: "bad CAR version", @@ -135,7 +137,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "complete carv2 with AllowCARv2", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, carv2: true, cfg: verifiedcar.Config{ @@ -146,7 +148,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with multiple roots errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1, root1}, err: "root CID mismatch", cfg: verifiedcar.Config{ @@ -156,7 +158,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with wrong root errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, err: "root CID mismatch", cfg: verifiedcar.Config{ @@ -166,7 +168,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous trailing block errors", - blocks: append(append([]block{}, allBlocks...), block{extraneousLnk.(cidlink.Link).Cid, extraneousByts}), + blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), roots: []cid.Cid{root1}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -176,9 +178,9 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous leading block errors", - blocks: append([]block{{extraneousLnk.(cidlink.Link).Cid, extraneousByts}}, allBlocks...), + blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].cid.String(), + err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -186,9 +188,9 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with out-of-order blocks errors", - blocks: append(append([]block{}, allBlocks[50:]...), allBlocks[0:50]...), + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + allBlocks[50].cid.String() + " != " + allBlocks[0].cid.String(), + err: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -196,7 +198,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with mismatching CID errors", - blocks: append(append([]block{}, allBlocks[0:99]...), block{allBlocks[99].cid, extraneousByts}), + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), roots: []cid.Cid{root1}, err: "mismatch in content integrity", cfg: verifiedcar.Config{ @@ -204,9 +206,24 @@ func TestVerifiedCar(t *testing.T) { Selector: allSelector, }, }, + { + name: "carv1 over budget errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + err: (&traversal.ErrBudgetExceeded{ + BudgetKind: "link", + Path: datamodel.ParsePath("Parents/0/Parents/0/Parents/0"), + Link: tbc1.LinkTipIndex(3), + }).Error(), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + MaxBlocks: 3, + }, + }, { name: "unixfs: large sharded file", - blocks: unixfsFileBlocks, + blocks: consumedBlocks(unixfsFileBlocks), roots: []cid.Cid{unixfsFile.Root}, cfg: verifiedcar.Config{ Root: unixfsFile.Root, @@ -215,7 +232,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large directory", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, cfg: verifiedcar.Config{ Root: unixfsDir.Root, @@ -224,7 +241,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded directory", - blocks: unixfsShardedDirBlocks, + blocks: consumedBlocks(unixfsShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, @@ -233,7 +250,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file with file scope", - blocks: unixfsFileBlocks, + blocks: consumedBlocks(unixfsFileBlocks), roots: []cid.Cid{unixfsFile.Root}, cfg: verifiedcar.Config{ Root: unixfsFile.Root, @@ -248,7 +265,7 @@ func TestVerifiedCar(t *testing.T) { // traversal, why is unixfs-preload making a difference for just matching a // directory.UnixFSBasicDir. name: "unixfs: all of large directory with file scope, errors", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -258,7 +275,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large sharded directory with file scope, errors", - blocks: unixfsShardedDirBlocks, + blocks: consumedBlocks(unixfsShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -268,7 +285,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large directory with file scope", - blocks: unixfsPreloadDirBlocks, + blocks: consumedBlocks(unixfsPreloadDirBlocks), roots: []cid.Cid{unixfsDir.Root}, cfg: verifiedcar.Config{ Root: unixfsDir.Root, @@ -277,7 +294,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large sharded directory with file scope", - blocks: unixfsPreloadShardedDirBlocks, + blocks: consumedBlocks(unixfsPreloadShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, @@ -286,7 +303,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: pathed subset inside large directory with file scope, errors", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -296,7 +313,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories", - blocks: unixfsWrappedFileBlocks, + blocks: consumedBlocks(unixfsWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, @@ -306,7 +323,7 @@ func TestVerifiedCar(t *testing.T) { { // our wrapped file has additional in the nested directories name: "unixfs: large sharded file wrapped in directories, pathed, errors", - blocks: unixfsWrappedFileBlocks, + blocks: consumedBlocks(unixfsWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -316,7 +333,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, trimmed, pathed", - blocks: unixfsTrimmedWrappedFileBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, @@ -325,7 +342,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", - blocks: unixfsTrimmedWrappedFileBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -335,7 +352,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, exclusive, pathed", - blocks: unixfsExclusiveWrappedFileBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedFileBlocks), roots: []cid.Cid{unixfsExclusiveWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedFile.Root, @@ -344,7 +361,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories", - blocks: unixfsWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -354,7 +371,7 @@ func TestVerifiedCar(t *testing.T) { { // our wrapped dir has additional in the nested directories name: "unixfs: large sharded dir wrapped in directories, pathed, errors", - blocks: unixfsWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -364,7 +381,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, pathed", - blocks: unixfsTrimmedWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -373,7 +390,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, preload, pathed", - blocks: unixfsTrimmedWrappedShardedDirOnlyBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirOnlyBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -382,7 +399,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", - blocks: unixfsTrimmedWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -392,7 +409,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, exclusive, pathed", - blocks: unixfsExclusiveWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedShardedDirBlocks), roots: []cid.Cid{unixfsExclusiveWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedShardedDir.Root, @@ -401,13 +418,66 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, exclusive, preload, pathed", - blocks: unixfsExclusiveWrappedShardedDirOnlyBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedShardedDirOnlyBlocks), roots: []cid.Cid{unixfsExclusiveWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedShardedDir.Root, Selector: unixfsWrappedPreloadPathSelector, }, }, + { + name: "unixfs: file with dups", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + }, + }, + { + name: "unixfs: file with dups, incoming has dups, not allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + }, + incomingHasDups: true, + }, + { + name: "unixfs: file with dups, incoming has dups, allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + ExpectDuplicatesIn: true, + }, + incomingHasDups: true, + }, + { + name: "unixfs: file with dups, duplicate writes on", + blocks: consumedBlocks(unixfsFileWithDupsBlocks), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + WriteDuplicatesOut: true, + }, + }, + { + name: "unixfs: file with dups, duplicate writes on, incoming dups", + blocks: consumedBlocks(unixfsFileWithDupsBlocks), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + WriteDuplicatesOut: true, + ExpectDuplicatesIn: true, + }, + incomingHasDups: true, + }, } for _, testCase := range testCases { @@ -420,7 +490,7 @@ func TestVerifiedCar(t *testing.T) { req := require.New(t) - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -428,11 +498,15 @@ func TestVerifiedCar(t *testing.T) { lsys.SetWriteStorage(store) bwo := lsys.StorageWriteOpener var writeCounter int + var skipped int lsys.StorageWriteOpener = func(lc linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { var buf bytes.Buffer return &buf, func(l datamodel.Link) error { - req.Equal(testCase.blocks[writeCounter].cid, l.(cidlink.Link).Cid, "block %d", writeCounter) - req.Equal(testCase.blocks[writeCounter].data, buf.Bytes(), "block %d", writeCounter) + for testCase.blocks[writeCounter+skipped].skipped { + skipped++ + } + req.Equal(testCase.blocks[writeCounter+skipped].Cid(), l.(cidlink.Link).Cid, "block %d", writeCounter) + req.Equal(testCase.blocks[writeCounter+skipped].RawData(), buf.Bytes(), "block %d", writeCounter) writeCounter++ w, wc, err := bwo(lc) if err != nil { @@ -443,8 +517,11 @@ func TestVerifiedCar(t *testing.T) { }, nil } - carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "") - blockCount, byteCount, err := testCase.cfg.Verify(ctx, carStream, lsys) + carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "", testCase.incomingHasDups) + blockCount, byteCount, err := testCase.cfg.VerifyCar(ctx, carStream, lsys) + + // read the rest of data + io.ReadAll(carStream) if testCase.err != "" { req.ErrorContains(err, testCase.err) @@ -452,9 +529,9 @@ func TestVerifiedCar(t *testing.T) { req.Equal(uint64(0), byteCount) } else { req.NoError(err) - req.Equal(uint64(len(testCase.blocks)), blockCount) + req.Equal(count(testCase.blocks), blockCount) req.Equal(sizeOf(testCase.blocks), byteCount) - req.Equal(len(testCase.blocks), writeCounter) + req.Equal(int(count(testCase.blocks)), writeCounter) } }) } @@ -464,9 +541,10 @@ func makeCarStream( t *testing.T, ctx context.Context, roots []cid.Cid, - blocks []block, + blocks []expectedBlock, carv2 bool, expectErrors bool, + allowDuplicatePuts bool, ) io.Reader { r, w := io.Pipe() @@ -490,13 +568,13 @@ func makeCarStream( carW = v2f } - carWriter, err := storage.NewWritable(carW, roots, car.WriteAsCarV1(!carv2), car.AllowDuplicatePuts(false)) + carWriter, err := storage.NewWritable(carW, roots, car.WriteAsCarV1(!carv2), car.AllowDuplicatePuts(allowDuplicatePuts)) req.NoError(err) if err != nil { return } for _, block := range blocks { - err := carWriter.Put(ctx, block.cid.KeyString(), block.data) + err := carWriter.Put(ctx, block.Cid().KeyString(), block.RawData()) if !expectErrors { req.NoError(err) } @@ -526,79 +604,43 @@ func makeCarStream( return r } -type block struct { - cid cid.Cid - data []byte +type expectedBlock struct { + blocks.Block + skipped bool } -func sizeOf(blocks []block) uint64 { - total := uint64(0) +func consumedBlocks(blocks []blocks.Block) []expectedBlock { + expectedBlocks := make([]expectedBlock, 0, len(blocks)) for _, block := range blocks { - total += uint64(len(block.data)) + expectedBlocks = append(expectedBlocks, expectedBlock{block, false}) } - return total + return expectedBlocks } -func toBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, sel selector.Selector) []block { - blocks := make([]block, 0) - unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - osro := lsys.StorageReadOpener - lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { - r, err := osro(lc, l) - if err != nil { - return nil, err - } - byts, err := io.ReadAll(r) - if err != nil { - return nil, err - } - blocks = append(blocks, block{l.(cidlink.Link).Cid, byts}) - return bytes.NewReader(byts), nil - } - var proto datamodel.NodePrototype = basicnode.Prototype.Any - if root.Prefix().Codec == cid.DagProtobuf { - proto = dagpb.Type.PBNode - } - rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto) - require.NoError(t, err) - prog := traversal.Progress{ - Cfg: &traversal.Config{ - LinkSystem: lsys, - LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser), - }, - } - vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } - err = prog.WalkAdv(rootNode, sel, vf) - require.NoError(t, err) - - return blocks -} - -// TODO: remove when this is fixed in IPLD prime -type correctedMemStore struct { - *memstore.Store -} - -func (cms *correctedMemStore) Get(ctx context.Context, key string) ([]byte, error) { - data, err := cms.Store.Get(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} +func skippedBlocks(blocks []blocks.Block) []expectedBlock { + expectedBlocks := make([]expectedBlock, 0, len(blocks)) + for _, block := range blocks { + expectedBlocks = append(expectedBlocks, expectedBlock{block, true}) } - return data, err + return expectedBlocks } -func (cms *correctedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { - rc, err := cms.Store.GetStream(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} +func count(blocks []expectedBlock) uint64 { + total := uint64(0) + for _, block := range blocks { + if !block.skipped { + total++ + } } - return rc, err + return total } -func mustCompile(selNode datamodel.Node) selector.Selector { - sel, err := selector.CompileSelector(selNode) - if err != nil { - panic(err) +func sizeOf(blocks []expectedBlock) uint64 { + total := uint64(0) + for _, block := range blocks { + if !block.skipped { + total += uint64(len(block.RawData())) + } } - return sel + return total }