Skip to content

Commit

Permalink
EtcdWorker: fix EtcdWorker snapshot isolation (#2510)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Aug 17, 2021
1 parent f538822 commit 984c4a4
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 11 deletions.
102 changes: 91 additions & 11 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package orchestrator

import (
"context"
"fmt"
"strconv"
"time"

"github.com/pingcap/errors"
Expand All @@ -35,7 +37,7 @@ type EtcdWorker struct {
reactor Reactor
state ReactorState
// rawState is the local cache of the latest Etcd state.
rawState map[util.EtcdKey][]byte
rawState map[util.EtcdKey]rawStateEntry
// pendingUpdates stores Etcd updates that the Reactor has not been notified of.
pendingUpdates []*etcdUpdate
// revision is the Etcd revision of the latest event received from Etcd
Expand All @@ -45,6 +47,16 @@ type EtcdWorker struct {
barrierRev int64
// prefix is the scope of Etcd watch
prefix util.EtcdPrefix
// deleteCounter maintains a local copy of a value stored in Etcd used to
// keep track of how many deletes have been committed by an EtcdWorker
// watching this key prefix.
// This mechanism is necessary as a workaround to correctly detect
// write-write conflicts when at least a transaction commits a delete,
// because deletes in Etcd reset the mod-revision of keys, making it
// difficult to use it as a unique version identifier to implement
// a `compare-and-swap` semantics, which is essential for implementing
// snapshot isolation for Reactor ticks.
deleteCounter int64
}

type etcdUpdate struct {
Expand All @@ -53,19 +65,29 @@ type etcdUpdate struct {
revision int64
}

// rawStateEntry stores the latest version of a key as seen by the EtcdWorker.
// modRevision is stored to implement `compare-and-swap` semantics more reliably.
type rawStateEntry struct {
value []byte
modRevision int64
}

// NewEtcdWorker returns a new EtcdWorker
func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initState ReactorState) (*EtcdWorker, error) {
return &EtcdWorker{
client: client,
reactor: reactor,
state: initState,
rawState: make(map[util.EtcdKey][]byte),
rawState: make(map[util.EtcdKey]rawStateEntry),
prefix: util.NormalizePrefix(prefix),
barrierRev: -1, // -1 indicates no barrier
}, nil
}

const etcdRequestProgressDuration = 2 * time.Second
const (
etcdRequestProgressDuration = 2 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
)

// Run starts the EtcdWorker event loop.
// A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event.
Expand Down Expand Up @@ -177,6 +199,19 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}

func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) {
if worker.isDeleteCounterKey(event.Kv.Key) {
switch event.Type {
case mvccpb.PUT:
worker.handleDeleteCounter(event.Kv.Value)
case mvccpb.DELETE:
log.Warn("deletion counter key deleted", zap.Reflect("event", event))
worker.handleDeleteCounter(nil)
}
// We return here because the delete-counter is not used for business logic,
// and it should not be exposed further to the Reactor.
return
}

worker.pendingUpdates = append(worker.pendingUpdates, &etcdUpdate{
key: util.NewEtcdKeyFromBytes(event.Kv.Key),
value: event.Kv.Value,
Expand All @@ -189,7 +224,10 @@ func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event)
if value == nil {
value = []byte{}
}
worker.rawState[util.NewEtcdKeyFromBytes(event.Kv.Key)] = value
worker.rawState[util.NewEtcdKeyFromBytes(event.Kv.Key)] = rawStateEntry{
value: value,
modRevision: event.Kv.ModRevision,
}
case mvccpb.DELETE:
delete(worker.rawState, util.NewEtcdKeyFromBytes(event.Kv.Key))
}
Expand All @@ -201,10 +239,17 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {
return errors.Trace(err)
}

worker.rawState = make(map[util.EtcdKey][]byte)
worker.rawState = make(map[util.EtcdKey]rawStateEntry)
for _, kv := range resp.Kvs {
if worker.isDeleteCounterKey(kv.Key) {
worker.handleDeleteCounter(kv.Value)
continue
}
key := util.NewEtcdKeyFromBytes(kv.Key)
worker.rawState[key] = kv.Value
worker.rawState[key] = rawStateEntry{
value: kv.Value,
modRevision: kv.ModRevision,
}
err := worker.state.Update(key, kv.Value, true)
if err != nil {
return errors.Trace(err)
Expand All @@ -218,9 +263,9 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {
func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte {
ret := make(map[util.EtcdKey][]byte)
for k, v := range worker.rawState {
cloneV := make([]byte, len(v))
copy(cloneV, v)
ret[util.NewEtcdKey(k.String())] = cloneV
vCloned := make([]byte, len(v.value))
copy(vCloned, v.value)
ret[util.NewEtcdKey(k.String())] = vCloned
}
return ret
}
Expand Down Expand Up @@ -251,11 +296,12 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
}
cmps := make([]clientv3.Cmp, 0, len(changedSet))
ops := make([]clientv3.Op, 0, len(changedSet))
hasDelete := false
for key := range changedSet {
// make sure someone else has not updated the key after the last snapshot
var cmp clientv3.Cmp
if _, ok := worker.rawState[key]; ok {
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "<", worker.revision+1)
if entry, ok := worker.rawState[key]; ok {
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "=", entry.modRevision)
} else {
// if ok is false, it means that the key of this patch is not exist in a committed state
// this compare is equivalent to `patch.Key` is not exist
Expand All @@ -269,10 +315,22 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
op = clientv3.OpPut(key.String(), string(value))
} else {
op = clientv3.OpDelete(key.String())
hasDelete = true
}
ops = append(ops, op)
}

if hasDelete {
ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1)))
}
if worker.deleteCounter > 0 {
cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter)))
} else if worker.deleteCounter == 0 {
cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(worker.prefix.String()+deletionCounterKey), "=", 0))
} else {
panic("unreachable")
}

resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -319,3 +377,25 @@ func (worker *EtcdWorker) cleanUp() {
worker.revision = 0
worker.pendingUpdates = worker.pendingUpdates[:0]
}

func (worker *EtcdWorker) isDeleteCounterKey(key []byte) bool {
return string(key) == worker.prefix.String()+deletionCounterKey
}

func (worker *EtcdWorker) handleDeleteCounter(value []byte) {
if len(value) == 0 {
// The delete counter key has been deleted, resetting the internal counter
worker.deleteCounter = 0
return
}

var err error
worker.deleteCounter, err = strconv.ParseInt(string(value), 10, 64)
if err != nil {
// This should never happen unless Etcd server has been tampered with.
log.Panic("strconv failed. Unexpected Etcd state.", zap.Error(err))
}
if worker.deleteCounter <= 0 {
log.Panic("unexpected delete counter", zap.Int64("value", worker.deleteCounter))
}
}
99 changes: 99 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -655,3 +656,101 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) {
err = cli.Unwrap().Close()
c.Assert(err, check.IsNil)
}

type modifyOneReactor struct {
state *commonReactorState
key []byte
value []byte
finished bool

waitOnCh chan struct{}
}

func (r *modifyOneReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) {
r.state = state.(*commonReactorState)
if !r.finished {
r.finished = true
} else {
return r.state, cerrors.ErrReactorFinished.GenWithStackByArgs()
}
if r.waitOnCh != nil {
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case <-r.waitOnCh:
}
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case <-r.waitOnCh:
}
}
r.state.AppendPatch(util.NewEtcdKeyFromBytes(r.key), func(old []byte) (newValue []byte, changed bool, err error) {
if len(old) > 0 {
return r.value, true, nil
}
return nil, false, nil
})
return r.state, nil
}

// TestModifyAfterDelete tests snapshot isolation when there is one modifying transaction delayed in the middle while a deleting transaction
// commits. The first transaction should be aborted and retried, and isolation should not be violated.
func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
defer testleak.AfterTest(c)()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

newClient, closer := setUpTest(c)
defer closer()

cli1 := newClient()
cli2 := newClient()

_, err := cli1.Put(ctx, "/test/key1", "original value")
c.Assert(err, check.IsNil)

modifyReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: []byte("modified value"),
waitOnCh: make(chan struct{}),
}
worker1, err := NewEtcdWorker(cli1, "/test", modifyReactor, &commonReactorState{
state: make(map[string]string),
})
c.Assert(err, check.IsNil)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100)
c.Assert(err, check.IsNil)
}()

modifyReactor.waitOnCh <- struct{}{}

deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
})
c.Assert(err, check.IsNil)

err = worker2.Run(ctx, nil, time.Millisecond*100)
c.Assert(err, check.IsNil)

modifyReactor.waitOnCh <- struct{}{}
wg.Wait()

resp, err := cli1.Get(ctx, "/test/key1")
c.Assert(err, check.IsNil)
c.Assert(resp.Kvs, check.HasLen, 0)
c.Assert(worker1.deleteCounter, check.Equals, int64(1))

_ = cli1.Unwrap().Close()
_ = cli2.Unwrap().Close()
}

0 comments on commit 984c4a4

Please sign in to comment.