diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 60be4ce6d74..a436a72a3a6 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -73,7 +73,8 @@ type batchTx struct { tx *bolt.Tx backend *backend - pending int + pending int + pendingDeleteOperations int } // Lock is supposed to be called only by the unit test. @@ -105,7 +106,23 @@ func (t *batchTx) LockOutsideApply() { } func (t *batchTx) Unlock() { - if t.pending >= t.backend.batchLimit { + // We need to commit the transaction if there is any pending deleting + // operation, otherwise etcd might run into a situation that it haven't + // finished committing the data into backend storage (note: etcd + // periodically commits the bbolt transactions instead of on each + // request) when it applies next request. Accordingly, etcd may still + // read the stale data from bbolt when processing next request. So it + // breaks the linearizability. + // + // Note we don't need to commit the transaction for put requests if + // it doesn't exceed the batch limit, because there is a buffer on top + // of the bbolt. Each time when etcd reads data from backend storage, + // it will read data from both bbolt and the buffer. But there is no + // such a buffer for delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } t.Mutex.Unlock() @@ -133,6 +150,7 @@ func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { ) } t.pending++ + t.pendingDeleteOperations++ } // UnsafePut must be called holding the lock on the tx. @@ -223,6 +241,7 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { ) } t.pending++ + t.pendingDeleteOperations++ } // UnsafeForEach must be called holding the lock on the tx. @@ -277,6 +296,7 @@ func (t *batchTx) commit(stop bool) { atomic.AddInt64(&t.backend.commits, 1) t.pending = 0 + t.pendingDeleteOperations = 0 if err != nil { t.backend.lg.Fatal("failed to commit tx", zap.Error(err)) }