Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

config, loader: auto-remove imported dump file #770

Merged
merged 28 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5f78915
config, loader: auto-remove imported dump file
lance6716 Jun 29, 2020
bdec08b
loader: switch to checkpoint.go to get fileEndOff
lance6716 Jul 1, 2020
0fb067b
misc: pass make check
lance6716 Jul 1, 2020
eb6c63f
*: add integration test
lance6716 Jul 1, 2020
8392150
misc: pass make check
lance6716 Jul 1, 2020
51e9b85
test: add new integration case
lance6716 Jul 1, 2020
a06d30e
loader: fix CI
lance6716 Jul 1, 2020
3937959
Merge branch 'master' into fix698
lance6716 Jul 2, 2020
16e0184
loader: remove structure file after all file loaded
lance6716 Jul 2, 2020
6a3254e
try default true
lance6716 Jul 2, 2020
f89c709
loader: fix CI
lance6716 Jul 2, 2020
daaee48
test: merge and remove reduntant tests
lance6716 Jul 2, 2020
2e9ba3e
misc: fix CI
lance6716 Jul 2, 2020
b3d21c6
Merge branch 'master' into fix698
lance6716 Jul 6, 2020
61ed2f3
Merge branch 'master' into fix698
lance6716 Jul 6, 2020
cf36503
address comments
lance6716 Jul 6, 2020
3d0ee2b
improve test case
lance6716 Jul 6, 2020
4e93f1f
improve test case
lance6716 Jul 6, 2020
2fb7012
Merge branch 'master' into fix698
lance6716 Jul 7, 2020
9c34cc8
Merge branch 'fix698' of github.com:lance6716/dm into fix698
lance6716 Jul 7, 2020
2be4923
Merge branch 'master' into fix698
lance6716 Jul 7, 2020
e454e49
Merge branch 'fix698' of github.com:lance6716/dm into fix698
lance6716 Jul 7, 2020
be9ba77
Merge branch 'master' into fix698
lance6716 Jul 8, 2020
4a3a922
fix CI
lance6716 Jul 8, 2020
25a34b7
Merge branch 'master' into fix698
lance6716 Jul 8, 2020
af60b8d
Merge branch 'master' into fix698
lance6716 Jul 9, 2020
52a19f4
address comments
lance6716 Jul 9, 2020
0c4a60f
address comments, test auto-clean work after worker restart
lance6716 Jul 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ type SubTaskConfig struct {

ConfigFile string `toml:"-" json:"config-file"`

RemoveFinishedDump bool `toml:"remove-finished-dump" json:"remove-finished-dump"`
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

// still needed by Syncer / Loader bin
printVersion bool
}
Expand Down
5 changes: 5 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ type TaskConfig struct {
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers"`

RemoveFinishedDump bool `yaml:"remove-finished-dump"`
}

// NewTaskConfig creates a TaskConfig
Expand All @@ -310,6 +312,7 @@ func NewTaskConfig() *TaskConfig {
Mydumpers: make(map[string]*MydumperConfig),
Loaders: make(map[string]*LoaderConfig),
Syncers: make(map[string]*SyncerConfig),
RemoveFinishedDump: false,
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
cfg.FlagSet = flag.NewFlagSet("task", flag.ContinueOnError)
return cfg
Expand Down Expand Up @@ -556,6 +559,8 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.LoaderConfig = *inst.Loader
cfg.SyncerConfig = *inst.Syncer

cfg.RemoveFinishedDump = c.RemoveFinishedDump

err := cfg.Adjust(true)
if err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
Expand Down
17 changes: 14 additions & 3 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,17 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
}

// fields[0] -> db name, fields[1] -> table name
schema, table := fields[0], fields[1]
sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table)
cp.logCtx.L().Info("initial checkpoint record",
zap.String("sql", sql2),
zap.String("id", cp.id),
zap.String("filename", filename),
zap.String("schema", fields[0]),
zap.String("table", fields[1]),
zap.String("schema", schema),
zap.String("table", table),
zap.Int64("offset", 0),
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
args := []interface{}{cp.id, filename, schema, table, 0, endPos}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
cp.connMutex.Unlock()
Expand All @@ -308,6 +309,16 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
}
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
// checkpoint not exists and no error, cache endPos in memory
if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
restoringFiles := tables[table]
restoringFiles[filename] = []int64{0, endPos}
return nil
}

Expand Down
22 changes: 20 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ type Tables2DataFiles map[string]DataFiles
type dataJob struct {
sql string
schema string
table string
file string
absPath string
offset int64
lastOffset int64
}
Expand Down Expand Up @@ -180,6 +182,18 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)

