From 0eda172e3af7811b91a6eab9581ddf404503fbd6 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 12 Aug 2022 15:20:08 +1000 Subject: [PATCH] fix: minor doc fixes, updates and improvements from review --- v2/internal/io/skip_writer_read_seeker.go | 2 +- v2/internal/loader/writing_loader.go | 17 ++++++++++++++++- v2/selective.go | 14 ++++++++++++-- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go index 3e7382e2..eb8ed2c2 100644 --- a/v2/internal/io/skip_writer_read_seeker.go +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -42,7 +42,7 @@ func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) { // Check if there's already a write in progress if c.reader == nil { // No write in progress, start a new write from the current offset - // in a go routine + // in a go routine and feed it back to the caller via a pipe writeCtx, writeCancel := context.WithCancel(c.parentCtx) c.writeCancel = writeCancel pr, pw := io.Pipe() diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index 11803780..b07d34b3 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -51,11 +51,19 @@ type IndexTracker interface { var _ IndexTracker = (*indexingWriter)(nil) +// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it +// to intercept block reads and construct CAR section output for that block, passing that section data to +// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which +// we expect to be a traversal operation). +// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we +// construct the CAR section data and decide when and how much to write out to the indexingWriter. +// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's +// written out via the indexingWriter. type writingReader struct { r io.Reader buf []byte cid string - wo *indexingWriter + iw *indexingWriter } func (w *writingReader) Read(p []byte) (int, error) { @@ -76,15 +84,20 @@ func (w *writingReader) Read(p []byte) (int, error) { if err != nil { return 0, err } + // write the varint size prefix and trim the unneeded prefix padding we allocated sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] _ = copy(writeBuf[:], sizeBytes) size := len(writeBuf) + // indexingWriter manages state for a skip operation, but we have to mutate it here - + // if there are still bytes to skip, then we either need to skip over this whole block, or pass + // part of it on, and then update the toSkip state if w.wo.toSkip > 0 { if w.wo.toSkip >= uint64(len(writeBuf)) { w.wo.toSkip -= uint64(len(writeBuf)) + // will cause the WriteTo() below to be a noop, we need to skip this entire block writeBuf = []byte{} } else { writeBuf = writeBuf[w.wo.toSkip:] @@ -108,6 +121,8 @@ func (w *writingReader) Read(p []byte) (int, error) { } if w.buf != nil { + // we've already read the block from the parent reader for writing the CAR block section (above), + // so we need to pass those bytes on in whatever chunk size the caller wants n, err := bytes.NewBuffer(w.buf).Read(p) if err != nil { return n, err diff --git a/v2/selective.go b/v2/selective.go index 9f128525..38b95aa7 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -44,6 +44,11 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } // WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance. +// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from +// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start). +// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In +// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't +// match the expected set with this option. func WithDataPayloadSize(size uint64) Option { return func(sco *Options) { sco.DataPayloadSize = size @@ -143,7 +148,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector } // NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. -func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { +func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ @@ -155,6 +160,11 @@ func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid opts: conf, } rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { + // it's only at this point we have the `offset` to start writing at since the user of the + // ReadSeeker will (may) have called Seek() and we've worked out where in the CAR + // that is as an offset. Now we can start writing the CARv1 data, which will be passed + // on to the ReadSeeker. + // Note that we're inside a goroutine here s, _, err := tc.WriteV1(ctx, offset, writer) return s, err } @@ -275,7 +285,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( skip -= v1Size } - // write the block. + // write the blocks wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) if err = tc.setup(ctx, &wls, tc.opts); err != nil { return v1Size, nil, err