Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EtcdWorker: fix EtcdWorker snapshot isolation (#2510) #2557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 91 additions & 11 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package orchestrator

import (
"context"
"fmt"
"strconv"
"time"

"github.com/pingcap/errors"
Expand All @@ -35,7 +37,7 @@ type EtcdWorker struct {
reactor Reactor
state ReactorState
// rawState is the local cache of the latest Etcd state.
rawState map[util.EtcdKey][]byte
rawState map[util.EtcdKey]rawStateEntry
// pendingUpdates stores Etcd updates that the Reactor has not been notified of.
pendingUpdates []*etcdUpdate
// revision is the Etcd revision of the latest event received from Etcd
Expand All @@ -45,6 +47,16 @@ type EtcdWorker struct {
barrierRev int64
// prefix is the scope of Etcd watch
prefix util.EtcdPrefix
// deleteCounter maintains a local copy of a value stored in Etcd used to
// keep track of how many deletes have been committed by an EtcdWorker
// watching this key prefix.
// This mechanism is necessary as a workaround to correctly detect
// write-write conflicts when at least a transaction commits a delete,
// because deletes in Etcd reset the mod-revision of keys, making it
// difficult to use it as a unique version identifier to implement
// a `compare-and-swap` semantics, which is essential for implementing
// snapshot isolation for Reactor ticks.
deleteCounter int64
}

type etcdUpdate struct {
Expand All @@ -53,19 +65,29 @@ type etcdUpdate struct {
revision int64
}

// rawStateEntry stores the latest version of a key as seen by the EtcdWorker.
// modRevision is stored to implement `compare-and-swap` semantics more reliably.
type rawStateEntry struct {
value []byte
modRevision int64
}

// NewEtcdWorker returns a new EtcdWorker
func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initState ReactorState) (*EtcdWorker, error) {
return &EtcdWorker{
client: client,
reactor: reactor,
state: initState,
rawState: make(map[util.EtcdKey][]byte),
rawState: make(map[util.EtcdKey]rawStateEntry),
prefix: util.NormalizePrefix(prefix),
barrierRev: -1, // -1 indicates no barrier
}, nil
}

const etcdRequestProgressDuration = 2 * time.Second
const (
etcdRequestProgressDuration = 2 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
)

// Run starts the EtcdWorker event loop.
// A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event.
Expand Down Expand Up @@ -177,6 +199,19 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}

func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) {
if worker.isDeleteCounterKey(event.Kv.Key) {
switch event.Type {
case mvccpb.PUT:
worker.handleDeleteCounter(event.Kv.Value)
case mvccpb.DELETE:
log.Warn("deletion counter key deleted", zap.Reflect("event", event))
worker.handleDeleteCounter(nil)
}
// We return here because the delete-counter is not used for business logic,
// and it should not be exposed further to the Reactor.
return
}

worker.pendingUpdates = append(worker.pendingUpdates, &etcdUpdate{
key: util.NewEtcdKeyFromBytes(event.Kv.Key),
value: event.Kv.Value,
Expand All @@ -189,7 +224,10 @@ func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event)
if value == nil {
value = []byte{}
}
worker.rawState[util.NewEtcdKeyFromBytes(event.Kv.Key)] = value
worker.rawState[util.NewEtcdKeyFromBytes(event.Kv.Key)] = rawStateEntry{
value: value,
modRevision: event.Kv.ModRevision,
}
case mvccpb.DELETE:
delete(worker.rawState, util.NewEtcdKeyFromBytes(event.Kv.Key))
}
Expand All @@ -201,10 +239,17 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {
return errors.Trace(err)
}

worker.rawState = make(map[util.EtcdKey][]byte)
worker.rawState = make(map[util.EtcdKey]rawStateEntry)
for _, kv := range resp.Kvs {
if worker.isDeleteCounterKey(kv.Key) {
worker.handleDeleteCounter(kv.Value)
continue
}
key := util.NewEtcdKeyFromBytes(kv.Key)
worker.rawState[key] = kv.Value
worker.rawState[key] = rawStateEntry{
value: kv.Value,
modRevision: kv.ModRevision,
}
err := worker.state.Update(key, kv.Value, true)
if err != nil {
return errors.Trace(err)
Expand All @@ -218,9 +263,9 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {
func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte {
ret := make(map[util.EtcdKey][]byte)
for k, v := range worker.rawState {
cloneV := make([]byte, len(v))
copy(cloneV, v)
ret[util.NewEtcdKey(k.String())] = cloneV
vCloned := make([]byte, len(v.value))
copy(vCloned, v.value)
ret[util.NewEtcdKey(k.String())] = vCloned
}
return ret
}
Expand Down Expand Up @@ -251,11 +296,12 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
}
cmps := make([]clientv3.Cmp, 0, len(changedSet))
ops := make([]clientv3.Op, 0, len(changedSet))
hasDelete := false
for key := range changedSet {
// make sure someone else has not updated the key after the last snapshot
var cmp clientv3.Cmp
if _, ok := worker.rawState[key]; ok {
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "<", worker.revision+1)
if entry, ok := worker.rawState[key]; ok {
cmp = clientv3.Compare(clientv3.ModRevision(key.String()), "=", entry.modRevision)
} else {
// if ok is false, it means that the key of this patch is not exist in a committed state
// this compare is equivalent to `patch.Key` is not exist
Expand All @@ -269,10 +315,22 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
op = clientv3.OpPut(key.String(), string(value))
} else {
op = clientv3.OpDelete(key.String())
hasDelete = true
}
ops = append(ops, op)
}

if hasDelete {
ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1)))
}
if worker.deleteCounter > 0 {
cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter)))
} else if worker.deleteCounter == 0 {
cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(worker.prefix.String()+deletionCounterKey), "=", 0))
} else {
panic("unreachable")
}

resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -319,3 +377,25 @@ func (worker *EtcdWorker) cleanUp() {
worker.revision = 0
worker.pendingUpdates = worker.pendingUpdates[:0]
}

func (worker *EtcdWorker) isDeleteCounterKey(key []byte) bool {
return string(key) == worker.prefix.String()+deletionCounterKey
}

func (worker *EtcdWorker) handleDeleteCounter(value []byte) {
if len(value) == 0 {
// The delete counter key has been deleted, resetting the internal counter
worker.deleteCounter = 0
return
}

var err error
worker.deleteCounter, err = strconv.ParseInt(string(value), 10, 64)
if err != nil {
// This should never happen unless Etcd server has been tampered with.
log.Panic("strconv failed. Unexpected Etcd state.", zap.Error(err))
}
if worker.deleteCounter <= 0 {
log.Panic("unexpected delete counter", zap.Int64("value", worker.deleteCounter))
}
}
99 changes: 99 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -655,3 +656,101 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) {
err = cli.Unwrap().Close()
c.Assert(err, check.IsNil)
}

type modifyOneReactor struct {
state *commonReactorState
key []byte
value []byte
finished bool

waitOnCh chan struct{}
}

func (r *modifyOneReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) {
r.state = state.(*commonReactorState)
if !r.finished {
r.finished = true
} else {
return r.state, cerrors.ErrReactorFinished.GenWithStackByArgs()
}
if r.waitOnCh != nil {
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case <-r.waitOnCh:
}
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case <-r.waitOnCh:
}
}
r.state.AppendPatch(util.NewEtcdKeyFromBytes(r.key), func(old []byte) (newValue []byte, changed bool, err error) {
if len(old) > 0 {
return r.value, true, nil
}
return nil, false, nil
})
return r.state, nil
}

// TestModifyAfterDelete tests snapshot isolation when there is one modifying transaction delayed in the middle while a deleting transaction
// commits. The first transaction should be aborted and retried, and isolation should not be violated.
func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
defer testleak.AfterTest(c)()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

newClient, closer := setUpTest(c)
defer closer()

cli1 := newClient()
cli2 := newClient()

_, err := cli1.Put(ctx, "/test/key1", "original value")
c.Assert(err, check.IsNil)

modifyReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: []byte("modified value"),
waitOnCh: make(chan struct{}),
}
worker1, err := NewEtcdWorker(cli1, "/test", modifyReactor, &commonReactorState{
state: make(map[string]string),
})
c.Assert(err, check.IsNil)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100)
c.Assert(err, check.IsNil)
}()

modifyReactor.waitOnCh <- struct{}{}

deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
})
c.Assert(err, check.IsNil)

err = worker2.Run(ctx, nil, time.Millisecond*100)
c.Assert(err, check.IsNil)

modifyReactor.waitOnCh <- struct{}{}
wg.Wait()

resp, err := cli1.Get(ctx, "/test/key1")
c.Assert(err, check.IsNil)
c.Assert(resp.Kvs, check.HasLen, 0)
c.Assert(worker1.deleteCounter, check.Equals, int64(1))

_ = cli1.Unwrap().Close()
_ = cli2.Unwrap().Close()
}