diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index 87b840fe0b..d30ae23482 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -159,6 +159,33 @@ func GenFormatDescriptionEvent(header *replication.EventHeader, latestPos uint32 return ev, errors.Trace(err) } +// GenRotateEvent generates a RotateEvent. +// ref: https://dev.mysql.com/doc/internals/en/rotate-event.html +func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogName []byte, position uint64) (*replication.BinlogEvent, error) { + if len(nextLogName) == 0 { + return nil, errors.NotValidf("empty next binlog name") + } + + // Post-header + postHeader := new(bytes.Buffer) + err := binary.Write(postHeader, binary.LittleEndian, position) + if err != nil { + return nil, errors.Annotatef(err, "write position %d", position) + } + + // Payload + payload := new(bytes.Buffer) + err = binary.Write(payload, binary.LittleEndian, nextLogName) + if err != nil { + return nil, errors.Annotatef(err, "write next binlog name % X", nextLogName) + } + + buf := new(bytes.Buffer) + event := &replication.RotateEvent{} + ev, err := assembleEvent(buf, event, false, *header, replication.ROTATE_EVENT, latestPos, postHeader.Bytes(), payload.Bytes()) + return ev, errors.Trace(err) +} + // GenPreviousGTIDsEvent generates a PreviousGTIDsEvent. // go-mysql has no PreviousGTIDsEvent struct defined, so return the event's raw data instead. // MySQL has no internal doc for PREVIOUS_GTIDS_EVENT. diff --git a/pkg/binlog/event/event_test.go b/pkg/binlog/event/event_test.go index 5e4d8e3072..d890b7f8b8 100644 --- a/pkg/binlog/event/event_test.go +++ b/pkg/binlog/event/event_test.go @@ -115,6 +115,40 @@ func (t *testEventSuite) TestGenFormatDescriptionEvent(c *C) { c.Assert(err, IsNil) } +func (t *testEventSuite) TestGenRotateEvent(c *C) { + var ( + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + Flags: 0x01, + } + latestPos uint32 = 4 + nextLogName []byte // nil + position uint64 = 123 + ) + + // empty nextLogName, invalid + rotateEv, err := GenRotateEvent(header, latestPos, nextLogName, position) + c.Assert(err, NotNil) + c.Assert(rotateEv, IsNil) + + // valid nextLogName + nextLogName = []byte("mysql-bin.000010") + rotateEv, err = GenRotateEvent(header, latestPos, nextLogName, position) + c.Assert(err, IsNil) + c.Assert(rotateEv, NotNil) + + // verify the header + verifyHeader(c, rotateEv.Header, header, replication.ROTATE_EVENT, latestPos, uint32(len(rotateEv.RawData))) + + // verify the body + rotateEvBody, ok := rotateEv.Event.(*replication.RotateEvent) + c.Assert(ok, IsTrue) + c.Assert(rotateEvBody, NotNil) + c.Assert(rotateEvBody.NextLogName, DeepEquals, nextLogName) + c.Assert(rotateEvBody.Position, Equals, position) +} + func (t *testEventSuite) TestGenPreviousGTIDsEvent(c *C) { var ( header = &replication.EventHeader{ diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go new file mode 100644 index 0000000000..30b3c1fb91 --- /dev/null +++ b/pkg/binlog/reader/file.go @@ -0,0 +1,181 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// binlog events generator for MySQL used to generate some binlog events for tests. +// Readability takes precedence over performance. + +package reader + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + "github.com/siddontang/go/sync2" + + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" +) + +// FileReader is a binlog event reader which reads binlog events from a file. +type FileReader struct { + mu sync.RWMutex + wg sync.WaitGroup + + stage readerStage + readOffset sync2.AtomicUint32 + sendOffset sync2.AtomicUint32 + + parser *replication.BinlogParser + ch chan *replication.BinlogEvent + ech chan error + + ctx context.Context + cancel context.CancelFunc +} + +// FileReaderConfig is the configuration used by a FileReader. +type FileReaderConfig struct { + EnableRawMode bool + Timezone *time.Location + ChBufferSize int // event channel's buffer size + EchBufferSize int // error channel's buffer size +} + +// FileReaderStatus represents the status of a FileReader. +type FileReaderStatus struct { + Stage string `json:"stage"` + ReadOffset uint32 `json:"read-offset"` // read event's offset in the file + SendOffset uint32 `json:"send-offset"` // sent event's offset in the file +} + +// String implements Stringer.String. +func (s *FileReaderStatus) String() string { + data, err := json.Marshal(s) + if err != nil { + log.Errorf("[FileReaderStatus] marshal status to json error %v", err) + } + return string(data) +} + +// NewFileReader creates a FileReader instance. +func NewFileReader(cfg *FileReaderConfig) Reader { + parser := replication.NewBinlogParser() + parser.SetVerifyChecksum(true) + parser.SetUseDecimal(true) + parser.SetRawMode(cfg.EnableRawMode) + if cfg.Timezone != nil { + parser.SetTimestampStringLocation(cfg.Timezone) + } + return &FileReader{ + parser: parser, + ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize), + ech: make(chan error, cfg.EchBufferSize), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) + } + + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.wg.Add(1) + go func() { + defer r.wg.Done() + err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) + if err != nil { + log.Errorf("[file reader] parse binlog file with error %s", errors.ErrorStack(err)) + select { + case r.ech <- err: + case <-r.ctx.Done(): + } + } + }() + + r.stage = stagePrepared + return nil +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *FileReader) StartSyncByGTID(gSet gtid.Set) error { + // NOTE: may be supported later. + return errors.NotSupportedf("read from file by GTID") +} + +// Close implements Reader.Close. +func (r *FileReader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage == stageClosed { + return errors.New("already closed") + } + + r.cancel() + r.wg.Wait() + r.parser.Stop() + r.stage = stageClosed + return nil +} + +// GetEvent implements Reader.GetEvent. +func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared) + } + + select { + case ev := <-r.ch: + r.sendOffset.Set(ev.Header.LogPos) + return ev, nil + case err := <-r.ech: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Status implements Reader.Status. +func (r *FileReader) Status() interface{} { + r.mu.RLock() + stage := r.stage + r.mu.RUnlock() + + return &FileReaderStatus{ + Stage: stage.String(), + ReadOffset: r.readOffset.Get(), + SendOffset: r.sendOffset.Get(), + } +} + +func (r *FileReader) onEvent(ev *replication.BinlogEvent) error { + select { + case r.ch <- ev: + r.readOffset.Set(ev.Header.LogPos) + return nil + case <-r.ctx.Done(): + return r.ctx.Err() + } +} diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go new file mode 100644 index 0000000000..5e21bfea30 --- /dev/null +++ b/pkg/binlog/reader/file_test.go @@ -0,0 +1,290 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" +) + +var ( + _ = Suite(&testFileReaderSuite{}) +) + +type testFileReaderSuite struct { +} + +func (t *testFileReaderSuite) TestInterfaceMethods(c *C) { + var ( + cfg = &FileReaderConfig{} + gSet gtid.Set // nil GTID set + timeoutCtx, timeoutCancel = context.WithTimeout(context.Background(), 10*time.Second) + ) + defer timeoutCancel() + + r := NewFileReader(cfg) + c.Assert(r, NotNil) + + // check status, stageNew + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageNew.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + frStatusStr := frStatus.String() + c.Assert(frStatusStr, Matches, fmt.Sprintf(`.*"stage":"%s".*`, stageNew)) + + // not prepared + e, err := r.GetEvent(timeoutCtx) + c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", stageNew)) + c.Assert(e, IsNil) + + // by GTID, not supported yet + err = r.StartSyncByGTID(gSet) + c.Assert(err, ErrorMatches, ".*not supported.*") + + // by pos + err = r.StartSyncByPos(gmysql.Position{}) + c.Assert(err, IsNil) + + // check status, stagePrepared + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + + // re-prepare is invalid + err = r.StartSyncByPos(gmysql.Position{}) + c.Assert(err, NotNil) + + // binlog file not exists + e, err = r.GetEvent(timeoutCtx) + c.Assert(os.IsNotExist(errors.Cause(err)), IsTrue) + c.Assert(e, IsNil) + + // close the reader + c.Assert(r.Close(), IsNil) + + // check status, stageClosed + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageClosed.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + + // re-close is invalid + c.Assert(r.Close(), NotNil) +} + +func (t *testFileReaderSuite) TestGetEvent(c *C) { + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer timeoutCancel() + + // create a empty file + dir := c.MkDir() + filename := filepath.Join(dir, "mysql-bin-test.000001") + f, err := os.Create(filename) + c.Assert(err, IsNil) + defer f.Close() + + // start from the beginning + startPos := gmysql.Position{Name: filename} + + // no data can be read, EOF + r := NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err := r.GetEvent(timeoutCtx) + c.Assert(errors.Cause(err), Equals, io.EOF) + c.Assert(e, IsNil) + c.Assert(r.Close(), IsNil) // close the reader + + // writer a binlog file header + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + + // no valid events can be read, but can cancel it by the context argument + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + e, err = r.GetEvent(ctx) + c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) + c.Assert(e, IsNil) + c.Assert(r.Close(), IsNil) // close the reader + + // writer a FormatDescriptionEvent + header := &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: uint32(101), + } + latestPos := uint32(len(replication.BinLogFileHeader)) + formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) + c.Assert(err, IsNil) + c.Assert(formatDescEv, NotNil) + _, err = f.Write(formatDescEv.RawData) + c.Assert(err, IsNil) + latestPos = formatDescEv.Header.LogPos + + // got a FormatDescriptionEvent + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e, DeepEquals, formatDescEv) + c.Assert(r.Close(), IsNil) // close the reader + + // check status, stageClosed + fStat, err := f.Stat() + c.Assert(err, IsNil) + fSize := uint32(fStat.Size()) + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageClosed.String()) + c.Assert(frStatus.ReadOffset, Equals, fSize) + c.Assert(frStatus.SendOffset, Equals, fSize) + + // write two QueryEvent + var queryEv *replication.BinlogEvent + for i := 0; i < 2; i++ { + queryEv, err = event.GenQueryEvent( + header, latestPos, 0, 0, 0, nil, + []byte(fmt.Sprintf("schema-%d", i)), []byte(fmt.Sprintf("query-%d", i))) + c.Assert(err, IsNil) + c.Assert(queryEv, NotNil) + _, err = f.Write(queryEv.RawData) + c.Assert(err, IsNil) + latestPos = queryEv.Header.LogPos + } + + // read from the middle + startPos.Pos = latestPos - queryEv.Header.EventSize + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) // always got a FormatDescriptionEvent first + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, queryEv.RawData) // the last QueryEvent + c.Assert(r.Close(), IsNil) // close the reader + + // read from an invalid pos + startPos.Pos-- + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) // always got a FormatDescriptionEvent first + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, ErrorMatches, ".*EOF.*") +} + +func (t *testFileReaderSuite) TestWithChannelBuffer(c *C) { + var ( + cfg = &FileReaderConfig{ChBufferSize: 10} + timeoutCtx, timeoutCancel = context.WithTimeout(context.Background(), 10*time.Second) + ) + defer timeoutCancel() + + // create a empty file + dir := c.MkDir() + filename := filepath.Join(dir, "mysql-bin-test.000001") + f, err := os.Create(filename) + c.Assert(err, IsNil) + defer f.Close() + + // writer a binlog file header + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + + // writer a FormatDescriptionEvent + header := &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: uint32(101), + } + latestPos := uint32(len(replication.BinLogFileHeader)) + formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) + c.Assert(err, IsNil) + c.Assert(formatDescEv, NotNil) + _, err = f.Write(formatDescEv.RawData) + c.Assert(err, IsNil) + latestPos = formatDescEv.Header.LogPos + + // write channelBufferSize QueryEvent + var queryEv *replication.BinlogEvent + for i := 0; i < cfg.ChBufferSize; i++ { + queryEv, err = event.GenQueryEvent( + header, latestPos, 0, 0, 0, nil, + []byte(fmt.Sprintf("schema-%d", i)), []byte(fmt.Sprintf("query-%d", i))) + c.Assert(err, IsNil) + c.Assert(queryEv, NotNil) + _, err = f.Write(queryEv.RawData) + c.Assert(err, IsNil) + latestPos = queryEv.Header.LogPos + } + + r := NewFileReader(cfg) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(gmysql.Position{Name: filename}), IsNil) + time.Sleep(time.Second) // wait events to be read + + // check status, stagePrepared + readOffset := latestPos - queryEv.Header.EventSize // an FormatDescriptionEvent in the channel buffer + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, readOffset) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) // no event sent yet + + // get one event + e, err := r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e, NotNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) + time.Sleep(time.Second) // wait events to be read + + // check status, again + readOffset = latestPos // reach the end + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, readOffset) + c.Assert(frStatus.SendOffset, Equals, formatDescEv.Header.LogPos) // already get formatDescEv + + c.Assert(r.Close(), IsNil) +} diff --git a/pkg/binlog/reader/mock.go b/pkg/binlog/reader/mock.go new file mode 100644 index 0000000000..8776fb7bfd --- /dev/null +++ b/pkg/binlog/reader/mock.go @@ -0,0 +1,94 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" +) + +// MockReader is a binlog event reader which read binlog events from an input channel. +type MockReader struct { + ch chan *replication.BinlogEvent + ech chan error + + // returned error for methods + ErrStartByPos error + ErrStartByGTID error + ErrClose error +} + +// NewMockReader creates a MockReader instance. +func NewMockReader() Reader { + return &MockReader{ + ch: make(chan *replication.BinlogEvent), + ech: make(chan error), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *MockReader) StartSyncByPos(pos gmysql.Position) error { + return r.ErrStartByPos +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *MockReader) StartSyncByGTID(gSet gtid.Set) error { + return r.ErrStartByGTID +} + +// Close implements Reader.Close. +func (r *MockReader) Close() error { + return r.ErrClose +} + +// GetEvent implements Reader.GetEvent. +func (r *MockReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + select { + case ev := <-r.ch: + return ev, nil + case err := <-r.ech: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Status implements Reader.Status. +func (r *MockReader) Status() interface{} { + return nil +} + +// PushEvent pushes an event into the reader. +func (r *MockReader) PushEvent(ctx context.Context, ev *replication.BinlogEvent) error { + select { + case r.ch <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// PushError pushes an error into the reader. +func (r *MockReader) PushError(ctx context.Context, err error) error { + select { + case r.ech <- err: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go new file mode 100644 index 0000000000..8a4e8387ac --- /dev/null +++ b/pkg/binlog/reader/mock_test.go @@ -0,0 +1,123 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "errors" + "time" + + . "github.com/pingcap/check" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var ( + _ = Suite(&testMockReaderSuite{}) +) + +type testMockReaderSuite struct { +} + +type testMockCase struct { + ev *replication.BinlogEvent + err error +} + +func (t *testMockReaderSuite) TestRead(c *C) { + r := NewMockReader() + + // some interface methods do nothing + c.Assert(r.StartSyncByPos(mysql.Position{}), IsNil) + c.Assert(r.StartSyncByGTID(nil), IsNil) + c.Assert(r.Status(), IsNil) + c.Assert(r.Close(), IsNil) + + // replace with special error + mockR := r.(*MockReader) + errStartByPos := errors.New("special error for start by pos") + errStartByGTID := errors.New("special error for start by GTID") + errClose := errors.New("special error for close") + mockR.ErrStartByPos = errStartByPos + mockR.ErrStartByGTID = errStartByGTID + mockR.ErrClose = errClose + c.Assert(r.StartSyncByPos(mysql.Position{}), Equals, errStartByPos) + c.Assert(r.StartSyncByGTID(nil), Equals, errStartByGTID) + c.Assert(r.Close(), Equals, errClose) + + cases := []testMockCase{ + { + ev: &replication.BinlogEvent{ + RawData: []byte{1}, + }, + err: nil, + }, + { + ev: &replication.BinlogEvent{ + RawData: []byte{2}, + }, + err: nil, + }, + { + ev: nil, + err: errors.New("1"), + }, + { + ev: &replication.BinlogEvent{ + RawData: []byte{3}, + }, + err: nil, + }, + { + ev: nil, + err: errors.New("2"), + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + go func() { + for _, cs := range cases { + if cs.err != nil { + c.Assert(mockR.PushError(ctx, cs.err), IsNil) + } else { + c.Assert(mockR.PushEvent(ctx, cs.ev), IsNil) + } + } + }() + + obtained := make([]testMockCase, 0, len(cases)) + for { + ev, err := r.GetEvent(ctx) + if err != nil { + obtained = append(obtained, testMockCase{ev: nil, err: err}) + } else { + obtained = append(obtained, testMockCase{ev: ev, err: nil}) + } + c.Assert(ctx.Err(), IsNil) + if len(obtained) == len(cases) { + break + } + } + + c.Assert(obtained, DeepEquals, cases) + + cancel() // cancel manually + c.Assert(mockR.PushError(ctx, cases[0].err), Equals, ctx.Err()) + c.Assert(mockR.PushEvent(ctx, cases[0].ev), Equals, ctx.Err()) + ev, err := r.GetEvent(ctx) + c.Assert(ev, IsNil) + c.Assert(err, Equals, ctx.Err()) +} diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go index ddf91b8680..eba9de6b82 100644 --- a/pkg/binlog/reader/reader.go +++ b/pkg/binlog/reader/reader.go @@ -54,6 +54,7 @@ type Reader interface { StartSyncByGTID(gSet gtid.Set) error // Close closes the reader and release the resource. + // Close will be blocked if `GetEvent` has not returned. Close() error // GetEvent gets the binlog event one by one, it will block if no event can be read. diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index 0f554663a2..da20113a08 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" - "github.com/siddontang/go/sync2" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -32,12 +31,13 @@ import ( // TCPReader is a binlog event reader which read binlog events from a TCP stream. type TCPReader struct { - mu sync.Mutex - - stage sync2.AtomicInt32 syncerCfg replication.BinlogSyncerConfig - syncer *replication.BinlogSyncer - streamer *replication.BinlogStreamer + + mu sync.RWMutex + stage readerStage + + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer } // TCPReaderStatus represents the status of a TCPReader. @@ -68,8 +68,8 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -78,7 +78,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { } r.streamer = streamer - r.stage.Set(int32(stagePrepared)) + r.stage = stagePrepared return nil } @@ -87,8 +87,8 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } if gSet == nil { @@ -101,7 +101,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { } r.streamer = streamer - r.stage.Set(int32(stagePrepared)) + r.stage = stagePrepared return nil } @@ -110,7 +110,7 @@ func (r *TCPReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() == int32(stageClosed) { + if r.stage == stageClosed { return errors.New("already closed") } @@ -129,14 +129,17 @@ func (r *TCPReader) Close() error { } } - r.stage.Set(int32(stageClosed)) + r.stage = stageClosed return nil } // GetEvent implements Reader.GetEvent. func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared) } return r.streamer.GetEvent(ctx) @@ -144,14 +147,16 @@ func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, err // Status implements Reader.Status. func (r *TCPReader) Status() interface{} { - stage := r.stage.Get() + r.mu.RLock() + stage := r.stage + r.mu.RUnlock() var connID uint32 - if stage != int32(stageNew) { + if stage != stageNew { connID = r.syncer.LastConnectionID() } return &TCPReaderStatus{ - Stage: readerStage(stage).String(), + Stage: stage.String(), ConnID: connID, } } diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index 8fd3467989..2a41a30eb2 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -204,7 +204,7 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { UseDecimal: true, VerifyChecksum: true, } - gSet gtid.Set // nit GTID set + gSet gtid.Set // nil GTID set ) // the first reader diff --git a/relay/reader/error.go b/relay/reader/error.go new file mode 100644 index 0000000000..39c7bc2dcb --- /dev/null +++ b/relay/reader/error.go @@ -0,0 +1,30 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + "github.com/pingcap/errors" +) + +// isIgnorableError checks whether the error is ignorable. +func isIgnorableError(err error) bool { + err = errors.Cause(err) + switch err { + case context.Canceled: + return true + } + return false +} diff --git a/relay/reader/error_test.go b/relay/reader/error_test.go new file mode 100644 index 0000000000..6fd61bf194 --- /dev/null +++ b/relay/reader/error_test.go @@ -0,0 +1,41 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" +) + +var ( + _ = Suite(&testErrorSuite{}) +) + +type testErrorSuite struct { +} + +func (t *testErrorSuite) TestIgnorable(c *C) { + err := errors.New("custom error") + c.Assert(isIgnorableError(err), IsFalse) + + cases := []error{ + context.Canceled, + errors.Annotate(context.Canceled, "annotated"), + } + for _, cs := range cases { + c.Assert(isIgnorableError(cs), IsTrue) + } +} diff --git a/relay/reader/reader.go b/relay/reader/reader.go new file mode 100644 index 0000000000..68036f026d --- /dev/null +++ b/relay/reader/reader.go @@ -0,0 +1,149 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + br "github.com/pingcap/dm/pkg/binlog/reader" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" +) + +// Reader reads binlog events from a upstream master server. +// The read binlog events should be send to a transformer. +// The reader should support: +// 1. handle expected errors +// 2. do retry if possible +// NOTE: some errors still need to be handled in the outer caller. +type Reader interface { + // Start starts the reading process. + Start() error + + // Close closes the reader and release the resource. + Close() error + + // GetEvent gets the binlog event one by one, it will block if no event can be read. + // You can pass a context (like Cancel or Timeout) to break the block. + GetEvent(ctx context.Context) (*replication.BinlogEvent, error) +} + +// Config is the configuration used by the Reader. +type Config struct { + SyncConfig replication.BinlogSyncerConfig + Pos mysql.Position + GTIDs gtid.Set + EnableGTID bool + MasterID string // the identifier for the master, used when logging. +} + +// reader implements Reader interface. +type reader struct { + cfg *Config + + mu sync.RWMutex + stage readerStage + + in br.Reader // the underlying reader used to read binlog events. + out chan *replication.BinlogEvent +} + +// NewReader creates a Reader instance. +func NewReader(cfg *Config) Reader { + return &reader{ + cfg: cfg, + in: br.NewTCPReader(cfg.SyncConfig), + out: make(chan *replication.BinlogEvent), + } +} + +// Start implements Reader.Start. +func (r *reader) Start() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) + } + r.stage = stagePrepared + + defer func() { + status := r.in.Status() + log.Infof("[relay] set up binlog reader for master %s with status %s", r.cfg.MasterID, status) + }() + + var err error + if r.cfg.EnableGTID { + err = r.setUpReaderByGTID() + } else { + err = r.setUpReaderByPos() + } + + return errors.Trace(err) +} + +// Close implements Reader.Close. +func (r *reader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage == stageClosed { + return errors.New("already closed") + } + r.stage = stageClosed + + err := r.in.Close() + return errors.Trace(err) +} + +// GetEvent implements Reader.GetEvent. +// If some ignorable error occurred, the returned event and error both are nil. +// NOTE: can only close the reader after this returned. +func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s, please start the reader first", r.stage, stagePrepared) + } + + for { + ev, err := r.in.GetEvent(ctx) + // NOTE: add retryable error support if needed later + if err == nil { + return ev, nil + } else if isIgnorableError(err) { + log.Warnf("[relay] get event with ignorable error %s", err) + return nil, nil // return without error and also without binlog event + } + return nil, errors.Trace(err) + } +} + +func (r *reader) setUpReaderByGTID() error { + gs := r.cfg.GTIDs + log.Infof("[relay] start sync for master %s from GTID set %s", r.cfg.MasterID, gs) + return r.in.StartSyncByGTID(gs) +} + +func (r *reader) setUpReaderByPos() error { + pos := r.cfg.Pos + log.Infof("[relay] start sync for master %s from position %s", r.cfg.MasterID, pos) + return r.in.StartSyncByPos(pos) +} diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go new file mode 100644 index 0000000000..5135faace9 --- /dev/null +++ b/relay/reader/reader_test.go @@ -0,0 +1,159 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/replication" + + br "github.com/pingcap/dm/pkg/binlog/reader" +) + +var ( + _ = Suite(&testReaderSuite{}) +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testReaderSuite struct { +} + +func (t *testReaderSuite) TestInterface(c *C) { + cases := []*replication.BinlogEvent{ + {RawData: []byte{1}}, + {RawData: []byte{2}}, + {RawData: []byte{3}}, + } + + cfg := &Config{ + SyncConfig: replication.BinlogSyncerConfig{ + ServerID: 101, + }, + MasterID: "test-master", + } + + // test with position + r := NewReader(cfg) + t.testInterfaceWithReader(c, r, cases) + + // test with GTID + cfg.EnableGTID = true + r = NewReader(cfg) + t.testInterfaceWithReader(c, r, cases) +} + +func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*replication.BinlogEvent) { + // replace underlying reader with a mock reader for testing + concreteR := r.(*reader) + c.Assert(concreteR, NotNil) + mockR := br.NewMockReader() + concreteR.in = mockR + + // start reader + err := r.Start() + c.Assert(err, IsNil) + err = r.Start() // call multi times + c.Assert(err, NotNil) + + // getEvent by pushing event to mock reader + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + concreteMR := mockR.(*br.MockReader) + go func() { + for _, cs := range cases { + c.Assert(concreteMR.PushEvent(ctx, cs), IsNil) + } + }() + obtained := make([]*replication.BinlogEvent, 0, len(cases)) + for { + ev, err2 := r.GetEvent(ctx) + c.Assert(err2, IsNil) + obtained = append(obtained, ev) + if len(obtained) == len(cases) { + break + } + } + c.Assert(obtained, DeepEquals, cases) + + // close reader + err = r.Close() + c.Assert(err, IsNil) + err = r.Close() + c.Assert(err, NotNil) // call multi times + + // getEvent from a closed reader + ev, err := r.GetEvent(ctx) + c.Assert(err, NotNil) + c.Assert(ev, IsNil) +} + +func (t *testReaderSuite) TestGetEventWithError(c *C) { + cfg := &Config{ + SyncConfig: replication.BinlogSyncerConfig{ + ServerID: 101, + }, + MasterID: "test-master", + } + + r := NewReader(cfg) + // replace underlying reader with a mock reader for testing + concreteR := r.(*reader) + c.Assert(concreteR, NotNil) + mockR := br.NewMockReader() + concreteR.in = mockR + + errOther := errors.New("other error") + in := []error{ + context.DeadlineExceeded, // should be handled in the outer + context.Canceled, // ignorable + errOther, + } + expected := []error{ + context.DeadlineExceeded, + nil, // from ignorable + errOther, + } + + err := r.Start() + c.Assert(err, IsNil) + + // getEvent by pushing event to mock reader + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + concreteMR := mockR.(*br.MockReader) + go func() { + for _, cs := range in { + c.Assert(concreteMR.PushError(ctx, cs), IsNil) + } + }() + + obtained := make([]error, 0, len(expected)) + for { + ev, err2 := r.GetEvent(ctx) + err2 = errors.Cause(err2) + c.Assert(ev, IsNil) + obtained = append(obtained, err2) + if err2 == errOther { + break // all received + } + } + c.Assert(obtained, DeepEquals, expected) +} diff --git a/relay/reader/stage.go b/relay/reader/stage.go new file mode 100644 index 0000000000..d5be1ccabf --- /dev/null +++ b/relay/reader/stage.go @@ -0,0 +1,36 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +type readerStage int32 + +const ( + stageNew readerStage = iota + stagePrepared + stageClosed +) + +// String implements Stringer.String. +func (s readerStage) String() string { + switch s { + case stageNew: + return "new" + case stagePrepared: + return "prepared" + case stageClosed: + return "closed" + default: + return "unknown" + } +} diff --git a/relay/transformer/transformer.go b/relay/transformer/transformer.go new file mode 100644 index 0000000000..10b700725f --- /dev/null +++ b/relay/transformer/transformer.go @@ -0,0 +1,100 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/parser" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var ( + artificialFlag = uint16(0x0020) // LOG_EVENT_ARTIFICIAL_F +) + +// Result represents a transform result. +type Result struct { + Ignore bool // whether the event should be ignored + LogPos uint32 // binlog event's End_log_pos or Position in RotateEvent + NextLogName string // next binlog filename, only valid for RotateEvent + GTIDSet mysql.GTIDSet // GTIDSet got from QueryEvent and XIDEvent when RawModeEnabled not true + CanSaveGTID bool // whether can save GTID into meta, true for DDL query and XIDEvent +} + +// Transformer receives binlog events from a reader and transforms them. +// The transformed binlog events should be send to one or more writers. +// The transformer should support: +// 1. extract binlog position, GTID info from the event. +// 2. decide the event whether needed by a downstream writer. +// - the downstream writer may also drop some events according to its strategy. +// NOTE: more features maybe moved from outer into Transformer later. +type Transformer interface { + // Transform transforms a binlog event. + Transform(e *replication.BinlogEvent) *Result +} + +// transformer implements Transformer interface. +type transformer struct { + parser2 *parser.Parser // used to parse query statement +} + +// NewTransformer creates a Transformer instance. +func NewTransformer(parser2 *parser.Parser) Transformer { + return &transformer{ + parser2: parser2, + } +} + +// Transform implements Transformer.Transform. +func (t *transformer) Transform(e *replication.BinlogEvent) *Result { + result := &Result{ + LogPos: e.Header.LogPos, + } + + switch ev := e.Event.(type) { + case *replication.RotateEvent: + result.LogPos = uint32(ev.Position) // next event's position + result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name + if e.Header.Timestamp == 0 || e.Header.LogPos == 0 { + result.Ignore = true // ignore fake rotate event + } + case *replication.QueryEvent: + // when RawModeEnabled not true, QueryEvent will be parsed. + // even for `BEGIN`, we still update pos/GTID, but only save GTID for DDL. + result.GTIDSet = ev.GSet + isDDL := checkIsDDL(string(ev.Query), t.parser2) + if isDDL { + result.CanSaveGTID = true + } + case *replication.XIDEvent: + // when RawModeEnabled not true, XIDEvent will be parsed. + result.GTIDSet = ev.GSet + result.CanSaveGTID = true // need save GTID for XID + case *replication.GenericEvent: + // handle some un-parsed events + switch e.Header.EventType { + case replication.HEARTBEAT_EVENT: + // ignore artificial heartbeat event + // ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html + result.Ignore = true + } + default: + if e.Header.Flags&artificialFlag != 0 { + // ignore events with LOG_EVENT_ARTIFICIAL_F flag(0x0020) set + // ref: https://dev.mysql.com/doc/internals/en/binlog-event-flag.html + result.Ignore = true + } + } + return result +} diff --git a/relay/transformer/transformer_test.go b/relay/transformer/transformer_test.go new file mode 100644 index 0000000000..70e165898a --- /dev/null +++ b/relay/transformer/transformer_test.go @@ -0,0 +1,188 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "testing" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" +) + +var ( + _ = check.Suite(&testTransformerSuite{}) +) + +func TestSuite(t *testing.T) { + check.TestingT(t) +} + +type testTransformerSuite struct { +} + +type Case struct { + event *replication.BinlogEvent + result *Result +} + +func (t *testTransformerSuite) TestTransform(c *check.C) { + var ( + tran = NewTransformer(parser.New()) + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + Flags: 0x01, + } + latestPos uint32 = 456789 + gtidStr = "9f61c5f9-1eef-11e9-b6cf-0242ac140003:5" + gtidSet, _ = gtid.ParserGTID(mysql.MySQLFlavor, gtidStr) + schema = []byte("test_schema") + cases = make([]Case, 0, 10) + ) + + // RotateEvent + nextLogName := "mysql-bin.000123" + position := uint64(4) + ev, err := event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // fake RotateEvent with zero timestamp + header.Timestamp = 0 + ev, err = event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, // ignore fake RotateEvent + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + header.Timestamp = uint32(time.Now().Unix()) // set to non-zero + + // fake RotateEvent with zero logPos + fakeRotateHeader := replication.EventHeader{} + fakeRotateHeader = *header + ev, err = event.GenRotateEvent(&fakeRotateHeader, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + ev.Header.LogPos = 0 // set to zero + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, // ignore fake RotateEvent + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // QueryEvent for DDL + query := []byte("CREATE TABLE test_tbl (c1 INT)") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, check.IsNil) + ev.Event.(*replication.QueryEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // QueryEvent for non-DDL + query = []byte("BEGIN") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // XIDEvent + xid := uint64(135) + ev, err = event.GenXIDEvent(header, latestPos, xid) + c.Assert(err, check.IsNil) + ev.Event.(*replication.XIDEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // GenericEvent, non-HEARTBEAT_EVENT + ev = &replication.BinlogEvent{Header: header, Event: &replication.GenericEvent{}} + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // GenericEvent, HEARTBEAT_EVENT + genericHeader := replication.EventHeader{} + genericHeader = *header + ev = &replication.BinlogEvent{Header: &genericHeader, Event: &replication.GenericEvent{}} + ev.Header.EventType = replication.HEARTBEAT_EVENT + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, + LogPos: ev.Header.LogPos, + }, + }) + + // other event type without LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // other event type with LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, check.IsNil) + ev.Header.Flags |= artificialFlag + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, + LogPos: ev.Header.LogPos, + }, + }) + + for _, cs := range cases { + c.Assert(tran.Transform(cs.event), check.DeepEquals, cs.result) + } +} diff --git a/relay/transformer/util.go b/relay/transformer/util.go new file mode 100644 index 0000000000..e900baaf79 --- /dev/null +++ b/relay/transformer/util.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + + parserpkg "github.com/pingcap/dm/pkg/parser" + "github.com/pingcap/dm/pkg/utils" +) + +// checkIsDDL checks input SQL whether is a valid DDL statement +func checkIsDDL(sql string, p *parser.Parser) bool { + sql = utils.TrimCtrlChars(sql) + + // if parse error, treat it as not a DDL + stmts, err := parserpkg.Parse(p, sql, "", "") + if err != nil || len(stmts) == 0 { + return false + } + + stmt := stmts[0] + switch stmt.(type) { + case ast.DDLNode: + return true + default: + // other thing this like `BEGIN` + return false + } +} diff --git a/relay/transformer/util_test.go b/relay/transformer/util_test.go new file mode 100644 index 0000000000..34dfe14665 --- /dev/null +++ b/relay/transformer/util_test.go @@ -0,0 +1,57 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/check" + "github.com/pingcap/parser" +) + +var ( + _ = check.Suite(&testUtilSuite{}) +) + +type testUtilSuite struct { +} + +func (t *testUtilSuite) TestCheckIsDDL(c *check.C) { + var ( + cases = []struct { + sql string + isDDL bool + }{ + { + sql: "CREATE DATABASE test_is_ddl", + isDDL: true, + }, + { + sql: "BEGIN", + isDDL: false, + }, + { + sql: "INSERT INTO test_is_ddl.test_is_ddl_table VALUES (1)", + isDDL: false, + }, + { + sql: "INVAID SQL STATEMENT", + isDDL: false, + }, + } + parser2 = parser.New() + ) + + for _, cs := range cases { + c.Assert(checkIsDDL(cs.sql, parser2), check.Equals, cs.isDDL) + } +}