Skip to content

Commit

Permalink
Query for provider head/tail
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <davidmichaelavila@gmail.com>
  • Loading branch information
michaelavila committed Mar 26, 2019
1 parent 8c96e3b commit 4916dd0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ require (
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gotest.tools/gotestsum v0.3.3
)

replace (
github.com/ipfs/go-datastore => github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1
github.com/ipfs/go-ds-flatfs => github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,16 @@ github.com/ipfs/go-cidutil v0.0.1 h1:UpDQI2LrihqOGY2mHaMhjrhh1DJ14N/58BQb7lKXvlQ
github.com/ipfs/go-cidutil v0.0.1/go.mod h1:/0H649ymJksNEZvBAkM18HIctk7tkONH9tspTeLok48=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1 h1:kiU8BjUZpF/CnxuIaG5skXOPIhtVtv1Ly1wa9Hldyck=
github.com/ipfs/go-datastore v0.0.2-0.20190323043649-bce485ce18d1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2 h1:7ToQt7QByBhOTuZF2USMv+PGlMcBC7FW7FdgQ4FCsoo=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-flatfs v0.0.1 h1:yqWwRYFOGNClUL7V2jvcx4KMMso1Jv+pgQzsv9/gWBs=
github.com/ipfs/go-ds-flatfs v0.0.1/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2 h1:C4Q4+j7oruIK2i1qtDfFcW4HRk+APQ3KWoNY69iUrS0=
github.com/ipfs/go-ds-flatfs v0.0.2-0.20190316035933-d5e3c1fa14d2/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
github.com/ipfs/go-ds-leveldb v0.0.1 h1:Z0lsTFciec9qYsyngAw1f/czhRU35qBLR2vhavPFgqA=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g=
Expand Down
66 changes: 39 additions & 27 deletions provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package provider

import (
"context"
"math"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -32,7 +32,7 @@ type Queue struct {
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
head, tail, err := getQueueHeadTail(ctx, namespaced)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,40 +142,52 @@ func (q *Queue) work() {
}

func (q *Queue) queueKey(id uint64) datastore.Key {
return datastore.NewKey(strconv.FormatUint(id, 10))
s := fmt.Sprintf("%016X", id)
return datastore.NewKey(s)
}

// crawl over the queue entries to find the head and tail
func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) {
q := query.Query{}
results, err := datastore.Query(q)
func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
head, err := getQueueHead(datastore)
if err != nil {
return 0, 0, err
}
tail, err := getQueueTail(datastore)
if err != nil {
return 0, 0, err
}
return head, tail, nil
}

var tail uint64
var head uint64 = math.MaxUint64
for entry := range results.Next() {
trimmed := strings.TrimPrefix(entry.Key, "/")
id, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil {
return 0, 0, err
}
func getQueueHead(ds datastore.Datastore) (uint64, error) {
return getFirstIDByOrder(ds, query.OrderByKey{})
}

if id < head {
head = id
}
func getQueueTail(ds datastore.Datastore) (uint64, error) {
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
if err != nil {
return 0, err
}
if tail > 0 {
tail++
}
return tail, nil
}

if (id + 1) > tail {
tail = (id + 1)
}
func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
q := query.Query{Orders: []query.Order{order}}
results, err := ds.Query(q)
if err != nil {
return 0, err
}
if err := results.Close(); err != nil {
return 0, 0, err
defer results.Close()
r, ok := results.NextSync()
if !ok {
return 0, nil
}
if head == math.MaxUint64 {
head = 0
trimmed := strings.TrimPrefix(r.Key, "/")
id, err := strconv.ParseUint(trimmed, 16, 64)
if err != nil {
return 0, err
}

return head, tail, nil
return id, nil
}
26 changes: 25 additions & 1 deletion provider/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func makeCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < 10; i++ {
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
Expand Down Expand Up @@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) {

assertOrdered(cids[5:], queue, t)
}

func TestInitializationWithManyCids(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

assertOrdered(cids, queue, t)
}

0 comments on commit 4916dd0

Please sign in to comment.