Skip to content

Commit

Permalink
feat: add ?meta=eof for trailling metadata dag-json
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 10, 2023
1 parent 7ca8f48 commit ac19912
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 47 deletions.
32 changes: 20 additions & 12 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)

// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error {
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) (int64, int64, error) {
sel, err := selector.CompileSelector(selNode)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
return 0, 0, fmt.Errorf("failed to compile selector: %w", err)
}

carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates))
if err != nil {
return fmt.Errorf("failed to create car writer: %w", err)
return 0, 0, fmt.Errorf("failed to create car writer: %w", err)
}

erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil}
erro := newErrorRecordingReadOpener(ctx, requestLsys.StorageReadOpener, carWriter)
requestLsys.StorageReadOpener = erro.StorageReadOpener

rootNode, err := loadNode(ctx, rootCid, requestLsys)
if err != nil {
return fmt.Errorf("failed to load root node: %w", err)
return 0, 0, fmt.Errorf("failed to load root node: %w", err)
}

progress := traversal.Progress{Cfg: &traversal.Config{
Expand All @@ -54,20 +54,26 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.
LinkTargetNodePrototypeChooser: protoChooser,
}}
if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil {
return fmt.Errorf("failed to complete traversal: %w", err)
return 0, 0, fmt.Errorf("failed to complete traversal: %w", err)
}
if erro.err != nil {
return fmt.Errorf("block load failed during traversal: %w", erro.err)
return 0, 0, fmt.Errorf("block load failed during traversal: %w", erro.err)
}

return nil
return erro.byteCount, erro.blockCount, nil
}

type errorRecordingReadOpener struct {
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
byteCount int64
blockCount int64
}

func newErrorRecordingReadOpener(ctx context.Context, orig linking.BlockReadOpener, car carstorage.WritableCar) *errorRecordingReadOpener {
return &errorRecordingReadOpener{ctx, orig, car, nil, 0, 0}
}

func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
Expand All @@ -84,6 +90,8 @@ func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext,
if err != nil {
return nil, err
}
erro.byteCount += int64(len(byts))
erro.blockCount++
return bytes.NewReader(byts), nil
}

Expand Down
96 changes: 70 additions & 26 deletions carstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,61 +50,73 @@ func TestStreamCar(t *testing.T) {
}

testCases := []struct {
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
validate func(t *testing.T, r io.Reader)
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
expectedBytes int64
expectedBlocks int64
validate func(t *testing.T, r io.Reader)
}{
{
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks),
expectedBlocks: 100,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, allChainBlocks, blks)
},
},
{
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks[:1]),
expectedBlocks: 1,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, []blocks.Block{allChainBlocks[0]}, blks)
},
},
{
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
expectedBytes: sizeOfDirEnt(fileEnt, fileLsys),
expectedBlocks: int64(len(fileEnt.SelfCids)),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, fileEnt.Root, root)
require.ElementsMatch(t, fileEnt.SelfCids, blkCids(blks))
},
},
{
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
expectedBytes: sizeOfDirEnt(dirEnt, dirLsys),
expectedBlocks: blocksInDirEnt(dirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, dirEnt.Root, root)
require.ElementsMatch(t, entCids(dirEnt), blkCids(blks))
},
},
{
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
expectedBytes: sizeOfDirEnt(shardedDirEnt, shardedDirLsys),
expectedBlocks: blocksInDirEnt(shardedDirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, shardedDirEnt.Root, root)
Expand All @@ -118,8 +130,10 @@ func TestStreamCar(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
var buf bytes.Buffer
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
byts, blks, err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
req.NoError(err)
req.Equal(tc.expectedBytes, byts)
req.Equal(tc.expectedBlocks, blks)
tc.validate(t, &buf)
})
}
Expand Down Expand Up @@ -200,3 +214,33 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

func sizeOf(blks []blocks.Block) int64 {
var size int64
for _, blk := range blks {
size += int64(len(blk.RawData()))
}
return size
}
func sizeOfDirEnt(dirEnt unixfs.DirEntry, ls linking.LinkSystem) int64 {
var size int64
for _, c := range dirEnt.SelfCids {
blk, err := ls.LoadRaw(linking.LinkContext{}, cidlink.Link{Cid: c})
if err != nil {
panic(err)
}
size += int64(len(blk))
}
for _, c := range dirEnt.Children {
size += sizeOfDirEnt(c, ls)
}
return size
}

func blocksInDirEnt(dirEnt unixfs.DirEntry) int64 {
size := int64(len(dirEnt.SelfCids))
for _, c := range dirEnt.Children {
size += blocksInDirEnt(c)
}
return size
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-unixfsnode v1.7.3
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55
github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb
github.com/ipni/go-libipni v0.3.4
github.com/ipni/index-provider v0.13.5
github.com/libp2p/go-libp2p v0.29.2
Expand Down
Loading

0 comments on commit ac19912

Please sign in to comment.