Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
streamer: add heartbeat for local streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Jan 27, 2021
1 parent 3cb465b commit 20cdc14
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/streamer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/uuid"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
Expand Down Expand Up @@ -62,6 +63,11 @@ func (t *testReaderSuite) SetUpSuite(c *C) {
t.lastPos = 0
t.lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0")
c.Assert(err, IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil)
}

func (t *testReaderSuite) TearDownSuite(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)
}

func (t *testReaderSuite) TestParseFileBase(c *C) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package streamer

import (
"context"
"time"

"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand Down Expand Up @@ -51,7 +53,21 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent,
failpoint.Return(nil, terror.ErrSyncClosed.Generate())
})

// MySQL will send heartbeat event 30s by default
heartbeatTicker := time.NewTicker(30 * time.Second)
heartbeatHeader := &replication.EventHeader{}
defer heartbeatTicker.Stop()

failpoint.Inject("SetHeartbeatInterval", func(v failpoint.Value) {
i := v.(int)
log.L().Info("will change heartbeat interval", zap.Int("new", i))
heartbeatTicker.Stop()
heartbeatTicker = time.NewTicker(time.Duration(i) * time.Second)
})

select {
case <-heartbeatTicker.C:
return event.GenHeartbeatEvent(heartbeatHeader), nil
case c := <-s.ch:
return c, nil
case s.err = <-s.ech:
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/replication"

"github.com/pingcap/dm/pkg/binlog/event"
Expand All @@ -31,6 +32,9 @@ type testStreamerSuite struct {
}

func (t *testStreamerSuite) TestStreamer(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil)
defer c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down Expand Up @@ -98,3 +102,15 @@ func (t *testStreamerSuite) TestStreamer(c *C) {
c.Assert(terror.ErrNeedSyncAgain.Equal(err), IsTrue)
c.Assert(ev2, IsNil)
}

func (t *testStreamerSuite) TestHeartbeat(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(1)"), IsNil)
defer c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s := newLocalStreamer() // with buffer
ev, err := s.GetEvent(ctx)
c.Assert(err, IsNil)
c.Assert(ev.Header.EventType, Equals, replication.HEARTBEAT_EVENT)
}

0 comments on commit 20cdc14

Please sign in to comment.