diff --git a/go.mod b/go.mod index a4a9f9c55f2f..8c88c6ec2ca2 100644 --- a/go.mod +++ b/go.mod @@ -114,3 +114,8 @@ require ( gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect gotest.tools/gotestsum v0.3.3 ) + +replace ( + github.com/ipfs/go-datastore => github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1 + github.com/ipfs/go-ds-flatfs => github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2 +) diff --git a/go.sum b/go.sum index 0fb3221bc3cf..8286213b99f9 100644 --- a/go.sum +++ b/go.sum @@ -121,12 +121,16 @@ github.com/ipfs/go-cidutil v0.0.1 h1:UpDQI2LrihqOGY2mHaMhjrhh1DJ14N/58BQb7lKXvlQ github.com/ipfs/go-cidutil v0.0.1/go.mod h1:/0H649ymJksNEZvBAkM18HIctk7tkONH9tspTeLok48= github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms= github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= +github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1 h1:kiU8BjUZpF/CnxuIaG5skXOPIhtVtv1Ly1wa9Hldyck= +github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2 h1:7ToQt7QByBhOTuZF2USMv+PGlMcBC7FW7FdgQ4FCsoo= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= github.com/ipfs/go-ds-flatfs v0.0.1 h1:yqWwRYFOGNClUL7V2jvcx4KMMso1Jv+pgQzsv9/gWBs= github.com/ipfs/go-ds-flatfs v0.0.1/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM= +github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2 h1:C4Q4+j7oruIK2i1qtDfFcW4HRk+APQ3KWoNY69iUrS0= +github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM= github.com/ipfs/go-ds-leveldb v0.0.1 h1:Z0lsTFciec9qYsyngAw1f/czhRU35qBLR2vhavPFgqA= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g= diff --git a/provider/queue.go b/provider/queue.go index a3268e109334..918bdcbd7690 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,7 +2,7 @@ package provider import ( "context" - "math" + "fmt" "strconv" "strings" @@ -32,7 +32,7 @@ type Queue struct { // NewQueue creates a queue for cids func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - head, tail, err := getQueueHeadTail(ctx, name, namespaced) + head, tail, err := getQueueHeadTail(ctx, namespaced) if err != nil { return nil, err } @@ -142,40 +142,52 @@ func (q *Queue) work() { } func (q *Queue) queueKey(id uint64) datastore.Key { - return datastore.NewKey(strconv.FormatUint(id, 10)) + s := fmt.Sprintf("%016X", id) + return datastore.NewKey(s) } -// crawl over the queue entries to find the head and tail -func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) { - q := query.Query{} - results, err := datastore.Query(q) +func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) { + head, err := getQueueHead(datastore) if err != nil { return 0, 0, err } + tail, err := getQueueTail(datastore) + if err != nil { + return 0, 0, err + } + return head, tail, nil +} - var tail uint64 - var head uint64 = math.MaxUint64 - for entry := range results.Next() { - trimmed := strings.TrimPrefix(entry.Key, "/") - id, err := strconv.ParseUint(trimmed, 10, 64) - if err != nil { - return 0, 0, err - } +func getQueueHead(ds datastore.Datastore) (uint64, error) { + return getFirstIDByOrder(ds, query.OrderByKey{}) +} - if id < head { - head = id - } +func getQueueTail(ds datastore.Datastore) (uint64, error) { + tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{}) + if err != nil { + return 0, err + } + if tail > 0 { + tail++ + } + return tail, nil +} - if (id + 1) > tail { - tail = (id + 1) - } +func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) { + q := query.Query{Orders: []query.Order{order}} + results, err := ds.Query(q) + if err != nil { + return 0, err } - if err := results.Close(); err != nil { - return 0, 0, err + defer results.Close() + r, ok := results.NextSync() + if !ok { + return 0, nil } - if head == math.MaxUint64 { - head = 0 + trimmed := strings.TrimPrefix(r.Key, "/") + id, err := strconv.ParseUint(trimmed, 16, 64) + if err != nil { + return 0, err } - - return head, tail, nil + return id, nil } diff --git a/provider/queue_test.go b/provider/queue_test.go index e1b74878ea51..2857da0a93e4 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -12,7 +12,7 @@ import ( func makeCids(n int) []cid.Cid { cids := make([]cid.Cid, 0, n) - for i := 0; i < 10; i++ { + for i := 0; i < n; i++ { c := blockGenerator.Next().Cid() cids = append(cids, c) } @@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[5:], queue, t) } + +func TestInitializationWithManyCids(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(25) + for _, c := range cids { + queue.Enqueue(c) + } + + // make a new queue, same data + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + assertOrdered(cids, queue, t) +}