From 20cdc141bd2b2c3fb0736ae5f4def35182b9cab9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Jan 2021 14:20:46 +0800 Subject: [PATCH] streamer: add heartbeat for local streamer --- pkg/streamer/reader_test.go | 6 ++++++ pkg/streamer/streamer.go | 16 ++++++++++++++++ pkg/streamer/streamer_test.go | 16 ++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index f9d9e7d44f..863f471359 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/google/uuid" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -62,6 +63,11 @@ func (t *testReaderSuite) SetUpSuite(c *C) { t.lastPos = 0 t.lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0") c.Assert(err, IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) +} + +func (t *testReaderSuite) TearDownSuite(c *C) { + c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) } func (t *testReaderSuite) TestParseFileBase(c *C) { diff --git a/pkg/streamer/streamer.go b/pkg/streamer/streamer.go index b426eb17be..81418e31a5 100644 --- a/pkg/streamer/streamer.go +++ b/pkg/streamer/streamer.go @@ -15,7 +15,9 @@ package streamer import ( "context" + "time" + "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -51,7 +53,21 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, failpoint.Return(nil, terror.ErrSyncClosed.Generate()) }) + // MySQL will send heartbeat event 30s by default + heartbeatTicker := time.NewTicker(30 * time.Second) + heartbeatHeader := &replication.EventHeader{} + defer heartbeatTicker.Stop() + + failpoint.Inject("SetHeartbeatInterval", func(v failpoint.Value) { + i := v.(int) + log.L().Info("will change heartbeat interval", zap.Int("new", i)) + heartbeatTicker.Stop() + heartbeatTicker = time.NewTicker(time.Duration(i) * time.Second) + }) + select { + case <-heartbeatTicker.C: + return event.GenHeartbeatEvent(heartbeatHeader), nil case c := <-s.ch: return c, nil case s.err = <-s.ech: diff --git a/pkg/streamer/streamer_test.go b/pkg/streamer/streamer_test.go index e4c178cad5..b12ad17a7d 100644 --- a/pkg/streamer/streamer_test.go +++ b/pkg/streamer/streamer_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/siddontang/go-mysql/replication" "github.com/pingcap/dm/pkg/binlog/event" @@ -31,6 +32,9 @@ type testStreamerSuite struct { } func (t *testStreamerSuite) TestStreamer(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) + defer c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -98,3 +102,15 @@ func (t *testStreamerSuite) TestStreamer(c *C) { c.Assert(terror.ErrNeedSyncAgain.Equal(err), IsTrue) c.Assert(ev2, IsNil) } + +func (t *testStreamerSuite) TestHeartbeat(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(1)"), IsNil) + defer c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s := newLocalStreamer() // with buffer + ev, err := s.GetEvent(ctx) + c.Assert(err, IsNil) + c.Assert(ev.Header.EventType, Equals, replication.HEARTBEAT_EVENT) +}