Skip to content

Commit

Permalink
Change provider to not lose jobs in the event that the process is killed
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <davidmichaelavila@gmail.com>
  • Loading branch information
michaelavila committed Jan 22, 2019
1 parent e1141da commit 864024d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 18 deletions.
25 changes: 17 additions & 8 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,31 +113,40 @@ func (p *Provider) handleAnnouncements() {
// TODO: We should probably not actually Dequeue() right here, or at
// least have a plan to replace the entry in the event that something
// goes wrong or the process is killed
cid, err := p.queue.Dequeue()
entry, err := p.queue.Dequeue()
p.lock.Unlock()
if err != nil {
log.Warning("Unable to dequeue:", err)
continue
}

isTracking, err := p.tracker.IsTracking(cid)
isTracking, err := p.tracker.IsTracking(entry.cid)
if err != nil {
log.Warningf("Unable to check provider tracking on outgoing: %s, %s", cid, err)
log.Warningf("Unable to check provider tracking on outgoing: %s, %s", entry.cid, err)
continue
}
if isTracking {
continue
}

if err := p.announce(cid); err != nil {
log.Warningf("Unable to announce providing: %s, %s", cid, err)
if err := p.announce(entry.cid); err != nil {
log.Warningf("Unable to announce providing: %s, %s", entry.cid, err)
// maybe the cid + err should go on the queue?
p.failures.Enqueue(cid)
p.failures.Enqueue(entry.cid)
if err := entry.Complete(); err != nil {
log.Warningf("Unable to complete queue entry for failure: %s, %s", entry.cid, err)
continue
}
continue
}

if err := p.tracker.Track(cid); err != nil {
log.Warningf("Unable to track: %s, %s", cid, err)
if err := entry.Complete(); err != nil {
log.Warningf("Unable to complete queue entry for success: %s, %s", entry.cid, err)
continue
}

if err := p.tracker.Track(entry.cid); err != nil {
log.Warningf("Unable to track: %s, %s", entry.cid, err)
continue
}
}
Expand Down
35 changes: 25 additions & 10 deletions provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ import (
"sync"
)

// Entry

type Entry struct {
cid cid.Cid
key ds.Key
datastore ds.Datastore
}

func (e *Entry) Complete() error {
return e.datastore.Delete(e.key)
}

// Queue

type Queue struct {
Expand Down Expand Up @@ -54,13 +66,13 @@ func (q *Queue) Enqueue(cid cid.Cid) error {
return nil
}

func (q *Queue) Dequeue() (cid.Cid, error) {
func (q *Queue) Dequeue() (*Entry, error) {
q.lock.Lock()
defer q.lock.Unlock()

if q.IsEmpty() {
// TODO figure out how IPFS folks are doing custom errors and make this comply
return cid.Undef, errors.New("queue is empty")
return nil, errors.New("queue is empty")
}

var nextKey ds.Key
Expand All @@ -73,23 +85,26 @@ func (q *Queue) Dequeue() (cid.Cid, error) {
q.head++
continue
} else if err != nil {
return cid.Undef, err
return nil, err
} else {
break
}
}

key, err := cid.Parse(value)
id, err := cid.Parse(value)
if err != nil {
return cid.Undef, err
return nil, err
}

if err := q.datastore.Delete(nextKey); err != nil {
return cid.Undef, err
entry := &Entry {
cid: id,
key: nextKey,
datastore: q.datastore,
}

q.head++
return key, nil

return entry, nil
}

func (q *Queue) IsEmpty() bool {
Expand Down Expand Up @@ -128,8 +143,8 @@ func getQueueHeadTail(name string, datastore ds.Datastore) (uint64, uint64, erro
head = id
}

if id > tail {
tail = id
if (id+1) > tail {
tail = (id+1)
}
}
if head == math.MaxUint64 {
Expand Down

0 comments on commit 864024d

Please sign in to comment.