diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go index 3e7382e2..eb8ed2c2 100644 --- a/v2/internal/io/skip_writer_read_seeker.go +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -42,7 +42,7 @@ func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) { // Check if there's already a write in progress if c.reader == nil { // No write in progress, start a new write from the current offset - // in a go routine + // in a go routine and feed it back to the caller via a pipe writeCtx, writeCancel := context.WithCancel(c.parentCtx) c.writeCancel = writeCancel pr, pw := io.Pipe() diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index 11803780..c3a93083 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -51,15 +51,23 @@ type IndexTracker interface { var _ IndexTracker = (*indexingWriter)(nil) +// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it +// to intercept block reads and construct CAR section output for that block, passing that section data to +// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which +// we expect to be a traversal operation). +// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we +// construct the CAR section data and decide when and how much to write out to the indexingWriter. +// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's +// written out via the indexingWriter. type writingReader struct { r io.Reader buf []byte cid string - wo *indexingWriter + iw *indexingWriter } func (w *writingReader) Read(p []byte) (int, error) { - if w.wo != nil { + if w.iw != nil { // build the buffer of size:cid:block if we don't have it yet. buf := bytes.NewBuffer(nil) // allocate space for size @@ -76,38 +84,45 @@ func (w *writingReader) Read(p []byte) (int, error) { if err != nil { return 0, err } + // write the varint size prefix and trim the unneeded prefix padding we allocated sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] _ = copy(writeBuf[:], sizeBytes) size := len(writeBuf) - if w.wo.toSkip > 0 { - if w.wo.toSkip >= uint64(len(writeBuf)) { - w.wo.toSkip -= uint64(len(writeBuf)) + // indexingWriter manages state for a skip operation, but we have to mutate it here - + // if there are still bytes to skip, then we either need to skip over this whole block, or pass + // part of it on, and then update the toSkip state + if w.iw.toSkip > 0 { + if w.iw.toSkip >= uint64(len(writeBuf)) { + w.iw.toSkip -= uint64(len(writeBuf)) + // will cause the WriteTo() below to be a noop, we need to skip this entire block writeBuf = []byte{} } else { - writeBuf = writeBuf[w.wo.toSkip:] - w.wo.toSkip = 0 + writeBuf = writeBuf[w.iw.toSkip:] + w.iw.toSkip = 0 } } - if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil { + if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.iw.w); err != nil { return 0, err } _, c, err := cid.CidFromBytes([]byte(w.cid)) if err != nil { return 0, err } - w.wo.rcrds[c] = index.Record{ + w.iw.rcrds[c] = index.Record{ Cid: c, - Offset: w.wo.size, + Offset: w.iw.size, } - w.wo.size += uint64(size) - w.wo = nil + w.iw.size += uint64(size) + w.iw = nil } if w.buf != nil { + // we've already read the block from the parent reader for writing the CAR block section (above), + // so we need to pass those bytes on in whatever chunk size the caller wants n, err := bytes.NewBuffer(w.buf).Read(p) if err != nil { return n, err diff --git a/v2/selective.go b/v2/selective.go index 9f128525..f1d0932f 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -44,6 +44,11 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } // WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance. +// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from +// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start). +// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In +// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't +// match the expected set with this option. func WithDataPayloadSize(size uint64) Option { return func(sco *Options) { sco.DataPayloadSize = size @@ -142,8 +147,12 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector return len, err } -// NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car. -func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { +// NewSelectiveV1Reader creates an io.ReadSeeker that can be used to stream a +// CARv1 given a LinkSystem, root CID and a selector. If Seek() is used, the +// output will only be given from that point in the resulting CAR. Where the +// size of the CAR is known ahead of time and provided via the +// WithDataPayloadSize option, seeking from the end of the CAR is permissible. +func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) tc := traversalCar{ @@ -155,6 +164,11 @@ func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid opts: conf, } rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { + // it's only at this point we have the `offset` to start writing at since the user of the + // ReadSeeker will (may) have called Seek() and we've worked out where in the CAR + // that is as an offset. Now we can start writing the CARv1 data, which will be passed + // on to the ReadSeeker. + // Note that we're inside a goroutine here s, _, err := tc.WriteV1(ctx, offset, writer) return s, err } @@ -275,7 +289,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( skip -= v1Size } - // write the block. + // write the blocks wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) if err = tc.setup(ctx, &wls, tc.opts); err != nil { return v1Size, nil, err