Skip to content

Commit

Permalink
kv(ticdc): fix data loss when upstream txn conflicts during scan (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Jun 19, 2022
1 parent 71bec0d commit 4f6c273
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
if err != nil {
return nil, err
}
if len(raw.OldValue) == 0 && len(raw.Value) == 0 {
log.Warn("empty value and old value", zap.Any("row", raw))
}
baseInfo := baseKVEntry{
StartTs: raw.StartTs,
CRTs: raw.CRTs,
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ func (s *eventFeedSession) receiveFromStream(
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outter code logic
// to call exactly once from outer code logic
worker := newRegionWorker(s, addr)

defer worker.evictAllRegions()
Expand Down
33 changes: 30 additions & 3 deletions cdc/kv/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type matcher struct {
// TODO : clear the single prewrite
unmatchedValue map[matchKey]*cdcpb.Event_Row
cachedCommit []*cdcpb.Event_Row
cachedRollback []*cdcpb.Event_Row
}

type matchKey struct {
Expand Down Expand Up @@ -58,8 +59,15 @@ func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {

// matchRow matches the commit event with the cached prewrite event
// the Value and OldValue will be assigned if a matched prewrite event exists.
func (m *matcher) matchRow(row *cdcpb.Event_Row) bool {
func (m *matcher) matchRow(row *cdcpb.Event_Row, initialized bool) bool {
if value, exist := m.unmatchedValue[newMatchKey(row)]; exist {
// TiKV may send a fake prewrite event with empty value caused by txn heartbeat.
//
// We need to skip match if the region is not initialized,
// as prewrite events may be sent out of order.
if !initialized && len(value.GetValue()) == 0 {
return false
}
row.Value = value.GetValue()
row.OldValue = value.GetOldValue()
delete(m.unmatchedValue, newMatchKey(row))
Expand All @@ -72,13 +80,16 @@ func (m *matcher) cacheCommitRow(row *cdcpb.Event_Row) {
m.cachedCommit = append(m.cachedCommit, row)
}

func (m *matcher) matchCachedRow() []*cdcpb.Event_Row {
func (m *matcher) matchCachedRow(initialized bool) []*cdcpb.Event_Row {
if !initialized {
log.Panic("must be initialized before match cahced rows")
}
cachedCommit := m.cachedCommit
m.cachedCommit = nil
top := 0
for i := 0; i < len(cachedCommit); i++ {
cacheEntry := cachedCommit[i]
ok := m.matchRow(cacheEntry)
ok := m.matchRow(cacheEntry, true)
if !ok {
// when cdc receives a commit log without a corresponding
// prewrite log before initialized, a committed log with
Expand All @@ -97,3 +108,19 @@ func (m *matcher) matchCachedRow() []*cdcpb.Event_Row {
func (m *matcher) rollbackRow(row *cdcpb.Event_Row) {
delete(m.unmatchedValue, newMatchKey(row))
}

func (m *matcher) cacheRollbackRow(row *cdcpb.Event_Row) {
m.cachedRollback = append(m.cachedRollback, row)
}

func (m *matcher) matchCachedRollbackRow(initialized bool) {
if !initialized {
log.Panic("must be initialized before match cahced rollback rows")
}
rollback := m.cachedRollback
m.cachedRollback = nil
for i := 0; i < len(rollback); i++ {
cacheEntry := rollback[i]
m.rollbackRow(cacheEntry)
}
}
135 changes: 129 additions & 6 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package kv

import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type matcherSuite struct{}
Expand Down Expand Up @@ -47,7 +50,7 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
StartTs: 1,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
ok := matcher.matchRow(commitRow1, true)
c.Assert(ok, check.IsFalse)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -60,7 +63,7 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
CommitTs: 3,
Key: []byte("k1"),
}
ok = matcher.matchRow(commitRow2)
ok = matcher.matchRow(commitRow2, true)
c.Assert(ok, check.IsTrue)
c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 2,
Expand Down Expand Up @@ -92,7 +95,7 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
CommitTs: 2,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
ok := matcher.matchRow(commitRow1, true)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -106,7 +109,7 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
defer testleak.AfterTest(c)()
matcher := newMatcher()
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
c.Assert(len(matcher.matchCachedRow(true)), check.Equals, 0)
matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -122,7 +125,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
CommitTs: 5,
Key: []byte("k3"),
})
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
c.Assert(len(matcher.matchCachedRow(true)), check.Equals, 0)

matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov3"),
})

c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{
c.Assert(matcher.matchCachedRow(true), check.DeepEquals, []*cdcpb.Event_Row{{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Expand All @@ -173,3 +176,123 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov2"),
}})
}

func TestMatchRowUninitialized(t *testing.T) {
t.Parallel()
matcher := newMatcher()

// fake prewrite before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
OldValue: []byte("v4"),
})
commitRow1 := &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1, false)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
}, commitRow1)
require.False(t, ok)
matcher.cacheCommitRow(commitRow1)

// actual prewrite before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
Value: []byte("v3"),
OldValue: []byte("v4"),
})

// normal prewrite and commit before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 2,
Key: []byte("k2"),
Value: []byte("v3"),
OldValue: []byte("v4"),
})
commitRow2 := &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k2"),
}
ok = matcher.matchRow(commitRow2, false)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k2"),
Value: []byte("v3"),
OldValue: []byte("v4"),
}, commitRow2)
require.True(t, ok)

// match cached row after init.
rows := matcher.matchCachedRow(true)
require.Len(t, rows, 1)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v3"),
OldValue: []byte("v4"),
}, rows[0])
}

func TestMatchMatchCachedRollbackRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.matchCachedRollbackRow(true)
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
})
matcher.matchCachedRollbackRow(true)

matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
})

matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
Value: []byte("v1"),
OldValue: []byte("ov1"),
})
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
})
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
Value: []byte("v3"),
OldValue: []byte("ov3"),
})

matcher.matchCachedRollbackRow(true)
require.Empty(t, matcher.unmatchedValue)
}
9 changes: 7 additions & 2 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (w *regionWorker) handleEventEntry(

state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
cachedEvents := state.matcher.matchCachedRow(state.initialized)
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue)
if err != nil {
Expand All @@ -665,6 +665,7 @@ func (w *regionWorker) handleEventEntry(
return errors.Trace(ctx.Err())
}
}
state.matcher.matchCachedRollbackRow(state.initialized)
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()
revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
Expand Down Expand Up @@ -699,7 +700,7 @@ func (w *regionWorker) handleEventEntry(
zap.Uint64("regionID", regionID))
return errUnreachable
}
ok := state.matcher.matchRow(entry)
ok := state.matcher.matchRow(entry, state.initialized)
if !ok {
if !state.initialized {
state.matcher.cacheCommitRow(entry)
Expand All @@ -724,6 +725,10 @@ func (w *regionWorker) handleEventEntry(
}
case cdcpb.Event_ROLLBACK:
w.metrics.metricPullEventRollbackCounter.Inc()
if !state.initialized {
state.matcher.cacheRollbackRow(entry)
continue
}
state.matcher.rollbackRow(entry)
}
}
Expand Down
Loading

0 comments on commit 4f6c273

Please sign in to comment.