Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
Either timeout or not
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelavila committed Jun 12, 2019
1 parent c3bccce commit 5999bf8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
11 changes: 9 additions & 2 deletions simple/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ func (p *Provider) handleAnnouncements() {
case <-p.ctx.Done():
return
case c := <-p.queue.Dequeue():
ctx, cancel := context.WithTimeout(p.ctx, p.timeout)
defer cancel()
var ctx context.Context
var cancel context.CancelFunc
if p.timeout > 0 {
ctx, cancel = context.WithTimeout(p.ctx, p.timeout)
defer cancel()
} else {
ctx = p.ctx
}

logP.Info("announce - start - ", c)
if err := p.contentRouting.Provide(ctx, c, true); err != nil {
logP.Warningf("Unable to provide entry: %s, %s", c, err)
Expand Down
50 changes: 49 additions & 1 deletion simple/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ type mockRouting struct {
}

func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error {
r.provided <- cid
select {
case r.provided <- cid:
case <-ctx.Done():
panic("context cancelled, but shouldn't have")
}
return nil
}

Expand Down Expand Up @@ -81,3 +85,47 @@ func TestAnnouncement(t *testing.T) {
}
}
}

func TestAnnouncementTimeout(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := q.NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

r := mockContentRouting()

prov := NewProvider(ctx, queue, r, WithTimeout(1 * time.Second))
prov.Run()

cids := cid.NewSet()

for i := 0; i < 100; i++ {
c := blockGenerator.Next().Cid()
cids.Add(c)
}

go func() {
for _, c := range cids.Keys() {
err = prov.Provide(c)
// A little goroutine stirring to exercise some different states
r := rand.Intn(10)
time.Sleep(time.Microsecond * time.Duration(r))
}
}()

for cids.Len() > 0 {
select {
case cp := <-r.provided:
if !cids.Has(cp) {
t.Fatal("Wrong CID provided")
}
cids.Remove(cp)
case <-time.After(time.Second * 5):
t.Fatal("Timeout waiting for cids to be provided.")
}
}
}

0 comments on commit 5999bf8

Please sign in to comment.