From c2e647a1581108d8f5da479f0d550268a5712c46 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 12 Jun 2019 11:09:13 -0700 Subject: [PATCH 1/5] Add 3 minute timeout to Provide call --- simple/provider.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/simple/provider.go b/simple/provider.go index b7da4b2..8fa9c10 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -5,6 +5,7 @@ package simple import ( "context" + "time" cid "github.com/ipfs/go-cid" q "github.com/ipfs/go-ipfs-provider/queue" @@ -14,7 +15,10 @@ import ( var logP = logging.Logger("provider.simple") -const provideOutgoingWorkerLimit = 8 +const ( + provideOutgoingWorkerLimit = 8 + provideTimeout = 3 * time.Minute +) // Provider announces blocks to the network type Provider struct { @@ -60,8 +64,10 @@ func (p *Provider) handleAnnouncements() { case <-p.ctx.Done(): return case c := <-p.queue.Dequeue(): + ctx, cancel := context.WithTimeout(p.ctx, provideTimeout) + defer cancel() logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { + if err := p.contentRouting.Provide(ctx, c, true); err != nil { logP.Warningf("Unable to provide entry: %s, %s", c, err) } logP.Info("announce - end - ", c) From 8e30fff33017288e116ff62d25c1534b30e74634 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 12 Jun 2019 12:38:25 -0700 Subject: [PATCH 2/5] Make timeout configurable --- simple/provider.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/simple/provider.go b/simple/provider.go index 8fa9c10..3b4c882 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -7,17 +7,16 @@ import ( "context" "time" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" q "github.com/ipfs/go-ipfs-provider/queue" logging "github.com/ipfs/go-log" - routing "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-core/routing" ) var logP = logging.Logger("provider.simple") const ( provideOutgoingWorkerLimit = 8 - provideTimeout = 3 * time.Minute ) // Provider announces blocks to the network @@ -27,15 +26,31 @@ type Provider struct { queue *q.Queue // used to announce providing to the network contentRouting routing.ContentRouting + // how long to wait for announce to complete before giving up + timeout time.Duration +} + +type Option func(*Provider) + +func WithTimeout(timeout time.Duration) Option { + return func(p *Provider) { + p.timeout = timeout + } } // NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting) *Provider { - return &Provider{ +func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { + p := &Provider{ ctx: ctx, queue: queue, contentRouting: contentRouting, } + + for _, option := range options { + option(p) + } + + return p } // Close stops the provider @@ -64,7 +79,7 @@ func (p *Provider) handleAnnouncements() { case <-p.ctx.Done(): return case c := <-p.queue.Dequeue(): - ctx, cancel := context.WithTimeout(p.ctx, provideTimeout) + ctx, cancel := context.WithTimeout(p.ctx, p.timeout) defer cancel() logP.Info("announce - start - ", c) if err := p.contentRouting.Provide(ctx, c, true); err != nil { From c3bccce5e62351f4fb09763fbe7b325275b97f44 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 12 Jun 2019 12:43:47 -0700 Subject: [PATCH 3/5] Make worker count configurable --- simple/provider.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/simple/provider.go b/simple/provider.go index 3b4c882..9b374a0 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -15,10 +15,6 @@ import ( var logP = logging.Logger("provider.simple") -const ( - provideOutgoingWorkerLimit = 8 -) - // Provider announces blocks to the network type Provider struct { ctx context.Context @@ -28,22 +24,35 @@ type Provider struct { contentRouting routing.ContentRouting // how long to wait for announce to complete before giving up timeout time.Duration + // how many workers concurrently work through thhe queue + workerLimit int } +// Option defines the functional option type that can be used to configure +// provider instances type Option func(*Provider) +// WithTimeout is an option to set a timeout on a provider func WithTimeout(timeout time.Duration) Option { return func(p *Provider) { p.timeout = timeout } } +// MaxWorkers is an option to set the max workers on a provider +func MaxWorkers(count int) Option { + return func(p *Provider) { + p.workerLimit = count + } +} + // NewProvider creates a provider that announces blocks to the network using a content router func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { p := &Provider{ ctx: ctx, queue: queue, contentRouting: contentRouting, + workerLimit: 8, } for _, option := range options { @@ -72,7 +81,7 @@ func (p *Provider) Provide(root cid.Cid) error { // Handle all outgoing cids by providing (announcing) them func (p *Provider) handleAnnouncements() { - for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + for workers := 0; workers < p.workerLimit; workers++ { go func() { for p.ctx.Err() == nil { select { From 5f6a572aacdcfbdfe7f4f399eab628f0b585e099 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 12 Jun 2019 13:02:41 -0700 Subject: [PATCH 4/5] Either timeout or not --- simple/provider.go | 11 +++++++-- simple/provider_test.go | 50 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/simple/provider.go b/simple/provider.go index 9b374a0..9302a27 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -88,8 +88,15 @@ func (p *Provider) handleAnnouncements() { case <-p.ctx.Done(): return case c := <-p.queue.Dequeue(): - ctx, cancel := context.WithTimeout(p.ctx, p.timeout) - defer cancel() + var ctx context.Context + var cancel context.CancelFunc + if p.timeout > 0 { + ctx, cancel = context.WithTimeout(p.ctx, p.timeout) + defer cancel() + } else { + ctx = p.ctx + } + logP.Info("announce - start - ", c) if err := p.contentRouting.Provide(ctx, c, true); err != nil { logP.Warningf("Unable to provide entry: %s, %s", c, err) diff --git a/simple/provider_test.go b/simple/provider_test.go index 6fbc528..deb0032 100644 --- a/simple/provider_test.go +++ b/simple/provider_test.go @@ -24,7 +24,11 @@ type mockRouting struct { } func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - r.provided <- cid + select { + case r.provided <- cid: + case <-ctx.Done(): + panic("context cancelled, but shouldn't have") + } return nil } @@ -81,3 +85,47 @@ func TestAnnouncement(t *testing.T) { } } } + +func TestAnnouncementTimeout(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := q.NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) + prov.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = prov.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} From 55dd24d1b9b3e3071d241ff47ab2ddf97b607cb2 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 30 Jun 2019 10:26:24 +0200 Subject: [PATCH 5/5] fix defer in a loop --- simple/provider.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/simple/provider.go b/simple/provider.go index 9302a27..3993d45 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -88,22 +88,26 @@ func (p *Provider) handleAnnouncements() { case <-p.ctx.Done(): return case c := <-p.queue.Dequeue(): - var ctx context.Context - var cancel context.CancelFunc - if p.timeout > 0 { - ctx, cancel = context.WithTimeout(p.ctx, p.timeout) - defer cancel() - } else { - ctx = p.ctx - } - - logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(ctx, c, true); err != nil { - logP.Warningf("Unable to provide entry: %s, %s", c, err) - } - logP.Info("announce - end - ", c) + p.doProvide(c) } } }() } } + +func (p *Provider) doProvide(c cid.Cid) { + ctx := p.ctx + if p.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, p.timeout) + defer cancel() + } else { + ctx = p.ctx + } + + logP.Info("announce - start - ", c) + if err := p.contentRouting.Provide(ctx, c, true); err != nil { + logP.Warningf("Unable to provide entry: %s, %s", c, err) + } + logP.Info("announce - end - ", c) +}