Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: minor doc fixes, updates and improvements from review #328

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v2/internal/io/skip_writer_read_seeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) {
// Check if there's already a write in progress
if c.reader == nil {
// No write in progress, start a new write from the current offset
// in a go routine
// in a go routine and feed it back to the caller via a pipe
writeCtx, writeCancel := context.WithCancel(c.parentCtx)
c.writeCancel = writeCancel
pr, pw := io.Pipe()
Expand Down
39 changes: 27 additions & 12 deletions v2/internal/loader/writing_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,23 @@ type IndexTracker interface {

var _ IndexTracker = (*indexingWriter)(nil)

// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it
// to intercept block reads and construct CAR section output for that block, passing that section data to
// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which
// we expect to be a traversal operation).
// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we
// construct the CAR section data and decide when and how much to write out to the indexingWriter.
// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's
// written out via the indexingWriter.
type writingReader struct {
r io.Reader
buf []byte
cid string
wo *indexingWriter
iw *indexingWriter
}

func (w *writingReader) Read(p []byte) (int, error) {
if w.wo != nil {
if w.iw != nil {
// build the buffer of size:cid:block if we don't have it yet.
buf := bytes.NewBuffer(nil)
// allocate space for size
Expand All @@ -76,38 +84,45 @@ func (w *writingReader) Read(p []byte) (int, error) {
if err != nil {
return 0, err
}
// write the varint size prefix and trim the unneeded prefix padding we allocated
sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid)))
writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):]
w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):]
_ = copy(writeBuf[:], sizeBytes)

size := len(writeBuf)
if w.wo.toSkip > 0 {
if w.wo.toSkip >= uint64(len(writeBuf)) {
w.wo.toSkip -= uint64(len(writeBuf))
// indexingWriter manages state for a skip operation, but we have to mutate it here -
// if there are still bytes to skip, then we either need to skip over this whole block, or pass
// part of it on, and then update the toSkip state
if w.iw.toSkip > 0 {
if w.iw.toSkip >= uint64(len(writeBuf)) {
w.iw.toSkip -= uint64(len(writeBuf))
// will cause the WriteTo() below to be a noop, we need to skip this entire block
writeBuf = []byte{}
} else {
writeBuf = writeBuf[w.wo.toSkip:]
w.wo.toSkip = 0
writeBuf = writeBuf[w.iw.toSkip:]
w.iw.toSkip = 0
}
}

if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil {
if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.iw.w); err != nil {
return 0, err
}
_, c, err := cid.CidFromBytes([]byte(w.cid))
if err != nil {
return 0, err
}
w.wo.rcrds[c] = index.Record{
w.iw.rcrds[c] = index.Record{
Cid: c,
Offset: w.wo.size,
Offset: w.iw.size,
}
w.wo.size += uint64(size)
w.wo = nil
w.iw.size += uint64(size)
w.iw = nil
}

if w.buf != nil {
// we've already read the block from the parent reader for writing the CAR block section (above),
// so we need to pass those bytes on in whatever chunk size the caller wants
n, err := bytes.NewBuffer(w.buf).Read(p)
if err != nil {
return n, err
Expand Down
20 changes: 17 additions & 3 deletions v2/selective.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option {
}

// WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance.
// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from
// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start).
// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In
// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't
// match the expected set with this option.
func WithDataPayloadSize(size uint64) Option {
return func(sco *Options) {
sco.DataPayloadSize = size
Expand Down Expand Up @@ -142,8 +147,12 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector
return len, err
}

// NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car.
func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) {
// NewSelectiveV1Reader creates an io.ReadSeeker that can be used to stream a
// CARv1 given a LinkSystem, root CID and a selector. If Seek() is used, the
// output will only be given from that point in the resulting CAR. Where the
// size of the CAR is known ahead of time and provided via the
// WithDataPayloadSize option, seeking from the end of the CAR is permissible.
func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) {
opts = append(opts, WithoutIndex())
conf := ApplyOptions(opts...)
tc := traversalCar{
Expand All @@ -155,6 +164,11 @@ func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid
opts: conf,
}
rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) {
// it's only at this point we have the `offset` to start writing at since the user of the
// ReadSeeker will (may) have called Seek() and we've worked out where in the CAR
// that is as an offset. Now we can start writing the CARv1 data, which will be passed
// on to the ReadSeeker.
// Note that we're inside a goroutine here
s, _, err := tc.WriteV1(ctx, offset, writer)
return s, err
}
Expand Down Expand Up @@ -275,7 +289,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (
skip -= v1Size
}

// write the block.
// write the blocks
wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec)
if err = tc.setup(ctx, &wls, tc.opts); err != nil {
return v1Size, nil, err
Expand Down