From ac19912bb76bc0d911666c2056b96912fc468c8e Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 10 Aug 2023 22:22:57 +1000 Subject: [PATCH] feat: add ?meta=eof for trailling metadata dag-json --- carstream.go | 32 ++++++---- carstream_test.go | 96 +++++++++++++++++++++--------- go.mod | 2 +- go.sum | 22 ++++++- httpipfs.go | 100 ++++++++++++++++++++++++++++++-- metadata/metadata.go | 75 ++++++++++++++++++++++++ metadata/metadata.ipldsch | 31 ++++++++++ metadata/metadata_test.go | 119 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 430 insertions(+), 47 deletions(-) create mode 100644 metadata/metadata.go create mode 100644 metadata/metadata.ipldsch create mode 100644 metadata/metadata_test.go diff --git a/carstream.go b/carstream.go index 3c98040..b17b289 100644 --- a/carstream.go +++ b/carstream.go @@ -29,23 +29,23 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) // StreamCar streams a DAG in CARv1 format to the given writer, using the given // selector. -func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error { +func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) (int64, int64, error) { sel, err := selector.CompileSelector(selNode) if err != nil { - return fmt.Errorf("failed to compile selector: %w", err) + return 0, 0, fmt.Errorf("failed to compile selector: %w", err) } carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates)) if err != nil { - return fmt.Errorf("failed to create car writer: %w", err) + return 0, 0, fmt.Errorf("failed to create car writer: %w", err) } - erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil} + erro := newErrorRecordingReadOpener(ctx, requestLsys.StorageReadOpener, carWriter) requestLsys.StorageReadOpener = erro.StorageReadOpener rootNode, err := loadNode(ctx, rootCid, requestLsys) if err != nil { - return fmt.Errorf("failed to load root node: %w", err) + return 0, 0, fmt.Errorf("failed to load root node: %w", err) } progress := traversal.Progress{Cfg: &traversal.Config{ @@ -54,20 +54,26 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid. LinkTargetNodePrototypeChooser: protoChooser, }} if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil { - return fmt.Errorf("failed to complete traversal: %w", err) + return 0, 0, fmt.Errorf("failed to complete traversal: %w", err) } if erro.err != nil { - return fmt.Errorf("block load failed during traversal: %w", erro.err) + return 0, 0, fmt.Errorf("block load failed during traversal: %w", erro.err) } - return nil + return erro.byteCount, erro.blockCount, nil } type errorRecordingReadOpener struct { - ctx context.Context - orig linking.BlockReadOpener - car carstorage.WritableCar - err error + ctx context.Context + orig linking.BlockReadOpener + car carstorage.WritableCar + err error + byteCount int64 + blockCount int64 +} + +func newErrorRecordingReadOpener(ctx context.Context, orig linking.BlockReadOpener, car carstorage.WritableCar) *errorRecordingReadOpener { + return &errorRecordingReadOpener{ctx, orig, car, nil, 0, 0} } func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { @@ -84,6 +90,8 @@ func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, if err != nil { return nil, err } + erro.byteCount += int64(len(byts)) + erro.blockCount++ return bytes.NewReader(byts), nil } diff --git a/carstream_test.go b/carstream_test.go index 4caa7ce..b6082a0 100644 --- a/carstream_test.go +++ b/carstream_test.go @@ -50,17 +50,21 @@ func TestStreamCar(t *testing.T) { } testCases := []struct { - name string - selector datamodel.Node - root cid.Cid - lsys linking.LinkSystem - validate func(t *testing.T, r io.Reader) + name string + selector datamodel.Node + root cid.Cid + lsys linking.LinkSystem + expectedBytes int64 + expectedBlocks int64 + validate func(t *testing.T, r io.Reader) }{ { - name: "chain: all blocks", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: tbc.TipLink.(cidlink.Link).Cid, - lsys: chainLsys, + name: "chain: all blocks", + selector: selectorparse.CommonSelector_ExploreAllRecursively, + root: tbc.TipLink.(cidlink.Link).Cid, + lsys: chainLsys, + expectedBytes: sizeOf(allChainBlocks), + expectedBlocks: 100, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root) @@ -68,10 +72,12 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "chain: just root", - selector: selectorparse.CommonSelector_MatchPoint, - root: tbc.TipLink.(cidlink.Link).Cid, - lsys: chainLsys, + name: "chain: just root", + selector: selectorparse.CommonSelector_MatchPoint, + root: tbc.TipLink.(cidlink.Link).Cid, + lsys: chainLsys, + expectedBytes: sizeOf(allChainBlocks[:1]), + expectedBlocks: 1, validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root) @@ -79,10 +85,12 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs file", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: fileEnt.Root, - lsys: fileLsys, + name: "unixfs file", + selector: selectorparse.CommonSelector_ExploreAllRecursively, + root: fileEnt.Root, + lsys: fileLsys, + expectedBytes: sizeOfDirEnt(fileEnt, fileLsys), + expectedBlocks: int64(len(fileEnt.SelfCids)), validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, fileEnt.Root, root) @@ -90,10 +98,12 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs directory", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: dirEnt.Root, - lsys: dirLsys, + name: "unixfs directory", + selector: selectorparse.CommonSelector_ExploreAllRecursively, + root: dirEnt.Root, + lsys: dirLsys, + expectedBytes: sizeOfDirEnt(dirEnt, dirLsys), + expectedBlocks: blocksInDirEnt(dirEnt), validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, dirEnt.Root, root) @@ -101,10 +111,12 @@ func TestStreamCar(t *testing.T) { }, }, { - name: "unixfs sharded directory", - selector: selectorparse.CommonSelector_ExploreAllRecursively, - root: shardedDirEnt.Root, - lsys: shardedDirLsys, + name: "unixfs sharded directory", + selector: selectorparse.CommonSelector_ExploreAllRecursively, + root: shardedDirEnt.Root, + lsys: shardedDirLsys, + expectedBytes: sizeOfDirEnt(shardedDirEnt, shardedDirLsys), + expectedBlocks: blocksInDirEnt(shardedDirEnt), validate: func(t *testing.T, r io.Reader) { root, blks := carToBlocks(t, r) require.Equal(t, shardedDirEnt.Root, root) @@ -118,8 +130,10 @@ func TestStreamCar(t *testing.T) { t.Run(tc.name, func(t *testing.T) { req := require.New(t) var buf bytes.Buffer - err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false) + byts, blks, err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false) req.NoError(err) + req.Equal(tc.expectedBytes, byts) + req.Equal(tc.expectedBlocks, blks) tc.validate(t, &buf) }) } @@ -200,3 +214,33 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry { } } } + +func sizeOf(blks []blocks.Block) int64 { + var size int64 + for _, blk := range blks { + size += int64(len(blk.RawData())) + } + return size +} +func sizeOfDirEnt(dirEnt unixfs.DirEntry, ls linking.LinkSystem) int64 { + var size int64 + for _, c := range dirEnt.SelfCids { + blk, err := ls.LoadRaw(linking.LinkContext{}, cidlink.Link{Cid: c}) + if err != nil { + panic(err) + } + size += int64(len(blk)) + } + for _, c := range dirEnt.Children { + size += sizeOfDirEnt(c, ls) + } + return size +} + +func blocksInDirEnt(dirEnt unixfs.DirEntry) int64 { + size := int64(len(dirEnt.SelfCids)) + for _, c := range dirEnt.Children { + size += blocksInDirEnt(c) + } + return size +} diff --git a/go.mod b/go.mod index ec044ea..3940d95 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-unixfsnode v1.7.3 github.com/ipld/go-car/v2 v2.10.1 github.com/ipld/go-codec-dagpb v1.6.0 - github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 + github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb github.com/ipni/go-libipni v0.3.4 github.com/ipni/index-provider v0.13.5 github.com/libp2p/go-libp2p v0.29.2 diff --git a/go.sum b/go.sum index 99559d3..ae8de0b 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,7 @@ github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3IS github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -67,6 +68,7 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6Uh github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -129,7 +131,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gammazero/channelqueue v0.2.1 h1:AcK6wnLrj8koTTn3RxjRCyfmS677TjhIZb1FSMi14qc= github.com/gammazero/channelqueue v0.2.1/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= @@ -141,6 +143,7 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclK github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -278,10 +281,12 @@ github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1Y github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= +github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ= github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs= +github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A= github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE= github.com/ipfs/go-ipfs-pq v0.0.3/go.mod h1:btNw5hsHBpRcSSgZtiNm/SLj5gYIZ18AKtv3kERkRb4= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= @@ -327,8 +332,8 @@ github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6 github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 h1:QAI/Ridj0+foHD6epbxmB4ugxz9B4vmNdYSmQLGa05E= github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:odxGcpiQZLzP5+yGu84Ljo8y3EzCvNAQKEodHNsHLXA= -github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 h1:D1JUX6l0+ugD3PE99l/NmN/97jz9YNP0uZZRLAGZQhs= -github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55/go.mod h1:PRQpXNcJypaPiiSdarsrJABPkYrBvafwDl0B9HjujZ8= +github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb h1:frdDF5813yKhOaEHAdEhgArLVk+uBQwygVtfg2jAGR4= +github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipni/go-libipni v0.3.4 h1:ZYgCE2TOZt/QJJcBZb+R63FaBLlA2suZGP2IH1fKv4A= github.com/ipni/go-libipni v0.3.4/go.mod h1:6EIUhN83pd1i6q7SCSCIuuUC3XgR7D/gjKkEnVyIQWE= @@ -351,11 +356,13 @@ github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0 github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -375,6 +382,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= @@ -437,10 +445,12 @@ github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-server-timing v1.0.1 h1:f00/aIe8T3MrnLhQHu3tSWvnwc5GV/p5eutuu3hF/tE= github.com/mitchellh/go-server-timing v1.0.1/go.mod h1:Mo6GKi9FSLwWFAMn3bqVPWe20y5ri5QGQuO9D9MCOxk= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -482,6 +492,7 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= @@ -566,6 +577,7 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYEDaXHZDBsXlPCDqdhQuJkuw4NOtaxYe3xii4= github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= @@ -576,9 +588,11 @@ github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hg github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -622,6 +636,7 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 h1:XYEgH2nJ github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= +github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829/go.mod h1:h/1PEBwj7Ym/8kOuMWvO2ujZ6Lt+TMbySEXNhjjR87I= github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245/go.mod h1:C+diUUz7pxhNY6KAoLgrTYARGWnt82zWTylZlxT92vk= @@ -925,6 +940,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/httpipfs.go b/httpipfs.go index a12e0cd..4d9ee98 100644 --- a/httpipfs.go +++ b/httpipfs.go @@ -13,8 +13,11 @@ import ( lassietypes "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-unixfsnode" + "github.com/ipld/frisbii/metadata" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" + mh "github.com/multiformats/go-multihash" + "lukechampine.com/blake3" ) var _ http.Handler = (*HttpIpfs)(nil) @@ -113,6 +116,8 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) { return } + includeMeta := req.URL.Query().Get("meta") == "eof" + if fileName == "" { fileName = fmt.Sprintf("%s%s", rootCid.String(), lassiehttp.FilenameExtCar) } @@ -120,7 +125,7 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) { selNode := unixfsnode.UnixFSPathSelectorBuilder(path.String(), dagScope.TerminalSelectorSpec(), false) bytesWrittenCh := make(chan struct{}) - writer := newIpfsResponseWriter(res, hi.maxResponseBytes, func() { + var writer io.Writer = newIpfsResponseWriter(res, hi.maxResponseBytes, func() { // called once we start writing blocks into the CAR (on the first Put()) res.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", fileName)) res.Header().Set("Accept-Ranges", lassiehttp.ResponseAcceptRangesHeader) @@ -132,18 +137,74 @@ func (hi *HttpIpfs) ServeHTTP(res http.ResponseWriter, req *http.Request) { close(bytesWrittenCh) }) - if err := StreamCar(ctx, hi.lsys, rootCid, selNode, writer, includeDupes); err != nil { - logError(http.StatusInternalServerError, err) + if includeMeta { + writer = newChecksumWriter(writer) + } + + dataBytes, blockCount, carErr := StreamCar(ctx, hi.lsys, rootCid, selNode, writer, includeDupes) + + if includeMeta { + // write NUL byte to indicate end of CARv1 data + if _, err := res.Write([]byte{0}); err != nil { + if carErr != nil { + carErr = err + } + } + // write the metadata + md := metadata.Metadata{ + Request: metadata.Request{ + Root: rootCid, + Scope: dagScope, + Duplicates: includeDupes, + }, + } + if carErr != nil { + msg := carErr.Error() + md.Error = &msg + } else { + checksum := writer.(*checksumWriter).Sum() + checksumMh, err := mh.Encode(checksum, mh.BLAKE3) + if err != nil { + msg := fmt.Sprintf("error creating checksum multihash: %s", err.Error()) + md.Error = &msg + carErr = err + } else { + md.Properties = &metadata.CarProperties{ + CarBytes: writer.(*checksumWriter).Count(), + DataBytes: dataBytes, + BlockCount: blockCount, + ChecksumMultihash: checksumMh, + } + if path.Len() != 0 { + p := "/" + path.String() + md.Request.Path = &p + } + } + } + enc, err := metadata.CarMetadata{Metadata: &md}.Serialize() + if err != nil { + carErr = err + } else { + if _, err := res.Write(enc); err != nil { + if carErr != nil { + carErr = err + } + } + } + } + + if carErr != nil { + logError(http.StatusInternalServerError, carErr) select { case <-bytesWrittenCh: - logger.Debugw("unclean close", "cid", rootCid, "err", err) + logger.Debugw("unclean close", "cid", rootCid, "err", carErr) if err := closeWithUnterminatedChunk(res); err != nil { logger.Infow("unable to send early termination", "err", err) } return default: } - logger.Debugw("error streaming CAR", "cid", rootCid, "err", err) + logger.Debugw("error streaming CAR", "cid", rootCid, "err", carErr) } } @@ -207,3 +268,32 @@ func closeWithUnterminatedChunk(res http.ResponseWriter) error { } return nil } + +type checksumWriter struct { + w io.Writer + h *blake3.Hasher + c int64 +} + +func newChecksumWriter(w io.Writer) *checksumWriter { + return &checksumWriter{ + w: w, + h: blake3.New(32, nil), + } +} + +func (hw *checksumWriter) Write(p []byte) (n int, err error) { + if _, err := hw.h.Write(p); err != nil { + return 0, err + } + hw.c += int64(len(p)) + return hw.w.Write(p) +} + +func (hw *checksumWriter) Sum() []byte { + return hw.h.Sum(nil) +} + +func (hw *checksumWriter) Count() int64 { + return hw.c +} diff --git a/metadata/metadata.go b/metadata/metadata.go new file mode 100644 index 0000000..c359721 --- /dev/null +++ b/metadata/metadata.go @@ -0,0 +1,75 @@ +package metadata + +import ( + "fmt" + + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/codec/dagjson" + bindnoderegistry "github.com/ipld/go-ipld-prime/node/bindnode/registry" + mh "github.com/multiformats/go-multihash" + + _ "embed" +) + +//go:embed metadata.ipldsch +var schema []byte + +var BindnodeRegistry = bindnoderegistry.NewRegistry() + +type CarMetadata struct { + Metadata *Metadata +} + +func (cm CarMetadata) Serialize() ([]byte, error) { + return BindnodeRegistry.TypeToBytes(&cm, dagjson.Encode) +} + +func (cm *CarMetadata) Deserialize(byts []byte) error { + cmIface, err := BindnodeRegistry.TypeFromBytes(byts, &CarMetadata{}, dagjson.Decode) + if err != nil { + return fmt.Errorf("invalid CarMetadata: %w", err) + } + cmm := cmIface.(*CarMetadata) // safe to assume type + if cmm.Metadata.Properties == nil && cmm.Metadata.Error == nil { + return fmt.Errorf("invalid CarMetadata: must contain either properties or error fields") + } + if (cmm.Metadata.Properties == nil) == (cmm.Metadata.Error == nil) { + return fmt.Errorf("invalid CarMetadata: must contain either properties or error fields, not both") + } + if cmm.Metadata.Properties != nil { + if _, err := mh.Decode(cmm.Metadata.Properties.ChecksumMultihash); err != nil { + return fmt.Errorf("invalid CarMetadata: checksum multihash: %w", err) + } + } + // TODO: parse and check EntityBytes format + *cm = *cmm + return nil +} + +type Metadata struct { + Request Request + Properties *CarProperties + Error *string +} + +type Request struct { + Root cid.Cid + Path *string + Scope types.DagScope + Duplicates bool + EntityBytes *string +} + +type CarProperties struct { + CarBytes int64 + DataBytes int64 + BlockCount int64 + ChecksumMultihash []byte +} + +func init() { + if err := BindnodeRegistry.RegisterType((*CarMetadata)(nil), string(schema), "CarMetadata"); err != nil { + panic(err.Error()) + } +} diff --git a/metadata/metadata.ipldsch b/metadata/metadata.ipldsch new file mode 100644 index 0000000..ae27840 --- /dev/null +++ b/metadata/metadata.ipldsch @@ -0,0 +1,31 @@ +type CarMetadata union { + | Metadata "car-metadata/v1" +} representation keyed + +type Metadata struct { + request Request + # must contain either a properties or an error + properties optional CarProperties + error optional String +} + +type Request struct { + root &Any + path optional String + scope DagScope + duplicates Bool (rename "dups") + entityBytes optional String (rename "entity-bytes") # Must be a valid entity-bytes param: "from:to" +} + +type DagScope enum { + | all + | entity + | block +} + +type CarProperties struct { + carBytes Int (rename "car_bytes") + dataBytes Int (rename "data_bytes") + blockCount Int (rename "block_count") + checksumMultihash optional Bytes (rename "checksum") # Must be a valid multihash +} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go new file mode 100644 index 0000000..b7a2415 --- /dev/null +++ b/metadata/metadata_test.go @@ -0,0 +1,119 @@ +package metadata_test + +import ( + "testing" + + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipld/frisbii/metadata" + "github.com/stretchr/testify/require" +) + +var testCid = cid.MustParse("bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4") + +func TestCarMetadataRoundtrip(t *testing.T) { + path := "/birb.mp4" + orig := metadata.CarMetadata{ + Metadata: &metadata.Metadata{ + Request: metadata.Request{ + Root: testCid, + Path: &path, + Scope: types.DagScopeAll, + Duplicates: true, + }, + Properties: &metadata.CarProperties{ + CarBytes: 202020, + DataBytes: 101010, + BlockCount: 303, + ChecksumMultihash: testCid.Hash(), + }, + }, + } + byts, err := orig.Serialize() + require.NoError(t, err) + + t.Log("metadata dag-json:", string(byts)) + + var roundtrip metadata.CarMetadata + err = roundtrip.Deserialize(byts) + require.NoError(t, err) + require.Equal(t, orig, roundtrip) + require.NotNil(t, roundtrip.Metadata) + require.Equal(t, testCid, roundtrip.Metadata.Request.Root) + require.NotNil(t, roundtrip.Metadata.Request.Path) + require.Equal(t, "/birb.mp4", *roundtrip.Metadata.Request.Path) + require.Equal(t, types.DagScopeAll, roundtrip.Metadata.Request.Scope) + require.True(t, roundtrip.Metadata.Request.Duplicates) + require.NotNil(t, roundtrip.Metadata.Properties) + require.Nil(t, roundtrip.Metadata.Error) + require.Equal(t, int64(202020), roundtrip.Metadata.Properties.CarBytes) + require.Equal(t, int64(101010), roundtrip.Metadata.Properties.DataBytes) + require.Equal(t, int64(303), roundtrip.Metadata.Properties.BlockCount) + require.Equal(t, []byte(testCid.Hash()), roundtrip.Metadata.Properties.ChecksumMultihash) +} + +func TestCarMetadataErrorRoundtrip(t *testing.T) { + path := "/birb.mp4" + msg := "something bad happened" + orig := metadata.CarMetadata{ + Metadata: &metadata.Metadata{ + Request: metadata.Request{ + Root: testCid, + Path: &path, + Scope: types.DagScopeAll, + Duplicates: true, + }, + Error: &msg, + }, + } + byts, err := orig.Serialize() + require.NoError(t, err) + + t.Log("metadata dag-json:", string(byts)) + + var roundtrip metadata.CarMetadata + err = roundtrip.Deserialize(byts) + require.NoError(t, err) + require.Equal(t, orig, roundtrip) + require.NotNil(t, roundtrip.Metadata) + require.Equal(t, testCid, roundtrip.Metadata.Request.Root) + require.NotNil(t, roundtrip.Metadata.Request.Path) + require.Equal(t, "/birb.mp4", *roundtrip.Metadata.Request.Path) + require.Equal(t, types.DagScopeAll, roundtrip.Metadata.Request.Scope) + require.True(t, roundtrip.Metadata.Request.Duplicates) + require.Nil(t, roundtrip.Metadata.Properties) + require.NotNil(t, roundtrip.Metadata.Error) + require.Equal(t, "something bad happened", *roundtrip.Metadata.Error) +} + +func TestBadMetadata(t *testing.T) { + testCases := []struct { + name string + byts string + err string + }{ + {"empty", `{}`, `union structure constraints for CarMetadata caused rejection: a union must have exactly one entry`}, + {"bad key", `{"not metadata":true}`, `union structure constraints for CarMetadata caused rejection: no member named "not metadata"`}, + { + "bad multihash", + `{"car-metadata/v1":{"properties":{"block_count":303,"car_bytes":202020,"checksum":{"/":{"bytes":"bm90IGEgbXVsdGloYXNo"}},"data_bytes":101010},"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: checksum multihash:`, + }, + { + "no properties or error", + `{"car-metadata/v1":{"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: must contain either properties or error fields`, + }, + { + "both properties and error", + `{"car-metadata/v1":{"error":"something bad happened","properties":{"block_count":303,"car_bytes":202020,"checksum":{"/":{"bytes":"EiBd9neBCasGxUmysJN7nGza4ylHikmbsP2+nXs6BlIpvw"}},"data_bytes":101010},"request":{"dups":true,"path":"/birb.mp4","root":{"/":"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"},"scope":"all"}}}`, + `invalid CarMetadata: must contain either properties or error fields, not both`, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var roundtrip metadata.CarMetadata + require.ErrorContains(t, roundtrip.Deserialize([]byte(tc.byts)), tc.err) + }) + } +}