This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
*: add Reader
, Transformer
for relay log
#108
Merged
Merged
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
81d0b46
reader: simple wrapper for TCPReader to reader binlog events from master
csuzhangxc 0e7747e
reader: add a MockReader
csuzhangxc 9a6553a
reader: test reader with MockReader
csuzhangxc ef76773
*: test GTID backoff to position
csuzhangxc 9fa15ad
reader: get event handle retryable and ignorable error
csuzhangxc 33e22e0
reader: add test cases for error
csuzhangxc 1f5bc1e
event: add GenRotateEvent
csuzhangxc 76ba02c
*: add a transformer
csuzhangxc 0500128
reader: add a FileReader
csuzhangxc 2f9db5b
reader: add channel buffer size control for FileReader; add read/send…
csuzhangxc 86a3967
reader: rename struct
csuzhangxc af31c57
reader: tiny modify FileReader; add test case for FileReader
csuzhangxc de5dbf6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer
csuzhangxc 6bde450
reader: tiny fix; add more test cases for FileReader
csuzhangxc 93dd964
reader: DeadlineExceeded should be retry in the outer caller
csuzhangxc 1be78cb
reader: address comments
csuzhangxc 83fc175
reader: address comments
csuzhangxc 43a7808
reader: address comments
csuzhangxc 59a6a7b
Update relay/transformer/transformer.go
IANTHEREAL d0b6697
reader: change Mutex and Atomic to RWMutex
csuzhangxc ef1f976
Merge remote-tracking branch 'origin/relay-writer' into relay-writer
csuzhangxc 208aa36
*: refine code
csuzhangxc 1184ffa
Update pkg/binlog/reader/file.go
amyangfei ab3418c
Update relay/reader/reader.go
amyangfei 5edf31b
*: address comments
csuzhangxc b8c006a
reader: refine error msg
csuzhangxc a5e98b7
*: refine code
csuzhangxc 521ad40
reader: add FileReaderStatus string representation
csuzhangxc 0e318d1
reader: address comment
csuzhangxc 3f146d0
reader: address comment
csuzhangxc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
// Copyright 2019 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// binlog events generator for MySQL used to generate some binlog events for tests. | ||
// Readability takes precedence over performance. | ||
|
||
package reader | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
gmysql "github.com/siddontang/go-mysql/mysql" | ||
"github.com/siddontang/go-mysql/replication" | ||
"github.com/siddontang/go/sync2" | ||
|
||
"github.com/pingcap/dm/pkg/gtid" | ||
"github.com/pingcap/dm/pkg/log" | ||
) | ||
|
||
// FileReader is a binlog event reader which reads binlog events from a file. | ||
type FileReader struct { | ||
mu sync.RWMutex | ||
wg sync.WaitGroup | ||
|
||
stage readerStage | ||
readOffset sync2.AtomicUint32 | ||
sendOffset sync2.AtomicUint32 | ||
|
||
parser *replication.BinlogParser | ||
ch chan *replication.BinlogEvent | ||
ech chan error | ||
|
||
ctx context.Context | ||
cancel context.CancelFunc | ||
} | ||
|
||
// FileReaderConfig is the configuration used by a FileReader. | ||
type FileReaderConfig struct { | ||
EnableRawMode bool | ||
Timezone *time.Location | ||
ChBufferSize int // event channel's buffer size | ||
EchBufferSize int // error channel's buffer size | ||
} | ||
|
||
// FileReaderStatus represents the status of a FileReader. | ||
type FileReaderStatus struct { | ||
Stage string `json:"stage"` | ||
ReadOffset uint32 `json:"read-offset"` // read event's offset in the file | ||
SendOffset uint32 `json:"send-offset"` // sent event's offset in the file | ||
} | ||
|
||
// String implements Stringer.String. | ||
func (s *FileReaderStatus) String() string { | ||
data, err := json.Marshal(s) | ||
if err != nil { | ||
log.Errorf("[FileReaderStatus] marshal status to json error %v", err) | ||
} | ||
return string(data) | ||
} | ||
|
||
// NewFileReader creates a FileReader instance. | ||
func NewFileReader(cfg *FileReaderConfig) Reader { | ||
parser := replication.NewBinlogParser() | ||
parser.SetVerifyChecksum(true) | ||
parser.SetUseDecimal(true) | ||
parser.SetRawMode(cfg.EnableRawMode) | ||
if cfg.Timezone != nil { | ||
parser.SetTimestampStringLocation(cfg.Timezone) | ||
} | ||
return &FileReader{ | ||
parser: parser, | ||
ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize), | ||
ech: make(chan error, cfg.EchBufferSize), | ||
} | ||
} | ||
|
||
// StartSyncByPos implements Reader.StartSyncByPos. | ||
func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
if r.stage != stageNew { | ||
return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) | ||
} | ||
|
||
r.ctx, r.cancel = context.WithCancel(context.Background()) | ||
r.wg.Add(1) | ||
go func() { | ||
defer r.wg.Done() | ||
err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) | ||
if err != nil { | ||
log.Errorf("[file reader] parse binlog file with error %s", errors.ErrorStack(err)) | ||
select { | ||
case r.ech <- err: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at least output error information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed. log the error before the |
||
case <-r.ctx.Done(): | ||
} | ||
} | ||
}() | ||
|
||
r.stage = stagePrepared | ||
return nil | ||
} | ||
|
||
// StartSyncByGTID implements Reader.StartSyncByGTID. | ||
func (r *FileReader) StartSyncByGTID(gSet gtid.Set) error { | ||
// NOTE: may be supported later. | ||
return errors.NotSupportedf("read from file by GTID") | ||
} | ||
|
||
// Close implements Reader.Close. | ||
func (r *FileReader) Close() error { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
if r.stage == stageClosed { | ||
return errors.New("already closed") | ||
} | ||
|
||
r.parser.Stop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's better to put it after L135, even I know it's ok after I review its implementation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
r.cancel() | ||
r.wg.Wait() | ||
r.stage = stageClosed | ||
return nil | ||
} | ||
|
||
// GetEvent implements Reader.GetEvent. | ||
func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { | ||
r.mu.RLock() | ||
defer r.mu.RUnlock() | ||
|
||
if r.stage != stagePrepared { | ||
return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared) | ||
} | ||
|
||
select { | ||
case ev := <-r.ch: | ||
r.sendOffset.Set(ev.Header.LogPos) | ||
return ev, nil | ||
case err := <-r.ech: | ||
return nil, err | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
} | ||
|
||
// Status implements Reader.Status. | ||
func (r *FileReader) Status() interface{} { | ||
r.mu.RLock() | ||
stage := r.stage | ||
r.mu.RUnlock() | ||
|
||
return &FileReaderStatus{ | ||
Stage: stage.String(), | ||
ReadOffset: r.readOffset.Get(), | ||
SendOffset: r.sendOffset.Get(), | ||
} | ||
} | ||
|
||
func (r *FileReader) onEvent(ev *replication.BinlogEvent) error { | ||
select { | ||
case r.ch <- ev: | ||
r.readOffset.Set(ev.Header.LogPos) | ||
return nil | ||
case <-r.ctx.Done(): | ||
return r.ctx.Err() | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's difference between
latestPos
andposition
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestPos
is the previous end position of the latest events or the start position of this event.position
is the rotate target position for this event or the start position of the next event.