Skip to content

Commit

Permalink
commit bbolt transaction if there is any pending deleting operations
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Dec 21, 2023
1 parent 35f4a6c commit bd5915f
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type batchTx struct {
tx *bolt.Tx
backend *backend

pending int
pending int
pendingDeleteOperations int
}

// Lock is supposed to be called only by the unit test.
Expand Down Expand Up @@ -105,7 +106,23 @@ func (t *batchTx) LockOutsideApply() {
}

func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
// We need to commit the transaction if there is any pending deleting
// operation, otherwise etcd might run into a situation that it haven't
// finished committing the data into backend storage (note: etcd
// periodically commits the bbolt transactions instead of on each
// request) when it applies next request. Accordingly, etcd may still
// read the stale data from bbolt when processing next request. So it
// breaks the linearizability.
//
// Note we don't need to commit the transaction for put requests if
// it doesn't exceed the batch limit, because there is a buffer on top
// of the bbolt. Each time when etcd reads data from backend storage,
// it will read data from both bbolt and the buffer. But there is no
// such a buffer for delete requests.
//
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
t.Mutex.Unlock()
Expand Down Expand Up @@ -133,6 +150,7 @@ func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
)
}
t.pending++
t.pendingDeleteOperations++
}

// UnsafePut must be called holding the lock on the tx.
Expand Down Expand Up @@ -223,6 +241,7 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
)
}
t.pending++
t.pendingDeleteOperations++
}

// UnsafeForEach must be called holding the lock on the tx.
Expand Down Expand Up @@ -277,6 +296,7 @@ func (t *batchTx) commit(stop bool) {
atomic.AddInt64(&t.backend.commits, 1)

t.pending = 0
t.pendingDeleteOperations = 0
if err != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
}
Expand Down

0 comments on commit bd5915f

Please sign in to comment.