Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

feat: hamt enumlinks custom #111

Merged
merged 10 commits into from
Nov 12, 2021
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-bitfield v1.0.0
github.com/ipfs/go-bitswap v0.1.2 // indirect
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.0
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.3
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
Expand All @@ -21,6 +26,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
)

go 1.16
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
200 changes: 167 additions & 33 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"context"
"fmt"
"os"
"sync"

"golang.org/x/sync/errgroup"

format "github.com/ipfs/go-unixfs"
"github.com/ipfs/go-unixfs/internal"
Expand Down Expand Up @@ -372,59 +375,190 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
rootNode, err := ds.Node()
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return
}
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent())

err := parallelShardWalk(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error {
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
return nil
})
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
}()
return linkResults
}

// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {

return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
if err != nil {
return nil, err
}
directoryShard, err := NewHamtFromDag(dagService, node)
if err != nil {
return nil, err
}
type listCidsAndShards struct {
cids []cid.Cid
shards []*Shard
}

childShards := make([]*ipld.Link, 0, directoryShard.childer.length())
links := directoryShard.childer.links
for idx := range directoryShard.childer.children {
lnk := links[idx]
lnkLinkType, err := directoryShard.childLinkType(lnk)
func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) {
res := &listCidsAndShards{}

for idx, lnk := range ds.childer.links {
if nextShard := ds.childer.children[idx]; nextShard == nil {
lnkLinkType, err := ds.childLinkType(lnk)
if err != nil {
return nil, err
}
if lnkLinkType == shardLink {
childShards = append(childShards, lnk)
} else {
sv, err := directoryShard.makeShardValue(lnk)

switch lnkLinkType {
case shardValueLink:
sv, err := ds.makeShardValue(lnk)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})

if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
case shardLink:
res.cids = append(res.cids, lnk.Cid)
default:
return nil, fmt.Errorf("unsupported shard link type")
}

} else {
if nextShard.val != nil {
formattedLink := &ipld.Link{
Name: nextShard.key,
Size: nextShard.val.Size,
Cid: nextShard.val.Cid,
}
if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
} else {
res.shards = append(res.shards, nextShard)
}
}
}
return res, nil
}

// parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464
// However, there are a few notable differences:
// 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks
// 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany.
// This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG.
// This is particularly helpful for operations like estimating the directory size which should complete quickly when possible.
// 3. None of the extra options from that package are needed
func parallelShardWalk(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error {
const concurrency = 32

var visitlk sync.Mutex
visitSet := cid.NewSet()
visit := visitSet.Visit

// Setup synchronization
grp, errGrpCtx := errgroup.WithContext(ctx)

// Input and output queues for workers.
feed := make(chan *listCidsAndShards)
out := make(chan *listCidsAndShards)
done := make(chan struct{})

for i := 0; i < concurrency; i++ {
grp.Go(func() error {
for feedChildren := range feed {
for _, nextShard := range feedChildren.shards {
nextChildren, err := nextShard.walkChildren(processShardValues)
if err != nil {
return err
}

select {
case out <- nextChildren:
case <-errGrpCtx.Done():
return nil
}
}

var linksToVisit []cid.Cid
for _, nextCid := range feedChildren.cids {
var shouldVisit bool

visitlk.Lock()
shouldVisit = visit(nextCid)
visitlk.Unlock()

if shouldVisit {
linksToVisit = append(linksToVisit, nextCid)
}
}

chNodes := dserv.GetMany(errGrpCtx, linksToVisit)
for optNode := range chNodes {
if optNode.Err != nil {
return optNode.Err
}

nextShard, err := NewHamtFromDag(dserv, optNode.Node)
if err != nil {
return err
}

nextChildren, err := nextShard.walkChildren(processShardValues)
if err != nil {
return err
}

select {
case out <- nextChildren:
case <-errGrpCtx.Done():
return nil
}
}

select {
case done <- struct{}{}:
case <-errGrpCtx.Done():
}
}
return nil
})
}

send := feed
var todoQueue []*listCidsAndShards
var inProgress int

next := &listCidsAndShards{
shards: []*Shard{root},
}

dispatcherLoop:
for {
select {
case send <- next:
inProgress++
if len(todoQueue) > 0 {
next = todoQueue[0]
todoQueue = todoQueue[1:]
} else {
next = nil
send = nil
}
case <-done:
inProgress--
if inProgress == 0 && next == nil {
break dispatcherLoop
}
case nextNodes := <-out:
if next == nil {
next = nextNodes
send = feed
} else {
todoQueue = append(todoQueue, nextNodes)
}
case <-errGrpCtx.Done():
break dispatcherLoop
}
return childShards, nil
}
close(feed)
return grp.Wait()
}

func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
Expand Down
Loading