if w.cfg.RemoveFinishedDump {
fileInfos := w.checkPoint.GetRestoringFileInfo(job.schema, job.table)
if pos, ok := fileInfos[job.file]; ok {
if job.offset == pos[1] {
w.tctx.L().Info("try to remove loaded dump file", zap.String("data file", job.file))
os.Remove(job.absPath)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
w.tctx.L().Warn("file not recorded in checkpoint", zap.String("data file", job.file))
}
}
}
}
}
Expand Down Expand Up @@ -319,7 +333,9 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
j := &dataJob{
sql: query,
schema: table.targetSchema,
table: table.targetTable,
file: baseFile,
absPath: file,
offset: cur,
lastOffset: lastOffset,
}
Expand Down Expand Up @@ -1110,7 +1126,8 @@ func (l *Loader) restoreData(ctx context.Context) error {
if err != nil {
return err
}
l.logCtx.L().Info("finish to create schema", zap.String("schema file", dbFile))
l.logCtx.L().Info("finish to create schema, try to delete file", zap.String("schema file", dbFile))
os.Remove(dbFile)

tnames := make([]string, 0, len(tables))
for t := range tables {
Expand All @@ -1137,7 +1154,8 @@ func (l *Loader) restoreData(ctx context.Context) error {
if err != nil {
return err
}
l.logCtx.L().Info("finish to create table", zap.String("table file", tableFile))
l.logCtx.L().Info("finish to create table, try to delete file", zap.String("table file", tableFile))
os.Remove(tableFile)

restoringFiles := l.checkPoint.GetRestoringFileInfo(db, table)
l.logCtx.L().Debug("restoring table data", zap.String("schema", db), zap.String("table", table), zap.Reflect("data files", restoringFiles))
Expand Down
58 changes: 58 additions & 0 deletions tests/auto_remove_dump/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "full_mode"
tables = ["~t.*"]

[[table-config]]
schema = "full_mode"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "full_mode"
table = "t1"

[[table-config]]
schema = "full_mode"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "full_mode"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
3 changes: 3 additions & 0 deletions tests/auto_remove_dump/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
50 changes: 50 additions & 0 deletions tests/auto_remove_dump/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
name: test
task-mode: full
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
timezone: "Asia/Shanghai"
remove-finished-dump: true
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["full_mode"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
2 changes: 2 additions & 0 deletions tests/auto_remove_dump/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
2 changes: 2 additions & 0 deletions tests/auto_remove_dump/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
14 changes: 14 additions & 0 deletions tests/auto_remove_dump/conf/source1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Worker Configuration.

server-id = 101 # we don't give dm_full account REPLICATION SLAVE privilege here, so we must specify server-id here
source-id = "mysql-replica-01"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "dm_full"
password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="
port = 3306
14 changes: 14 additions & 0 deletions tests/auto_remove_dump/conf/source2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Worker Configuration.

server-id = 102 # we don't give dm_full_account REPLICATION SLAVE privilege here, so we must specify server-id here
source-id = "mysql-replica-02"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "dm_full"
password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="
port = 3307
29 changes: 29 additions & 0 deletions tests/auto_remove_dump/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
drop database if exists `full_mode`;
create database `full_mode`;
use `full_mode`;
create table t1 (id int, name varchar(20));
insert into t1 (id, name) values (1, 'arya'), (2, 'catelyn');
insert into t1 (id, name) values (3, 'Eddard Stark');
update t1 set name = 'Arya Stark' where id = 1;
update t1 set name = 'Catelyn Stark' where name = 'catelyn';

-- test multi column index with generated column
alter table t1 add column info json;
alter table t1 add column gen_id int as (info->"$.id");
alter table t1 add index multi_col(`id`, `gen_id`);
insert into t1 (id, name, info) values (4, 'gentest', '{"id": 123}');
insert into t1 (id, name, info) values (5, 'gentest', '{"id": 124}');
update t1 set info = '{"id": 120}' where id = 1;
update t1 set info = '{"id": 121}' where id = 2;
update t1 set info = '{"id": 122}' where id = 3;

-- test genColumnCache is reset after ddl
alter table t1 add column info2 varchar(40);
insert into t1 (id, name, info) values (6, 'gentest', '{"id": 125, "test cache": false}');
alter table t1 add unique key gen_idx(`gen_id`);
update t1 set name = 'gentestxx' where gen_id = 123;

insert into t1 (id, name, info) values (7, 'gentest', '{"id": 126}');
update t1 set name = 'gentestxxxxxx' where gen_id = 124;
-- delete with unique key
delete from t1 where gen_id > 124;
7 changes: 7 additions & 0 deletions tests/auto_remove_dump/data/db1.prepare.user.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
drop user if exists 'dm_full';
flush privileges;
create user 'dm_full'@'%' identified by '123456';
grant all privileges on *.* to 'dm_full'@'%';
revoke replication slave, replication client on *.* from 'dm_full'@'%';
revoke create temporary tables, lock tables, create routine, alter routine, event, create tablespace, file, shutdown, execute, process, index on *.* from 'dm_full'@'%'; # privileges not supported by TiDB
flush privileges;
5 changes: 5 additions & 0 deletions tests/auto_remove_dump/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
drop database if exists `full_mode`;
create database `full_mode`;
use `full_mode`;
create table t2 (id int auto_increment, name varchar(20), primary key (`id`));
insert into t2 (name) values ('Arya'), ('Bran'), ('Sansa');
7 changes: 7 additions & 0 deletions tests/auto_remove_dump/data/db2.prepare.user.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
drop user if exists 'dm_full';
flush privileges;
create user 'dm_full'@'%' identified by '123456';
grant all privileges on *.* to 'dm_full'@'%';
revoke replication slave, replication client on *.* from 'dm_full'@'%';
revoke create temporary tables, lock tables, create routine, alter routine, event, create tablespace, file, shutdown, execute, process, index on *.* from 'dm_full'@'%'; # privileges not supported by TiDB
flush privileges;
51 changes: 51 additions & 0 deletions tests/auto_remove_dump/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash

set -eu

cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

function run() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
run_sql_file $cur/data/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 7

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.toml $WORK_DIR/source1.toml
cp $cur/conf/source2.toml $WORK_DIR/source2.toml
sed -i "/relay-binlog-name/i\relay-dir = \"$WORK_DIR/worker1/relay_log\"" $WORK_DIR/source1.toml
sed -i "/relay-binlog-name/i\relay-dir = \"$WORK_DIR/worker2/relay_log\"" $WORK_DIR/source2.toml
dmctl_operate_source create $WORK_DIR/source1.toml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.toml $SOURCE_ID2

# start DM task only
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"

# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check auto_remove works
ls $WORK_DIR/worker1/dumped_data.test/*.sql && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test/*.sql && exit 1 || echo "worker2 auto removed dump files"
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

cleanup_data full_mode
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"