From 39f18cc5b900e7013347c9f11e005e89b3510d5e Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 26 Jan 2022 12:08:44 +0100 Subject: [PATCH 01/11] Add a 'skip' parameter to writev1 so that the beginning of a car can be skipped --- v2/internal/loader/writing_loader.go | 40 +++++++++++++++++++--------- v2/selective.go | 33 ++++++++++++++++------- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index b4d0e6ef..9387d210 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -13,10 +13,11 @@ import ( ) type writerOutput struct { - w io.Writer - size uint64 - code multicodec.Code - rcrds map[cid.Cid]index.Record + w io.Writer + size uint64 + toSkip uint64 + code multicodec.Code + rcrds map[cid.Cid]index.Record } func (w *writerOutput) Size() uint64 { @@ -54,17 +55,31 @@ type writingReader struct { } func (w *writingReader) Read(p []byte) (int, error) { + buf := bytes.NewBuffer(nil) if w.wo != nil { // write the cid size := varint.ToUvarint(uint64(w.len) + uint64(len(w.cid))) - if _, err := w.wo.w.Write(size); err != nil { + if _, err := buf.Write(size); err != nil { return 0, err } - if _, err := w.wo.w.Write([]byte(w.cid)); err != nil { + if _, err := buf.Write([]byte(w.cid)); err != nil { return 0, err } cpy := bytes.NewBuffer(w.r.(*bytes.Buffer).Bytes()) - if _, err := cpy.WriteTo(w.wo.w); err != nil { + if _, err := cpy.WriteTo(buf); err != nil { + return 0, err + } + out := buf.Bytes() + if w.wo.toSkip > 0 { + if w.wo.toSkip >= uint64(len(out)) { + w.wo.toSkip -= uint64(len(out)) + out = []byte{} + } else { + out = out[w.wo.toSkip:] + w.wo.toSkip = 0 + } + } + if _, err := bytes.NewBuffer(out).WriteTo(w.wo.w); err != nil { return 0, err } _, c, err := cid.CidFromBytes([]byte(w.cid)) @@ -89,12 +104,13 @@ func (w *writingReader) Read(p []byte) (int, error) { // The `initialOffset` is used to calculate the offsets recorded for the index, and will be // included in the `.Size()` of the IndexTracker. // An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets. -func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { +func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { wo := writerOutput{ - w: w, - size: initialOffset, - code: indexCodec, - rcrds: make(map[cid.Cid]index.Record), + w: w, + size: initialOffset, + toSkip: skip, + code: indexCodec, + rcrds: make(map[cid.Cid]index.Record), } tls := ls diff --git a/v2/selective.go b/v2/selective.go index 39bb5f91..4bb528c3 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -1,6 +1,7 @@ package car import ( + "bytes" "context" "fmt" "io" @@ -112,7 +113,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector opts: ApplyOptions(opts...), } - len, _, err := tc.WriteV1(writer) + len, _, err := tc.WriteV1(0, writer) return len, err } @@ -137,7 +138,7 @@ func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { if err != nil { return n, err } - v1s, idx, err := tc.WriteV1(w) + v1s, idx, err := tc.WriteV1(0, w) n += int64(v1s) if err != nil { @@ -202,21 +203,35 @@ func (tc *traversalCar) WriteV2Header(w io.Writer) (int64, error) { return hn, nil } -func (tc *traversalCar) WriteV1(w io.Writer) (uint64, index.Index, error) { +// WriteV1 writes a v1 car to the writer, w, except for the first `skip` bytes. +// Returns bytes written, an index of what was written, or error if one occured. +func (tc *traversalCar) WriteV1(skip uint64, w io.Writer) (uint64, index.Index, error) { + written := uint64(0) + // write the v1 header c1h := carv1.CarHeader{Roots: []cid.Cid{tc.root}, Version: 1} - if err := carv1.WriteHeader(&c1h, w); err != nil { - return 0, nil, err - } v1Size, err := carv1.HeaderSize(&c1h) if err != nil { - return v1Size, nil, err + return 0, nil, err + } + if skip < v1Size { + buf := bytes.NewBuffer(nil) + if err := carv1.WriteHeader(&c1h, buf); err != nil { + return 0, nil, err + } + if _, err := w.Write(buf.Bytes()[skip:]); err != nil { + return 0, nil, err + } + written = v1Size - skip + skip = 0 + } else { + skip -= v1Size } // write the block. - wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, tc.opts.IndexCodec) + wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) err = traverse(tc.ctx, &wls, tc.root, tc.selector, tc.opts) - v1Size = writer.Size() + v1Size = writer.Size() - v1Size + written if err != nil { return v1Size, nil, err } From e7ced920c49d53feba9ef03ad7fe3a983870310f Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 27 Jan 2022 11:42:37 +0100 Subject: [PATCH 02/11] expand interface, expose reader --- v2/internal/io/offset_write_seeker.go | 9 +- v2/internal/io/skip_writer_read_seeker.go | 108 ++++++++++++++++++++++ v2/options.go | 1 + v2/selective.go | 62 +++++++++++-- 4 files changed, 168 insertions(+), 12 deletions(-) create mode 100644 v2/internal/io/skip_writer_read_seeker.go diff --git a/v2/internal/io/offset_write_seeker.go b/v2/internal/io/offset_write_seeker.go index 7e0f6ba5..afee7f40 100644 --- a/v2/internal/io/offset_write_seeker.go +++ b/v2/internal/io/offset_write_seeker.go @@ -1,6 +1,11 @@ package io -import "io" +import ( + "errors" + "io" +) + +var ErrUnsupported = errors.New("unsupported seek operation") var ( _ io.Writer = (*OffsetWriteSeeker)(nil) @@ -30,7 +35,7 @@ func (ow *OffsetWriteSeeker) Seek(offset int64, whence int) (int64, error) { case io.SeekCurrent: ow.offset += offset case io.SeekEnd: - panic("unsupported whence: SeekEnd") + return 0, ErrUnsupported } return ow.Position(), nil } diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go new file mode 100644 index 00000000..42437b78 --- /dev/null +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -0,0 +1,108 @@ +package io + +import ( + "context" + "errors" + "fmt" + "io" +) + +// SkipWriterReaderSeeker wraps a factory producing a writer with a ReadSeeker implementation. +// Note that Read and Seek are not thread-safe, they must not be called +// concurrently. +type SkipWriterReaderSeeker struct { + parentCtx context.Context + offset uint64 + size uint64 + + cons ReWriter + reader *io.PipeReader + writeCancel context.CancelFunc + writeComplete chan struct{} +} + +// ReWriter is a function writing to an io.Writer from an offset. +type ReWriter func(ctx context.Context, skip uint64, w io.Writer) (uint64, error) + +var _ io.ReadSeeker = (*SkipWriterReaderSeeker)(nil) + +// NewSkipWriterReaderSeeker creates an io.ReadSeeker around a ReWriter. +func NewSkipWriterReaderSeeker(ctx context.Context, size uint64, cons ReWriter) *SkipWriterReaderSeeker { + return &SkipWriterReaderSeeker{ + parentCtx: ctx, + size: size, + cons: cons, + writeComplete: make(chan struct{}, 1), + } +} + +// Note: not threadsafe +func (c *SkipWriterReaderSeeker) Read(p []byte) (n int, err error) { + // Check if there's already a write in progress + if c.reader == nil { + // No write in progress, start a new write from the current offset + // in a go routine + writeCtx, writeCancel := context.WithCancel(c.parentCtx) + c.writeCancel = writeCancel + pr, pw := io.Pipe() + c.reader = pr + + go func() { + amnt, err := c.cons(writeCtx, c.offset, pw) + c.offset += amnt + if err != nil && !errors.Is(err, context.Canceled) { + pw.CloseWithError(err) + } else { + pw.Close() + } + c.writeComplete <- struct{}{} + }() + } + + return c.reader.Read(p) +} + +// Note: not threadsafe +func (c *SkipWriterReaderSeeker) Seek(offset int64, whence int) (int64, error) { + // Update the offset + switch whence { + case io.SeekStart: + if offset < 0 { + return 0, fmt.Errorf("invalid offset %d from start: must be positive", offset) + } + c.offset = uint64(offset) + case io.SeekCurrent: + if int64(c.offset)+offset < 0 { + return 0, fmt.Errorf("invalid offset %d from current %d: resulting offset is negative", offset, c.offset) + } + c.offset = uint64((int64(c.offset) + offset)) + case io.SeekEnd: + if c.size == 0 { + return 0, ErrUnsupported + + } + if int64(c.size)+offset < 0 { + return 0, fmt.Errorf("invalid offset %d from end: larger than total size %d", offset, c.size) + } + c.offset = uint64(int64(c.size) + offset) + } + + // Cancel any ongoing write and wait for it to complete + // TODO: if we're fast-forwarding with 'SeekCurrent', we may be able to read from the current reader instead. + if c.reader != nil { + c.writeCancel() + + // Seek and Read should not be called concurrently so this is safe + c.reader.Close() + + select { + case <-c.parentCtx.Done(): + return 0, c.parentCtx.Err() + case <-c.writeComplete: + } + + c.reader = nil + } + + return int64(c.offset), nil +} diff --git a/v2/options.go b/v2/options.go index d2e526c4..b7674c78 100644 --- a/v2/options.go +++ b/v2/options.go @@ -60,6 +60,7 @@ type Options struct { MaxTraversalLinks uint64 WriteAsCarV1 bool TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser + V1Size uint64 MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 diff --git a/v2/selective.go b/v2/selective.go index 4bb528c3..28280429 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2/index" "github.com/ipld/go-car/v2/internal/carv1" + ioint "github.com/ipld/go-car/v2/internal/io" "github.com/ipld/go-car/v2/internal/loader" ipld "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" @@ -41,9 +42,28 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } } +// WithV1Size sets the expected v1 size of the car being written if it is known in advance. +func WithV1Size(size uint64) Option { + return func(sco *Options) { + sco.V1Size = size + } +} + // NewSelectiveWriter walks through the proposed dag traversal to learn its total size in order to be able to // stream out a car to a writer in the expected traversal order in one go. func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) { + conf := ApplyOptions(opts...) + if conf.V1Size != 0 { + return &traversalCar{ + size: conf.V1Size, + ctx: ctx, + root: root, + selector: selector, + ls: ls, + opts: ApplyOptions(opts...), + }, nil + } + cls, cntr := loader.CountingLinkSystem(*ls) c1h := carv1.CarHeader{Roots: []cid.Cid{root}, Version: 1} @@ -51,7 +71,7 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, if err != nil { return nil, err } - if err := traverse(ctx, &cls, root, selector, ApplyOptions(opts...)); err != nil { + if err := traverse(ctx, &cls, root, selector, conf); err != nil { return nil, err } tc := traversalCar{ @@ -68,13 +88,14 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, // TraverseToFile writes a car file matching a given root and selector to the // path at `destination` using one read of each block. func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, destination string, opts ...Option) error { + conf := ApplyOptions(opts...) tc := traversalCar{ - size: 0, + size: conf.V1Size, ctx: ctx, root: root, selector: selector, ls: ls, - opts: ApplyOptions(opts...), + opts: conf, } fp, err := os.Create(destination) @@ -104,19 +125,40 @@ func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele // TraverseV1 walks through the proposed dag traversal and writes a carv1 to the provided io.Writer func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, writer io.Writer, opts ...Option) (uint64, error) { opts = append(opts, WithoutIndex()) + conf := ApplyOptions(opts...) tc := traversalCar{ - size: 0, + size: conf.V1Size, ctx: ctx, root: root, selector: selector, ls: ls, - opts: ApplyOptions(opts...), + opts: conf, } - len, _, err := tc.WriteV1(0, writer) + len, _, err := tc.WriteV1(tc.ctx, 0, writer) return len, err } +// CreateV1Reader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. +func CreateV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { + opts = append(opts, WithoutIndex()) + conf := ApplyOptions(opts...) + tc := traversalCar{ + size: conf.V1Size, + ctx: ctx, + root: root, + selector: selector, + ls: ls, + opts: conf, + } + rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { + s, _, err := tc.WriteV1(ctx, offset, writer) + return s, err + } + rw := ioint.NewSkipWriterReaderSeeker(ctx, conf.V1Size, rwf) + return rw, nil +} + // Writer is an interface allowing writing a car prepared by PrepareTraversal type Writer interface { io.WriterTo @@ -138,7 +180,7 @@ func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { if err != nil { return n, err } - v1s, idx, err := tc.WriteV1(0, w) + v1s, idx, err := tc.WriteV1(tc.ctx, 0, w) n += int64(v1s) if err != nil { @@ -205,7 +247,7 @@ func (tc *traversalCar) WriteV2Header(w io.Writer) (int64, error) { // WriteV1 writes a v1 car to the writer, w, except for the first `skip` bytes. // Returns bytes written, an index of what was written, or error if one occured. -func (tc *traversalCar) WriteV1(skip uint64, w io.Writer) (uint64, index.Index, error) { +func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (uint64, index.Index, error) { written := uint64(0) // write the v1 header @@ -230,7 +272,7 @@ func (tc *traversalCar) WriteV1(skip uint64, w io.Writer) (uint64, index.Index, // write the block. wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) - err = traverse(tc.ctx, &wls, tc.root, tc.selector, tc.opts) + err = traverse(ctx, &wls, tc.root, tc.selector, tc.opts) v1Size = writer.Size() - v1Size + written if err != nil { return v1Size, nil, err @@ -281,7 +323,7 @@ func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Nod if err != nil { return err } - rootNode, err := ls.Load(ipld.LinkContext{}, lnk, rp) + rootNode, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any) if err != nil { return fmt.Errorf("root blk load failed: %s", err) } From 1a1ff47c4420ac2a7eab413c90cfd8105bf21f47 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 18 Apr 2022 10:32:51 +0200 Subject: [PATCH 03/11] track block offsets in traversal resumption to allow for rewinding to block offsets --- v2/internal/loader/counting_loader.go | 18 +- v2/options.go | 2 +- v2/selective.go | 79 ++++---- v2/traversal/resumption.go | 257 ++++++++++++++++++++++++++ v2/traversal/resumption_test.go | 241 ++++++++++++++++++++++++ 5 files changed, 556 insertions(+), 41 deletions(-) create mode 100644 v2/traversal/resumption.go create mode 100644 v2/traversal/resumption_test.go diff --git a/v2/internal/loader/counting_loader.go b/v2/internal/loader/counting_loader.go index e428993e..9adaee0d 100644 --- a/v2/internal/loader/counting_loader.go +++ b/v2/internal/loader/counting_loader.go @@ -9,13 +9,13 @@ import ( "github.com/multiformats/go-varint" ) -// counter tracks how much data has been read. -type counter struct { - totalRead uint64 +// Counter tracks how much data has been read. +type Counter struct { + TotalRead uint64 } -func (c *counter) Size() uint64 { - return c.totalRead +func (c *Counter) Size() uint64 { + return c.TotalRead } // ReadCounter provides an externally consumable interface to the @@ -26,12 +26,12 @@ type ReadCounter interface { type countingReader struct { r io.Reader - c *counter + c *Counter } func (c *countingReader) Read(p []byte) (int, error) { n, err := c.r.Read(p) - c.c.totalRead += uint64(n) + c.c.TotalRead += uint64(n) return n, err } @@ -41,7 +41,7 @@ func (c *countingReader) Read(p []byte) (int, error) { // appear in a CAR file is added to the counter (included the size of the // CID and the varint length for the block data). func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) { - c := counter{} + c := Counter{} clc := ls clc.StorageReadOpener = func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { r, err := ls.StorageReadOpener(lc, l) @@ -54,7 +54,7 @@ func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) { return nil, err } size := varint.ToUvarint(uint64(n) + uint64(len(l.Binary()))) - c.totalRead += uint64(len(size)) + uint64(len(l.Binary())) + c.TotalRead += uint64(len(size)) + uint64(len(l.Binary())) return &countingReader{buf, &c}, nil } return clc, &c diff --git a/v2/options.go b/v2/options.go index b7674c78..aa21969c 100644 --- a/v2/options.go +++ b/v2/options.go @@ -60,7 +60,7 @@ type Options struct { MaxTraversalLinks uint64 WriteAsCarV1 bool TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser - V1Size uint64 + DataPayloadSize uint64 MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 diff --git a/v2/selective.go b/v2/selective.go index 28280429..a08537d7 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -14,6 +14,7 @@ import ( "github.com/ipld/go-car/v2/internal/carv1" ioint "github.com/ipld/go-car/v2/internal/io" "github.com/ipld/go-car/v2/internal/loader" + resumetraversal "github.com/ipld/go-car/v2/traversal" ipld "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" @@ -42,10 +43,10 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } } -// WithV1Size sets the expected v1 size of the car being written if it is known in advance. -func WithV1Size(size uint64) Option { +// WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance. +func WithDataPayloadSize(size uint64) Option { return func(sco *Options) { - sco.V1Size = size + sco.DataPayloadSize = size } } @@ -53,9 +54,9 @@ func WithV1Size(size uint64) Option { // stream out a car to a writer in the expected traversal order in one go. func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) { conf := ApplyOptions(opts...) - if conf.V1Size != 0 { + if conf.DataPayloadSize != 0 { return &traversalCar{ - size: conf.V1Size, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, @@ -63,25 +64,27 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, opts: ApplyOptions(opts...), }, nil } - - cls, cntr := loader.CountingLinkSystem(*ls) + tc := traversalCar{ + //size: headSize + cntr.Size(), + ctx: ctx, + root: root, + selector: selector, + ls: ls, + opts: ApplyOptions(opts...), + } + if err := tc.setup(ctx, ls, ApplyOptions(opts...)); err != nil { + return nil, err + } c1h := carv1.CarHeader{Roots: []cid.Cid{root}, Version: 1} headSize, err := carv1.HeaderSize(&c1h) if err != nil { return nil, err } - if err := traverse(ctx, &cls, root, selector, conf); err != nil { + if err := tc.traverse(root, selector); err != nil { return nil, err } - tc := traversalCar{ - size: headSize + cntr.Size(), - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: ApplyOptions(opts...), - } + tc.size = headSize + tc.resumer.Position() return &tc, nil } @@ -90,7 +93,7 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, destination string, opts ...Option) error { conf := ApplyOptions(opts...) tc := traversalCar{ - size: conf.V1Size, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, @@ -127,7 +130,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ - size: conf.V1Size, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, @@ -144,7 +147,7 @@ func CreateV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ - size: conf.V1Size, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, @@ -155,7 +158,7 @@ func CreateV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele s, _, err := tc.WriteV1(ctx, offset, writer) return s, err } - rw := ioint.NewSkipWriterReaderSeeker(ctx, conf.V1Size, rwf) + rw := ioint.NewSkipWriterReaderSeeker(ctx, conf.DataPayloadSize, rwf) return rw, nil } @@ -173,6 +176,8 @@ type traversalCar struct { selector ipld.Node ls *ipld.LinkSystem opts Options + progress *traversal.Progress + resumer resumetraversal.TraverseResumer } func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { @@ -272,7 +277,10 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( // write the block. wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) - err = traverse(ctx, &wls, tc.root, tc.selector, tc.opts) + if err = tc.setup(ctx, &wls, tc.opts); err != nil { + return v1Size, nil, err + } + err = tc.traverse(tc.root, tc.selector) v1Size = writer.Size() - v1Size + written if err != nil { return v1Size, nil, err @@ -289,12 +297,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( return v1Size, idx, err } -func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Node, opts Options) error { - sel, err := selector.CompileSelector(s) - if err != nil { - return err - } - +func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Options) error { chooser := func(_ ipld.Link, _ linking.LinkContext) (ipld.NodePrototype, error) { return basicnode.Prototype.Any, nil } @@ -317,17 +320,31 @@ func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Nod } } - lnk := cidlink.Link{Cid: root} ls.TrustedStorage = true - rp, err := chooser(lnk, ipld.LinkContext{}) + resumer, err := resumetraversal.WithTraversingLinksystem(&progress) + if err != nil { + return err + } + tc.progress = &progress + tc.resumer = resumer + return nil +} + +func (tc *traversalCar) traverse(root cid.Cid, s ipld.Node) error { + sel, err := selector.CompileSelector(s) + if err != nil { + return err + } + lnk := cidlink.Link{Cid: root} + rp, err := tc.progress.Cfg.LinkTargetNodePrototypeChooser(lnk, ipld.LinkContext{}) if err != nil { return err } - rootNode, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any) + rootNode, err := tc.progress.Cfg.LinkSystem.Load(ipld.LinkContext{}, lnk, rp) if err != nil { return fmt.Errorf("root blk load failed: %s", err) } - err = progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error { + err = tc.progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error { if lbn, ok := node.(datamodel.LargeBytesNode); ok { s, err := lbn.AsLargeBytes() if err != nil { diff --git a/v2/traversal/resumption.go b/v2/traversal/resumption.go new file mode 100644 index 00000000..d88eead8 --- /dev/null +++ b/v2/traversal/resumption.go @@ -0,0 +1,257 @@ +package traversal + +// Resumption is an extension to an ipld traversal Progress struct that tracks the tree of the dag as it is discovered. +// For each link, it tracks the offset that node would appear at from the beginning of the traversal, if the traversal +// were to be serialized in a car format (e.g. [size || cid || block]*, no car header offset is included) +// It can then resume the traversal based on either a path within the traversal, or a car offset. + +import ( + "fmt" + "io" + "math" + + "github.com/ipld/go-car/v2/internal/loader" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + "github.com/ipld/go-ipld-prime/traversal" +) + +type pathNode struct { + link datamodel.Link + offset uint64 + children map[datamodel.PathSegment]*pathNode +} + +func newPath(link datamodel.Link, at uint64) *pathNode { + return &pathNode{ + link: link, + offset: at, + children: make(map[datamodel.PathSegment]*pathNode), + } +} + +func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) { + if len(p) == 0 { + return + } + if _, ok := pn.children[p[0]]; !ok { + child := newPath(link, at) + pn.children[p[0]] = child + } + pn.children[p[0]].addPath(p[1:], link, at) +} + +func (pn pathNode) allLinks() []datamodel.Link { + if len(pn.children) == 0 { + return []datamodel.Link{pn.link} + } + links := make([]datamodel.Link, 0) + if pn.link != nil { + links = append(links, pn.link) + } + for _, v := range pn.children { + links = append(links, v.allLinks()...) + } + return links +} + +// getPaths returns reconstructed paths in the tree rooted at 'root' +func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { + segs := root.Segments() + switch len(segs) { + case 0: + if pn.link != nil { + return []datamodel.Link{pn.link} + } + return []datamodel.Link{} + case 1: + // base case 1: get all paths below this child. + next := segs[0] + if child, ok := pn.children[next]; ok { + return child.allLinks() + } + return []datamodel.Link{} + default: + } + + next := segs[0] + if _, ok := pn.children[next]; !ok { + // base case 2: not registered sub-path. + return []datamodel.Link{} + } + return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:])) +} + +var errInvalid = fmt.Errorf("invalid path") + +func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { + // we look for offset of next sibling. + // if no next sibling recurse up the path segments until we find a next sibling. + segs := root.Segments() + if len(segs) == 0 { + return 0, errInvalid + } + // see if this path is a child. + chld, ok := pn.children[segs[0]] + if !ok { + return 0, errInvalid + } + closest := chld.offset + // try recursive path + if len(segs) > 1 { + co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:])) + if err == nil { + return co, err + } + } + // find our next sibling + var next uint64 = math.MaxUint64 + var nc *pathNode + for _, v := range pn.children { + if v.offset > closest && v.offset < next { + next = v.offset + nc = v + } + } + if nc != nil { + return nc.offset, nil + } + + return 0, errInvalid +} + +// TraverseResumer allows resuming a progress from a previously encountered path in the selector. +type TraverseResumer interface { + RewindToPath(from datamodel.Path) error + RewindToOffset(offset uint64) error + Position() uint64 +} + +type traversalState struct { + wrappedLinksystem *linking.LinkSystem + lsCounter *loader.Counter + blockNumber int + pathOrder map[int]datamodel.Path + pathTree *pathNode + rewindPathTarget *datamodel.Path + rewindOffsetTarget uint64 + pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block. + progress *traversal.Progress +} + +func (ts *traversalState) RewindToPath(from datamodel.Path) error { + if ts.progress == nil { + return nil + } + // reset progress and traverse until target. + ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) + ts.blockNumber = 0 + ts.pendingBlockStart = ts.lsCounter.Size() + ts.lsCounter.TotalRead = 0 + ts.rewindPathTarget = &from + return nil +} + +func (ts *traversalState) RewindToOffset(offset uint64) error { + if ts.progress == nil { + return nil + } + // no-op + if ts.lsCounter.Size() == offset { + return nil + } + // reset progress and traverse until target. + ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) + ts.blockNumber = 0 + ts.pendingBlockStart = ts.lsCounter.Size() + ts.lsCounter.TotalRead = 0 + ts.rewindOffsetTarget = offset + return nil +} + +func (ts *traversalState) Position() uint64 { + return ts.lsCounter.Size() +} + +func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { + // when not in replay mode, we track metadata + if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 { + ts.pathOrder[ts.blockNumber] = lc.LinkPath + ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) + ts.blockNumber++ + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // if we reach the target, we exit replay mode (by removing target) + if ts.rewindPathTarget != nil && lc.LinkPath.String() == ts.rewindPathTarget.String() { + ts.rewindPathTarget = nil + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // if we're at the rewind offset target, we exit replay mode + if ts.rewindOffsetTarget != 0 && ts.lsCounter.Size() >= ts.rewindOffsetTarget { + ts.rewindOffsetTarget = 0 + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // when replaying path, we skip links not of our direct ancestor, + // and add all links on the path under them as 'seen' + if ts.rewindPathTarget != nil { + targetSegments := ts.rewindPathTarget.Segments() + seg := lc.LinkPath.Segments() + for i, s := range seg { + if i >= len(targetSegments) { + break + } + if targetSegments[i].String() != s.String() { + links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1])) + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + var err error + ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) + if err == errInvalid { + ts.lsCounter.TotalRead = ts.pendingBlockStart + } else if err != nil { + // total read is now invalid, sadly + return nil, err + } + return nil, traversal.SkipMe{} + } + } + } + if ts.rewindOffsetTarget != 0 { + links := ts.pathTree.getLinks(lc.LinkPath) + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + var err error + ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath) + if err == errInvalid { + ts.lsCounter.TotalRead = ts.pendingBlockStart + } else if err != nil { + return nil, err + } + return nil, traversal.SkipMe{} + } + + // descend. + return ts.wrappedLinksystem.StorageReadOpener(lc, l) +} + +// WithTraversingLinksystem extends a progress for traversal such that it can +// subsequently resume and perform subsets of the walk efficiently from +// an arbitrary position within the selector traversal. +func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) { + wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem) + ts := &traversalState{ + wrappedLinksystem: &wls, + lsCounter: ctr.(*loader.Counter), + pathOrder: make(map[int]datamodel.Path), + pathTree: newPath(nil, 0), + progress: p, + } + p.Cfg.LinkSystem.StorageReadOpener = ts.traverse + return ts, nil +} diff --git a/v2/traversal/resumption_test.go b/v2/traversal/resumption_test.go new file mode 100644 index 00000000..d5976a4f --- /dev/null +++ b/v2/traversal/resumption_test.go @@ -0,0 +1,241 @@ +package traversal_test + +import ( + "errors" + "testing" + + cartraversal "github.com/ipld/go-car/v2/traversal" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/fluent" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/storage/memstore" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" +) + +var store = memstore.Store{} +var ( + // baguqeeyexkjwnfy + leafAlpha, leafAlphaLnk = encode(basicnode.NewString("alpha")) + // baguqeeyeqvc7t3a + leafBeta, leafBetaLnk = encode(basicnode.NewString("beta")) + // baguqeeyezhlahvq + middleMapNode, middleMapNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 3, func(na fluent.MapAssembler) { + na.AssembleEntry("foo").AssignBool(true) + na.AssembleEntry("bar").AssignBool(false) + na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { + na.AssembleEntry("alink").AssignLink(leafAlphaLnk) + na.AssembleEntry("nonlink").AssignString("zoo") + }) + })) + // baguqeeyehfkkfwa + middleListNode, middleListNodeLnk = encode(fluent.MustBuildList(basicnode.Prototype.List, 4, func(na fluent.ListAssembler) { + na.AssembleValue().AssignLink(leafAlphaLnk) + na.AssembleValue().AssignLink(leafAlphaLnk) + na.AssembleValue().AssignLink(leafBetaLnk) + na.AssembleValue().AssignLink(leafAlphaLnk) + })) + // note that using `rootNode` directly will have a different field ordering than + // the encoded form if you were to load `rootNodeLnk` due to dag-json field + // reordering on encode, beware the difference for traversal order between + // created, in-memory nodes and those that have passed through a codec with + // field ordering rules + // baguqeeyeie4ajfy + rootNode, rootNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { + na.AssembleEntry("plain").AssignString("olde string") + na.AssembleEntry("linkedString").AssignLink(leafAlphaLnk) + na.AssembleEntry("linkedMap").AssignLink(middleMapNodeLnk) + na.AssembleEntry("linkedList").AssignLink(middleListNodeLnk) + })) +) + +// encode hardcodes some encoding choices for ease of use in fixture generation; +// just gimme a link and stuff the bytes in a map. +// (also return the node again for convenient assignment.) +func encode(n datamodel.Node) (datamodel.Node, datamodel.Link) { + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ + Version: 1, + Codec: 0x0129, + MhType: 0x13, + MhLength: 4, + }} + lsys := cidlink.DefaultLinkSystem() + lsys.SetWriteStorage(&store) + + lnk, err := lsys.Store(linking.LinkContext{}, lp, n) + if err != nil { + panic(err) + } + return n, lnk +} + +func TestWalkResumeByPath(t *testing.T) { + seen := 0 + count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected total traversal to visit 14 nodes, got %d", seen) + } + + // resume from beginning. + resumer.RewindToPath(datamodel.NewPath(nil)) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected resumed traversal to visit 14 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // one less: will not visit 'linkedString' before linked map. + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedList")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // will not visit 'linkedString' or 'linkedMap' before linked list. + if seen != 7 { + t.Fatalf("expected resumed traversal to visit 7 nodes, got %d", seen) + } +} + +func TestWalkResumeByPathPartialWalk(t *testing.T) { + seen := 0 + limit := 0 + countUntil := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + if seen >= limit { + return traversal.SkipMe{} + } + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + limit = 9 + if err := p.WalkAdv(rootNode, s, countUntil); !errors.Is(err, traversal.SkipMe{}) { + t.Fatal(err) + } + if seen != limit { + t.Fatalf("expected partial traversal, got %d", seen) + } + + // resume. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + limit = 14 + if err := p.WalkAdv(rootNode, s, countUntil); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } +} + +func TestWalkResumeByOffset(t *testing.T) { + seen := 0 + count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected total traversal to visit 14 nodes, got %d", seen) + } + + // resume from beginning. + resumer.RewindToOffset(0) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected resumed traversal to visit 14 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToOffset(10) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToOffset(50) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // will not visit 'linkedString' or 'linkedMap' before linked list. + if seen != 7 { + t.Fatalf("expected resumed traversal to visit 7 nodes, got %d", seen) + } +} From 06197c53406553c495f4252904ac01945ad7614f Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 18 Apr 2022 10:37:02 +0200 Subject: [PATCH 04/11] static check --- v2/traversal/resumption_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/v2/traversal/resumption_test.go b/v2/traversal/resumption_test.go index d5976a4f..58350b62 100644 --- a/v2/traversal/resumption_test.go +++ b/v2/traversal/resumption_test.go @@ -21,11 +21,11 @@ import ( var store = memstore.Store{} var ( // baguqeeyexkjwnfy - leafAlpha, leafAlphaLnk = encode(basicnode.NewString("alpha")) + _, leafAlphaLnk = encode(basicnode.NewString("alpha")) // baguqeeyeqvc7t3a - leafBeta, leafBetaLnk = encode(basicnode.NewString("beta")) + _, leafBetaLnk = encode(basicnode.NewString("beta")) // baguqeeyezhlahvq - middleMapNode, middleMapNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 3, func(na fluent.MapAssembler) { + _, middleMapNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 3, func(na fluent.MapAssembler) { na.AssembleEntry("foo").AssignBool(true) na.AssembleEntry("bar").AssignBool(false) na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { @@ -34,7 +34,7 @@ var ( }) })) // baguqeeyehfkkfwa - middleListNode, middleListNodeLnk = encode(fluent.MustBuildList(basicnode.Prototype.List, 4, func(na fluent.ListAssembler) { + _, middleListNodeLnk = encode(fluent.MustBuildList(basicnode.Prototype.List, 4, func(na fluent.ListAssembler) { na.AssembleValue().AssignLink(leafAlphaLnk) na.AssembleValue().AssignLink(leafAlphaLnk) na.AssembleValue().AssignLink(leafBetaLnk) @@ -46,7 +46,7 @@ var ( // created, in-memory nodes and those that have passed through a codec with // field ordering rules // baguqeeyeie4ajfy - rootNode, rootNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { + rootNode, _ = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { na.AssembleEntry("plain").AssignString("olde string") na.AssembleEntry("linkedString").AssignLink(leafAlphaLnk) na.AssembleEntry("linkedMap").AssignLink(middleMapNodeLnk) From 23b28c4169ad6a509f3a50b9710245ce21b62ab7 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 7 Jun 2022 18:14:18 +0530 Subject: [PATCH 05/11] skip offset should be an opt --- v2/options.go | 9 +++++++++ v2/selective.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/v2/options.go b/v2/options.go index aa21969c..d5c3af2b 100644 --- a/v2/options.go +++ b/v2/options.go @@ -61,6 +61,7 @@ type Options struct { WriteAsCarV1 bool TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser DataPayloadSize uint64 + SkipOffset uint64 MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 @@ -98,6 +99,14 @@ func ZeroLengthSectionAsEOF(enable bool) Option { } } +// WithSkipOffset sets the start offset we should seek to the in the traversal +// when writing out a CAR. +func WithSkipOffset(skip uint64) Option { + return func(o *Options) { + o.SkipOffset = skip + } +} + // UseDataPadding sets the padding to be added between CARv2 header and its data payload on Finalize. func UseDataPadding(p uint64) Option { return func(o *Options) { diff --git a/v2/selective.go b/v2/selective.go index a08537d7..620f9dcb 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -138,7 +138,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector opts: conf, } - len, _, err := tc.WriteV1(tc.ctx, 0, writer) + len, _, err := tc.WriteV1(tc.ctx, conf.SkipOffset, writer) return len, err } From e64285513b36cfa728aa65aa0eb6f83bd9523127 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 7 Jun 2022 19:46:18 +0530 Subject: [PATCH 06/11] Update options.go --- v2/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/options.go b/v2/options.go index d5c3af2b..4ccd289d 100644 --- a/v2/options.go +++ b/v2/options.go @@ -100,7 +100,7 @@ func ZeroLengthSectionAsEOF(enable bool) Option { } // WithSkipOffset sets the start offset we should seek to the in the traversal -// when writing out a CAR. +// when writing out a CAR. This option only applies to the selective and traversal writer. func WithSkipOffset(skip uint64) Option { return func(o *Options) { o.SkipOffset = skip From 5ee6f81f042050b2eee58991f75cd5b3cfbbd861 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 4 Aug 2022 15:38:49 +0200 Subject: [PATCH 07/11] more efficient + clearer teeing writer --- v2/internal/loader/writing_loader.go | 73 +++++++++++++++++----------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index 9387d210..11803780 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -12,7 +12,8 @@ import ( "github.com/multiformats/go-varint" ) -type writerOutput struct { +// indexingWriter wraps an io.Writer with metadata of the index of the car written to it. +type indexingWriter struct { w io.Writer size uint64 toSkip uint64 @@ -20,15 +21,16 @@ type writerOutput struct { rcrds map[cid.Cid]index.Record } -func (w *writerOutput) Size() uint64 { +func (w *indexingWriter) Size() uint64 { return w.size } -func (w *writerOutput) Index() (index.Index, error) { +func (w *indexingWriter) Index() (index.Index, error) { idx, err := index.New(w.code) if err != nil { return nil, err } + // todo: maybe keep both a map and a list proactively for efficiency here. rcrds := make([]index.Record, 0, len(w.rcrds)) for _, r := range w.rcrds { rcrds = append(rcrds, r) @@ -47,39 +49,50 @@ type IndexTracker interface { Index() (index.Index, error) } +var _ IndexTracker = (*indexingWriter)(nil) + type writingReader struct { r io.Reader - len int64 + buf []byte cid string - wo *writerOutput + wo *indexingWriter } func (w *writingReader) Read(p []byte) (int, error) { - buf := bytes.NewBuffer(nil) if w.wo != nil { - // write the cid - size := varint.ToUvarint(uint64(w.len) + uint64(len(w.cid))) - if _, err := buf.Write(size); err != nil { + // build the buffer of size:cid:block if we don't have it yet. + buf := bytes.NewBuffer(nil) + // allocate space for size + _, err := buf.Write(make([]byte, varint.MaxLenUvarint63)) + if err != nil { return 0, err } + // write the cid if _, err := buf.Write([]byte(w.cid)); err != nil { return 0, err } - cpy := bytes.NewBuffer(w.r.(*bytes.Buffer).Bytes()) - if _, err := cpy.WriteTo(buf); err != nil { + // write the block + n, err := io.Copy(buf, w.r) + if err != nil { return 0, err } - out := buf.Bytes() + sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) + writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] + w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] + _ = copy(writeBuf[:], sizeBytes) + + size := len(writeBuf) if w.wo.toSkip > 0 { - if w.wo.toSkip >= uint64(len(out)) { - w.wo.toSkip -= uint64(len(out)) - out = []byte{} + if w.wo.toSkip >= uint64(len(writeBuf)) { + w.wo.toSkip -= uint64(len(writeBuf)) + writeBuf = []byte{} } else { - out = out[w.wo.toSkip:] + writeBuf = writeBuf[w.wo.toSkip:] w.wo.toSkip = 0 } } - if _, err := bytes.NewBuffer(out).WriteTo(w.wo.w); err != nil { + + if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil { return 0, err } _, c, err := cid.CidFromBytes([]byte(w.cid)) @@ -90,11 +103,19 @@ func (w *writingReader) Read(p []byte) (int, error) { Cid: c, Offset: w.wo.size, } - w.wo.size += uint64(w.len) + uint64(len(size)+len(w.cid)) - + w.wo.size += uint64(size) w.wo = nil } + if w.buf != nil { + n, err := bytes.NewBuffer(w.buf).Read(p) + if err != nil { + return n, err + } + w.buf = w.buf[n:] + return n, err + } + return w.r.Read(p) } @@ -105,7 +126,7 @@ func (w *writingReader) Read(p []byte) (int, error) { // included in the `.Size()` of the IndexTracker. // An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets. func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { - wo := writerOutput{ + iw := indexingWriter{ w: w, size: initialOffset, toSkip: skip, @@ -121,7 +142,7 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ski } // if we've already read this cid in this session, don't re-write it. - if _, ok := wo.rcrds[c]; ok { + if _, ok := iw.rcrds[c]; ok { return ls.StorageReadOpener(lc, l) } @@ -129,12 +150,8 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ski if err != nil { return nil, err } - buf := bytes.NewBuffer(nil) - n, err := buf.ReadFrom(r) - if err != nil { - return nil, err - } - return &writingReader{buf, n, l.Binary(), &wo}, nil + + return &writingReader{r, nil, l.Binary(), &iw}, nil } - return tls, &wo + return tls, &iw } From 78b25a203a889d5000793c0de1c672ebd2e1af72 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 4 Aug 2022 16:25:54 +0200 Subject: [PATCH 08/11] test demonstrating arbitrary offsets --- v2/selective_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/v2/selective_test.go b/v2/selective_test.go index 924351d4..a387e890 100644 --- a/v2/selective_test.go +++ b/v2/selective_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "math/rand" "os" "path" "testing" @@ -135,3 +136,42 @@ func TestPartialTraversal(t *testing.T) { } require.Equal(t, 2, len(fnd)) } + +func TestOffsetWrites(t *testing.T) { + store := cidlink.Memory{Bag: make(map[string][]byte)} + ls := cidlink.DefaultLinkSystem() + ls.StorageReadOpener = store.OpenRead + ls.StorageWriteOpener = store.OpenWrite + unixfsnode.AddUnixFSReificationToLinkSystem(&ls) + + // write a unixfs file. + initBuf := bytes.Buffer{} + data := make([]byte, 1000000) + _, _ = rand.Read(data) + _, _ = initBuf.Write(data) + rootCid, _, err := builder.BuildUnixFSFile(&initBuf, "", &ls) + require.NoError(t, err) + _, rts, err := cid.CidFromBytes([]byte(rootCid.Binary())) + require.NoError(t, err) + + // get the full car buffer. + fullBuf := bytes.Buffer{} + _, err = car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &fullBuf) + require.NoError(t, err) + + for i := uint64(1); i < 1000; i += 1 { + buf := bytes.Buffer{} + + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } + + for i := uint64(1000); i < 1000000; i += 1000 { + buf := bytes.Buffer{} + + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } +} From c3789f2a88121ce87920975a78c4aaeb066796c1 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 4 Aug 2022 16:29:56 +0200 Subject: [PATCH 09/11] rename per code review --- v2/selective.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/selective.go b/v2/selective.go index 620f9dcb..9f128525 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -142,8 +142,8 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector return len, err } -// CreateV1Reader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. -func CreateV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { +// NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. +func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ From e9c7d8a4dc3333280e67a515804609b68da6f2f7 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 10 Aug 2022 20:26:57 +1000 Subject: [PATCH 10/11] feat: make SkipWriterReaderSeeker an io.Closer --- v2/internal/io/skip_writer_read_seeker.go | 31 ++++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go index 42437b78..3e7382e2 100644 --- a/v2/internal/io/skip_writer_read_seeker.go +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -25,6 +25,7 @@ type SkipWriterReaderSeeker struct { type ReWriter func(ctx context.Context, skip uint64, w io.Writer) (uint64, error) var _ io.ReadSeeker = (*SkipWriterReaderSeeker)(nil) +var _ io.Closer = (*SkipWriterReaderSeeker)(nil) // NewSkipWriterReaderSeeker creates an io.ReadSeeker around a ReWriter. func NewSkipWriterReaderSeeker(ctx context.Context, size uint64, cons ReWriter) *SkipWriterReaderSeeker { @@ -37,7 +38,7 @@ func NewSkipWriterReaderSeeker(ctx context.Context, size uint64, cons ReWriter) } // Note: not threadsafe -func (c *SkipWriterReaderSeeker) Read(p []byte) (n int, err error) { +func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) { // Check if there's already a write in progress if c.reader == nil { // No write in progress, start a new write from the current offset @@ -90,19 +91,25 @@ func (c *SkipWriterReaderSeeker) Seek(offset int64, whence int) (int64, error) { // Cancel any ongoing write and wait for it to complete // TODO: if we're fast-forwarding with 'SeekCurrent', we may be able to read from the current reader instead. if c.reader != nil { - c.writeCancel() - - // Seek and Read should not be called concurrently so this is safe - c.reader.Close() - - select { - case <-c.parentCtx.Done(): - return 0, c.parentCtx.Err() - case <-c.writeComplete: - } - + err := c.Close() c.reader = nil + if err != nil { + return 0, err + } } return int64(c.offset), nil } + +func (c *SkipWriterReaderSeeker) Close() error { + c.writeCancel() + // Seek and Read should not be called concurrently so this is safe + c.reader.Close() + + select { + case <-c.parentCtx.Done(): + return c.parentCtx.Err() + case <-c.writeComplete: + } + return nil +} From 1f6d4980c81afdf4f70606f7a6b92f6587466898 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 12 Aug 2022 15:20:08 +1000 Subject: [PATCH 11/11] fix: minor doc fixes, updates and improvements from review --- v2/internal/io/skip_writer_read_seeker.go | 2 +- v2/internal/loader/writing_loader.go | 39 ++++++++++++++++------- v2/selective.go | 20 ++++++++++-- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go index 3e7382e2..eb8ed2c2 100644 --- a/v2/internal/io/skip_writer_read_seeker.go +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -42,7 +42,7 @@ func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) { // Check if there's already a write in progress if c.reader == nil { // No write in progress, start a new write from the current offset - // in a go routine + // in a go routine and feed it back to the caller via a pipe writeCtx, writeCancel := context.WithCancel(c.parentCtx) c.writeCancel = writeCancel pr, pw := io.Pipe() diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index 11803780..c3a93083 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -51,15 +51,23 @@ type IndexTracker interface { var _ IndexTracker = (*indexingWriter)(nil) +// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it +// to intercept block reads and construct CAR section output for that block, passing that section data to +// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which +// we expect to be a traversal operation). +// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we +// construct the CAR section data and decide when and how much to write out to the indexingWriter. +// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's +// written out via the indexingWriter. type writingReader struct { r io.Reader buf []byte cid string - wo *indexingWriter + iw *indexingWriter } func (w *writingReader) Read(p []byte) (int, error) { - if w.wo != nil { + if w.iw != nil { // build the buffer of size:cid:block if we don't have it yet. buf := bytes.NewBuffer(nil) // allocate space for size @@ -76,38 +84,45 @@ func (w *writingReader) Read(p []byte) (int, error) { if err != nil { return 0, err } + // write the varint size prefix and trim the unneeded prefix padding we allocated sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] _ = copy(writeBuf[:], sizeBytes) size := len(writeBuf) - if w.wo.toSkip > 0 { - if w.wo.toSkip >= uint64(len(writeBuf)) { - w.wo.toSkip -= uint64(len(writeBuf)) + // indexingWriter manages state for a skip operation, but we have to mutate it here - + // if there are still bytes to skip, then we either need to skip over this whole block, or pass + // part of it on, and then update the toSkip state + if w.iw.toSkip > 0 { + if w.iw.toSkip >= uint64(len(writeBuf)) { + w.iw.toSkip -= uint64(len(writeBuf)) + // will cause the WriteTo() below to be a noop, we need to skip this entire block writeBuf = []byte{} } else { - writeBuf = writeBuf[w.wo.toSkip:] - w.wo.toSkip = 0 + writeBuf = writeBuf[w.iw.toSkip:] + w.iw.toSkip = 0 } } - if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil { + if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.iw.w); err != nil { return 0, err } _, c, err := cid.CidFromBytes([]byte(w.cid)) if err != nil { return 0, err } - w.wo.rcrds[c] = index.Record{ + w.iw.rcrds[c] = index.Record{ Cid: c, - Offset: w.wo.size, + Offset: w.iw.size, } - w.wo.size += uint64(size) - w.wo = nil + w.iw.size += uint64(size) + w.iw = nil } if w.buf != nil { + // we've already read the block from the parent reader for writing the CAR block section (above), + // so we need to pass those bytes on in whatever chunk size the caller wants n, err := bytes.NewBuffer(w.buf).Read(p) if err != nil { return n, err diff --git a/v2/selective.go b/v2/selective.go index 9f128525..f1d0932f 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -44,6 +44,11 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } // WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance. +// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from +// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start). +// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In +// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't +// match the expected set with this option. func WithDataPayloadSize(size uint64) Option { return func(sco *Options) { sco.DataPayloadSize = size @@ -142,8 +147,12 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector return len, err } -// NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. -func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { +// NewSelectiveV1Reader creates an io.ReadSeeker that can be used to stream a +// CARv1 given a LinkSystem, root CID and a selector. If Seek() is used, the +// output will only be given from that point in the resulting CAR. Where the +// size of the CAR is known ahead of time and provided via the +// WithDataPayloadSize option, seeking from the end of the CAR is permissible. +func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ @@ -155,6 +164,11 @@ func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid opts: conf, } rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { + // it's only at this point we have the `offset` to start writing at since the user of the + // ReadSeeker will (may) have called Seek() and we've worked out where in the CAR + // that is as an offset. Now we can start writing the CARv1 data, which will be passed + // on to the ReadSeeker. + // Note that we're inside a goroutine here s, _, err := tc.WriteV1(ctx, offset, writer) return s, err } @@ -275,7 +289,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( skip -= v1Size } - // write the block. + // write the blocks wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) if err = tc.setup(ctx, &wls, tc.opts); err != nil { return v1Size, nil, err