From 0c14b4162ea3c4f07aea90dd78bc92cf067a0faa Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 9 Dec 2016 14:58:12 -0800 Subject: [PATCH] merkledag: add a concurrency limit to merkledag fetch graph License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index dce41f51627..50ea3443868 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -449,6 +449,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi } } +// FetchGraphConcurrency is total number of concurrenct fetches that +// 'fetchNodes' will start at a time +var FetchGraphConcurrency = 8 + func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) { var wg sync.WaitGroup defer func() { @@ -458,8 +462,13 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out ch close(out) }() + rateLimit := make(chan struct{}, FetchGraphConcurrency) + get := func(ks []*cid.Cid) { defer wg.Done() + defer func() { + <-rateLimit + }() nodes := ds.GetMany(ctx, ks) for opt := range nodes { select { @@ -471,6 +480,11 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out ch } for ks := range in { + select { + case rateLimit <- struct{}{}: + case <-ctx.Done(): + return + } wg.Add(1) go get(ks) }