Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-2138-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 19, 2021
2 parents 126f6be + 578a147 commit fdd0204
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 578 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
linters:
enable:
- unconvert
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ tools/bin/errdoc-gen: tools/check/go.mod
cd tools/check; test -e ../bin/errdoc-gen || \
$(GO) build -o ../bin/errdoc-gen github.com/pingcap/errors/errdoc-gen

tools/bin/golangci-lint: tools/check/go.mod
tools/bin/golangci-lint:
cd tools/check; test -e ../bin/golangci-lint || \
$(GO) build -o ../bin/golangci-lint github.com/golangci/golangci-lint/cmd/golangci-lint
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b ../bin v1.30.0

failpoint-enable: check_failpoint_ctl
$(FAILPOINT_ENABLE)
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error {
return errors.Annotatef(
cerror.WrapError(cerror.ErrMarshalFailed, err), "Marshal data: %v", data)
}
info.Opts[mark.OptCyclicConfig] = string(cyclicCfg)
info.Opts[mark.OptCyclicConfig] = cyclicCfg
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,19 @@ func ColumnValueString(c interface{}) string {
case int32:
data = strconv.FormatInt(int64(v), 10)
case int64:
data = strconv.FormatInt(int64(v), 10)
data = strconv.FormatInt(v, 10)
case uint8:
data = strconv.FormatUint(uint64(v), 10)
case uint16:
data = strconv.FormatUint(uint64(v), 10)
case uint32:
data = strconv.FormatUint(uint64(v), 10)
case uint64:
data = strconv.FormatUint(uint64(v), 10)
data = strconv.FormatUint(v, 10)
case float32:
data = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
data = strconv.FormatFloat(float64(v), 'f', -1, 64)
data = strconv.FormatFloat(v, 'f', -1, 64)
case string:
data = v
case []byte:
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const (
// CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint.
CDCServiceSafePointID = "ticdc"
// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
GCSafepointUpdateInterval = time.Duration(2 * time.Second)
GCSafepointUpdateInterval = 2 * time.Second
// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
MinGCSafePointCacheUpdateInterval = time.Second * 2
)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if err != nil {
return errors.Trace(err)
}
opts[mark.OptCyclicConfig] = string(cyclicCfg)
opts[mark.OptCyclicConfig] = cyclicCfg
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
case <-ticker.C:
for _, pdEndpoint := range s.pdEndpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Second*10))
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/health", pdEndpoint), nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpo

// CreateTableSink creates a table sink
func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) Sink {
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if _, exist := m.tableSinks[tableID]; exist {
log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID)))
}
Expand All @@ -67,8 +69,6 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)
buffer: make([]*model.RowChangedEvent, 0, 128),
emittedTs: checkpointTs,
}
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
m.tableSinks[tableID] = sink
return sink
}
Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,14 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
var wg sync.WaitGroup
tableSinks := make([]Sink, goroutineNum)
for i := 0; i < goroutineNum; i++ {
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
i := i
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
}()
}
wg.Wait()
for i := 0; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
Expand Down
4 changes: 2 additions & 2 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func verifyTables(ctx context.Context, credential *security.Credential, cfg *con
}

for tID, tableName := range snap.CloneTables() {
tableInfo, exist := snap.TableByID(int64(tID))
tableInfo, exist := snap.TableByID(tID)
if !exist {
return nil, nil, errors.NotFoundf("table %d", int64(tID))
return nil, nil, errors.NotFoundf("table %d", tID)
}
if filter.ShouldIgnoreTable(tableName.Schema, tableName.Table) {
continue
Expand Down
17 changes: 9 additions & 8 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
filterV2 "github.com/pingcap/tidb-tools/pkg/table-filter"
)

// Filter is a event filter implementation
// Filter is a event filter implementation.
type Filter struct {
filter filterV2.Filter
ignoreTxnStartTs []uint64
ddlAllowlist []model.ActionType
isCyclicEnabled bool
}

// VerifyRules ...
// VerifyRules checks the filter rules in the configuration
// and returns an invalid rule error if the verification fails, otherwise it will return the parsed filter.
func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) {
var f filterV2.Filter
var err error
Expand All @@ -50,7 +51,7 @@ func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) {
return f, nil
}

// NewFilter creates a filter
// NewFilter creates a filter.
func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
f, err := VerifyRules(cfg)
if err != nil {
Expand Down Expand Up @@ -78,9 +79,9 @@ func (f *Filter) shouldIgnoreStartTs(ts uint64) bool {
}

// ShouldIgnoreTable returns true if the specified table should be ignored by this change feed.
// Set `tbl` to an empty string to test against the whole database.
// NOTICE: Set `tbl` to an empty string to test against the whole database.
func (f *Filter) ShouldIgnoreTable(db, tbl string) bool {
if IsSysSchema(db) {
if isSysSchema(db) {
return true
}
if f.isCyclicEnabled && mark.IsMarkTable(db, tbl) {
Expand Down Expand Up @@ -110,7 +111,7 @@ func (f *Filter) ShouldIgnoreDDLEvent(ts uint64, ddlType model.ActionType, schem
return f.shouldIgnoreStartTs(ts) || shouldIgnoreTableOrSchema
}

// ShouldDiscardDDL returns true if this DDL should be discarded
// ShouldDiscardDDL returns true if this DDL should be discarded.
func (f *Filter) ShouldDiscardDDL(ddlType model.ActionType) bool {
if !f.shouldDiscardByBuiltInDDLAllowlist(ddlType) {
return false
Expand Down Expand Up @@ -180,7 +181,7 @@ func (f *Filter) shouldDiscardByBuiltInDDLAllowlist(ddlType model.ActionType) bo
return true
}

// IsSysSchema returns true if the given schema is a system schema
func IsSysSchema(db string) bool {
// isSysSchema returns true if the given schema is a system schema
func isSysSchema(db string) bool {
return filterV1.IsSystemSchema(db)
}
1 change: 0 additions & 1 deletion tools/check/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/pingcap/tidb-cdc/_tools
go 1.13

require (
github.com/golangci/golangci-lint v1.33.0
github.com/mgechev/revive v1.0.2
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
mvdan.cc/gofumpt v0.0.0-20201123090407-3077abae40c0
Expand Down
Loading

0 comments on commit fdd0204

Please sign in to comment.