-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propose traversal-based car creation #269
Merged
Merged
Changes from 1 commit
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
7d4502e
Propose traversal-based car creation
willscott 44245ff
file mode - traverses once and fixes up carv2 header at the end
willscott 642b02b
add traversal budget option and respect set options on creation
willscott df2a186
* Handle the various possible car writer options.
willscott 6d41ba2
[fixup] options test
willscott ac5d450
Add round-trip test
willscott f53e09c
cleaner loaders
willscott 4076b7a
renames / comments per review
willscott 21907ee
fix test
willscott 0b392bc
additional testing as requested
willscott File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
package car | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
|
||
"github.com/ipfs/go-cid" | ||
"github.com/ipld/go-car/v2/internal/carv1" | ||
ipld "github.com/ipld/go-ipld-prime" | ||
"github.com/ipld/go-ipld-prime/linking" | ||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
"github.com/ipld/go-ipld-prime/node/basicnode" | ||
"github.com/ipld/go-ipld-prime/traversal" | ||
"github.com/ipld/go-ipld-prime/traversal/selector" | ||
"github.com/multiformats/go-varint" | ||
) | ||
|
||
// PrepareTraversal walks through the proposed dag traversal to learn it's total size in order to be able to | ||
// stream out a car to a writer in the expected traversal order in one go. | ||
func PrepareTraversal(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Traversal, error) { | ||
cls, cntr := countingLinkSystem(*ls) | ||
|
||
c1h := carv1.CarHeader{Roots: []cid.Cid{root}, Version: 1} | ||
headSize, err := carv1.HeaderSize(&c1h) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if err := traverse(ctx, &cls, root, selector, opts...); err != nil { | ||
return nil, err | ||
} | ||
tc := traversalCar{ | ||
size: headSize + cntr.totalRead, | ||
ctx: ctx, | ||
root: root, | ||
selector: selector, | ||
ls: ls, | ||
opts: opts, | ||
} | ||
return &tc, nil | ||
} | ||
|
||
// Traversal is a allows writing a car with the data specified by a selector. | ||
type Traversal interface { | ||
io.WriterTo | ||
} | ||
|
||
type traversalCar struct { | ||
willscott marked this conversation as resolved.
Show resolved
Hide resolved
|
||
size uint64 | ||
ctx context.Context | ||
root cid.Cid | ||
selector ipld.Node | ||
ls *ipld.LinkSystem | ||
opts []Option | ||
willscott marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { | ||
n, err := w.Write(Pragma) | ||
if err != nil { | ||
return int64(n), err | ||
} | ||
h, err := tc.WriteHeader(w) | ||
if err != nil { | ||
return int64(n) + h, err | ||
} | ||
h += int64(n) | ||
|
||
// write the v1 header | ||
c1h := carv1.CarHeader{Roots: []cid.Cid{tc.root}, Version: 1} | ||
if err := carv1.WriteHeader(&c1h, w); err != nil { | ||
return h, err | ||
} | ||
hn, err := carv1.HeaderSize(&c1h) | ||
h += int64(hn) | ||
if err != nil { | ||
return h, err | ||
} | ||
|
||
// write the block. | ||
wls, writer := teeingLinkSystem(*tc.ls, w) | ||
err = traverse(tc.ctx, &wls, tc.root, tc.selector, tc.opts...) | ||
h += int64(writer.size) | ||
if err != nil { | ||
return h, err | ||
} | ||
|
||
return h, nil | ||
} | ||
|
||
func (tc *traversalCar) WriteHeader(w io.Writer) (int64, error) { | ||
h := NewHeader(tc.size) | ||
// TODO: support calculation / inclusion of the index. | ||
h.IndexOffset = 0 | ||
return h.WriteTo(w) | ||
} | ||
|
||
type counter struct { | ||
totalRead uint64 | ||
} | ||
|
||
type countingReader struct { | ||
r io.Reader | ||
c *counter | ||
read uint64 | ||
cid string | ||
} | ||
|
||
func (c *countingReader) Read(p []byte) (int, error) { | ||
n, err := c.r.Read(p) | ||
if err == io.EOF { | ||
// add in the overall length of the block. | ||
n += len(c.cid) | ||
uv := varint.ToUvarint(uint64(n)) | ||
n += len(uv) | ||
} | ||
c.c.totalRead += uint64(n) | ||
c.read += uint64(n) | ||
return n, err | ||
} | ||
|
||
func countingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, *counter) { | ||
c := counter{} | ||
return linking.LinkSystem{ | ||
EncoderChooser: ls.EncoderChooser, | ||
DecoderChooser: ls.DecoderChooser, | ||
HasherChooser: ls.HasherChooser, | ||
StorageWriteOpener: ls.StorageWriteOpener, | ||
StorageReadOpener: func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { | ||
r, err := ls.StorageReadOpener(lc, l) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &countingReader{r, &c, 0, l.Binary()}, nil | ||
}, | ||
TrustedStorage: ls.TrustedStorage, | ||
NodeReifier: ls.NodeReifier, | ||
}, &c | ||
} | ||
|
||
func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Node, opts ...Option) error { | ||
sel, err := selector.CompileSelector(s) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
progress := traversal.Progress{ | ||
Cfg: &traversal.Config{ | ||
Ctx: ctx, | ||
LinkSystem: *ls, | ||
LinkTargetNodePrototypeChooser: func(_ ipld.Link, _ linking.LinkContext) (ipld.NodePrototype, error) { | ||
return basicnode.Prototype.Any, nil | ||
}, | ||
LinkVisitOnlyOnce: true, // TODO: from opts, | ||
}, | ||
} | ||
|
||
lnk := cidlink.Link{Cid: root} | ||
rootNode, err := ls.Load(ipld.LinkContext{}, lnk, basicnode.Prototype.Any) | ||
if err != nil { | ||
return err | ||
} | ||
return progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, _ ipld.Node) error { | ||
return nil | ||
}) | ||
} | ||
|
||
type writerOutput struct { | ||
w io.Writer | ||
size uint64 | ||
} | ||
|
||
type writingReader struct { | ||
r io.Reader | ||
len int64 | ||
cid string | ||
wo *writerOutput | ||
} | ||
|
||
func (w *writingReader) Read(p []byte) (int, error) { | ||
if w.wo != nil { | ||
// write the cid | ||
size := varint.ToUvarint(uint64(w.len)) | ||
if _, err := w.wo.w.Write(size); err != nil { | ||
return 0, err | ||
} | ||
if _, err := w.wo.w.Write([]byte(w.cid)); err != nil { | ||
return 0, err | ||
} | ||
cpy := bytes.NewBuffer(w.r.(*bytes.Buffer).Bytes()) | ||
if _, err := cpy.WriteTo(w.wo.w); err != nil { | ||
return 0, err | ||
} | ||
w.wo = nil | ||
} | ||
|
||
return w.r.Read(p) | ||
} | ||
|
||
func teeingLinkSystem(ls ipld.LinkSystem, w io.Writer) (ipld.LinkSystem, *writerOutput) { | ||
wo := writerOutput{w, 0} | ||
return linking.LinkSystem{ | ||
EncoderChooser: ls.EncoderChooser, | ||
DecoderChooser: ls.DecoderChooser, | ||
HasherChooser: ls.HasherChooser, | ||
StorageWriteOpener: ls.StorageWriteOpener, | ||
StorageReadOpener: func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { | ||
r, err := ls.StorageReadOpener(lc, l) | ||
if err != nil { | ||
return nil, err | ||
} | ||
buf := bytes.NewBuffer(nil) | ||
n, err := buf.ReadFrom(r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &writingReader{buf, n, l.Binary(), &wo}, nil | ||
}, | ||
TrustedStorage: ls.TrustedStorage, | ||
NodeReifier: ls.NodeReifier, | ||
}, &wo | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lotus evolved away from needing this, so I don't think there's a consumer for prepare+dump anymore, it's just write now, thankfully. There used to be a need to get the size up-front to set up the commp calculation with padding, but that's not necessary anymore. So you could decomplicate this by removing this functionality if you like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to write out to a stream - e.g. network, this 2x scan would still be more efficient than having to write it out to a file, touch up the header, and then as a second step send the whole thing, i think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, so this would be for a CARv2 format and you need the offset and all that? Because for a selector-based CARv1 you have the root already, or else you can't run the traversal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is to get the data size field correct in the carv2 header when streaming.
I can make a carv1-only stream version that can do it in one pass as well. I suppose that's probably useful as well.
This comment was marked as resolved.
Sorry, something went wrong.