Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Jun 21, 2019
1 parent f20bf92 commit 071bc72
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 91 deletions.
10 changes: 0 additions & 10 deletions relay/reader/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@ import (
"github.com/pingcap/errors"
)

// isIgnorableError checks whether the error is ignorable.
func isIgnorableError(err error) bool {
err = errors.Cause(err)
switch err {
case context.Canceled:
return true
}
return false
}

// isRetryableError checks whether the error is retryable.
func isRetryableError(err error) bool {
err = errors.Cause(err)
Expand Down
13 changes: 0 additions & 13 deletions relay/reader/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,6 @@ var (
type testErrorSuite struct {
}

func (t *testErrorSuite) TestIgnorable(c *check.C) {
err := errors.New("custom error")
c.Assert(isIgnorableError(err), check.IsFalse)

cases := []error{
context.Canceled,
errors.Annotate(context.Canceled, "annotated"),
}
for _, cs := range cases {
c.Assert(isIgnorableError(cs), check.IsTrue)
}
}

func (t *testErrorSuite) TestRetryable(c *check.C) {
err := errors.New("custom error")
c.Assert(isRetryableError(err), check.IsFalse)
Expand Down
24 changes: 14 additions & 10 deletions relay/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reader
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
Expand All @@ -27,11 +28,14 @@ import (
"github.com/pingcap/dm/pkg/log"
)

const (
// event timeout when trying to read events from upstream master server.
eventTimeout = 1 * time.Hour
)

// Result represents a read operation result.
type Result struct {
Event *replication.BinlogEvent
ErrIgnorable bool // the error is ignorable
ErrRetryable bool // the error is retryable
Event *replication.BinlogEvent
}

// Reader reads binlog events from a upstream master server.
Expand All @@ -48,7 +52,7 @@ type Reader interface {
Close() error

// GetEvent gets the binlog event one by one, it will block if no event can be read.
// You can pass a context (like Cancel or Timeout) to break the block.
// You can pass a context (like Cancel) to break the block.
GetEvent(ctx context.Context) (Result, error)
}

Expand Down Expand Up @@ -121,7 +125,6 @@ func (r *reader) Close() error {
}

// GetEvent implements Reader.GetEvent.
// If some ignorable error occurred, the returned event and error both are nil.
// NOTE: can only close the reader after this returned.
func (r *reader) GetEvent(ctx context.Context) (Result, error) {
r.mu.RLock()
Expand All @@ -133,14 +136,15 @@ func (r *reader) GetEvent(ctx context.Context) (Result, error) {
}

for {
ev, err := r.in.GetEvent(ctx)
// NOTE: add retryable error support if needed later
ctx2, cancel2 := context.WithTimeout(ctx, eventTimeout)
ev, err := r.in.GetEvent(ctx2)
cancel2()

if err == nil {
result.Event = ev
} else if isIgnorableError(err) {
result.ErrIgnorable = true
} else if isRetryableError(err) {
result.ErrRetryable = true
log.Infof("[relay] get retryable error %v when reading binlog event", err)
continue
}
return result, errors.Trace(err)
}
Expand Down
21 changes: 8 additions & 13 deletions relay/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,13 @@ func (t *testReaderSuite) TestGetEventWithError(c *check.C) {

errOther := errors.New("other error")
in := []error{
context.Canceled, // ignorable
context.DeadlineExceeded, // retryable
context.Canceled,
context.DeadlineExceeded, // retried without return
errOther,
}
expected := []Result{
{
ErrIgnorable: true,
},
{
ErrRetryable: true,
},
{},
expected := []error{
context.Canceled,
errOther,
}

err := r.Start()
Expand All @@ -149,11 +144,11 @@ func (t *testReaderSuite) TestGetEventWithError(c *check.C) {
}
}()

results := make([]Result, 0, len(expected))
results := make([]error, 0, len(expected))
for {
result, err2 := r.GetEvent(ctx)
_, err2 := r.GetEvent(ctx)
c.Assert(err2, check.NotNil)
results = append(results, result)
results = append(results, errors.Cause(err2))
if err2 == errOther {
break // all received
}
Expand Down
17 changes: 3 additions & 14 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,16 @@ import (
)

var (
// ErrNoIncompleteEventFound represents no incomplete event found in relay log file
ErrNoIncompleteEventFound = errors.New("no incomplete event found in relay log file")
// used to fill RelayLogInfo
fakeTaskName = "relay"
)

const (
eventTimeout = 1 * time.Hour
slaveReadTimeout = 1 * time.Minute // slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout
masterHeartbeatPeriod = 30 * time.Second // master server send heartbeat period: ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
flushMetaInterval = 30 * time.Second
getMasterStatusInterval = 30 * time.Second
trimUUIDsInterval = 1 * time.Hour
binlogHeaderSize = 4
showStatusConnectionTimeout = "1m"

// dumpFlagSendAnnotateRowsEvent (BINLOG_SEND_ANNOTATE_ROWS_EVENT) request the MariaDB master to send Annotate_rows_log_event back.
Expand Down Expand Up @@ -340,21 +336,14 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo

for {
// 1. reader events from upstream server
ctx2, cancel2 := context.WithTimeout(ctx, eventTimeout)
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx2)
cancel2()
rResult, err := reader2.GetEvent(ctx)
binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds())

if err != nil {
if rResult.ErrIgnorable {
log.Infof("[relay] get ignorable error %v when reading binlog event", err)
switch errors.Cause(err) {
case context.Canceled:
return nil
} else if rResult.ErrRetryable {
log.Infof("[relay] get retryable error %v when reading binlog event", err)
continue
}
switch err {
case replication.ErrChecksumMismatch:
relayLogDataCorruptionCounter.Inc()
case replication.ErrSyncClosed, replication.ErrNeedSyncAgain:
Expand Down
39 changes: 8 additions & 31 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *mockReader) Close() error {
func (r *mockReader) GetEvent(ctx context.Context) (reader.Result, error) {
select {
case <-ctx.Done():
return reader.Result{ErrIgnorable: true}, ctx.Err()
return reader.Result{}, ctx.Err()
default:
}
return r.result, r.err
Expand Down Expand Up @@ -296,23 +296,14 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

// reader return with ignorable error
reader2.result.ErrIgnorable = true
reader2.err = errors.New("reader error for testing")
// return with `nil`
err := r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(err, IsNil)

// reader return with non-ignorable error
reader2.result.ErrIgnorable = false
// return with the annotated reader error
// reader return with an error
for _, reader2.err = range []error{
errors.New("reader error for testing"),
replication.ErrChecksumMismatch,
replication.ErrSyncClosed,
replication.ErrNeedSyncAgain,
} {
err = r.handleEvents(ctx, reader2, transformer2, writer2)
err := r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, reader2.err)
}

Expand All @@ -323,13 +314,13 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// writer return error
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
err = r.handleEvents(ctx, reader2, transformer2, writer2)
err := r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)

// writer without error
writer2.err = nil
err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout
c.Assert(err, IsNil)
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
// check meta
Expand All @@ -344,7 +335,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
// write a QueryEvent with GTID sets
reader2.result.Event = queryEv
err = r.handleEvents(ctx2, reader2, transformer2, writer2)
c.Assert(err, IsNil)
c.Assert(errors.Cause(err), Equals, ctx.Err())
// check written event
c.Assert(writer2.latestEvent, Equals, reader2.result.Event)
// check meta
Expand All @@ -354,29 +345,15 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
c.Assert(pos.Pos, Equals, queryEv.Header.LogPos)
c.Assert(gs.Origin(), DeepEquals, queryEv2.GSet) // got GTID sets

// reader return retryable error
reader2.result.ErrRetryable = true
reader2.err = errors.New("reader error for testing")
ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel3()
err = r.handleEvents(ctx3, reader2, transformer2, writer2)
c.Assert(err, IsNil)
select {
case <-ctx3.Done():
default:
c.Fatalf("retryable error for reader not retried")
}

// transformer return ignorable for the event
reader2.result.ErrIgnorable = false
reader2.err = nil
reader2.result.Event = &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.HEARTBEAT_EVENT},
Event: &replication.GenericEvent{}}
ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel4()
err = r.handleEvents(ctx4, reader2, transformer2, writer2)
c.Assert(err, IsNil)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx4.Done():
default:
Expand All @@ -389,7 +366,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel5()
err = r.handleEvents(ctx5, reader2, transformer2, writer2)
c.Assert(err, IsNil)
c.Assert(errors.Cause(err), Equals, ctx.Err())
select {
case <-ctx5.Done():
default:
Expand Down

0 comments on commit 071bc72

Please sign in to comment.