diff --git a/carstream.go b/carstream.go index 3c98040..e55f6e3 100644 --- a/carstream.go +++ b/carstream.go @@ -7,6 +7,7 @@ import ( "io" // codecs we care about + "github.com/filecoin-project/lassie/pkg/types" dagpb "github.com/ipld/go-codec-dagpb" _ "github.com/ipld/go-ipld-prime/codec/cbor" _ "github.com/ipld/go-ipld-prime/codec/dagcbor" @@ -16,6 +17,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipfs/go-cid" + "github.com/ipfs/go-unixfsnode" "github.com/ipld/go-car/v2" carstorage "github.com/ipld/go-car/v2/storage" "github.com/ipld/go-ipld-prime/datamodel" @@ -29,7 +31,17 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) // StreamCar streams a DAG in CARv1 format to the given writer, using the given // selector. -func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error { +func StreamCar( + ctx context.Context, + requestLsys linking.LinkSystem, + rootCid cid.Cid, + path datamodel.Path, + dagScope types.DagScope, + out io.Writer, + duplicates bool, +) error { + + selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false) sel, err := selector.CompileSelector(selNode) if err != nil { return fmt.Errorf("failed to compile selector: %w", err) @@ -53,12 +65,30 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid. LinkSystem: requestLsys, LinkTargetNodePrototypeChooser: protoChooser, }} - if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil { + var lastPath datamodel.Path + visitor := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { + lastPath = p.Path + return nil + } + if err := progress.WalkAdv(rootNode, sel, visitor); err != nil { return fmt.Errorf("failed to complete traversal: %w", err) } if erro.err != nil { return fmt.Errorf("block load failed during traversal: %w", erro.err) } + for path.Len() > 0 { + if lastPath.Len() == 0 { + return fmt.Errorf("failed to traverse full path, missed: [%s]", path.String()) + } + var seg, lastSeg datamodel.PathSegment + seg, path = path.Shift() + lastSeg, lastPath = lastPath.Shift() + if seg != lastSeg { + return fmt.Errorf("unexpected path segment visit, got [%s], expected [%s]", lastSeg.String(), seg.String()) + } + } + // having lastPath.Len()>0 is fine, it may be due to an "all" or "entity" + // doing an explore-all on the remainder of the DAG after the path. return nil } @@ -100,5 +130,3 @@ func loadNode(ctx context.Context, rootCid cid.Cid, lsys linking.LinkSystem) (da } return rootNode, nil } - -func visitNoop(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } diff --git a/carstream_test.go b/carstream_test.go index 4caa7ce..0dd76a6 100644 --- a/carstream_test.go +++ b/carstream_test.go @@ -7,6 +7,7 @@ import ( "io" "testing" + "github.com/filecoin-project/lassie/pkg/types" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" @@ -17,7 +18,6 @@ import ( "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/storage/memstore" - selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/stretchr/testify/require" ) @@ -51,16 +51,17 @@ func TestStreamCar(t *testing.T) { testCases := []struct { name string - selector datamodel.Node + path datamodel.Path + scope types.DagScope root cid.Cid lsys linking.LinkSystem validate func(t *testing.T, r io.Reader) }{ { - name: "chain: all blocks", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: tbc.TipLink.(cidlink.Link).Cid, - lsys: chainLsys, + name: "chain: all blocks", + scope: types.DagScopeAll, + root: tbc.TipLink.(cidlink.Link).Cid, + lsys: chainLsys, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root) @@ -68,10 +69,10 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "chain: just root", - selector: selectorparse.CommonSelector_MatchPoint, - root: tbc.TipLink.(cidlink.Link).Cid, - lsys: chainLsys, + name: "chain: just root", + scope: types.DagScopeBlock, + root: tbc.TipLink.(cidlink.Link).Cid, + lsys: chainLsys, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root) @@ -79,10 +80,10 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs file", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: fileEnt.Root, - lsys: fileLsys, + name: "unixfs file", + scope: types.DagScopeAll, + root: fileEnt.Root, + lsys: fileLsys, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, fileEnt.Root, root) @@ -90,10 +91,10 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs directory", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: dirEnt.Root, - lsys: dirLsys, + name: "unixfs directory", + scope: types.DagScopeAll, + root: dirEnt.Root, + lsys: dirLsys, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, dirEnt.Root, root) @@ -101,10 +102,10 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs sharded directory", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: shardedDirEnt.Root, - lsys: shardedDirLsys, + name: "unixfs sharded directory", + scope: types.DagScopeAll, + root: shardedDirEnt.Root, + lsys: shardedDirLsys, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, shardedDirEnt.Root, root) @@ -118,7 +119,7 @@ func TestStreamCar(t *testing.T) { t.Run(tc.name, func(t *testing.T) { req := require.New(t) var buf bytes.Buffer - err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false) + err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.path, tc.scope, &buf, false) req.NoError(err) tc.validate(t, &buf) }) diff --git a/go.mod b/go.mod index ec044ea..32af575 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-unixfsnode v1.7.3 github.com/ipld/go-car/v2 v2.10.1 github.com/ipld/go-codec-dagpb v1.6.0 - github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 + github.com/ipld/go-ipld-prime v0.21.0 github.com/ipni/go-libipni v0.3.4 github.com/ipni/index-provider v0.13.5 github.com/libp2p/go-libp2p v0.29.2 diff --git a/go.sum b/go.sum index 99559d3..33152ed 100644 --- a/go.sum +++ b/go.sum @@ -129,7 +129,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gammazero/channelqueue v0.2.1 h1:AcK6wnLrj8koTTn3RxjRCyfmS677TjhIZb1FSMi14qc= github.com/gammazero/channelqueue v0.2.1/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= @@ -327,8 +327,8 @@ github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6 github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 h1:QAI/Ridj0+foHD6epbxmB4ugxz9B4vmNdYSmQLGa05E= github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:odxGcpiQZLzP5+yGu84Ljo8y3EzCvNAQKEodHNsHLXA= -github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 h1:D1JUX6l0+ugD3PE99l/NmN/97jz9YNP0uZZRLAGZQhs= -github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55/go.mod h1:PRQpXNcJypaPiiSdarsrJABPkYrBvafwDl0B9HjujZ8= +github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= +github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipni/go-libipni v0.3.4 h1:ZYgCE2TOZt/QJJcBZb+R63FaBLlA2suZGP2IH1fKv4A= github.com/ipni/go-libipni v0.3.4/go.mod h1:6EIUhN83pd1i6q7SCSCIuuUC3XgR7D/gjKkEnVyIQWE= diff --git a/httpipfs.go b/httpipfs.go index a12e0cd..989ba43 100644 --- a/httpipfs.go +++ b/httpipfs.go @@ -12,7 +12,6 @@ import ( lassiehttp "github.com/filecoin-project/lassie/pkg/server/http" lassietypes "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" - "github.com/ipfs/go-unixfsnode" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" ) @@ -117,8 +116,6 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) { fileName = fmt.Sprintf("%s%s", rootCid.String(), lassiehttp.FilenameExtCar) } - selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false) - bytesWrittenCh := make(chan struct{}) writer := newIpfsResponseWriter(res, hi.maxResponseBytes, func() { // called once we start writing blocks into the CAR (on the first Put()) @@ -132,7 +129,7 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) { close(bytesWrittenCh) }) - if err := StreamCar(ctx, hi.lsys, rootCid, selNode, writer, includeDupes); err != nil { + if err := StreamCar(ctx, hi.lsys, rootCid, path, dagScope, writer, includeDupes); err != nil { logError(http.StatusInternalServerError, err) select { case <-bytesWrittenCh: