Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: use new Reader, Transformer and Writer #171

Merged
merged 31 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fec35ed
*: use `reader` to reader binlog from upstream; do not check gap anymore
csuzhangxc Jun 12, 2019
3e6a305
*: let `reader` handle more errors
csuzhangxc Jun 12, 2019
f22c105
*: use `transformer` and `writer`; TODO, handle duplicated PreviousGT…
csuzhangxc Jun 12, 2019
12d4cac
relay: fix meta update
csuzhangxc Jun 13, 2019
242ea36
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 17, 2019
fece692
relay: remove some useless code
csuzhangxc Jun 17, 2019
481b624
relay: refine code
csuzhangxc Jun 17, 2019
c5460f9
relay: test for handling events
csuzhangxc Jun 17, 2019
dd604c0
relay: move some test only code into _test.go file
csuzhangxc Jun 18, 2019
c314dd4
relay: extract and test for `isNewServer`
csuzhangxc Jun 18, 2019
92d9983
relay: test `reSetupMeta`
csuzhangxc Jun 18, 2019
67c2f7c
relay: test processing
csuzhangxc Jun 18, 2019
f255895
relay: try fix CI
csuzhangxc Jun 18, 2019
93ec22e
reader: try fix CI
csuzhangxc Jun 19, 2019
9f22aaa
reader: try fix CI
csuzhangxc Jun 19, 2019
9e108d7
relay: try fix CI
csuzhangxc Jun 19, 2019
8e83cf1
*: writer return value rather than pointer
csuzhangxc Jun 19, 2019
a18e395
*: recover relay log file before starting to read binlog events from …
csuzhangxc Jun 19, 2019
2f0de38
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 21, 2019
6c3fab5
*: address comments, add a `WaitSomething` function
csuzhangxc Jun 21, 2019
201f960
relay: address comments
csuzhangxc Jun 21, 2019
2e8ed95
relay: check GTID sets `Equal` before `Contain`
csuzhangxc Jun 21, 2019
f20bf92
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 21, 2019
fed7086
Merge branch 'master' into relay-writer-3
IANTHEREAL Jun 21, 2019
071bc72
*: address comments
csuzhangxc Jun 21, 2019
c93c496
Merge remote-tracking branch 'origin/relay-writer-3' into relay-writer-3
csuzhangxc Jun 21, 2019
0527d16
reader: address comment, change `eventTimeout`
csuzhangxc Jun 22, 2019
184d1b8
Update relay/relay.go
csuzhangxc Jun 26, 2019
f9b1663
*: address comments
csuzhangxc Jun 26, 2019
88f6455
Merge branch 'master' into relay-writer-3
csuzhangxc Jun 26, 2019
3bc0cc1
Merge branch 'master' into relay-writer-3
csuzhangxc Jun 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,115 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/errors"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/relay/purger"
"github.com/pingcap/errors"
)

type testRelay struct{}

var _ = Suite(&testRelay{})

/*********** dummy relay log process unit, used only for testing *************/

// DummyRelay is a dummy relay
type DummyRelay struct {
initErr error

processResult pb.ProcessResult
errorInfo *pb.RelayError
reloadErr error
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
}

// Init implements Process interface
func (d *DummyRelay) Init() error {
return d.initErr
}

// InjectInitError injects init error
func (d *DummyRelay) InjectInitError(err error) {
d.initErr = err
}

// Process implements Process interface
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) {
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we can use a simple channel send/receive instead of select with a single case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refined in f9b1663.

case <-ctx.Done():
pr <- d.processResult
}
}

// InjectProcessResult injects process result
func (d *DummyRelay) InjectProcessResult(result pb.ProcessResult) {
d.processResult = result
}

