diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index dce41f51627..50ea3443868 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -449,6 +449,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi } } +// FetchGraphConcurrency is total number of concurrenct fetches that +// 'fetchNodes' will start at a time +var FetchGraphConcurrency = 8 + func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) { var wg sync.WaitGroup defer func() { @@ -458,8 +462,13 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out ch close(out) }() + rateLimit := make(chan struct{}, FetchGraphConcurrency) + get := func(ks []*cid.Cid) { defer wg.Done() + defer func() { + <-rateLimit + }() nodes := ds.GetMany(ctx, ks) for opt := range nodes { select { @@ -471,6 +480,11 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out ch } for ks := range in { + select { + case rateLimit <- struct{}{}: + case <-ctx.Done(): + return + } wg.Add(1) go get(ks) }