Skip to content

Commit

Permalink
Merge pull request globalsign#11 from jameinel/txn-preload
Browse files Browse the repository at this point in the history
TXN preload
  • Loading branch information
domodwyer authored Jul 5, 2017
2 parents db5f18b + b5ff827 commit 2e1497f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 7 deletions.
43 changes: 36 additions & 7 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (f *flusher) run() (err error) {

f.debugf("Processing %s", f.goal)
seen := make(map[bson.ObjectId]*transaction)
if err := f.recurse(f.goal, seen); err != nil {
preloaded := make(map[bson.ObjectId]*transaction)
if err := f.recurse(f.goal, seen, preloaded); err != nil {
return err
}
if f.goal.done() {
Expand Down Expand Up @@ -155,26 +156,54 @@ func (f *flusher) run() (err error) {
return nil
}

func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error {
const preloadBatchSize = 100

func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error {
seen[t.Id] = t
delete(preloaded, t.Id)
err := f.advance(t, nil, false)
if err != errPreReqs {
return err
}
for _, dkey := range t.docKeys() {
remaining := make([]bson.ObjectId, 0, len(f.queue[dkey]))
toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey]))
for _, dtt := range f.queue[dkey] {
id := dtt.id()
if seen[id] != nil {
if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil {
continue
}
qt, err := f.load(id)
if err != nil {
return err
toPreload[id] = struct{}{}
remaining = append(remaining, id)
}
// done with this map
toPreload = nil
for len(remaining) > 0 {
batch := remaining
if len(batch) > preloadBatchSize {
batch = remaining[:preloadBatchSize]
}
err = f.recurse(qt, seen)
remaining = remaining[len(batch):]
err := f.loadMulti(batch, preloaded)
if err != nil {
return err
}
for _, id := range batch {
if seen[id] != nil {
continue
}
qt, ok := preloaded[id]
if !ok {
qt, err = f.load(id)
if err != nil {
return err
}
}
err = f.recurse(qt, seen, preloaded)
if err != nil {
return err
}
}
}
}
return nil
Expand Down
20 changes: 20 additions & 0 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,26 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
return &t, nil
}

func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error {
txns := make([]transaction, 0, len(ids))

query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}})
// Not sure that this actually has much of an effect when using All()
query.Batch(len(ids))
err := query.All(&txns)
if err == mgo.ErrNotFound {
return fmt.Errorf("could not find a transaction in batch: %v", ids)
} else if err != nil {
return err
}
for i := range txns {
t := &txns[i]
preloaded[t.Id] = t
}
return nil
}


type typeNature int

const (
Expand Down

0 comments on commit 2e1497f

Please sign in to comment.