diff --git a/v2/options.go b/v2/options.go index 1c9e6a18..ce8054bd 100644 --- a/v2/options.go +++ b/v2/options.go @@ -8,6 +8,7 @@ import ( "github.com/multiformats/go-multicodec" "github.com/ipld/go-car/v2/internal/carv1" + resumetraversal "github.com/ipld/go-car/v2/traversal" ) // DefaultMaxIndexCidSize specifies the maximum size in byptes accepted as a section CID by CARv2 index. @@ -62,6 +63,7 @@ type Options struct { DataPayloadSize uint64 SkipOffset uint64 TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser + TraversalResumerPathState resumetraversal.TraversalResumerPathState MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 @@ -182,3 +184,17 @@ func MaxAllowedSectionSize(max uint64) Option { o.MaxAllowedSectionSize = max } } + +// WithTraversalResumerPathState provides a custom TraversalResumerPathState +// that can be reused between selective CAR creations where traversals may need +// to be resumed at arbitrary points within the DAG. +// +// A TraversalResumerPathState shared across multiple traversals using the same +// selector and DAG will yield the same state. This allows us to resume at +// arbitrary points within in the DAG and load the minimal additional blocks +// required to resume the traversal at that point. +func WithTraversalResumerPathState(pathState resumetraversal.TraversalResumerPathState) Option { + return func(o *Options) { + o.TraversalResumerPathState = pathState + } +} diff --git a/v2/selective.go b/v2/selective.go index 9f128525..d699d33f 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -321,7 +321,11 @@ func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Opt } ls.TrustedStorage = true - resumer, err := resumetraversal.WithTraversingLinksystem(&progress) + pathState := opts.TraversalResumerPathState + if pathState == nil { + pathState = resumetraversal.NewTraversalResumerPathState() + } + resumer, err := resumetraversal.WithTraversingLinksystem(&progress, pathState) if err != nil { return err } diff --git a/v2/traversal/resumption.go b/v2/traversal/resumption.go index d88eead8..d832d560 100644 --- a/v2/traversal/resumption.go +++ b/v2/traversal/resumption.go @@ -17,12 +17,41 @@ import ( "github.com/ipld/go-ipld-prime/traversal" ) +// TraverseResumer allows resuming a progress from a previously encountered path in the selector. +type TraverseResumer interface { + RewindToPath(from datamodel.Path) error + RewindToOffset(offset uint64) error + Position() uint64 +} + +// TraversalResumerPathState tracks a traversal state for the purpose of +// building a CAR. For each block in the CAR it tracks the path to that block, +// the Link of the block and where in the CAR the block is located. +// +// A TraversalResumerPathState shared across multiple traversals using the same +// selector and DAG will yield the same state. This allows us to resume at +// arbitrary points within in the DAG and load the minimal additional blocks +// required to resume the traversal at that point. +type TraversalResumerPathState interface { + AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64) + GetLinks(root datamodel.Path) []datamodel.Link + GetOffsetAfter(root datamodel.Path) (uint64, error) +} + type pathNode struct { link datamodel.Link offset uint64 children map[datamodel.PathSegment]*pathNode } +// NewTraversalResumerPathState creates a new TraversalResumerPathState. +// +// Note that the TraversalResumerPathState returned by this factory is not +// thread-safe. +func NewTraversalResumerPathState() TraversalResumerPathState { + return newPath(nil, 0) +} + func newPath(link datamodel.Link, at uint64) *pathNode { return &pathNode{ link: link, @@ -31,15 +60,15 @@ func newPath(link datamodel.Link, at uint64) *pathNode { } } -func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) { +func (pn pathNode) AddPath(p []datamodel.PathSegment, link datamodel.Link, atOffset uint64) { if len(p) == 0 { return } if _, ok := pn.children[p[0]]; !ok { - child := newPath(link, at) + child := newPath(link, atOffset) pn.children[p[0]] = child } - pn.children[p[0]].addPath(p[1:], link, at) + pn.children[p[0]].AddPath(p[1:], link, atOffset) } func (pn pathNode) allLinks() []datamodel.Link { @@ -57,7 +86,7 @@ func (pn pathNode) allLinks() []datamodel.Link { } // getPaths returns reconstructed paths in the tree rooted at 'root' -func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { +func (pn pathNode) GetLinks(root datamodel.Path) []datamodel.Link { segs := root.Segments() switch len(segs) { case 0: @@ -80,12 +109,12 @@ func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { // base case 2: not registered sub-path. return []datamodel.Link{} } - return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:])) + return pn.children[next].GetLinks(datamodel.NewPathNocopy(segs[1:])) } var errInvalid = fmt.Errorf("invalid path") -func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { +func (pn pathNode) GetOffsetAfter(root datamodel.Path) (uint64, error) { // we look for offset of next sibling. // if no next sibling recurse up the path segments until we find a next sibling. segs := root.Segments() @@ -100,7 +129,7 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { closest := chld.offset // try recursive path if len(segs) > 1 { - co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:])) + co, err := chld.GetOffsetAfter(datamodel.NewPathNocopy(segs[1:])) if err == nil { return co, err } @@ -121,35 +150,28 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { return 0, errInvalid } -// TraverseResumer allows resuming a progress from a previously encountered path in the selector. -type TraverseResumer interface { - RewindToPath(from datamodel.Path) error - RewindToOffset(offset uint64) error - Position() uint64 -} - type traversalState struct { wrappedLinksystem *linking.LinkSystem lsCounter *loader.Counter - blockNumber int - pathOrder map[int]datamodel.Path - pathTree *pathNode + pathTree TraversalResumerPathState rewindPathTarget *datamodel.Path rewindOffsetTarget uint64 pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block. progress *traversal.Progress } +var _ TraverseResumer = (*traversalState)(nil) + func (ts *traversalState) RewindToPath(from datamodel.Path) error { if ts.progress == nil { return nil } // reset progress and traverse until target. ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) - ts.blockNumber = 0 ts.pendingBlockStart = ts.lsCounter.Size() ts.lsCounter.TotalRead = 0 ts.rewindPathTarget = &from + ts.rewindOffsetTarget = 0 return nil } @@ -163,10 +185,10 @@ func (ts *traversalState) RewindToOffset(offset uint64) error { } // reset progress and traverse until target. ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) - ts.blockNumber = 0 ts.pendingBlockStart = ts.lsCounter.Size() ts.lsCounter.TotalRead = 0 ts.rewindOffsetTarget = offset + ts.rewindPathTarget = nil return nil } @@ -177,9 +199,7 @@ func (ts *traversalState) Position() uint64 { func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { // when not in replay mode, we track metadata if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 { - ts.pathOrder[ts.blockNumber] = lc.LinkPath - ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) - ts.blockNumber++ + ts.pathTree.AddPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) return ts.wrappedLinksystem.StorageReadOpener(lc, l) } @@ -205,12 +225,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read break } if targetSegments[i].String() != s.String() { - links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1])) + links := ts.pathTree.GetLinks(datamodel.NewPathNocopy(seg[0 : i+1])) for _, l := range links { ts.progress.SeenLinks[l] = struct{}{} } var err error - ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) + ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) if err == errInvalid { ts.lsCounter.TotalRead = ts.pendingBlockStart } else if err != nil { @@ -222,12 +242,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read } } if ts.rewindOffsetTarget != 0 { - links := ts.pathTree.getLinks(lc.LinkPath) + links := ts.pathTree.GetLinks(lc.LinkPath) for _, l := range links { ts.progress.SeenLinks[l] = struct{}{} } var err error - ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath) + ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(lc.LinkPath) if err == errInvalid { ts.lsCounter.TotalRead = ts.pendingBlockStart } else if err != nil { @@ -243,13 +263,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read // WithTraversingLinksystem extends a progress for traversal such that it can // subsequently resume and perform subsets of the walk efficiently from // an arbitrary position within the selector traversal. -func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) { +func WithTraversingLinksystem(p *traversal.Progress, pathState TraversalResumerPathState) (TraverseResumer, error) { wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem) ts := &traversalState{ wrappedLinksystem: &wls, lsCounter: ctr.(*loader.Counter), - pathOrder: make(map[int]datamodel.Path), - pathTree: newPath(nil, 0), + pathTree: pathState, progress: p, } p.Cfg.LinkSystem.StorageReadOpener = ts.traverse diff --git a/v2/traversal/resumption_test.go b/v2/traversal/resumption_test.go index 58350b62..62722129 100644 --- a/v2/traversal/resumption_test.go +++ b/v2/traversal/resumption_test.go @@ -89,7 +89,7 @@ func TestWalkResumeByPath(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState()) if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestWalkResumeByPathPartialWalk(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState()) if err != nil { t.Fatal(err) } @@ -195,7 +195,7 @@ func TestWalkResumeByOffset(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState()) if err != nil { t.Fatal(err) }