Skip to content

Commit

Permalink
Output fake txn when no entry event comes before a resolved event (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored Nov 4, 2019
1 parent bdf6913 commit 00aabc6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
9 changes: 8 additions & 1 deletion cdc/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func CollectRawTxns(
delete(entryGroups, ts)
}
}
// TODO: Handle the case when readyTsList is empty
sort.Slice(readyTxns, func(i, j int) bool {
return readyTxns[i].TS < readyTxns[j].TS
})
Expand All @@ -129,6 +128,14 @@ func CollectRawTxns(
return err
}
}
if len(readyTxns) == 0 {
log.Info("Forwarding fake txn", zap.Uint64("ts", resolvedTs))
fakeTxn := RawTxn{
TS: resolvedTs,
Entries: nil,
}
outputFn(ctx, fakeTxn)
}
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions cdc/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,40 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) {
c.Assert(string(txn.Entries[2].Key), check.Equals, "key1-3")
}

func (cs *CollectRawTxnsSuite) TestShouldOutputBinlogEvenWhenThereIsNoRealEvent(c *check.C) {
entries := []kv.KvOrResolved{
{Resolved: &kv.ResolvedSpan{Timestamp: 1024}},
{Resolved: &kv.ResolvedSpan{Timestamp: 2000}},
}

cursor := 0
input := func(ctx context.Context) (kv.KvOrResolved, error) {
if cursor >= len(entries) {
return kv.KvOrResolved{}, errors.New("End")
}
e := entries[cursor]
cursor++
return e, nil
}

var rawTxns []RawTxn
output := func(ctx context.Context, txn RawTxn) error {
rawTxns = append(rawTxns, txn)
return nil
}

ctx := context.Background()
tracker := mockTracker{forwarded: []bool{true, true}}
err := CollectRawTxns(ctx, input, output, &tracker)
c.Assert(err, check.ErrorMatches, "End")

c.Assert(rawTxns, check.HasLen, len(entries))
for i, t := range rawTxns {
c.Assert(t.Entries, check.HasLen, 0)
c.Assert(t.TS, check.Equals, entries[i].Resolved.Timestamp)
}
}

type mountTxnsSuite struct{}

var _ = check.Suite(&mountTxnsSuite{})
Expand Down

0 comments on commit 00aabc6

Please sign in to comment.