From 071bc72611e21e93629c04f28fb439ccd9cb4f38 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 21 Jun 2019 18:18:47 +0800 Subject: [PATCH] *: address comments --- relay/reader/error.go | 10 ---------- relay/reader/error_test.go | 13 ------------- relay/reader/reader.go | 24 +++++++++++++---------- relay/reader/reader_test.go | 21 ++++++++------------ relay/relay.go | 17 +++------------- relay/relay_test.go | 39 ++++++++----------------------------- 6 files changed, 33 insertions(+), 91 deletions(-) diff --git a/relay/reader/error.go b/relay/reader/error.go index 40d38c715a..db00892d18 100644 --- a/relay/reader/error.go +++ b/relay/reader/error.go @@ -19,16 +19,6 @@ import ( "github.com/pingcap/errors" ) -// isIgnorableError checks whether the error is ignorable. -func isIgnorableError(err error) bool { - err = errors.Cause(err) - switch err { - case context.Canceled: - return true - } - return false -} - // isRetryableError checks whether the error is retryable. func isRetryableError(err error) bool { err = errors.Cause(err) diff --git a/relay/reader/error_test.go b/relay/reader/error_test.go index 095bc3a297..f7eef0e287 100644 --- a/relay/reader/error_test.go +++ b/relay/reader/error_test.go @@ -27,19 +27,6 @@ var ( type testErrorSuite struct { } -func (t *testErrorSuite) TestIgnorable(c *check.C) { - err := errors.New("custom error") - c.Assert(isIgnorableError(err), check.IsFalse) - - cases := []error{ - context.Canceled, - errors.Annotate(context.Canceled, "annotated"), - } - for _, cs := range cases { - c.Assert(isIgnorableError(cs), check.IsTrue) - } -} - func (t *testErrorSuite) TestRetryable(c *check.C) { err := errors.New("custom error") c.Assert(isRetryableError(err), check.IsFalse) diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 34bf3cf2f7..a39fbb8151 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -16,6 +16,7 @@ package reader import ( "context" "sync" + "time" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" @@ -27,11 +28,14 @@ import ( "github.com/pingcap/dm/pkg/log" ) +const ( + // event timeout when trying to read events from upstream master server. + eventTimeout = 1 * time.Hour +) + // Result represents a read operation result. type Result struct { - Event *replication.BinlogEvent - ErrIgnorable bool // the error is ignorable - ErrRetryable bool // the error is retryable + Event *replication.BinlogEvent } // Reader reads binlog events from a upstream master server. @@ -48,7 +52,7 @@ type Reader interface { Close() error // GetEvent gets the binlog event one by one, it will block if no event can be read. - // You can pass a context (like Cancel or Timeout) to break the block. + // You can pass a context (like Cancel) to break the block. GetEvent(ctx context.Context) (Result, error) } @@ -121,7 +125,6 @@ func (r *reader) Close() error { } // GetEvent implements Reader.GetEvent. -// If some ignorable error occurred, the returned event and error both are nil. // NOTE: can only close the reader after this returned. func (r *reader) GetEvent(ctx context.Context) (Result, error) { r.mu.RLock() @@ -133,14 +136,15 @@ func (r *reader) GetEvent(ctx context.Context) (Result, error) { } for { - ev, err := r.in.GetEvent(ctx) - // NOTE: add retryable error support if needed later + ctx2, cancel2 := context.WithTimeout(ctx, eventTimeout) + ev, err := r.in.GetEvent(ctx2) + cancel2() + if err == nil { result.Event = ev - } else if isIgnorableError(err) { - result.ErrIgnorable = true } else if isRetryableError(err) { - result.ErrRetryable = true + log.Infof("[relay] get retryable error %v when reading binlog event", err) + continue } return result, errors.Trace(err) } diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index 4ea5e246dc..9502786d0f 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -122,18 +122,13 @@ func (t *testReaderSuite) TestGetEventWithError(c *check.C) { errOther := errors.New("other error") in := []error{ - context.Canceled, // ignorable - context.DeadlineExceeded, // retryable + context.Canceled, + context.DeadlineExceeded, // retried without return errOther, } - expected := []Result{ - { - ErrIgnorable: true, - }, - { - ErrRetryable: true, - }, - {}, + expected := []error{ + context.Canceled, + errOther, } err := r.Start() @@ -149,11 +144,11 @@ func (t *testReaderSuite) TestGetEventWithError(c *check.C) { } }() - results := make([]Result, 0, len(expected)) + results := make([]error, 0, len(expected)) for { - result, err2 := r.GetEvent(ctx) + _, err2 := r.GetEvent(ctx) c.Assert(err2, check.NotNil) - results = append(results, result) + results = append(results, errors.Cause(err2)) if err2 == errOther { break // all received } diff --git a/relay/relay.go b/relay/relay.go index 87f027c9cd..1aa0056c20 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -42,20 +42,16 @@ import ( ) var ( - // ErrNoIncompleteEventFound represents no incomplete event found in relay log file - ErrNoIncompleteEventFound = errors.New("no incomplete event found in relay log file") // used to fill RelayLogInfo fakeTaskName = "relay" ) const ( - eventTimeout = 1 * time.Hour slaveReadTimeout = 1 * time.Minute // slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout masterHeartbeatPeriod = 30 * time.Second // master server send heartbeat period: ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html flushMetaInterval = 30 * time.Second getMasterStatusInterval = 30 * time.Second trimUUIDsInterval = 1 * time.Hour - binlogHeaderSize = 4 showStatusConnectionTimeout = "1m" // dumpFlagSendAnnotateRowsEvent (BINLOG_SEND_ANNOTATE_ROWS_EVENT) request the MariaDB master to send Annotate_rows_log_event back. @@ -340,21 +336,14 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo for { // 1. reader events from upstream server - ctx2, cancel2 := context.WithTimeout(ctx, eventTimeout) readTimer := time.Now() - rResult, err := reader2.GetEvent(ctx2) - cancel2() + rResult, err := reader2.GetEvent(ctx) binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds()) if err != nil { - if rResult.ErrIgnorable { - log.Infof("[relay] get ignorable error %v when reading binlog event", err) + switch errors.Cause(err) { + case context.Canceled: return nil - } else if rResult.ErrRetryable { - log.Infof("[relay] get retryable error %v when reading binlog event", err) - continue - } - switch err { case replication.ErrChecksumMismatch: relayLogDataCorruptionCounter.Inc() case replication.ErrSyncClosed, replication.ErrNeedSyncAgain: diff --git a/relay/relay_test.go b/relay/relay_test.go index e4dcbeb5a0..45e48225a2 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -97,7 +97,7 @@ func (r *mockReader) Close() error { func (r *mockReader) GetEvent(ctx context.Context) (reader.Result, error) { select { case <-ctx.Done(): - return reader.Result{ErrIgnorable: true}, ctx.Err() + return reader.Result{}, ctx.Err() default: } return r.result, r.err @@ -296,23 +296,14 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() - // reader return with ignorable error - reader2.result.ErrIgnorable = true - reader2.err = errors.New("reader error for testing") - // return with `nil` - err := r.handleEvents(ctx, reader2, transformer2, writer2) - c.Assert(err, IsNil) - - // reader return with non-ignorable error - reader2.result.ErrIgnorable = false - // return with the annotated reader error + // reader return with an error for _, reader2.err = range []error{ errors.New("reader error for testing"), replication.ErrChecksumMismatch, replication.ErrSyncClosed, replication.ErrNeedSyncAgain, } { - err = r.handleEvents(ctx, reader2, transformer2, writer2) + err := r.handleEvents(ctx, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, reader2.err) } @@ -323,13 +314,13 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // writer return error writer2.err = errors.New("writer error for testing") // return with the annotated writer error - err = r.handleEvents(ctx, reader2, transformer2, writer2) + err := r.handleEvents(ctx, reader2, transformer2, writer2) c.Assert(errors.Cause(err), Equals, writer2.err) // writer without error writer2.err = nil err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout - c.Assert(err, IsNil) + c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) // check meta @@ -344,7 +335,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // write a QueryEvent with GTID sets reader2.result.Event = queryEv err = r.handleEvents(ctx2, reader2, transformer2, writer2) - c.Assert(err, IsNil) + c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) // check meta @@ -354,21 +345,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { c.Assert(pos.Pos, Equals, queryEv.Header.LogPos) c.Assert(gs.Origin(), DeepEquals, queryEv2.GSet) // got GTID sets - // reader return retryable error - reader2.result.ErrRetryable = true - reader2.err = errors.New("reader error for testing") - ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Millisecond) - defer cancel3() - err = r.handleEvents(ctx3, reader2, transformer2, writer2) - c.Assert(err, IsNil) - select { - case <-ctx3.Done(): - default: - c.Fatalf("retryable error for reader not retried") - } - // transformer return ignorable for the event - reader2.result.ErrIgnorable = false reader2.err = nil reader2.result.Event = &replication.BinlogEvent{ Header: &replication.EventHeader{EventType: replication.HEARTBEAT_EVENT}, @@ -376,7 +353,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel4() err = r.handleEvents(ctx4, reader2, transformer2, writer2) - c.Assert(err, IsNil) + c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx4.Done(): default: @@ -389,7 +366,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel5() err = r.handleEvents(ctx5, reader2, transformer2, writer2) - c.Assert(err, IsNil) + c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx5.Done(): default: