From 49314cf842b70a69b15830901541f82562549762 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 27 Oct 2021 14:42:35 -0400 Subject: [PATCH 01/10] feat: use custom dag traversal for HAMT link enumeration --- hamt/hamt.go | 211 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 203 insertions(+), 8 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index 74a8d2759..21cea4be6 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -24,6 +24,7 @@ import ( "context" "fmt" "os" + "sync" format "github.com/ipfs/go-unixfs" "github.com/ipfs/go-unixfs/internal" @@ -372,14 +373,11 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { go func() { defer close(linkResults) defer cancel() - getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) - cset := cid.NewSet() - rootNode, err := ds.Node() - if err != nil { - emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) - return - } - err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) + + err := parallelWalkDepth(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error { + emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + return nil + }) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } @@ -387,6 +385,203 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return linkResults } +type listCidShardUnion struct { + links []cid.Cid + shards []*Shard +} + +func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) error) (*listCidShardUnion, error) { + res := &listCidShardUnion{} + + for idx, lnk := range ds.childer.links { + if nextShard := ds.childer.children[idx]; nextShard == nil { + lnkLinkType, err := ds.childLinkType(lnk) + if err != nil { + return nil, err + } + + switch lnkLinkType { + case shardValueLink: + sv, err := ds.makeShardValue(lnk) + if err != nil { + return nil, err + } + formattedLink := sv.val + formattedLink.Name = sv.key + + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + case shardLink: + res.links = append(res.links, lnk.Cid) + default: + return nil, fmt.Errorf("unsupported shard link type") + } + + } else { + if nextShard.val != nil { + formattedLink := &ipld.Link{ + Name: nextShard.key, + Size: nextShard.val.Size, + Cid: nextShard.val.Cid, + } + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + } else { + res.shards = append(res.shards, nextShard) + } + } + } + return res, nil +} + +func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { + const concurrency = 32 + visit := cid.NewSet().Visit + + type shardCidUnion struct { + cid cid.Cid + shard *Shard + } + + feed := make(chan *shardCidUnion) + out := make(chan *listCidShardUnion) + done := make(chan struct{}) + + var visitlk sync.Mutex + var wg sync.WaitGroup + + errChan := make(chan error) + fetchersCtx, cancel := context.WithCancel(ctx) + defer wg.Wait() + defer cancel() + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for cdepth := range feed { + var shouldVisit bool + + if cdepth.shard != nil { + shouldVisit = true + } else { + visitlk.Lock() + shouldVisit = visit(cdepth.cid) + visitlk.Unlock() + } + + if shouldVisit { + var nextShard *Shard + if cdepth.shard != nil { + nextShard = cdepth.shard + } else { + nd, err := dserv.Get(ctx, cdepth.cid) + if err != nil { + if err != nil { + select { + case errChan <- err: + case <-fetchersCtx.Done(): + } + return + } + } + nextShard, err = NewHamtFromDag(dserv, nd) + if err != nil { + if err != nil { + if err != nil { + select { + case errChan <- err: + case <-fetchersCtx.Done(): + } + return + } + } + } + } + + nextLinks, err := nextShard.walkLinks(processShardValues) + if err != nil { + select { + case errChan <- err: + case <-fetchersCtx.Done(): + } + return + } + + select { + case out <- nextLinks: + case <-fetchersCtx.Done(): + return + } + } + select { + case done <- struct{}{}: + case <-fetchersCtx.Done(): + } + } + }() + } + defer close(feed) + + send := feed + var todoQueue []*shardCidUnion + var inProgress int + + next := &shardCidUnion{ + shard: root, + } + + for { + select { + case send <- next: + inProgress++ + if len(todoQueue) > 0 { + next = todoQueue[0] + todoQueue = todoQueue[1:] + } else { + next = nil + send = nil + } + case <-done: + inProgress-- + if inProgress == 0 && next == nil { + return nil + } + case linksDepth := <-out: + for _, c := range linksDepth.links { + cd := &shardCidUnion{ + cid: c, + } + + if next == nil { + next = cd + send = feed + } else { + todoQueue = append(todoQueue, cd) + } + } + for _, shard := range linksDepth.shards { + cd := &shardCidUnion{ + shard: shard, + } + + if next == nil { + next = cd + send = feed + } else { + todoQueue = append(todoQueue, cd) + } + } + case err := <-errChan: + return err + + case <-ctx.Done(): + return ctx.Err() + } + } +} + // makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync // to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called // on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation From 29ffa004db209e2d23de418fabefe74d8ff6fd99 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 27 Oct 2021 14:52:17 -0400 Subject: [PATCH 02/10] remove unused code --- hamt/hamt.go | 40 ---------------------------------------- 1 file changed, 40 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index 21cea4be6..3c5192ce9 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -582,46 +582,6 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, } } -// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync -// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called -// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation -func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks { - - return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { - node, err := dagService.Get(ctx, currentCid) - if err != nil { - return nil, err - } - directoryShard, err := NewHamtFromDag(dagService, node) - if err != nil { - return nil, err - } - - childShards := make([]*ipld.Link, 0, directoryShard.childer.length()) - links := directoryShard.childer.links - for idx := range directoryShard.childer.children { - lnk := links[idx] - lnkLinkType, err := directoryShard.childLinkType(lnk) - - if err != nil { - return nil, err - } - if lnkLinkType == shardLink { - childShards = append(childShards, lnk) - } else { - sv, err := directoryShard.makeShardValue(lnk) - if err != nil { - return nil, err - } - formattedLink := sv.val - formattedLink.Name = sv.key - emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) - } - } - return childShards, nil - } -} - func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) { // make sure that context cancel is processed first // the reason is due to the concurrency of EnumerateChildrenAsync From c930522c279f2f7eb6be5fb6b312e2a53c6c7d43 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 27 Oct 2021 20:19:24 -0400 Subject: [PATCH 03/10] cleanup and switch to errgrp --- go.mod | 1 + go.sum | 1 + hamt/hamt.go | 98 +++++++++++++++++++++------------------------------- 3 files changed, 42 insertions(+), 58 deletions(-) diff --git a/go.mod b/go.mod index aac677992..106041334 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.7.0 github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) go 1.16 diff --git a/go.sum b/go.sum index 5d067042f..ca32d1143 100644 --- a/go.sum +++ b/go.sum @@ -335,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/hamt/hamt.go b/hamt/hamt.go index 3c5192ce9..5d7221002 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -26,6 +26,8 @@ import ( "os" "sync" + "golang.org/x/sync/errgroup" + format "github.com/ipfs/go-unixfs" "github.com/ipfs/go-unixfs/internal" @@ -438,91 +440,71 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { const concurrency = 32 - visit := cid.NewSet().Visit + + var visitlk sync.Mutex + visitSet := cid.NewSet() + visit := visitSet.Visit type shardCidUnion struct { cid cid.Cid shard *Shard } + // Setup synchronization + grp, errGrpCtx := errgroup.WithContext(ctx) + + // Input and output queues for workers. feed := make(chan *shardCidUnion) out := make(chan *listCidShardUnion) done := make(chan struct{}) - var visitlk sync.Mutex - var wg sync.WaitGroup - - errChan := make(chan error) - fetchersCtx, cancel := context.WithCancel(ctx) - defer wg.Wait() - defer cancel() for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for cdepth := range feed { + grp.Go(func() error { + for shardOrCID := range feed { var shouldVisit bool - if cdepth.shard != nil { + if shardOrCID.shard != nil { shouldVisit = true } else { visitlk.Lock() - shouldVisit = visit(cdepth.cid) + shouldVisit = visit(shardOrCID.cid) visitlk.Unlock() } if shouldVisit { var nextShard *Shard - if cdepth.shard != nil { - nextShard = cdepth.shard + if shardOrCID.shard != nil { + nextShard = shardOrCID.shard } else { - nd, err := dserv.Get(ctx, cdepth.cid) + nd, err := dserv.Get(ctx, shardOrCID.cid) if err != nil { - if err != nil { - select { - case errChan <- err: - case <-fetchersCtx.Done(): - } - return - } + return err } nextShard, err = NewHamtFromDag(dserv, nd) if err != nil { - if err != nil { - if err != nil { - select { - case errChan <- err: - case <-fetchersCtx.Done(): - } - return - } - } + return err } } nextLinks, err := nextShard.walkLinks(processShardValues) if err != nil { - select { - case errChan <- err: - case <-fetchersCtx.Done(): - } - return + return err } select { case out <- nextLinks: - case <-fetchersCtx.Done(): - return + case <-errGrpCtx.Done(): + return nil } } select { case done <- struct{}{}: - case <-fetchersCtx.Done(): + case <-errGrpCtx.Done(): } } - }() + return nil + }) } - defer close(feed) send := feed var todoQueue []*shardCidUnion @@ -532,6 +514,7 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, shard: root, } +dispatcherLoop: for { select { case send <- next: @@ -546,40 +529,39 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, case <-done: inProgress-- if inProgress == 0 && next == nil { - return nil + break dispatcherLoop } - case linksDepth := <-out: - for _, c := range linksDepth.links { - cd := &shardCidUnion{ + case nextNodes := <-out: + for _, c := range nextNodes.links { + shardOrCid := &shardCidUnion{ cid: c, } if next == nil { - next = cd + next = shardOrCid send = feed } else { - todoQueue = append(todoQueue, cd) + todoQueue = append(todoQueue, shardOrCid) } } - for _, shard := range linksDepth.shards { - cd := &shardCidUnion{ + for _, shard := range nextNodes.shards { + shardOrCid := &shardCidUnion{ shard: shard, } if next == nil { - next = cd + next = shardOrCid send = feed } else { - todoQueue = append(todoQueue, cd) + todoQueue = append(todoQueue, shardOrCid) } } - case err := <-errChan: - return err - - case <-ctx.Done(): - return ctx.Err() + case <-errGrpCtx.Done(): + break dispatcherLoop } } + close(feed) + return grp.Wait() } func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) { From 8051de7b1e92f07546d3a23578d4f7fdd674addf Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 28 Oct 2021 01:34:24 -0400 Subject: [PATCH 04/10] switch to GetMany in EnumLinksAsync --- hamt/hamt.go | 90 +++++++++++++++++++++++----------------------------- 1 file changed, 40 insertions(+), 50 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index 5d7221002..7dac3b18e 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -445,45 +445,52 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, visitSet := cid.NewSet() visit := visitSet.Visit - type shardCidUnion struct { - cid cid.Cid - shard *Shard - } - // Setup synchronization grp, errGrpCtx := errgroup.WithContext(ctx) // Input and output queues for workers. - feed := make(chan *shardCidUnion) + feed := make(chan *listCidShardUnion) out := make(chan *listCidShardUnion) done := make(chan struct{}) for i := 0; i < concurrency; i++ { grp.Go(func() error { for shardOrCID := range feed { - var shouldVisit bool + for _, nextShard := range shardOrCID.shards { + nextLinks, err := nextShard.walkLinks(processShardValues) + if err != nil { + return err + } + + select { + case out <- nextLinks: + case <-errGrpCtx.Done(): + return nil + } + } + + var linksToVisit []cid.Cid + for _, nextLink := range shardOrCID.links { + var shouldVisit bool - if shardOrCID.shard != nil { - shouldVisit = true - } else { visitlk.Lock() - shouldVisit = visit(shardOrCID.cid) + shouldVisit = visit(nextLink) visitlk.Unlock() + + if shouldVisit { + linksToVisit = append(linksToVisit, nextLink) + } } - if shouldVisit { - var nextShard *Shard - if shardOrCID.shard != nil { - nextShard = shardOrCID.shard - } else { - nd, err := dserv.Get(ctx, shardOrCID.cid) - if err != nil { - return err - } - nextShard, err = NewHamtFromDag(dserv, nd) - if err != nil { - return err - } + chNodes := dserv.GetMany(errGrpCtx, linksToVisit) + for optNode := range chNodes { + if optNode.Err != nil { + return optNode.Err + } + + nextShard, err := NewHamtFromDag(dserv, optNode.Node) + if err != nil { + return err } nextLinks, err := nextShard.walkLinks(processShardValues) @@ -497,6 +504,7 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, return nil } } + select { case done <- struct{}{}: case <-errGrpCtx.Done(): @@ -507,11 +515,11 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, } send := feed - var todoQueue []*shardCidUnion + var todoQueue []*listCidShardUnion var inProgress int - next := &shardCidUnion{ - shard: root, + next := &listCidShardUnion{ + shards: []*Shard{root}, } dispatcherLoop: @@ -532,29 +540,11 @@ dispatcherLoop: break dispatcherLoop } case nextNodes := <-out: - for _, c := range nextNodes.links { - shardOrCid := &shardCidUnion{ - cid: c, - } - - if next == nil { - next = shardOrCid - send = feed - } else { - todoQueue = append(todoQueue, shardOrCid) - } - } - for _, shard := range nextNodes.shards { - shardOrCid := &shardCidUnion{ - shard: shard, - } - - if next == nil { - next = shardOrCid - send = feed - } else { - todoQueue = append(todoQueue, shardOrCid) - } + if next == nil { + next = nextNodes + send = feed + } else { + todoQueue = append(todoQueue, nextNodes) } case <-errGrpCtx.Done(): break dispatcherLoop From af57e4b81cf94d39d5fdc741bee70098e6874fd4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 28 Oct 2021 01:35:06 -0400 Subject: [PATCH 05/10] switch sharding threshold test to work on the blockstore rather than dagservice layer --- go.mod | 5 ++++ io/directory_test.go | 66 ++++++++++++++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 106041334..d981b36f7 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,13 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/ipfs/go-bitfield v1.0.0 github.com/ipfs/go-bitswap v0.1.2 // indirect + github.com/ipfs/go-block-format v0.0.2 + github.com/ipfs/go-blockservice v0.1.0 github.com/ipfs/go-cid v0.0.7 + github.com/ipfs/go-datastore v0.0.5 + github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 + github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.3 github.com/ipfs/go-ipfs-posinfo v0.0.1 github.com/ipfs/go-ipfs-util v0.0.1 diff --git a/io/directory_test.go b/io/directory_test.go index 909f9b4fd..253e621e2 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -3,6 +3,12 @@ package io import ( "context" "fmt" + blocks "github.com/ipfs/go-block-format" + bsrv "github.com/ipfs/go-blockservice" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" "math" "sort" "strconv" @@ -358,8 +364,11 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // with a regular structure to be able to predict how many Shard nodes we // will need to fetch in order to reach the HAMTShardingSize threshold in // sizeBelowThreshold (assuming a sequential DAG walk function). - ds := mdtest.Mock() - completeHAMTRoot, err := CreateCompleteHAMT(ds, treeHeight, shardWidth) + + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + countGetsDS := newCountGetsDS(bstore) + dsrv := mdag.NewDAGService(bsrv.New(countGetsDS, offline.Exchange(countGetsDS))) + completeHAMTRoot, err := CreateCompleteHAMT(dsrv, treeHeight, shardWidth) assert.NoError(t, err) // With this structure and a BFS traversal (from `parallelWalkDepth`) then @@ -374,8 +383,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // the HAMTShardingSize threshold. nodesToFetch += thresholdToWidthRatio - countGetsDS := newCountGetsDS(ds) - hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot) + hamtDir, err := newHAMTDirectoryFromNode(dsrv, completeHAMTRoot) assert.NoError(t, err) countGetsDS.resetCounter() @@ -537,21 +545,23 @@ func newEmptyHAMTDirectory(dserv ipld.DAGService, shardWidth int) (*HAMTDirector // countGetsDS is a DAG service that keeps track of the number of // unique CIDs fetched. type countGetsDS struct { - ipld.DAGService + blockstore.Blockstore cidsFetched map[cid.Cid]struct{} mapLock sync.Mutex + started bool getRequestDelay time.Duration } -var _ ipld.DAGService = (*countGetsDS)(nil) +var _ blockstore.Blockstore = (*countGetsDS)(nil) -func newCountGetsDS(ds ipld.DAGService) *countGetsDS { +func newCountGetsDS(bs blockstore.Blockstore) *countGetsDS { return &countGetsDS{ - ds, + bs, make(map[cid.Cid]struct{}), sync.Mutex{}, + false, 0, } } @@ -560,6 +570,7 @@ func (d *countGetsDS) resetCounter() { d.mapLock.Lock() defer d.mapLock.Unlock() d.cidsFetched = make(map[cid.Cid]struct{}) + d.started = true } func (d *countGetsDS) uniqueCidsFetched() int { @@ -572,12 +583,7 @@ func (d *countGetsDS) setRequestDelay(timeout time.Duration) { d.getRequestDelay = timeout } -func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { - node, err := d.DAGService.Get(ctx, c) - if err != nil { - return nil, err - } - +func (d *countGetsDS) maybeSleep(c cid.Cid) { d.mapLock.Lock() _, cidRequestedBefore := d.cidsFetched[c] d.cidsFetched[c] = struct{}{} @@ -588,11 +594,35 @@ func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { // Subsequent requests get no timeout simulating an in-disk cache. time.Sleep(d.getRequestDelay) } +} + +func (d *countGetsDS) Has(c cid.Cid) (bool, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.Has(c) +} + +func (d *countGetsDS) Get(c cid.Cid) (blocks.Block, error) { + blk, err := d.Blockstore.Get(c) + if err != nil { + return nil, err + } - return node, nil + d.maybeSleep(c) + return blk, nil } -// Process sequentially (blocking) calling Get which tracks requests. -func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { - panic("GetMany not supported") +func (d *countGetsDS) GetSize(c cid.Cid) (int, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.GetSize(c) +} + +func (d *countGetsDS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.AllKeysChan(ctx) } From bec968935e929b7cef5c638cf65adfdea9ec9169 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 28 Oct 2021 14:35:33 -0300 Subject: [PATCH 06/10] gofmt --- io/directory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/directory_test.go b/io/directory_test.go index 253e621e2..d25e83db9 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -549,7 +549,7 @@ type countGetsDS struct { cidsFetched map[cid.Cid]struct{} mapLock sync.Mutex - started bool + started bool getRequestDelay time.Duration } From d0faeb3a6675eb69db654096bb5abbb5efbdddff Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 10 Nov 2021 17:23:46 -0500 Subject: [PATCH 07/10] refactor some names and add more comments --- hamt/hamt.go | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index 7dac3b18e..c3e7ce1a7 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -376,7 +376,7 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { defer close(linkResults) defer cancel() - err := parallelWalkDepth(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error { + err := parallelShardWalk(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error { emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) return nil }) @@ -387,13 +387,13 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return linkResults } -type listCidShardUnion struct { - links []cid.Cid +type listCidsAndShards struct { + cids []cid.Cid shards []*Shard } -func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) error) (*listCidShardUnion, error) { - res := &listCidShardUnion{} +func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) { + res := &listCidsAndShards{} for idx, lnk := range ds.childer.links { if nextShard := ds.childer.children[idx]; nextShard == nil { @@ -415,7 +415,7 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro return nil, err } case shardLink: - res.links = append(res.links, lnk.Cid) + res.cids = append(res.cids, lnk.Cid) default: return nil, fmt.Errorf("unsupported shard link type") } @@ -438,7 +438,14 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro return res, nil } -func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { +// parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464 +// However, there are a few notable differences: +// 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks +// 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany. +// This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG. +// This is particularly helpful for operations like estimating the directory size which should complete quickly when possible. +// 3. None of the extra options from that package are needed +func parallelShardWalk(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { const concurrency = 32 var visitlk sync.Mutex @@ -449,36 +456,36 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, grp, errGrpCtx := errgroup.WithContext(ctx) // Input and output queues for workers. - feed := make(chan *listCidShardUnion) - out := make(chan *listCidShardUnion) + feed := make(chan *listCidsAndShards) + out := make(chan *listCidsAndShards) done := make(chan struct{}) for i := 0; i < concurrency; i++ { grp.Go(func() error { - for shardOrCID := range feed { - for _, nextShard := range shardOrCID.shards { - nextLinks, err := nextShard.walkLinks(processShardValues) + for feedChildren := range feed { + for _, nextShard := range feedChildren.shards { + nextChildren, err := nextShard.walkChildren(processShardValues) if err != nil { return err } select { - case out <- nextLinks: + case out <- nextChildren: case <-errGrpCtx.Done(): return nil } } var linksToVisit []cid.Cid - for _, nextLink := range shardOrCID.links { + for _, nextCid := range feedChildren.cids { var shouldVisit bool visitlk.Lock() - shouldVisit = visit(nextLink) + shouldVisit = visit(nextCid) visitlk.Unlock() if shouldVisit { - linksToVisit = append(linksToVisit, nextLink) + linksToVisit = append(linksToVisit, nextCid) } } @@ -493,13 +500,13 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, return err } - nextLinks, err := nextShard.walkLinks(processShardValues) + nextChildren, err := nextShard.walkChildren(processShardValues) if err != nil { return err } select { - case out <- nextLinks: + case out <- nextChildren: case <-errGrpCtx.Done(): return nil } @@ -515,10 +522,10 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, } send := feed - var todoQueue []*listCidShardUnion + var todoQueue []*listCidsAndShards var inProgress int - next := &listCidShardUnion{ + next := &listCidsAndShards{ shards: []*Shard{root}, } From a99e1871f34a04d847feaec3d2a0c2b408db4011 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 11 Nov 2021 15:40:29 -0500 Subject: [PATCH 08/10] test: adjust TestHAMTEnumerationWhenComputingSize to allow for optimal fetching --- io/directory_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/io/directory_test.go b/io/directory_test.go index d25e83db9..6661982f8 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -371,6 +371,16 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { completeHAMTRoot, err := CreateCompleteHAMT(dsrv, treeHeight, shardWidth) assert.NoError(t, err) + // Calculate the optimal number of nodes to traverse + optimalNodesToFetch := 0 + nodesToProcess := HAMTShardingSize + for nodesToProcess > 1 { + // divide by the shard width to get the parents and continue up the tree + parentNodes := int(math.Ceil(float64(nodesToProcess) / float64(shardWidth))) + optimalNodesToFetch += parentNodes + nodesToProcess = parentNodes + } + // With this structure and a BFS traversal (from `parallelWalkDepth`) then // we would roughly fetch the following nodes: nodesToFetch := 0 @@ -396,12 +406,12 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) assert.False(t, below) t.Logf("fetched %d nodes (predicted range: %d-%d)", - countGetsDS.uniqueCidsFetched(), nodesToFetch, nodesToFetch+defaultConcurrentFetch) + countGetsDS.uniqueCidsFetched(), optimalNodesToFetch, nodesToFetch+defaultConcurrentFetch) // Check that the actual number of nodes fetched is within the margin of the // estimated `nodesToFetch` plus an extra of `defaultConcurrentFetch` since // we are fetching in parallel. assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch+defaultConcurrentFetch) - assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= optimalNodesToFetch) } // Compare entries in the leftDir against the rightDir and possibly From 2927cdcdd67289a2c3d5496c360088a6aa1059af Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 11 Nov 2021 19:07:28 -0500 Subject: [PATCH 09/10] fix TestHAMTEnumerationWhenComputingSize optimal size computation fix comments in completehamt_test.go --- io/completehamt_test.go | 3 ++- io/directory_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/io/completehamt_test.go b/io/completehamt_test.go index 1bb3d8720..7995ac1d7 100644 --- a/io/completehamt_test.go +++ b/io/completehamt_test.go @@ -21,7 +21,8 @@ import ( // * all leaf Shard nodes have the same depth (and have only 'value' links). // * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). // * the total number of 'value' links (directory entries) is: -// io.DefaultShardWidth ^ (treeHeight + 1). +// childsPerNode ^ (treeHeight). +// treeHeight: The number of layers of non-value HAMT nodes (e.g. height = 1 is a single shard pointing to some values) // FIXME: HAMTHashFunction needs to be set to idHash by the caller. We depend on // this simplification for the current logic to work. (HAMTHashFunction is a // global setting of the package, it is hard-coded in the serialized Shard node diff --git a/io/directory_test.go b/io/directory_test.go index 6661982f8..48f50b335 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -374,7 +374,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // Calculate the optimal number of nodes to traverse optimalNodesToFetch := 0 nodesToProcess := HAMTShardingSize - for nodesToProcess > 1 { + for i := 0; i < treeHeight-1; i++ { // divide by the shard width to get the parents and continue up the tree parentNodes := int(math.Ceil(float64(nodesToProcess) / float64(shardWidth))) optimalNodesToFetch += parentNodes From d9a5431f1da7192bbb17795923ddb970c82473c7 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 11 Nov 2021 19:10:17 -0500 Subject: [PATCH 10/10] chore: order deps --- io/directory_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/io/directory_test.go b/io/directory_test.go index 48f50b335..fd58438ed 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -3,12 +3,6 @@ package io import ( "context" "fmt" - blocks "github.com/ipfs/go-block-format" - bsrv "github.com/ipfs/go-blockservice" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" "math" "sort" "strconv" @@ -17,7 +11,13 @@ import ( "testing" "time" + blocks "github.com/ipfs/go-block-format" + bsrv "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" ipld "github.com/ipfs/go-ipld-format" mdag "github.com/ipfs/go-merkledag" mdtest "github.com/ipfs/go-merkledag/test"