Skip to content

Commit

Permalink
fix: produce error if full path was not traversed
Browse files Browse the repository at this point in the history
Fixes: #14
  • Loading branch information
rvagg committed Aug 11, 2023
1 parent 7ca8f48 commit 154655f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 35 deletions.
36 changes: 32 additions & 4 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"

// codecs we care about
"github.com/filecoin-project/lassie/pkg/types"
dagpb "github.com/ipld/go-codec-dagpb"
_ "github.com/ipld/go-ipld-prime/codec/cbor"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
"github.com/ipld/go-car/v2"
carstorage "github.com/ipld/go-car/v2/storage"
"github.com/ipld/go-ipld-prime/datamodel"
Expand All @@ -29,7 +31,17 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)

// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error {
func StreamCar(
ctx context.Context,
requestLsys linking.LinkSystem,
rootCid cid.Cid,
path datamodel.Path,
dagScope types.DagScope,
out io.Writer,
duplicates bool,
) error {

selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false)
sel, err := selector.CompileSelector(selNode)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
Expand All @@ -53,12 +65,30 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.
LinkSystem: requestLsys,
LinkTargetNodePrototypeChooser: protoChooser,
}}
if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil {
var lastPath datamodel.Path
visitor := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error {
lastPath = p.Path
return nil
}
if err := progress.WalkAdv(rootNode, sel, visitor); err != nil {
return fmt.Errorf("failed to complete traversal: %w", err)
}
if erro.err != nil {
return fmt.Errorf("block load failed during traversal: %w", erro.err)
}
for path.Len() > 0 {
if lastPath.Len() == 0 {
return fmt.Errorf("failed to traverse full path, missed: [%s]", path.String())
}
var seg, lastSeg datamodel.PathSegment
seg, path = path.Shift()
lastSeg, lastPath = lastPath.Shift()
if seg != lastSeg {
return fmt.Errorf("unexpected path segment visit, got [%s], expected [%s]", lastSeg.String(), seg.String())
}
}
// having lastPath.Len()>0 is fine, it may be due to an "all" or "entity"
// doing an explore-all on the remainder of the DAG after the path.

return nil
}
Expand Down Expand Up @@ -100,5 +130,3 @@ func loadNode(ctx context.Context, rootCid cid.Cid, lsys linking.LinkSystem) (da
}
return rootNode, nil
}

func visitNoop(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil }
47 changes: 24 additions & 23 deletions carstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"testing"

"github.com/filecoin-project/lassie/pkg/types"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
gstestutil "github.com/ipfs/go-graphsync/testutil"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -51,60 +51,61 @@ func TestStreamCar(t *testing.T) {

testCases := []struct {
name string
selector datamodel.Node
path datamodel.Path
scope types.DagScope
root cid.Cid
lsys linking.LinkSystem
validate func(t *testing.T, r io.Reader)
}{
{
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: all blocks",
scope: types.DagScopeAll,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, allChainBlocks, blks)
},
},
{
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: just root",
scope: types.DagScopeBlock,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, []blocks.Block{allChainBlocks[0]}, blks)
},
},
{
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
name: "unixfs file",
scope: types.DagScopeAll,
root: fileEnt.Root,
lsys: fileLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, fileEnt.Root, root)
require.ElementsMatch(t, fileEnt.SelfCids, blkCids(blks))
},
},
{
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
name: "unixfs directory",
scope: types.DagScopeAll,
root: dirEnt.Root,
lsys: dirLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, dirEnt.Root, root)
require.ElementsMatch(t, entCids(dirEnt), blkCids(blks))
},
},
{
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
name: "unixfs sharded directory",
scope: types.DagScopeAll,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, shardedDirEnt.Root, root)
Expand All @@ -118,7 +119,7 @@ func TestStreamCar(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
var buf bytes.Buffer
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.path, tc.scope, &buf, false)
req.NoError(err)
tc.validate(t, &buf)
})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-unixfsnode v1.7.3
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipni/go-libipni v0.3.4
github.com/ipni/index-provider v0.13.5
github.com/libp2p/go-libp2p v0.29.2
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gammazero/channelqueue v0.2.1 h1:AcK6wnLrj8koTTn3RxjRCyfmS677TjhIZb1FSMi14qc=
github.com/gammazero/channelqueue v0.2.1/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4=
Expand Down Expand Up @@ -327,8 +327,8 @@ github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6
github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s=
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 h1:QAI/Ridj0+foHD6epbxmB4ugxz9B4vmNdYSmQLGa05E=
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:odxGcpiQZLzP5+yGu84Ljo8y3EzCvNAQKEodHNsHLXA=
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 h1:D1JUX6l0+ugD3PE99l/NmN/97jz9YNP0uZZRLAGZQhs=
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55/go.mod h1:PRQpXNcJypaPiiSdarsrJABPkYrBvafwDl0B9HjujZ8=
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo=
github.com/ipni/go-libipni v0.3.4 h1:ZYgCE2TOZt/QJJcBZb+R63FaBLlA2suZGP2IH1fKv4A=
github.com/ipni/go-libipni v0.3.4/go.mod h1:6EIUhN83pd1i6q7SCSCIuuUC3XgR7D/gjKkEnVyIQWE=
Expand Down
5 changes: 1 addition & 4 deletions httpipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
lassiehttp "github.com/filecoin-project/lassie/pkg/server/http"
lassietypes "github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
)
Expand Down Expand Up @@ -117,8 +116,6 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) {
fileName = fmt.Sprintf("%s%s", rootCid.String(), lassiehttp.FilenameExtCar)
}

selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false)

bytesWrittenCh := make(chan struct{})
writer := newIpfsResponseWriter(res, hi.maxResponseBytes, func() {
// called once we start writing blocks into the CAR (on the first Put())
Expand All @@ -132,7 +129,7 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) {
close(bytesWrittenCh)
})

if err := StreamCar(ctx, hi.lsys, rootCid, selNode, writer, includeDupes); err != nil {
if err := StreamCar(ctx, hi.lsys, rootCid, path, dagScope, writer, includeDupes); err != nil {
logError(http.StatusInternalServerError, err)
select {
case <-bytesWrittenCh:
Expand Down

0 comments on commit 154655f

Please sign in to comment.