Skip to content
This repository has been archived by the owner on Nov 24, 2022. It is now read-only.

Commit

Permalink
remove internal commiting to make the logic more straightforward
Browse files Browse the repository at this point in the history
Signed-off-by: Merlin Ran <merlinran@gmail.com>
  • Loading branch information
merlinran committed Aug 13, 2021
1 parent dd67d3f commit 1bae0d2
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions service/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func (s *Store) SetProposalCid(id auction.BidID, pcid cid.Cid) error {
if err := s.enqueueDataURI(txn, b); err != nil {
return fmt.Errorf("enqueueing data uri: %v", err)
}
if err := txn.Commit(); err != nil {
return fmt.Errorf("committing txn: %v", err)
}

log.Infof("set proposal cid for bid %s; enqueued sources %s for fetch", b.ID, b.Sources)
return nil
Expand Down Expand Up @@ -517,30 +520,20 @@ func (s *Store) dealDataFilePathFor(bidID auction.BidID, payloadCid string) stri
}

// enqueueDataURI queues a data uri fetch.
// commitTxn will be committed internally!
func (s *Store) enqueueDataURI(commitTxn ds.Txn, b *Bid) error {
func (s *Store) enqueueDataURI(txn ds.Txn, b *Bid) error {
// Set the bid to "fetching_data"
if err := s.saveAndTransitionStatus(commitTxn, b, BidStatusFetchingData); err != nil {
if err := s.saveAndTransitionStatus(txn, b, BidStatusFetchingData); err != nil {
return fmt.Errorf("updating status (fetching_data): %v", err)
}
if commitTxn != nil {
if err := commitTxn.Commit(); err != nil {
return fmt.Errorf("committing txn: %v", err)
select {
case s.jobCh <- b:
default:
log.Debugf("workers are busy; queueing %s", b.ID)
// Workers are busy, set back to "queued_data"
if err := s.saveAndTransitionStatus(txn, b, BidStatusQueuedData); err != nil {
log.Errorf("updating status (queued_data): %v", err)
}
}

// Unblock the caller by letting the rest happen in the background
go func() {
select {
case s.jobCh <- b:
default:
log.Debugf("workers are busy; queueing %s", b.ID)
// Workers are busy, set back to "queued_data"
if err := s.saveAndTransitionStatus(nil, b, BidStatusQueuedData); err != nil {
log.Errorf("updating status (queued_data): %v", err)
}
}
}()
return nil
}

Expand Down Expand Up @@ -713,6 +706,9 @@ func (s *Store) getNext() {
if err := s.enqueueDataURI(txn, b); err != nil {
log.Errorf("enqueueing: %v", err)
}
if err := txn.Commit(); err != nil {
log.Errorf("committing txn: %v", err)
}
}

func (s *Store) getQueued(txn ds.Txn) (*Bid, error) {
Expand Down Expand Up @@ -766,6 +762,9 @@ func (s *Store) getOrphaned() error {
return fmt.Errorf("enqueueing: %v", err)
}
}
if err := txn.Commit(); err != nil {
return fmt.Errorf("committing txn: %v", err)
}
return nil
}

Expand Down

0 comments on commit 1bae0d2

Please sign in to comment.