diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index e14f3d29..b094f43b 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -13,6 +13,8 @@ import ( "github.com/filecoin-project/lassie/pkg/storage" "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage/deferred" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" trustlessutils "github.com/ipld/go-trustless-utils" @@ -253,14 +255,14 @@ func defaultFetchRun( lassie.RegisterSubscriber(pp.subscriber) } - var carWriter *storage.DeferredCarWriter + var carWriter *deferred.DeferredCarWriter if outfile == stdoutFileString { // we need the onlyWriter because stdout is presented as an os.File, and // therefore pretend to support seeks, so feature-checking in go-car // will make bad assumptions about capabilities unless we hide it - carWriter = storage.NewDeferredCarWriterForStream(rootCid, &onlyWriter{dataWriter}) + carWriter = deferred.NewDeferredCarWriterForStream(&onlyWriter{dataWriter}, []cid.Cid{rootCid}) } else { - carWriter = storage.NewDeferredCarWriterForPath(rootCid, outfile) + carWriter = deferred.NewDeferredCarWriterForPath(outfile, []cid.Cid{rootCid}, car.WriteAsCarV1(true)) } tempStore := storage.NewDeferredStorageCar(tempDir, rootCid) diff --git a/go.mod b/go.mod index ea71e415..7c7118e2 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-unixfsnode v1.8.0 - github.com/ipld/go-car/v2 v2.11.0 + github.com/ipld/go-car/v2 v2.13.0 github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-prime v0.21.0 github.com/ipld/go-trustless-utils v0.0.0 @@ -90,7 +90,7 @@ require ( github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect - github.com/ipfs/go-ipld-cbor v0.0.6 // indirect + github.com/ipfs/go-ipld-cbor v0.1.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect @@ -149,7 +149,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/warpfork/go-testmark v0.12.1 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect - github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect + github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect diff --git a/go.sum b/go.sum index 93ab667c..1e55347c 100644 --- a/go.sum +++ b/go.sum @@ -288,8 +288,9 @@ github.com/ipfs/go-ipld-cbor v0.0.3/go.mod h1:wTBtrQZA3SoFKMVkp6cn6HMRteIB1VsmHA github.com/ipfs/go-ipld-cbor v0.0.4/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-cbor v0.0.5/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-cbor v0.0.6-0.20211211231443-5d9b9e1f6fa8/go.mod h1:ssdxxaLJPXH7OjF5V4NSjBbcfh+evoR4ukuru0oPXMA= -github.com/ipfs/go-ipld-cbor v0.0.6 h1:pYuWHyvSpIsOOLw4Jy7NbBkCyzLDcl64Bf/LZW7eBQ0= github.com/ipfs/go-ipld-cbor v0.0.6/go.mod h1:ssdxxaLJPXH7OjF5V4NSjBbcfh+evoR4ukuru0oPXMA= +github.com/ipfs/go-ipld-cbor v0.1.0 h1:dx0nS0kILVivGhfWuB6dUpMa/LAwElHPw1yOGYopoYs= +github.com/ipfs/go-ipld-cbor v0.1.0/go.mod h1:U2aYlmVrJr2wsUBU67K4KgepApSZddGRDWBYR0H4sCk= github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten1F5U= @@ -317,8 +318,8 @@ github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU= github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= -github.com/ipld/go-car/v2 v2.11.0 h1:lkAPwbbTFqbdfawgm+bfmFc8PjGC7D12VcaLXPCLNfM= -github.com/ipld/go-car/v2 v2.11.0/go.mod h1:aDszqev0zjtU8l96g4lwXHaU9bzArj56Y7eEN0q/xqA= +github.com/ipld/go-car/v2 v2.13.0 h1:ogsB0rm6cclI2/HxP2XM+z+KehNj2ovpz0YfiF1hJwE= +github.com/ipld/go-car/v2 v2.13.0/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= @@ -608,8 +609,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200812213548-958ddffe352c/go.mod h1:f github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/cbor-gen v0.0.0-20210118024343-169e9d70c0c2/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/cbor-gen v0.0.0-20210303213153-67a261a1d291/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= -github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa h1:EyA027ZAkuaCLoxVX4r1TZMPy1d31fM6hbfQ4OU4I5o= -github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= +github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 h1:yVYDLoN2gmB3OdBXFW8e1UwgVbmCvNlnAKhvHPaNARI= +github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index dd1fb5e4..0ff548db 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-unixfsnode" + "github.com/ipld/go-car/v2/storage/deferred" cidlink "github.com/ipld/go-ipld-prime/linking/cid" trustlessutils "github.com/ipld/go-trustless-utils" trustlesshttp "github.com/ipld/go-trustless-utils/http" @@ -50,9 +51,9 @@ func IpfsHandler(fetcher types.Fetcher, cfg HttpServerConfig) func(http.Response tempStore := storage.NewDeferredStorageCar(cfg.TempDir, request.Root) var carWriter storage.DeferredWriter if request.Duplicates { - carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), request.Root, request.Path, request.Scope, request.Bytes, tempStore, res) + carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), res, request.Root, request.Path, request.Scope, request.Bytes, tempStore) } else { - carWriter = storage.NewDeferredCarWriterForStream(request.Root, res) + carWriter = deferred.NewDeferredCarWriterForStream(res, []cid.Cid{request.Root}) } carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer func() { diff --git a/pkg/storage/cachingtempstore_test.go b/pkg/storage/cachingtempstore_test.go index f3bb7fa3..993c4221 100644 --- a/pkg/storage/cachingtempstore_test.go +++ b/pkg/storage/cachingtempstore_test.go @@ -4,14 +4,21 @@ import ( "bytes" "context" "io" + "math/rand" + "sync" "testing" "time" "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage/deferred" + mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) +var rng = rand.New(rand.NewSource(3333)) +var rngLk sync.Mutex + func TestDeferredCarWriterWritesCARv1(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -48,7 +55,7 @@ func TestDeferredCarWriterWritesCARv1(t *testing.T) { testCid2, testData2 := randBlock() var buf bytes.Buffer - cw := NewDeferredCarWriterForStream(testCid1, &buf) + cw := deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) ss := NewCachingTempStore(cw.BlockWriteOpener(), NewDeferredStorageCar("", testCid1)) t.Cleanup(func() { ss.Close() }) @@ -155,3 +162,20 @@ func TestDeferredCarWriterWritesCARv1(t *testing.T) { }) } } + +func randBlock() (cid.Cid, []byte) { + data := make([]byte, 1024) + rngLk.Lock() + rng.Read(data) + rngLk.Unlock() + h, err := mh.Sum(data, mh.SHA2_512, -1) + if err != nil { + panic(err) + } + return cid.NewCidV1(cid.Raw, h), data +} + +func randCid() cid.Cid { + c, _ := randBlock() + return c +} diff --git a/pkg/storage/deferredcarwriter.go b/pkg/storage/deferredcarwriter.go deleted file mode 100644 index 276b3ef1..00000000 --- a/pkg/storage/deferredcarwriter.go +++ /dev/null @@ -1,160 +0,0 @@ -package storage - -import ( - "context" - "io" - "os" - "sync" - - "github.com/ipfs/go-cid" - carv2 "github.com/ipld/go-car/v2" - carstorage "github.com/ipld/go-car/v2/storage" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/linking" - ipldstorage "github.com/ipld/go-ipld-prime/storage" -) - -type putCb struct { - cb func(int) - once bool -} - -var _ ipldstorage.WritableStorage = (*DeferredCarWriter)(nil) -var _ io.Closer = (*DeferredCarWriter)(nil) - -type DeferredWriter interface { - ipldstorage.WritableStorage - io.Closer - BlockWriteOpener() linking.BlockWriteOpener - OnPut(cb func(int), once bool) -} - -// DeferredCarWriter creates a write-only CARv1 either to an existing stream or -// to a file designated by a supplied path. CARv1 content (including header) -// only begins when the first Put() operation is performed. If the output is a -// file, it will be created when the first Put() operation is performed. -// DeferredCarWriter is threadsafe, and can be used concurrently. -// Closing the writer will close, but not delete, the underlying file. This -// writer is intended for constructing the final output CARv1 for the user. -type DeferredCarWriter struct { - root cid.Cid - outPath string - outStream io.Writer - - lk sync.Mutex - f *os.File - w carstorage.WritableCar - putCb []putCb - opts []carv2.Option -} - -// NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a -// file designated by the supplied path. The file will only be created on the -// first Put() operation. -func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} -} - -// NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to -// the supplied stream. The stream will only be written to on the first Put() -// operation. -func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} -} - -// OnPut will call a callback when each Put() operation is started. The argument -// to the callback is the number of bytes being written. If once is true, the -// callback will be removed after the first call. -func (dcw *DeferredCarWriter) OnPut(cb func(int), once bool) { - if dcw.putCb == nil { - dcw.putCb = make([]putCb, 0) - } - dcw.putCb = append(dcw.putCb, putCb{cb: cb, once: once}) -} - -// Has returns false if the key was not already written to the CARv1 output. -func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error) { - dcw.lk.Lock() - defer dcw.lk.Unlock() - - if dcw.w == nil { // shortcut, haven't written anything, don't even initialise - return false, nil - } - - writer, err := dcw.writer() - if err != nil { - return false, err - } - - return writer.Has(ctx, key) -} - -// Put writes the given content to the CARv1 output stream, creating it if it -// doesn't exist yet. -func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byte) error { - dcw.lk.Lock() - defer dcw.lk.Unlock() - - if dcw.putCb != nil { - // call all callbacks, remove those that were only needed once - for i := 0; i < len(dcw.putCb); i++ { - cb := dcw.putCb[i] - cb.cb(len(content)) - if cb.once { - dcw.putCb = append(dcw.putCb[:i], dcw.putCb[i+1:]...) - i-- - } - } - } - - // first Put() call, initialise writer, which will write a CARv1 header - writer, err := dcw.writer() - if err != nil { - return err - } - - return writer.Put(ctx, key, content) -} - -// writer() -func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { - if dcw.w == nil { - outStream := dcw.outStream - if outStream == nil { - openedFile, err := os.OpenFile(dcw.outPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) - if err != nil { - return nil, err - } - dcw.f = openedFile - outStream = openedFile - } - w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, append([]carv2.Option{carv2.WriteAsCarV1(true)}, dcw.opts...)...) - if err != nil { - return nil, err - } - dcw.w = w - } - return dcw.w, nil -} - -// Close closes the underlying file, if one was created. -func (dcw *DeferredCarWriter) Close() error { - dcw.lk.Lock() - defer dcw.lk.Unlock() - - if dcw.f != nil { - defer func() { dcw.f = nil }() - return dcw.f.Close() - } - return nil -} - -// BlockWriteOpener returns a BlockWriteOpener that operates on this storage. -func (dcw *DeferredCarWriter) BlockWriteOpener() linking.BlockWriteOpener { - return func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { - wr, wrcommit, err := ipldstorage.PutStream(lctx.Ctx, dcw) - return wr, func(lnk ipld.Link) error { - return wrcommit(lnk.Binary()) - }, err - } -} diff --git a/pkg/storage/deferredcarwriter_test.go b/pkg/storage/deferredcarwriter_test.go deleted file mode 100644 index 486177c6..00000000 --- a/pkg/storage/deferredcarwriter_test.go +++ /dev/null @@ -1,224 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "io" - "math/rand" - "os" - "sync" - "testing" - - "github.com/ipfs/go-cid" - carv2 "github.com/ipld/go-car/v2" - mh "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/require" -) - -var rng = rand.New(rand.NewSource(3333)) -var rngLk sync.Mutex - -func TestDeferredCarWriterForPath(t *testing.T) { - ctx := context.Background() - testCid1, testData1 := randBlock() - testCid2, testData2 := randBlock() - - tmpFile := t.TempDir() + "/test.car" - - cw := NewDeferredCarWriterForPath(testCid1, tmpFile) - - _, err := os.Stat(tmpFile) - require.True(t, os.IsNotExist(err)) - - require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) - require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) - - stat, err := os.Stat(tmpFile) - require.NoError(t, err) - require.True(t, stat.Size() > int64(len(testData1)+len(testData2))) - - require.NoError(t, cw.Close()) - - // shouldn't be deleted - _, err = os.Stat(tmpFile) - require.NoError(t, err) - - r, err := os.Open(tmpFile) - require.NoError(t, err) - t.Cleanup(func() { r.Close() }) - carv2, err := carv2.NewBlockReader(r) - require.NoError(t, err) - - // compare CAR contents to what we wrote - require.Equal(t, carv2.Roots, []cid.Cid{testCid1}) - require.Equal(t, carv2.Version, uint64(1)) - - blk, err := carv2.Next() - require.NoError(t, err) - require.Equal(t, blk.Cid(), testCid1) - require.Equal(t, blk.RawData(), testData1) - - blk, err = carv2.Next() - require.NoError(t, err) - require.Equal(t, blk.Cid(), testCid2) - require.Equal(t, blk.RawData(), testData2) - - _, err = carv2.Next() - require.ErrorIs(t, err, io.EOF) -} - -func TestDeferredCarWriterForStream(t *testing.T) { - for _, tc := range []string{"path", "stream"} { - tc := tc - t.Run(tc, func(t *testing.T) { - t.Parallel() - ctx := context.Background() - testCid1, testData1 := randBlock() - testCid2, testData2 := randBlock() - testCid3, _ := randBlock() - - var cw *DeferredCarWriter - var buf bytes.Buffer - tmpFile := t.TempDir() + "/test.car" - - if tc == "path" { - cw = NewDeferredCarWriterForPath(testCid1, tmpFile) - _, err := os.Stat(tmpFile) - require.True(t, os.IsNotExist(err)) - } else { - cw = NewDeferredCarWriterForStream(testCid1, &buf) - require.Equal(t, buf.Len(), 0) - } - - has, err := cw.Has(ctx, testCid3.KeyString()) - require.NoError(t, err) - require.False(t, has) - - require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) - has, err = cw.Has(ctx, testCid1.KeyString()) - require.NoError(t, err) - require.True(t, has) - require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) - has, err = cw.Has(ctx, testCid1.KeyString()) - require.NoError(t, err) - require.True(t, has) - has, err = cw.Has(ctx, testCid2.KeyString()) - require.NoError(t, err) - require.True(t, has) - has, err = cw.Has(ctx, testCid3.KeyString()) - require.NoError(t, err) - require.False(t, has) - - if tc == "path" { - stat, err := os.Stat(tmpFile) - require.NoError(t, err) - require.True(t, stat.Size() > int64(len(testData1)+len(testData2))) - } else { - require.True(t, buf.Len() > len(testData1)+len(testData2)) - } - - require.NoError(t, cw.Close()) - - var rdr *carv2.BlockReader - if tc == "path" { - r, err := os.Open(tmpFile) - require.NoError(t, err) - rdr, err = carv2.NewBlockReader(r) - require.NoError(t, err) - t.Cleanup(func() { r.Close() }) - } else { - rdr, err = carv2.NewBlockReader(&buf) - require.NoError(t, err) - } - - // compare CAR contents to what we wrote - require.Equal(t, rdr.Roots, []cid.Cid{testCid1}) - require.Equal(t, rdr.Version, uint64(1)) - - blk, err := rdr.Next() - require.NoError(t, err) - require.Equal(t, blk.Cid(), testCid1) - require.Equal(t, blk.RawData(), testData1) - - blk, err = rdr.Next() - require.NoError(t, err) - require.Equal(t, blk.Cid(), testCid2) - require.Equal(t, blk.RawData(), testData2) - - _, err = rdr.Next() - require.ErrorIs(t, err, io.EOF) - }) - } -} - -func TestDeferredCarWriterPutCb(t *testing.T) { - ctx := context.Background() - testCid1, testData1 := randBlock() - testCid2, testData2 := randBlock() - - var buf bytes.Buffer - cw := NewDeferredCarWriterForStream(testCid1, &buf) - - var pc1 int - cw.OnPut(func(ii int) { - switch pc1 { - case 0: - require.Equal(t, buf.Len(), 0) // called before first write - require.Equal(t, len(testData1), ii) - case 1: - require.Equal(t, len(testData2), ii) - default: - require.Fail(t, "unexpected put callback") - } - pc1++ - }, false) - var pc2 int - cw.OnPut(func(ii int) { - switch pc2 { - case 0: - require.Equal(t, buf.Len(), 0) // called before first write - require.Equal(t, len(testData1), ii) - case 1: - require.Equal(t, len(testData2), ii) - default: - require.Fail(t, "unexpected put callback") - } - pc2++ - }, false) - var pc3 int - cw.OnPut(func(ii int) { - switch pc3 { - case 0: - require.Equal(t, buf.Len(), 0) // called before first write - require.Equal(t, len(testData1), ii) - default: - require.Fail(t, "unexpected put callback") - } - pc3++ - }, true) - - require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) - require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) - require.NoError(t, cw.Close()) - - require.Equal(t, 2, pc1) - require.Equal(t, 2, pc2) - require.Equal(t, 1, pc3) -} - -func randBlock() (cid.Cid, []byte) { - data := make([]byte, 1024) - rngLk.Lock() - rng.Read(data) - rngLk.Unlock() - h, err := mh.Sum(data, mh.SHA2_512, -1) - if err != nil { - panic(err) - } - return cid.NewCidV1(cid.Raw, h), data -} - -func randCid() cid.Cid { - c, _ := randBlock() - return c -} diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index 90bd46c3..07b6f3ae 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -11,15 +11,26 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage/deferred" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + ipldstorage "github.com/ipld/go-ipld-prime/storage" trustlessutils "github.com/ipld/go-trustless-utils" "github.com/ipld/go-trustless-utils/traversal" ) +type DeferredWriter interface { + ipldstorage.WritableStorage + io.Closer + BlockWriteOpener() linking.BlockWriteOpener + OnPut(cb func(int), once bool) +} + +var _ DeferredWriter = (*DuplicateAdderCar)(nil) + type DuplicateAdderCar struct { - *DeferredCarWriter + *deferred.DeferredCarWriter ctx context.Context root cid.Cid path string @@ -33,12 +44,12 @@ type DuplicateAdderCar struct { func NewDuplicateAdderCarForStream( ctx context.Context, + outStream io.Writer, root cid.Cid, path string, scope trustlessutils.DagScope, bytes *trustlessutils.ByteRange, store *DeferredStorageCar, - outStream io.Writer, ) *DuplicateAdderCar { blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})} @@ -46,7 +57,7 @@ func NewDuplicateAdderCarForStream( blockStream.cond = sync.NewCond(&blockStream.mu) // create the car writer for the final stream - outgoing := NewDeferredCarWriterForStream(root, outStream, carv2.AllowDuplicatePuts(true)) + outgoing := deferred.NewDeferredCarWriterForStream(outStream, []cid.Cid{root}, carv2.AllowDuplicatePuts(true)) return &DuplicateAdderCar{ DeferredCarWriter: outgoing, ctx: ctx, diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index f7f8ea06..33c05197 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -33,7 +33,7 @@ func TestDuplicateAdderCar(t *testing.T) { store := storage.NewDeferredStorageCar("", unixfsFileWithDups.Root) ctx := context.Background() - carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", trustlessutils.DagScopeAll, nil, store, buf) + carWriter := storage.NewDuplicateAdderCarForStream(ctx, buf, unixfsFileWithDups.Root, "", trustlessutils.DagScopeAll, nil, store) cachingTempStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), store) // write the root block, containing sharding metadata