Skip to content

Commit

Permalink
Cleanup, fix broken restart, and more tests.
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Erik Ingenito <erik@carbonfive.com>
  • Loading branch information
Erik Ingenito committed Mar 15, 2019
1 parent 7d07347 commit a36a278
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 56 deletions.
21 changes: 3 additions & 18 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,13 @@ func (p *provider) handleAnnouncements() {
case <-p.ctx.Done():
return
case entry := <-p.queue.Dequeue():
if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil {
log.Info("announce - start - ", entry.cid)
if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil {
log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
}

if err := entry.Complete(); err != nil {
log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err)
}
log.Info("announce - end - ", entry.cid)
}
}
}()
}
}

// TODO: better document this provide logic
func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error {
// announce
log.Info("announce - start - ", key)
if err := contentRouting.Provide(ctx, key, true); err != nil {
log.Warningf("Failed to provide cid: %s", err)
// TODO: Maybe put these failures onto a failures queue?
return err
}
log.Info("announce - end - ", key)
return nil
}
8 changes: 4 additions & 4 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs-blocksutil"
pstore "github.com/libp2p/go-libp2p-peerstore"
"math/rand"
Expand All @@ -25,11 +26,10 @@ func mockContentRouting() *mockRouting {

func TestAnnouncement(t *testing.T) {
ctx := context.Background()
defer func() {
ctx.Done()
}()
defer ctx.Done()

queue, err := NewQueue(ctx, "test", datastore.NewMapDatastore())
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
Expand Down
58 changes: 24 additions & 34 deletions provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ type Entry struct {
queue *Queue
}

// Complete the entry by removing it from the queue
func (e *Entry) Complete() error {
e.queue.lock.Lock()
defer e.queue.lock.Unlock()
return e.queue.remove(e.key)
}

// Queue provides a durable, FIFO interface to the datastore for storing cids
//
// Durability just means that cids in the process of being provided when a
Expand All @@ -44,8 +37,8 @@ type Queue struct {
tail uint64
head uint64

lock sync.Mutex
datastore ds.Datastore
enqueueLock sync.Mutex
datastore ds.Datastore // Must be threadsafe

dequeue chan *Entry
added chan struct{}
Expand All @@ -59,22 +52,22 @@ func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue,
return nil, err
}
q := &Queue{
name: name,
ctx: ctx,
head: head,
tail: tail,
lock: sync.Mutex{},
datastore: namespaced,
dequeue: make(chan *Entry),
added: make(chan struct{}),
name: name,
ctx: ctx,
head: head,
tail: tail,
enqueueLock: sync.Mutex{},
datastore: namespaced,
dequeue: make(chan *Entry),
added: make(chan struct{}),
}
return q, nil
}

// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
q.lock.Lock()
defer q.lock.Unlock()
q.enqueueLock.Lock()
defer q.enqueueLock.Unlock()

nextKey := q.queueKey(q.tail)

Expand Down Expand Up @@ -126,21 +119,17 @@ func (q *Queue) Run() {
case <-q.ctx.Done():
return
case q.dequeue <- entry:
q.head++
err = q.datastore.Delete(entry.key)
}

}
}()
}

// Find the next item in the queue, crawl forward if an entry is not
// found in the next spot.
func (q *Queue) next() (*Entry, error) {
q.lock.Lock()
defer func() {
q.lock.Unlock()
}()

var nextKey ds.Key
var key ds.Key
var value []byte
var err error
for {
Expand All @@ -152,8 +141,11 @@ func (q *Queue) next() (*Entry, error) {
return nil, nil
default:
}
nextKey = q.queueKey(q.head)
value, err = q.datastore.Get(nextKey)
key = q.queueKey(q.head)

value, err = q.datastore.Get(key)

value, err = q.datastore.Get(key)
if err == ds.ErrNotFound {
q.head++
continue
Expand All @@ -171,11 +163,13 @@ func (q *Queue) next() (*Entry, error) {

entry := &Entry{
cid: id,
key: nextKey,
key: key,
queue: q,
}

q.head++
if err != nil {
return nil, err
}

return entry, nil
}
Expand Down Expand Up @@ -223,7 +217,3 @@ func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore)

return head, tail, nil
}

func (q *Queue) remove(key ds.Key) error {
return q.datastore.Delete(key)
}
88 changes: 88 additions & 0 deletions provider/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package provider

import (
"context"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"testing"
"time"
)

func makeCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, 10)
for i := 0; i < 10; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
return cids
}

func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) {
for _, cid := range cids {
select {
case dequeued := <- q.dequeue:
if cid != dequeued.cid {
t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", cid, dequeued.cid)
}

case <-time.After(time.Second * 1):
t.Fatal("Timeout waiting for cids to be provided.")
}
}
}

func TestBasicOperation(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue.Run()

cids := makeCids(10)

for _, c := range cids {
err = queue.Enqueue(c)
if err != nil {
t.Fatal("Failed to enqueue CID")
}
}

assertOrdered(cids, queue, t)
}

func TestInitialization(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := datastore.NewMapDatastore()
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue.Run()

cids := makeCids(10)

for _, c := range cids {
err = queue.Enqueue(c)
if err != nil {
t.Fatal("Failed to enqueue CID")
}
}

assertOrdered(cids[:5], queue, t)

// make a new queue
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue.Run()

assertOrdered(cids[5:], queue, t)
}

0 comments on commit a36a278

Please sign in to comment.