Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

streamer: add heartbeat for local streamer #1404

Merged
merged 2 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/streamer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/uuid"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
Expand Down Expand Up @@ -62,6 +63,11 @@ func (t *testReaderSuite) SetUpSuite(c *C) {
t.lastPos = 0
t.lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0")
c.Assert(err, IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil)
}

func (t *testReaderSuite) TearDownSuite(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)
}

func (t *testReaderSuite) TestParseFileBase(c *C) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package streamer

import (
"context"
"time"

"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -24,6 +26,10 @@ import (
"go.uber.org/zap"
)

var (
heartbeatInterval = 30 * time.Second
)

// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.

// Streamer provides the ability to get binlog event from remote server or local file.
Expand Down Expand Up @@ -51,7 +57,17 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent,
failpoint.Return(nil, terror.ErrSyncClosed.Generate())
})

failpoint.Inject("SetHeartbeatInterval", func(v failpoint.Value) {
i := v.(int)
log.L().Info("will change heartbeat interval", zap.Int("new", i))
heartbeatInterval = time.Duration(i) * time.Second
})

select {
case <-time.After(heartbeatInterval):
// MySQL will send heartbeat event 30s by default
heartbeatHeader := &replication.EventHeader{}
return event.GenHeartbeatEvent(heartbeatHeader), nil
case c := <-s.ch:
return c, nil
case s.err = <-s.ech:
Expand Down
20 changes: 20 additions & 0 deletions pkg/streamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/replication"

"github.com/pingcap/dm/pkg/binlog/event"
Expand All @@ -31,6 +32,11 @@ type testStreamerSuite struct {
}

func (t *testStreamerSuite) TestStreamer(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down Expand Up @@ -98,3 +104,17 @@ func (t *testStreamerSuite) TestStreamer(c *C) {
c.Assert(terror.ErrNeedSyncAgain.Equal(err), IsTrue)
c.Assert(ev2, IsNil)
}

func (t *testStreamerSuite) TestHeartbeat(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(1)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil)
}()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s := newLocalStreamer()
ev, err := s.GetEvent(ctx)
c.Assert(err, IsNil)
c.Assert(ev.Header.EventType, Equals, replication.HEARTBEAT_EVENT)
}