From 1595253b7b6f73b2b1f121677f3f32e7b2a6cb2d Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Fri, 15 Mar 2019 14:19:19 -0700 Subject: [PATCH] Remove locking entirely License: MIT Signed-off-by: Erik Ingenito --- provider/provider.go | 22 ++--- provider/provider_test.go | 4 +- provider/queue.go | 168 +++++++++++++------------------------- provider/queue_test.go | 19 ++--- 4 files changed, 71 insertions(+), 142 deletions(-) diff --git a/provider/provider.go b/provider/provider.go index 7a30f6d27ba..7e149f7776e 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -11,13 +11,9 @@ import ( routing "github.com/libp2p/go-libp2p-routing" ) -var ( - log = logging.Logger("provider") -) +var log = logging.Logger("provider") -const ( - provideOutgoingWorkerLimit = 8 -) +const provideOutgoingWorkerLimit = 8 // Provider announces blocks to the network type Provider interface { @@ -44,13 +40,13 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte // Start workers to handle provide requests. func (p *provider) Run() { - p.queue.Run() p.handleAnnouncements() } // Provide the given cid using specified strategy. func (p *provider) Provide(root cid.Cid) error { - return p.queue.Enqueue(root) + p.queue.Enqueue(root) + return nil } // Handle all outgoing cids by providing (announcing) them @@ -61,12 +57,12 @@ func (p *provider) handleAnnouncements() { select { case <-p.ctx.Done(): return - case entry := <-p.queue.Dequeue(): - log.Info("announce - start - ", entry.cid) - if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil { - log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) + case c := <-p.queue.Dequeue(): + log.Info("announce - start - ", c) + if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { + log.Warningf("Unable to provide entry: %s, %s", c, err) } - log.Info("announce - end - ", entry.cid) + log.Info("announce - end - ", c) } } }() diff --git a/provider/provider_test.go b/provider/provider_test.go index 464d73d9a34..95282e38a95 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) { cids := cid.NewSet() - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { c := blockGenerator.Next().Cid() cids.Add(c) } @@ -63,7 +63,7 @@ func TestAnnouncement(t *testing.T) { t.Fatal("Wrong CID provided") } cids.Remove(cp) - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): t.Fatal("Timeout waiting for cids to be provided.") } } diff --git a/provider/queue.go b/provider/queue.go index f1c9945cdf7..3f7115f68e5 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,27 +2,15 @@ package provider import ( "context" - "errors" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" - "sync" - - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" - query "github.com/ipfs/go-datastore/query" ) -// Entry allows for the durability in the queue. When a cid is dequeued it is -// not removed from the datastore until you call Complete() on the entry you -// receive. -type Entry struct { - cid cid.Cid - key datastore.Key - queue *Queue -} - // Queue provides a durable, FIFO interface to the datastore for storing cids // // Durability just means that cids in the process of being provided when a @@ -32,17 +20,15 @@ type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider name string - ctx context.Context tail uint64 head uint64 - enqueueLock sync.Mutex ds datastore.Datastore // Must be threadsafe - dequeue chan *Entry - added chan struct{} + dequeue chan cid.Cid + enqueue chan cid.Cid } // NewQueue creates a queue for cids @@ -57,124 +43,85 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, ctx: ctx, head: head, tail: tail, - enqueueLock: sync.Mutex{}, ds: namespaced, - dequeue: make(chan *Entry), - added: make(chan struct{}), + dequeue: make(chan cid.Cid), + enqueue: make(chan cid.Cid), } + q.work() return q, nil } // Enqueue puts a cid in the queue -func (q *Queue) Enqueue(cid cid.Cid) error { - q.enqueueLock.Lock() - defer q.enqueueLock.Unlock() - - nextKey := q.queueKey(q.tail) - - if err := q.ds.Put(nextKey, cid.Bytes()); err != nil { - return err - } - - q.tail++ - +func (q *Queue) Enqueue(cid cid.Cid) { select { - case q.added <- struct{}{}: + case q.enqueue <- cid: case <-q.ctx.Done(): - default: } - - return nil } // Dequeue returns a channel that if listened to will remove entries from the queue -func (q *Queue) Dequeue() <-chan *Entry { +func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } -// IsEmpty returns whether or not the queue has any items -func (q *Queue) IsEmpty() bool { - return (q.tail - q.head) == 0 -} - -// Run dequeues items when the dequeue channel is available to -// be written to. -func (q *Queue) Run() { +// Run dequeues and enqueues when available. +func (q *Queue) work() { go func() { for { - if q.IsEmpty() { - select { - case <-q.ctx.Done(): - return - case <-q.added: + var c cid.Cid = cid.Undef + var key datastore.Key + var dequeue chan cid.Cid + + // If we're not empty dequeue a cid and ship it + if q.head < q.tail { + key = q.queueKey(q.head) + value, err := q.ds.Get(key) + + if err == datastore.ErrNotFound { + log.Warningf("Missing entry in queue: %s", err) + q.head++ + continue + } else if err != nil { + log.Warningf("Error fetching from queue: %s", err) + continue + } + + c, err = cid.Parse(value) + if err != nil { + log.Warningf("Error marshalling Cid from queue: ", err) + q.head++ + err = q.ds.Delete(key) + continue } } - entry, err := q.next() - if err != nil { - log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) - continue + if c != cid.Undef { + dequeue = q.dequeue } select { + case toQueue := <-q.enqueue: + nextKey := q.queueKey(q.tail) + + if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { + log.Errorf("Failed to enqueue cid: %s", err) + } + + q.tail++ + case dequeue <- c: + q.head++ + err := q.ds.Delete(key) + + if err != nil { + log.Errorf("Failed to delete queued cid: %s", err) + } case <-q.ctx.Done(): return - case q.dequeue <- entry: - q.head++ - err = q.ds.Delete(entry.key) } } }() } -// Find the next item in the queue, crawl forward if an entry is not -// found in the next spot. -func (q *Queue) next() (*Entry, error) { - var key datastore.Key - var value []byte - var err error - for { - if q.head >= q.tail { - return nil, errors.New("next: no more entries in queue returning") - } - select { - case <-q.ctx.Done(): - return nil, nil - default: - } - key = q.queueKey(q.head) - - value, err = q.ds.Get(key) - - value, err = q.ds.Get(key) - if err == datastore.ErrNotFound { - q.head++ - continue - } else if err != nil { - return nil, err - } else { - break - } - } - - id, err := cid.Parse(value) - if err != nil { - return nil, err - } - - entry := &Entry{ - cid: id, - key: key, - queue: q, - } - - if err != nil { - return nil, err - } - - return entry, nil -} - func (q *Queue) queueKey(id uint64) datastore.Key { return datastore.NewKey(strconv.FormatUint(id, 10)) } @@ -190,11 +137,6 @@ func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Data var tail uint64 var head uint64 = math.MaxUint64 for entry := range results.Next() { - select { - case <-ctx.Done(): - return 0, 0, nil - default: - } trimmed := strings.TrimPrefix(entry.Key, "/") id, err := strconv.ParseUint(trimmed, 10, 64) if err != nil { diff --git a/provider/queue_test.go b/provider/queue_test.go index 724eca4ee16..2ac2de28820 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -11,7 +11,7 @@ import ( ) func makeCids(n int) []cid.Cid { - cids := make([]cid.Cid, 0, 10) + cids := make([]cid.Cid, 0, n) for i := 0; i < 10; i++ { c := blockGenerator.Next().Cid() cids = append(cids, c) @@ -23,8 +23,8 @@ func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { for _, c := range cids { select { case dequeued := <- q.dequeue: - if c != dequeued.cid { - t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued.cid) + if c != dequeued { + t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) } case <-time.After(time.Second * 1): @@ -42,15 +42,11 @@ func TestBasicOperation(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() cids := makeCids(10) for _, c := range cids { - err = queue.Enqueue(c) - if err != nil { - t.Fatal("Failed to enqueue CID") - } + queue.Enqueue(c) } assertOrdered(cids, queue, t) @@ -65,15 +61,11 @@ func TestInitialization(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() cids := makeCids(10) for _, c := range cids { - err = queue.Enqueue(c) - if err != nil { - t.Fatal("Failed to enqueue CID") - } + queue.Enqueue(c) } assertOrdered(cids[:5], queue, t) @@ -83,7 +75,6 @@ func TestInitialization(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() assertOrdered(cids[5:], queue, t) }