Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: support load table infos to save memory and add gc safepoint update (#1233) #1268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ tools/bin/revive: tools/check/go.mod

tools/bin/golangci-lint: tools/check/go.mod
cd tools/check; \
GOBIN=$(CURDIR)/tools/bin $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint
GOBIN=$(CURDIR)/tools/bin $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.53.3
1 change: 0 additions & 1 deletion drainer/binlog_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
return itemp
}

//
func (b *binlogItem) SetJob(job *model.Job) {
b.job = job
}
2 changes: 2 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`

// v2 filter rules
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
Expand Down Expand Up @@ -252,6 +253,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
Expand Down
4 changes: 2 additions & 2 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s (%s b
// CreateMarkDBDDL is DDL to create the database of mark table.
var CreateMarkDBDDL = "create database IF NOT EXISTS retl;"

//LoopBackSync loopback sync info
// LoopBackSync loopback sync info
type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
SyncDDL bool
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
// NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
Expand Down
49 changes: 49 additions & 0 deletions drainer/safepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package drainer

import (
"context"
"fmt"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

const (
drainerServiceSafePointPrefix = "drainer"
defaultDrainerGCSafePointTTL = 5 * 60
)

func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, cpt checkpoint.CheckPoint, ttl int64) {
updateInterval := time.Duration(ttl/2) * time.Second
tick := time.NewTicker(updateInterval)
defer tick.Stop()
dumplingServiceSafePointID := fmt.Sprintf("%s_%d", drainerServiceSafePointPrefix, time.Now().UnixNano())
log.Info("generate drainer gc safePoint id", zap.String("id", dumplingServiceSafePointID))

for {
snapshotTS := uint64(cpt.TS())
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", snapshotTS),
zap.Int64("ttl", ttl))
for retryCnt := 0; retryCnt <= 10; retryCnt++ {
_, err := pdClient.UpdateServiceGCSafePoint(ctx, dumplingServiceSafePointID, ttl, snapshotTS)
if err == nil {
break
}
log.Debug("update PD safePoint failed", zap.Error(err), zap.Int("retryTime", retryCnt))
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,

log.Debug("Handle job", zap.Stringer("job", job))

sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
if job.Query == "" {
log.Warn("job query is empty", zap.Stringer("job", job))
}

switch job.Type {
Expand Down
6 changes: 0 additions & 6 deletions drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,6 @@ func (t *schemaSuite) TestHandleDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))

// db info
dbInfo := &model.DBInfo{
ID: 2,
Expand Down
21 changes: 20 additions & 1 deletion drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tipb/go-binlog"
Expand Down Expand Up @@ -200,7 +201,12 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
}
defer tiStore.Close()

jobs, err := loadHistoryDDLJobs(tiStore)
var jobs []*model.Job
if cfg.LoadSchemaSnapshot {
jobs, err = loadTableInfos(tiStore, cp.TS())
} else {
jobs, err = loadHistoryDDLJobs(tiStore)
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -275,6 +281,19 @@ func (s *Server) Start() error {
}
})

if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
s.tg.GoNoPanic("gc_safepoint", func() {
defer func() { go s.Close() }()
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)
if err != nil {
log.Error("fail to create pdCli", zap.Error(err))
errCh <- err
}
updateServiceSafePoint(s.ctx, pdCli, s.cp, defaultDrainerGCSafePointTTL)
pdCli.Close()
})
}

s.tg.GoNoPanic("collect", func() {
defer func() { go s.Close() }()
s.collector.Start(s.ctx)
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
2 changes: 1 addition & 1 deletion drainer/translator/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
55 changes: 55 additions & 0 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
return jobs, nil
}

// loadTableInfos loads all table infos after startTs
func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
meta := getSnapshotMetaFromTs(tiStore, startTs)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, 0, len(dbinfos))
version := int64(1)
for _, dbinfo := range dbinfos {
log.L().Info("load db info", zap.Stringer("db", dbinfo.Name), zap.Int64("version", version))
jobs = append(jobs, mockCreateSchemaJob(dbinfo, version))
version++
}
for _, dbinfo := range dbinfos {
tableInfos, err := meta.ListTables(dbinfo.ID)
if err != nil {
return nil, errors.Trace(err)
}
if len(tableInfos) == 0 {
continue
}
for _, tableInfo := range tableInfos {
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
}
jobs = append(jobs, &model.Job{
Type: model.ActionCreateTables,
State: model.JobStateDone,
SchemaID: dbinfo.ID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: version,
MultipleTableInfos: tableInfos,
},
})
version++
}
return jobs, nil
}

func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
version, err := tiStore.CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
Expand All @@ -203,6 +242,11 @@ func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

func getSnapshotMetaFromTs(tiStore kv.Storage, ts int64) *meta.Meta {
snapshot := tiStore.GetSnapshot(kv.NewVersion(uint64(ts)))
return meta.NewSnapshotMeta(snapshot)
}

func genDrainerID(listenAddr string) (string, error) {
urllis, err := url.Parse(listenAddr)
if err != nil {
Expand Down Expand Up @@ -334,3 +378,14 @@ func combineFilterRules(filterRules []*bf.BinlogEventRule) []*bf.BinlogEventRule
}
return rules
}

func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
return &model.Job{
Type: model.ActionCreateSchema,
State: model.JobStateDone,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: schemaVersion,
DBInfo: dbInfo,
},
}
}
4 changes: 2 additions & 2 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func Merge(v bool) Option {
}
}

//DestinationDBType set destDBType option.
// DestinationDBType set destDBType option.
func DestinationDBType(t string) Option {
destDBType := DBTypeUnknown
if t == "oracle" {
Expand All @@ -209,7 +209,7 @@ func DestinationDBType(t string) Option {
}
}

//SetloopBackSyncInfo set loop back sync info of loader
// SetloopBackSyncInfo set loop back sync info of loader
func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option {
return func(o *options) {
o.loopBackSyncInfo = loopBackSyncInfo
Expand Down
2 changes: 1 addition & 1 deletion pkg/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func CreateDB(user string, password string, host string, port int, tls *tls.Conf
return CreateDBWithSQLMode(user, password, host, port, tls, nil, nil, time.Minute)
}

//CreateOracleDB create Oracle DB connection and return it
// CreateOracleDB create Oracle DB connection and return it
func CreateOracleDB(user string, password string, host string, port int, serviceName, connectString string) (db *gosql.DB, err error) {
loc, err := time.LoadLocation("Local")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func GetTidbPosition(db *sql.DB) (int64, error) {
return ts, nil
}

//GetOraclePosition return oracle scn
// GetOraclePosition return oracle scn
func GetOraclePosition(db *sql.DB) (int64, error) {
rows, err := db.Query("select dbms_flashback.get_system_change_number as current_scn from dual")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pump/storage/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newItemGenerator(txnNum int32, maxLatency int64, fakeTxnPerNum int32) <-cha
return items
}

/// sorter
// / sorter
type sortItem struct {
start int64
commit int64
Expand Down
6 changes: 3 additions & 3 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ func (vlog *valueLog) scanRequests(start valuePointer, fn func(*request) error)

// scan visits binlogs in order starting from the specified position.
// There are two limitations to the usage of scan:
// 1. Binlogs added in new logFiles after scan starts are not visible, so don't assume
// that every single binlog added would be visited
// 2. If GC is running concurrently, logFiles may be closed and deleted, thus breaking the scanning.
// 1. Binlogs added in new logFiles after scan starts are not visible, so don't assume
// that every single binlog added would be visited
// 2. If GC is running concurrently, logFiles may be closed and deleted, thus breaking the scanning.
func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record *Record) error) error {
vlog.gcLock.Lock()
defer vlog.gcLock.Unlock()
Expand Down