diff --git a/cdc/txn/txn.go b/cdc/txn/txn.go index 4d80f650cb7..08f7fedf083 100644 --- a/cdc/txn/txn.go +++ b/cdc/txn/txn.go @@ -119,7 +119,6 @@ func CollectRawTxns( delete(entryGroups, ts) } } - // TODO: Handle the case when readyTsList is empty sort.Slice(readyTxns, func(i, j int) bool { return readyTxns[i].TS < readyTxns[j].TS }) @@ -129,6 +128,14 @@ func CollectRawTxns( return err } } + if len(readyTxns) == 0 { + log.Info("Forwarding fake txn", zap.Uint64("ts", resolvedTs)) + fakeTxn := RawTxn{ + TS: resolvedTs, + Entries: nil, + } + outputFn(ctx, fakeTxn) + } } } } diff --git a/cdc/txn/txn_test.go b/cdc/txn/txn_test.go index 5625b36cdb7..1b741e2f963 100644 --- a/cdc/txn/txn_test.go +++ b/cdc/txn/txn_test.go @@ -177,6 +177,40 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { c.Assert(string(txn.Entries[2].Key), check.Equals, "key1-3") } +func (cs *CollectRawTxnsSuite) TestShouldOutputBinlogEvenWhenThereIsNoRealEvent(c *check.C) { + entries := []kv.KvOrResolved{ + {Resolved: &kv.ResolvedSpan{Timestamp: 1024}}, + {Resolved: &kv.ResolvedSpan{Timestamp: 2000}}, + } + + cursor := 0 + input := func(ctx context.Context) (kv.KvOrResolved, error) { + if cursor >= len(entries) { + return kv.KvOrResolved{}, errors.New("End") + } + e := entries[cursor] + cursor++ + return e, nil + } + + var rawTxns []RawTxn + output := func(ctx context.Context, txn RawTxn) error { + rawTxns = append(rawTxns, txn) + return nil + } + + ctx := context.Background() + tracker := mockTracker{forwarded: []bool{true, true}} + err := CollectRawTxns(ctx, input, output, &tracker) + c.Assert(err, check.ErrorMatches, "End") + + c.Assert(rawTxns, check.HasLen, len(entries)) + for i, t := range rawTxns { + c.Assert(t.Entries, check.HasLen, 0) + c.Assert(t.TS, check.Equals, entries[i].Resolved.Timestamp) + } +} + type mountTxnsSuite struct{} var _ = check.Suite(&mountTxnsSuite{})