Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
cleanup and switch to errgrp
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Oct 28, 2021
1 parent 29ffa00 commit e4d2cd8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 58 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
)

go 1.16
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
98 changes: 40 additions & 58 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"os"
"sync"

"golang.org/x/sync/errgroup"

format "github.com/ipfs/go-unixfs"
"github.com/ipfs/go-unixfs/internal"

Expand Down Expand Up @@ -438,91 +440,71 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro

func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error {
const concurrency = 32
visit := cid.NewSet().Visit

var visitlk sync.Mutex
visitSet := cid.NewSet()
visit := visitSet.Visit

type shardCidUnion struct {
cid cid.Cid
shard *Shard
}

// Setup synchronization
grp, errGrpCtx := errgroup.WithContext(ctx)

// Input and output queues for workers.
feed := make(chan *shardCidUnion)
out := make(chan *listCidShardUnion)
done := make(chan struct{})

var visitlk sync.Mutex
var wg sync.WaitGroup

errChan := make(chan error)
fetchersCtx, cancel := context.WithCancel(ctx)
defer wg.Wait()
defer cancel()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for cdepth := range feed {
grp.Go(func() error {
for shardOrCID := range feed {
var shouldVisit bool

if cdepth.shard != nil {
if shardOrCID.shard != nil {
shouldVisit = true
} else {
visitlk.Lock()
shouldVisit = visit(cdepth.cid)
shouldVisit = visit(shardOrCID.cid)
visitlk.Unlock()
}

if shouldVisit {
var nextShard *Shard
if cdepth.shard != nil {
nextShard = cdepth.shard
if shardOrCID.shard != nil {
nextShard = shardOrCID.shard
} else {
nd, err := dserv.Get(ctx, cdepth.cid)
nd, err := dserv.Get(ctx, shardOrCID.cid)
if err != nil {
if err != nil {
select {
case errChan <- err:
case <-fetchersCtx.Done():
}
return
}
return err
}
nextShard, err = NewHamtFromDag(dserv, nd)
if err != nil {
if err != nil {
if err != nil {
select {
case errChan <- err:
case <-fetchersCtx.Done():
}
return
}
}
return err
}
}

nextLinks, err := nextShard.walkLinks(processShardValues)
if err != nil {
select {
case errChan <- err:
case <-fetchersCtx.Done():
}
return
return err
}

select {
case out <- nextLinks:
case <-fetchersCtx.Done():
return
case <-errGrpCtx.Done():
return nil
}
}
select {
case done <- struct{}{}:
case <-fetchersCtx.Done():
case <-errGrpCtx.Done():
}
}
}()
return nil
})
}
defer close(feed)

send := feed
var todoQueue []*shardCidUnion
Expand All @@ -532,6 +514,7 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService,
shard: root,
}

dispatcherLoop:
for {
select {
case send <- next:
Expand All @@ -546,40 +529,39 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService,
case <-done:
inProgress--
if inProgress == 0 && next == nil {
return nil
break dispatcherLoop
}
case linksDepth := <-out:
for _, c := range linksDepth.links {
cd := &shardCidUnion{
case nextNodes := <-out:
for _, c := range nextNodes.links {
shardOrCid := &shardCidUnion{
cid: c,
}

if next == nil {
next = cd
next = shardOrCid
send = feed
} else {
todoQueue = append(todoQueue, cd)
todoQueue = append(todoQueue, shardOrCid)
}
}
for _, shard := range linksDepth.shards {
cd := &shardCidUnion{
for _, shard := range nextNodes.shards {
shardOrCid := &shardCidUnion{
shard: shard,
}

if next == nil {
next = cd
next = shardOrCid
send = feed
} else {
todoQueue = append(todoQueue, cd)
todoQueue = append(todoQueue, shardOrCid)
}
}
case err := <-errChan:
return err

case <-ctx.Done():
return ctx.Err()
case <-errGrpCtx.Done():
break dispatcherLoop
}
}
close(feed)
return grp.Wait()
}

func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
Expand Down

0 comments on commit e4d2cd8

Please sign in to comment.