// SwitchMaster implements Process interface
func (d *DummyRelay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error {
return nil
}

// Migrate implements Process interface
func (d *DummyRelay) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error {
return nil
}

// ActiveRelayLog implements Process interface
func (d *DummyRelay) ActiveRelayLog() *pkgstreamer.RelayLogInfo {
return nil
}

// Reload implements Process interface
func (d *DummyRelay) Reload(newCfg *relay.Config) error {
return d.reloadErr
}

// InjectReloadError injects reload error
func (d *DummyRelay) InjectReloadError(err error) {
d.reloadErr = err
}

// Update implements Process interface
func (d *DummyRelay) Update(cfg *config.SubTaskConfig) error {
return nil
}

// Resume implements Process interface
func (d *DummyRelay) Resume(ctx context.Context, pr chan pb.ProcessResult) {}

// Pause implements Process interface
func (d *DummyRelay) Pause() {}

// Error implements Process interface
func (d *DummyRelay) Error() interface{} {
return d.errorInfo
}

// Status implements Process interface
func (d *DummyRelay) Status() interface{} {
return &pb.RelayStatus{
Stage: pb.Stage_New,
}
}

// Close implements Process interface
func (d *DummyRelay) Close() {}

// IsClosed implements Process interface
func (d *DummyRelay) IsClosed() bool { return false }

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = relay.NewDummyRelay
relay.NewRelay = NewDummyRelay
originNewPurger := purger.NewPurger
purger.NewPurger = purger.NewDummyPurger
defer func() {
Expand Down Expand Up @@ -62,7 +157,7 @@ func (t *testRelay) testInit(c *C, holder *realRelayHolder) {
_, err := holder.Init(nil)
c.Assert(err, IsNil)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

initErr := errors.New("init error")
Expand Down Expand Up @@ -106,7 +201,7 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
}

func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)
processResult := &pb.ProcessResult{
IsCanceled: true,
Expand Down Expand Up @@ -191,7 +286,7 @@ func (t *testRelay) testUpdate(c *C, holder *realRelayHolder) {
c.Assert(waitRelayStage(holder, originStage, 10), IsTrue)
c.Assert(holder.closed.Get(), Equals, closedFalse)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

err := errors.New("reload error")
Expand Down
21 changes: 17 additions & 4 deletions pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,23 @@ func (t *testTCPReaderSuite) setUpData(c *C) {
query := fmt.Sprintf("DROP DATABASE `%s`", dbName)
_, err := t.db.Exec(query)

// delete previous binlog files/events.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)
maxRetryCount := 5
for i := 0; i < maxRetryCount; i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can have a function in pkg/unit, like

func WaitSomething(backoff int, fn func() bool) bool {
	for i := 0; i < backoff; i++ {
		if fn() {
			return true
		}

		time.Sleep(10 * time.Millisecond)
	}

	return false
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 6c3fab5.

// delete previous binlog files/events. if other test cases writing events, they may be failed.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)

// check whether other test cases have wrote any events.
time.Sleep(time.Second)
_, gs, err2 := utils.GetMasterStatus(t.db, flavor)
c.Assert(err2, IsNil)
if len(gs.String()) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return gs.String() == "" is simpler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in f9b1663.

time.Sleep(5 * time.Second) // some events exist now, try again later.
} else {
break
}
}

// execute some SQL statements to generate binlog events.
query = fmt.Sprintf("CREATE DATABASE `%s`", dbName)
Expand Down
6 changes: 0 additions & 6 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,6 @@ func (r *BinlogReader) parseFile(

onEventFunc := func(e *replication.BinlogEvent) error {
log.Debugf("[streamer] read event %+v", e.Header)
if e.Header.Flags&0x0040 != 0 {
// now LOG_EVENT_RELAY_LOG_F is only used for events which used to fill the gap in relay log file when switching the master server
log.Debugf("skip event %+v created by relay writer", e.Header)
return nil
}

r.latestServerID = e.Header.ServerID // record server_id

switch e.Header.EventType {
Expand Down
11 changes: 3 additions & 8 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ import (
"io/ioutil"
"os"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/gtid"
)

var _ = Suite(&testRelaySuite{})
var _ = Suite(&testMetaSuite{})

func TestSuite(t *testing.T) {
TestingT(t)
}

type testRelaySuite struct {
type testMetaSuite struct {
}

type MetaTestCase struct {
Expand All @@ -41,7 +36,7 @@ type MetaTestCase struct {
gset gtid.Set
}

func (r *testRelaySuite) TestLocalMeta(c *C) {
func (r *testMetaSuite) TestLocalMeta(c *C) {
dir, err := ioutil.TempDir("", "test_local_meta")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)
Expand Down
10 changes: 10 additions & 0 deletions relay/reader/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ func isIgnorableError(err error) bool {
}
return false
}

// isRetryableError checks whether the error is retryable.
func isRetryableError(err error) bool {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename it to a better name? add some worker like retry from, it would be more clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let Reader itself to retry in 071bc72. keep its name still be isRetryableError.

err = errors.Cause(err)
switch err {
case context.DeadlineExceeded:
return true
}
return false
}
23 changes: 18 additions & 5 deletions relay/reader/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,39 @@ package reader
import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/check"
"github.com/pingcap/errors"
)

var (
_ = Suite(&testErrorSuite{})
_ = check.Suite(&testErrorSuite{})
)

type testErrorSuite struct {
}

func (t *testErrorSuite) TestIgnorable(c *C) {
func (t *testErrorSuite) TestIgnorable(c *check.C) {
err := errors.New("custom error")
c.Assert(isIgnorableError(err), IsFalse)
c.Assert(isIgnorableError(err), check.IsFalse)

cases := []error{
context.Canceled,
errors.Annotate(context.Canceled, "annotated"),
}
for _, cs := range cases {
c.Assert(isIgnorableError(cs), IsTrue)
c.Assert(isIgnorableError(cs), check.IsTrue)
}
}

func (t *testErrorSuite) TestRetryable(c *check.C) {
err := errors.New("custom error")
c.Assert(isRetryableError(err), check.IsFalse)

cases := []error{
context.DeadlineExceeded,
errors.Annotate(context.DeadlineExceeded, "annotated"),
}
for _, cs := range cases {
c.Assert(isRetryableError(cs), check.IsTrue)
}
}
23 changes: 16 additions & 7 deletions relay/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (
"github.com/pingcap/dm/pkg/log"
)

// Result represents a read operation result.
type Result struct {
Event *replication.BinlogEvent
ErrIgnorable bool // the error is ignorable
ErrRetryable bool // the error is retryable
}

// Reader reads binlog events from a upstream master server.
// The read binlog events should be send to a transformer.
// The reader should support:
Expand All @@ -42,7 +49,7 @@ type Reader interface {

// GetEvent gets the binlog event one by one, it will block if no event can be read.
// You can pass a context (like Cancel or Timeout) to break the block.
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
GetEvent(ctx context.Context) (Result, error)
}

// Config is the configuration used by the Reader.
Expand Down Expand Up @@ -116,24 +123,26 @@ func (r *reader) Close() error {
// GetEvent implements Reader.GetEvent.
// If some ignorable error occurred, the returned event and error both are nil.
// NOTE: can only close the reader after this returned.
func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) {
func (r *reader) GetEvent(ctx context.Context) (Result, error) {
r.mu.RLock()
defer r.mu.RUnlock()

var result Result
if r.stage != common.StagePrepared {
return nil, errors.Errorf("stage %s, expect %s, please start the reader first", r.stage, common.StagePrepared)
return result, errors.Errorf("stage %s, expect %s, please start the reader first", r.stage, common.StagePrepared)
}

for {
ev, err := r.in.GetEvent(ctx)
// NOTE: add retryable error support if needed later
if err == nil {
return ev, nil
result.Event = ev
} else if isIgnorableError(err) {
log.Warnf("[relay] get event with ignorable error %s", err)
return nil, nil // return without error and also without binlog event
result.ErrIgnorable = true
} else if isRetryableError(err) {
result.ErrRetryable = true
}
return nil, errors.Trace(err)
return result, errors.Trace(err)
}
}

Expand Down
Loading