diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 8ea8e13248..44ec755007 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -123,6 +123,9 @@ ErrFailUpdateV1DBSchema,[code=11116:class=functional:scope=internal:level=medium ErrBinlogStatusVarsParse,[code=11117:class=functional:scope=internal:level=medium], "Message: fail to parse binglog status_vars: %v, offset: %d" ErrVerifyHandleErrorArgs,[code=11118:class=functional:scope=internal:level=low], "Workaround: Please make sure the args are correct." ErrRewriteSQL,[code=11119:class=functional:scope=internal:level=high], "Message: failed to rewrite SQL for target DB, stmt: %+v, targetTableNames: %+v" +ErrNoUUIDDirMatchGTID,[code=11120:class=functional:scope=internal:level=high], "Message: no relay subdir match gtid %s" +ErrNoRelayPosMatchGTID,[code=11121:class=functional:scope=internal:level=high], "Message: no relay pos match gtid %s" +ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low] ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`." ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format." ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format." @@ -428,12 +431,13 @@ ErrWorkerCacheDDLInfoExists,[code=40063:class=dm-worker:scope=internal:level=hig ErrWorkerExecSkipDDLConflict,[code=40064:class=dm-worker:scope=internal:level=high], "Message: execDDL and skipDDL can not specify both at the same time" ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high], "Message: only syncer support ExecuteDDL, but current unit is %s" ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high], "Message: ExecuteDDL timeout (exceeding %s), Workaround: Please try use `query-status` to query whether the DDL is still blocking." -ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s" +ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay to catch up with loader is timeout (exceeding %s), loader: %s, relay: %s" ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later." ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file." ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker" ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started" ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker" +ErrWorkerWaitRelayCatchupGTID,[code=40078:class=dm-worker:scope=internal:level=high], "Message: cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s" ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd" ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd" ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)" diff --git a/dm/pb/dmworker.pb.go b/dm/pb/dmworker.pb.go index 3809c12128..4dfabe291a 100644 --- a/dm/pb/dmworker.pb.go +++ b/dm/pb/dmworker.pb.go @@ -600,10 +600,11 @@ var xxx_messageInfo_DumpStatus proto.InternalMessageInfo // LoadStatus represents status for load unit type LoadStatus struct { - FinishedBytes int64 `protobuf:"varint,1,opt,name=finishedBytes,proto3" json:"finishedBytes,omitempty"` - TotalBytes int64 `protobuf:"varint,2,opt,name=totalBytes,proto3" json:"totalBytes,omitempty"` - Progress string `protobuf:"bytes,3,opt,name=progress,proto3" json:"progress,omitempty"` - MetaBinlog string `protobuf:"bytes,4,opt,name=metaBinlog,proto3" json:"metaBinlog,omitempty"` + FinishedBytes int64 `protobuf:"varint,1,opt,name=finishedBytes,proto3" json:"finishedBytes,omitempty"` + TotalBytes int64 `protobuf:"varint,2,opt,name=totalBytes,proto3" json:"totalBytes,omitempty"` + Progress string `protobuf:"bytes,3,opt,name=progress,proto3" json:"progress,omitempty"` + MetaBinlog string `protobuf:"bytes,4,opt,name=metaBinlog,proto3" json:"metaBinlog,omitempty"` + MetaBinlogGTID string `protobuf:"bytes,5,opt,name=metaBinlogGTID,proto3" json:"metaBinlogGTID,omitempty"` } func (m *LoadStatus) Reset() { *m = LoadStatus{} } @@ -667,6 +668,13 @@ func (m *LoadStatus) GetMetaBinlog() string { return "" } +func (m *LoadStatus) GetMetaBinlogGTID() string { + if m != nil { + return m.MetaBinlogGTID + } + return "" +} + // ShardingGroup represents a DDL sharding group, this is used by SyncStatus, and is differ from ShardingGroup in syncer pkg // target: target table name // DDL: in syncing DDL @@ -2419,127 +2427,127 @@ func init() { func init() { proto.RegisterFile("dmworker.proto", fileDescriptor_51a1b9e17fd67b10) } var fileDescriptor_51a1b9e17fd67b10 = []byte{ - // 1907 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0x41, 0x93, 0xdb, 0x58, - 0x11, 0xb6, 0x24, 0xdb, 0x63, 0xb7, 0xed, 0x89, 0xf2, 0x92, 0x5d, 0xcc, 0x10, 0xcc, 0x94, 0xb2, - 0xb5, 0x0c, 0x73, 0x98, 0x22, 0xc3, 0x52, 0x4b, 0x6d, 0x15, 0x10, 0x32, 0x93, 0x4d, 0x16, 0x1c, - 0x92, 0xc8, 0xc9, 0x72, 0xa4, 0x9e, 0xa5, 0x17, 0x8f, 0x6a, 0x64, 0x49, 0xd1, 0x93, 0x66, 0xca, - 0x07, 0xfe, 0x02, 0x70, 0xe1, 0x40, 0x15, 0x37, 0x8a, 0x2b, 0xc5, 0xaf, 0x00, 0x8e, 0x5b, 0x9c, - 0x38, 0x52, 0xc9, 0x89, 0xff, 0xc0, 0x81, 0xea, 0x7e, 0x4f, 0xd2, 0x73, 0xc6, 0x4e, 0xc8, 0x81, - 0x9b, 0xfa, 0xeb, 0x7e, 0xfd, 0xfa, 0xf5, 0xfb, 0xba, 0x5b, 0x12, 0xec, 0x86, 0xcb, 0xcb, 0x34, - 0x3f, 0x17, 0xf9, 0x51, 0x96, 0xa7, 0x45, 0xca, 0xec, 0x6c, 0xee, 0x1d, 0x00, 0x7b, 0x5a, 0x8a, - 0x7c, 0x35, 0x2b, 0x78, 0x51, 0x4a, 0x5f, 0xbc, 0x2c, 0x85, 0x2c, 0x18, 0x83, 0x76, 0xc2, 0x97, - 0x62, 0x6c, 0xed, 0x5b, 0x07, 0x7d, 0x9f, 0x9e, 0xbd, 0x0c, 0x6e, 0x9e, 0xa4, 0xcb, 0x65, 0x9a, - 0xfc, 0x82, 0x7c, 0xf8, 0x42, 0x66, 0x69, 0x22, 0x05, 0xfb, 0x10, 0xba, 0xb9, 0x90, 0x65, 0x5c, - 0x90, 0x75, 0xcf, 0xd7, 0x12, 0x73, 0xc1, 0x59, 0xca, 0xc5, 0xd8, 0x26, 0x17, 0xf8, 0x88, 0x96, - 0x32, 0x2d, 0xf3, 0x40, 0x8c, 0x1d, 0x02, 0xb5, 0x84, 0xb8, 0x8a, 0x6b, 0xdc, 0x56, 0xb8, 0x92, - 0xbc, 0x3f, 0x5b, 0x70, 0x63, 0x2d, 0xb8, 0xf7, 0xde, 0xf1, 0x13, 0x18, 0xaa, 0x3d, 0x94, 0x07, - 0xda, 0x77, 0x70, 0xec, 0x1e, 0x65, 0xf3, 0xa3, 0x99, 0x81, 0xfb, 0x6b, 0x56, 0xec, 0x53, 0x18, - 0xc9, 0x72, 0xfe, 0x8c, 0xcb, 0x73, 0xbd, 0xac, 0xbd, 0xef, 0x1c, 0x0c, 0x8e, 0xaf, 0xd3, 0x32, - 0x53, 0xe1, 0xaf, 0xdb, 0x79, 0x7f, 0xb2, 0x60, 0x70, 0x72, 0x26, 0x02, 0x2d, 0x63, 0xa0, 0x19, - 0x97, 0x52, 0x84, 0x55, 0xa0, 0x4a, 0x62, 0x37, 0xa1, 0x53, 0xa4, 0x05, 0x8f, 0x29, 0xd4, 0x8e, - 0xaf, 0x04, 0x36, 0x01, 0x90, 0x65, 0x10, 0x08, 0x29, 0x5f, 0x94, 0x31, 0x85, 0xda, 0xf1, 0x0d, - 0x04, 0xbd, 0xbd, 0xe0, 0x51, 0x2c, 0x42, 0x4a, 0x53, 0xc7, 0xd7, 0x12, 0x1b, 0xc3, 0xce, 0x25, - 0xcf, 0x93, 0x28, 0x59, 0x8c, 0x3b, 0xa4, 0xa8, 0x44, 0x5c, 0x11, 0x8a, 0x82, 0x47, 0xf1, 0xb8, - 0xbb, 0x6f, 0x1d, 0x0c, 0x7d, 0x2d, 0x79, 0x43, 0x80, 0xd3, 0x72, 0x99, 0xe9, 0xa8, 0x7f, 0x6d, - 0x01, 0x4c, 0x53, 0x1e, 0xea, 0xa0, 0x3f, 0x82, 0xd1, 0x8b, 0x28, 0x89, 0xe4, 0x99, 0x08, 0xef, - 0xad, 0x0a, 0x21, 0x29, 0x76, 0xc7, 0x5f, 0x07, 0x31, 0x58, 0x8a, 0x5a, 0x99, 0xd8, 0x64, 0x62, - 0x20, 0x6c, 0x0f, 0x7a, 0x59, 0x9e, 0x2e, 0x72, 0x21, 0xa5, 0xbe, 0xed, 0x5a, 0xc6, 0xb5, 0x4b, - 0x51, 0xf0, 0x7b, 0x51, 0x12, 0xa7, 0x0b, 0x7d, 0xe7, 0x06, 0xe2, 0xfd, 0xce, 0x82, 0xd1, 0xec, - 0x8c, 0xe7, 0x61, 0x94, 0x2c, 0x1e, 0xe4, 0x69, 0x99, 0xe1, 0x41, 0x0a, 0x9e, 0x2f, 0x44, 0xa1, - 0x19, 0xa9, 0x25, 0xe4, 0xe9, 0xe9, 0xe9, 0x14, 0xf7, 0x77, 0x90, 0xa7, 0xf8, 0xac, 0xe2, 0xcf, - 0x65, 0x31, 0x4d, 0x03, 0x5e, 0x44, 0x69, 0xa2, 0xb7, 0x5f, 0x07, 0x89, 0x8b, 0xab, 0x24, 0xa0, - 0x64, 0x3a, 0xc4, 0x45, 0x92, 0x30, 0xee, 0x32, 0xd1, 0x9a, 0x0e, 0x69, 0x6a, 0xd9, 0xfb, 0xa3, - 0x03, 0x30, 0x5b, 0x25, 0x81, 0x4e, 0xd4, 0x3e, 0x0c, 0xe8, 0xc0, 0xf7, 0x2f, 0x44, 0x52, 0x54, - 0x69, 0x32, 0x21, 0x74, 0x46, 0xe2, 0xb3, 0xac, 0x4a, 0x51, 0x2d, 0xb3, 0x5b, 0xd0, 0xcf, 0x45, - 0x20, 0x92, 0x02, 0x95, 0x0e, 0x29, 0x1b, 0x80, 0x79, 0x30, 0x5c, 0x72, 0x59, 0x88, 0x7c, 0x2d, - 0x49, 0x6b, 0x18, 0x3b, 0x04, 0xd7, 0x94, 0x1f, 0x14, 0x51, 0x48, 0x04, 0xe8, 0xfb, 0x57, 0x70, - 0xf4, 0x47, 0x87, 0xa8, 0xfc, 0x75, 0x95, 0x3f, 0x13, 0x43, 0x7f, 0xa6, 0x4c, 0xfe, 0x76, 0x94, - 0xbf, 0x37, 0x71, 0xf4, 0x37, 0x8f, 0xd3, 0xe0, 0x3c, 0x4a, 0x16, 0x74, 0x01, 0x3d, 0x4a, 0xd5, - 0x1a, 0xc6, 0x7e, 0x08, 0x6e, 0x99, 0xe4, 0x42, 0xa6, 0xf1, 0x85, 0x08, 0xe9, 0x1e, 0xe5, 0xb8, - 0x6f, 0x54, 0x92, 0x79, 0xc3, 0xfe, 0x15, 0x53, 0xe3, 0x86, 0x40, 0x15, 0x8f, 0xbe, 0xa1, 0x09, - 0xc0, 0x9c, 0x02, 0x79, 0xb6, 0xca, 0xc4, 0x78, 0xa0, 0xd8, 0xd3, 0x20, 0xde, 0x1f, 0x2c, 0x18, - 0x9a, 0xc5, 0x6d, 0xb4, 0x1d, 0x6b, 0x4b, 0xdb, 0xb1, 0xcd, 0xb6, 0xc3, 0xbe, 0x53, 0xb7, 0x17, - 0xd5, 0x2e, 0x28, 0xda, 0x27, 0x79, 0x8a, 0x75, 0xe8, 0x93, 0xa2, 0xee, 0x38, 0x77, 0x60, 0x90, - 0x8b, 0x98, 0xaf, 0xea, 0x3e, 0x81, 0xf6, 0xd7, 0xd0, 0xde, 0x6f, 0x60, 0xdf, 0xb4, 0xf1, 0xfe, - 0x66, 0xc3, 0xc0, 0x50, 0x5e, 0xb9, 0x69, 0xeb, 0x7f, 0xbc, 0x69, 0x7b, 0xcb, 0x4d, 0xef, 0x57, - 0x21, 0x95, 0xf3, 0xd3, 0x28, 0xd7, 0xe4, 0x37, 0xa1, 0xda, 0x62, 0x8d, 0x5a, 0x26, 0xc4, 0x0e, - 0xe0, 0x9a, 0x21, 0x1a, 0xc4, 0x7a, 0x13, 0x66, 0x47, 0xc0, 0x08, 0x3a, 0xe1, 0x45, 0x70, 0xf6, - 0x3c, 0x7b, 0x44, 0xd1, 0x10, 0xbb, 0x7a, 0xfe, 0x06, 0x0d, 0xfb, 0x16, 0x74, 0x64, 0xc1, 0x17, - 0x82, 0x88, 0xb5, 0x7b, 0xdc, 0x27, 0x22, 0x20, 0xe0, 0x2b, 0xdc, 0x48, 0x7e, 0xef, 0x1d, 0xc9, - 0xf7, 0xfe, 0x63, 0xc3, 0x68, 0xad, 0x1d, 0x6f, 0x1a, 0x5b, 0xcd, 0x8e, 0xf6, 0x96, 0x1d, 0xf7, - 0xa1, 0x5d, 0x26, 0x91, 0xba, 0xec, 0xdd, 0xe3, 0x21, 0xea, 0x9f, 0x27, 0x51, 0x81, 0x5c, 0xf2, - 0x49, 0x63, 0xc4, 0xd4, 0x7e, 0x17, 0x21, 0xbe, 0x0b, 0x37, 0x1a, 0x22, 0x9f, 0x9e, 0x4e, 0xa7, - 0x69, 0x70, 0xfe, 0xc5, 0xa9, 0xce, 0xde, 0x26, 0x15, 0x63, 0x6a, 0x68, 0x51, 0x41, 0x3e, 0x6c, - 0xa9, 0xb1, 0xf5, 0x6d, 0xe8, 0x04, 0x38, 0x46, 0x28, 0x4b, 0x9a, 0x50, 0xc6, 0x5c, 0x79, 0xd8, - 0xf2, 0x95, 0x9e, 0x7d, 0x04, 0xed, 0xb0, 0x5c, 0x66, 0x3a, 0x57, 0xbb, 0x68, 0xd7, 0x34, 0xf6, - 0x87, 0x2d, 0x9f, 0xb4, 0x68, 0x15, 0xa7, 0x3c, 0x1c, 0xf7, 0x1b, 0xab, 0xa6, 0xdf, 0xa3, 0x15, - 0x6a, 0xd1, 0x0a, 0x2b, 0x8c, 0xaa, 0x4d, 0x5b, 0x35, 0xcd, 0x0e, 0xad, 0x50, 0x7b, 0xaf, 0x07, - 0x5d, 0xa9, 0x88, 0xfc, 0x23, 0xb8, 0xbe, 0x96, 0xfd, 0x69, 0x24, 0x29, 0x55, 0x4a, 0x3d, 0xb6, - 0xb6, 0xcd, 0xcc, 0x6a, 0xfd, 0x04, 0x80, 0xce, 0x74, 0x3f, 0xcf, 0xd3, 0xbc, 0x9a, 0xdd, 0x56, - 0x3d, 0xbb, 0xbd, 0x6f, 0x42, 0x1f, 0xcf, 0xf2, 0x16, 0x35, 0x1e, 0x62, 0x9b, 0x3a, 0x83, 0x21, - 0x45, 0xff, 0x74, 0xba, 0xc5, 0x82, 0x1d, 0xc3, 0x4d, 0x35, 0x40, 0x15, 0x9d, 0x9f, 0xa4, 0x32, - 0xa2, 0x71, 0xa1, 0x0a, 0x6b, 0xa3, 0x0e, 0x1b, 0xba, 0x40, 0x77, 0xb3, 0xa7, 0xd3, 0x6a, 0xaa, - 0x55, 0xb2, 0xf7, 0x7d, 0xe8, 0xe3, 0x8e, 0x6a, 0xbb, 0x03, 0xe8, 0x92, 0xa2, 0xca, 0x83, 0x5b, - 0xa7, 0x53, 0x07, 0xe4, 0x6b, 0xbd, 0xf7, 0x1b, 0x0b, 0x06, 0xaa, 0x5d, 0xa9, 0x95, 0xef, 0xdb, - 0xad, 0xf6, 0xd7, 0x96, 0x57, 0xf5, 0x6e, 0x7a, 0x3c, 0x02, 0xa0, 0x86, 0xa3, 0x0c, 0xda, 0xcd, - 0xf5, 0x36, 0xa8, 0x6f, 0x58, 0xe0, 0xc5, 0x34, 0xd2, 0x86, 0xd4, 0xfe, 0xde, 0x86, 0xa1, 0xbe, - 0x52, 0x65, 0xf2, 0x7f, 0x2a, 0x3b, 0x5d, 0x19, 0x6d, 0xb3, 0x32, 0x3e, 0xae, 0x2a, 0xa3, 0xd3, - 0x1c, 0xa3, 0x61, 0x51, 0x53, 0x18, 0xb7, 0x75, 0x61, 0x74, 0xc9, 0x6c, 0x54, 0x15, 0x46, 0x65, - 0xa5, 0xea, 0xe2, 0xb6, 0xae, 0x8b, 0x9d, 0xc6, 0xa8, 0xa6, 0x54, 0x5d, 0x16, 0xb7, 0x75, 0x59, - 0xf4, 0x1a, 0xa3, 0xfa, 0x9a, 0xeb, 0xaa, 0xd8, 0x81, 0x0e, 0x5d, 0xa7, 0xf7, 0x19, 0xb8, 0x66, - 0x6a, 0xa8, 0x26, 0x3e, 0xd6, 0xca, 0x35, 0x2a, 0x18, 0x46, 0xbe, 0x5e, 0xfb, 0x12, 0x46, 0x6b, - 0x4d, 0x05, 0x27, 0x5d, 0x24, 0x4f, 0x78, 0x12, 0x88, 0xb8, 0x7e, 0x85, 0x34, 0x10, 0x83, 0x64, - 0x76, 0xe3, 0x59, 0xbb, 0x58, 0x23, 0x99, 0xf1, 0x22, 0xe8, 0xac, 0xbd, 0x08, 0xfe, 0xc3, 0x82, - 0xa1, 0xb9, 0x00, 0xdf, 0x25, 0xef, 0xe7, 0xf9, 0x49, 0x1a, 0xaa, 0xdb, 0xec, 0xf8, 0x95, 0x88, - 0xd4, 0xc7, 0xc7, 0x98, 0x4b, 0xa9, 0x19, 0x58, 0xcb, 0x5a, 0x37, 0x0b, 0xd2, 0xac, 0x7a, 0xb5, - 0xaf, 0x65, 0xad, 0x9b, 0x8a, 0x0b, 0x11, 0xeb, 0x51, 0x53, 0xcb, 0xb8, 0xdb, 0x23, 0x21, 0x25, - 0xd2, 0x44, 0x75, 0xc8, 0x4a, 0xc4, 0x55, 0x3e, 0xbf, 0x3c, 0xe1, 0xa5, 0x14, 0xfa, 0x5d, 0xa5, - 0x96, 0x31, 0x2d, 0xf8, 0x09, 0xc2, 0xf3, 0xb4, 0x4c, 0xaa, 0x37, 0x14, 0x03, 0xf1, 0x2e, 0xe1, - 0xfa, 0x93, 0x32, 0x5f, 0x08, 0x22, 0x71, 0xf5, 0x45, 0xb3, 0x07, 0xbd, 0x28, 0xe1, 0x41, 0x11, - 0x5d, 0x08, 0x9d, 0xc9, 0x5a, 0x46, 0xfe, 0x16, 0xd1, 0x52, 0xe8, 0x57, 0x34, 0x7a, 0x46, 0xfb, - 0x17, 0x51, 0x2c, 0x88, 0xd7, 0xfa, 0x48, 0x95, 0x4c, 0x25, 0xaa, 0xa6, 0xab, 0xfe, 0x5e, 0x51, - 0x92, 0xf7, 0x17, 0x0b, 0xf6, 0x1e, 0x67, 0x22, 0xe7, 0x85, 0x50, 0xdf, 0x48, 0xb3, 0xe0, 0x4c, - 0x2c, 0x79, 0x15, 0xc2, 0x2d, 0xb0, 0xd3, 0x8c, 0x36, 0xd7, 0x7c, 0x57, 0xea, 0xc7, 0x99, 0x6f, - 0xa7, 0x19, 0x05, 0xc1, 0xe5, 0xb9, 0xce, 0x2d, 0x3d, 0x6f, 0xfd, 0x60, 0xda, 0x83, 0x5e, 0xc8, - 0x0b, 0x3e, 0xe7, 0x52, 0x54, 0x39, 0xad, 0x64, 0xfa, 0xb6, 0xe0, 0xf3, 0xb8, 0xca, 0xa8, 0x12, - 0xc8, 0x13, 0xed, 0xa6, 0xb3, 0xa9, 0x25, 0xaf, 0x80, 0xd1, 0x97, 0x77, 0x34, 0x19, 0x1f, 0x89, - 0x82, 0xb3, 0x3d, 0x23, 0x48, 0xc0, 0x20, 0x51, 0xa3, 0x43, 0x7c, 0x67, 0x4d, 0x57, 0x8d, 0xc0, - 0x31, 0x1a, 0x41, 0x75, 0xae, 0x36, 0x11, 0x8f, 0x9e, 0xbd, 0x4f, 0xe0, 0xa6, 0xce, 0xd3, 0x97, - 0x77, 0x70, 0xd7, 0xad, 0x19, 0x52, 0x6a, 0xb5, 0xbd, 0xf7, 0x57, 0x0b, 0x3e, 0x78, 0x63, 0xd9, - 0x7b, 0x7f, 0x10, 0x7e, 0x0a, 0x6d, 0xfc, 0xd0, 0x18, 0x3b, 0x54, 0x30, 0xb7, 0x71, 0x8f, 0x8d, - 0x2e, 0x8f, 0x50, 0xb8, 0x9f, 0x14, 0xf9, 0xca, 0xa7, 0x05, 0x7b, 0x3f, 0x85, 0x7e, 0x0d, 0xa1, - 0xdf, 0x73, 0xb1, 0xaa, 0x7a, 0xe2, 0xb9, 0x58, 0xe1, 0xc4, 0xbe, 0xe0, 0x71, 0xa9, 0x52, 0xa3, - 0xc7, 0xde, 0x5a, 0x62, 0x7d, 0xa5, 0xff, 0xcc, 0xfe, 0x81, 0xe5, 0xfd, 0x0a, 0xc6, 0x0f, 0x79, - 0x12, 0xc6, 0x9a, 0x25, 0xaa, 0x54, 0x75, 0x0a, 0xbe, 0x61, 0xa4, 0x60, 0x80, 0x5e, 0x48, 0xfb, - 0x16, 0x8e, 0xdc, 0x82, 0xfe, 0xbc, 0x1a, 0x52, 0x3a, 0xf1, 0x0d, 0x80, 0x2b, 0xe4, 0xcb, 0x58, - 0xea, 0x8f, 0x1c, 0x7a, 0x3e, 0xfc, 0x25, 0x74, 0xd5, 0xa5, 0xb2, 0x11, 0xf4, 0xbf, 0x48, 0x2e, - 0x78, 0x1c, 0x85, 0x8f, 0x33, 0xb7, 0xc5, 0x7a, 0xd0, 0x9e, 0x15, 0x69, 0xe6, 0x5a, 0xac, 0x0f, - 0x9d, 0x27, 0x58, 0x6b, 0xae, 0xcd, 0x00, 0xba, 0xd8, 0x8e, 0x96, 0xc2, 0x75, 0x10, 0x9e, 0x15, - 0x3c, 0x2f, 0xdc, 0x36, 0xc2, 0xcf, 0xb3, 0x90, 0x17, 0xc2, 0xed, 0xb0, 0x5d, 0x80, 0x9f, 0x94, - 0x45, 0xaa, 0xcd, 0xba, 0x87, 0x2f, 0xc9, 0x6c, 0x21, 0x98, 0x0b, 0x43, 0xed, 0x9f, 0x64, 0xb7, - 0xc5, 0x76, 0xc0, 0xf9, 0xb9, 0xb8, 0x74, 0x2d, 0x36, 0x80, 0x1d, 0xbf, 0x4c, 0xf0, 0x2b, 0x55, - 0xed, 0x41, 0xdb, 0x85, 0xae, 0x83, 0x0a, 0x0c, 0x22, 0x13, 0xa1, 0xdb, 0x66, 0x43, 0xe8, 0x7d, - 0xae, 0x3f, 0x3b, 0xdd, 0x0e, 0xaa, 0xd0, 0x0c, 0xd7, 0x74, 0x51, 0x45, 0x1b, 0xa2, 0xb4, 0x73, - 0xf8, 0x18, 0x7a, 0xd5, 0xf4, 0x60, 0xd7, 0x60, 0xa0, 0x77, 0x45, 0xc8, 0x6d, 0x61, 0xd8, 0x34, - 0x23, 0x5c, 0x0b, 0x8f, 0x88, 0x73, 0xc0, 0xb5, 0xf1, 0x09, 0x9b, 0xbd, 0xeb, 0xd0, 0xb1, 0x57, - 0x49, 0xe0, 0xb6, 0xd1, 0x90, 0x9a, 0x86, 0x1b, 0x1e, 0x3e, 0x82, 0x1d, 0x7a, 0x7c, 0x8c, 0x59, - 0xdf, 0xd5, 0xfe, 0x34, 0xe2, 0xb6, 0x30, 0x73, 0x18, 0xa5, 0xb2, 0xb6, 0x30, 0x03, 0x74, 0x00, - 0x25, 0xdb, 0x18, 0x82, 0xca, 0x86, 0x02, 0x1c, 0x8c, 0xaf, 0xaa, 0x76, 0x76, 0x03, 0xae, 0x55, - 0x59, 0xd1, 0x90, 0x72, 0xf8, 0x40, 0x14, 0x0a, 0x70, 0x2d, 0xf2, 0x5f, 0x8b, 0x36, 0x26, 0xd2, - 0x17, 0xcb, 0xf4, 0x42, 0x68, 0xc4, 0x39, 0xbc, 0x0b, 0xbd, 0xaa, 0x38, 0x0c, 0x87, 0x15, 0x54, - 0x3b, 0x54, 0x80, 0x6b, 0x35, 0x1e, 0x34, 0x62, 0x1f, 0xde, 0xa5, 0x56, 0x8f, 0xdc, 0x32, 0x4e, - 0xa8, 0x11, 0x4d, 0x86, 0xf3, 0x28, 0xd3, 0x57, 0x25, 0xb2, 0x98, 0x07, 0x35, 0x1d, 0x2e, 0x44, - 0x5e, 0xb8, 0xce, 0xf1, 0xbf, 0x6d, 0xe8, 0x2a, 0x0a, 0xb3, 0xbb, 0x30, 0x30, 0xfe, 0xd4, 0xb0, - 0x0f, 0x91, 0xb9, 0x57, 0xff, 0x2b, 0xed, 0x7d, 0xed, 0x0a, 0xae, 0xca, 0xcd, 0x6b, 0xb1, 0x1f, - 0x03, 0x34, 0x5d, 0x9b, 0x7d, 0x40, 0xa3, 0xec, 0xcd, 0x2e, 0xbe, 0x37, 0xa6, 0x79, 0xbf, 0xe1, - 0x2f, 0x94, 0xd7, 0x62, 0x3f, 0x83, 0x91, 0x2e, 0x65, 0x95, 0x24, 0x36, 0x31, 0xaa, 0x7b, 0x43, - 0x3f, 0x7e, 0xab, 0xb3, 0xcf, 0x6b, 0x67, 0x2a, 0x5f, 0x6c, 0xbc, 0xa1, 0x55, 0x28, 0x37, 0x5f, - 0xdf, 0xda, 0x44, 0xbc, 0x16, 0x7b, 0x00, 0x03, 0x55, 0xea, 0x6a, 0xbc, 0xde, 0x42, 0xdb, 0x6d, - 0xb5, 0xff, 0xb6, 0x80, 0xee, 0x8d, 0xff, 0xfe, 0x6a, 0x62, 0x7d, 0xf5, 0x6a, 0x62, 0xfd, 0xeb, - 0xd5, 0xc4, 0xfa, 0xed, 0xeb, 0x49, 0xeb, 0xab, 0xd7, 0x93, 0xd6, 0x3f, 0x5f, 0x4f, 0x5a, 0xf3, - 0x2e, 0xfd, 0xcc, 0xfb, 0xde, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc2, 0x78, 0xee, 0x8f, 0xde, - 0x13, 0x00, 0x00, + // 1919 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0x4f, 0x73, 0x1b, 0x49, + 0x15, 0xd7, 0xcc, 0x48, 0xb2, 0xf4, 0x24, 0x3b, 0x93, 0x4e, 0x76, 0x11, 0x26, 0x08, 0xd7, 0x64, + 0x2b, 0x18, 0x1f, 0x5c, 0xc4, 0x2c, 0xb5, 0xd4, 0x56, 0x01, 0x21, 0x76, 0xd6, 0x59, 0x70, 0x70, + 0x32, 0x4a, 0x96, 0x23, 0xd5, 0x9a, 0xe9, 0xc8, 0x53, 0x1e, 0xcd, 0x4c, 0xa6, 0x67, 0xec, 0xd2, + 0x81, 0xcf, 0x00, 0x17, 0x0e, 0x54, 0x71, 0xa3, 0xb8, 0x52, 0x9c, 0xf8, 0x08, 0xc0, 0x31, 0xc5, + 0x89, 0x23, 0x95, 0x9c, 0xf8, 0x0e, 0x1c, 0xa8, 0xf7, 0xba, 0x67, 0xa6, 0x65, 0x4b, 0x09, 0x39, + 0x70, 0x9b, 0xf7, 0x7b, 0xaf, 0x5f, 0xbf, 0xfe, 0xf5, 0xfb, 0xa3, 0x16, 0x6c, 0x85, 0xf3, 0xcb, + 0x34, 0x3f, 0x17, 0xf9, 0x7e, 0x96, 0xa7, 0x45, 0xca, 0xec, 0x6c, 0xea, 0xed, 0x02, 0x7b, 0x56, + 0x8a, 0x7c, 0x31, 0x29, 0x78, 0x51, 0x4a, 0x5f, 0xbc, 0x2a, 0x85, 0x2c, 0x18, 0x83, 0x76, 0xc2, + 0xe7, 0x62, 0x64, 0xed, 0x58, 0xbb, 0x7d, 0x9f, 0xbe, 0xbd, 0x0c, 0x6e, 0x1f, 0xa6, 0xf3, 0x79, + 0x9a, 0xfc, 0x82, 0x7c, 0xf8, 0x42, 0x66, 0x69, 0x22, 0x05, 0xfb, 0x18, 0xba, 0xb9, 0x90, 0x65, + 0x5c, 0x90, 0x75, 0xcf, 0xd7, 0x12, 0x73, 0xc1, 0x99, 0xcb, 0xd9, 0xc8, 0x26, 0x17, 0xf8, 0x89, + 0x96, 0x32, 0x2d, 0xf3, 0x40, 0x8c, 0x1c, 0x02, 0xb5, 0x84, 0xb8, 0x8a, 0x6b, 0xd4, 0x56, 0xb8, + 0x92, 0xbc, 0x3f, 0x59, 0x70, 0x6b, 0x29, 0xb8, 0x0f, 0xde, 0xf1, 0x53, 0x18, 0xaa, 0x3d, 0x94, + 0x07, 0xda, 0x77, 0x70, 0xe0, 0xee, 0x67, 0xd3, 0xfd, 0x89, 0x81, 0xfb, 0x4b, 0x56, 0xec, 0x33, + 0xd8, 0x94, 0xe5, 0xf4, 0x39, 0x97, 0xe7, 0x7a, 0x59, 0x7b, 0xc7, 0xd9, 0x1d, 0x1c, 0xdc, 0xa4, + 0x65, 0xa6, 0xc2, 0x5f, 0xb6, 0xf3, 0xfe, 0x68, 0xc1, 0xe0, 0xf0, 0x4c, 0x04, 0x5a, 0xc6, 0x40, + 0x33, 0x2e, 0xa5, 0x08, 0xab, 0x40, 0x95, 0xc4, 0x6e, 0x43, 0xa7, 0x48, 0x0b, 0x1e, 0x53, 0xa8, + 0x1d, 0x5f, 0x09, 0x6c, 0x0c, 0x20, 0xcb, 0x20, 0x10, 0x52, 0xbe, 0x2c, 0x63, 0x0a, 0xb5, 0xe3, + 0x1b, 0x08, 0x7a, 0x7b, 0xc9, 0xa3, 0x58, 0x84, 0x44, 0x53, 0xc7, 0xd7, 0x12, 0x1b, 0xc1, 0xc6, + 0x25, 0xcf, 0x93, 0x28, 0x99, 0x8d, 0x3a, 0xa4, 0xa8, 0x44, 0x5c, 0x11, 0x8a, 0x82, 0x47, 0xf1, + 0xa8, 0xbb, 0x63, 0xed, 0x0e, 0x7d, 0x2d, 0x79, 0x43, 0x80, 0xa3, 0x72, 0x9e, 0xe9, 0xa8, 0xff, + 0x62, 0x01, 0x9c, 0xa4, 0x3c, 0xd4, 0x41, 0x7f, 0x02, 0x9b, 0x2f, 0xa3, 0x24, 0x92, 0x67, 0x22, + 0x7c, 0xb8, 0x28, 0x84, 0xa4, 0xd8, 0x1d, 0x7f, 0x19, 0xc4, 0x60, 0x29, 0x6a, 0x65, 0x62, 0x93, + 0x89, 0x81, 0xb0, 0x6d, 0xe8, 0x65, 0x79, 0x3a, 0xcb, 0x85, 0x94, 0xfa, 0xb6, 0x6b, 0x19, 0xd7, + 0xce, 0x45, 0xc1, 0x1f, 0x46, 0x49, 0x9c, 0xce, 0xf4, 0x9d, 0x1b, 0x08, 0xbb, 0x07, 0x5b, 0x8d, + 0x74, 0xfc, 0xfc, 0xcb, 0x23, 0x3a, 0x57, 0xdf, 0xbf, 0x82, 0x7a, 0xbf, 0xb5, 0x60, 0x73, 0x72, + 0xc6, 0xf3, 0x30, 0x4a, 0x66, 0xc7, 0x79, 0x5a, 0x66, 0x78, 0xe0, 0x82, 0xe7, 0x33, 0x51, 0xe8, + 0xcc, 0xd5, 0x12, 0xe6, 0xf3, 0xd1, 0xd1, 0x09, 0xc6, 0xe9, 0x60, 0x3e, 0xe3, 0xb7, 0x3a, 0x67, + 0x2e, 0x8b, 0x93, 0x34, 0xe0, 0x45, 0x94, 0x26, 0x3a, 0xcc, 0x65, 0x90, 0x72, 0x76, 0x91, 0x04, + 0x44, 0xba, 0x43, 0x39, 0x4b, 0x12, 0x9e, 0xaf, 0x4c, 0xb4, 0xa6, 0x43, 0x9a, 0x5a, 0xf6, 0xfe, + 0xe0, 0x00, 0x4c, 0x16, 0x49, 0xa0, 0x09, 0xdd, 0x81, 0x01, 0x11, 0xf3, 0xe8, 0x42, 0x24, 0x45, + 0x45, 0xa7, 0x09, 0xa1, 0x33, 0x12, 0x9f, 0x67, 0x15, 0x95, 0xb5, 0xcc, 0xee, 0x40, 0x3f, 0x17, + 0x81, 0x48, 0x0a, 0x54, 0x3a, 0xa4, 0x6c, 0x00, 0xe6, 0xc1, 0x70, 0xce, 0x65, 0x21, 0xf2, 0x25, + 0x32, 0x97, 0x30, 0xb6, 0x07, 0xae, 0x29, 0x1f, 0x17, 0x51, 0xa8, 0x09, 0xbd, 0x86, 0xa3, 0x3f, + 0x3a, 0x44, 0xe5, 0xaf, 0xab, 0xfc, 0x99, 0x18, 0xfa, 0x33, 0x65, 0xf2, 0xb7, 0xa1, 0xfc, 0x5d, + 0xc5, 0xd1, 0xdf, 0x34, 0x4e, 0x83, 0xf3, 0x28, 0x99, 0xd1, 0x05, 0xf4, 0x88, 0xaa, 0x25, 0x8c, + 0xfd, 0x10, 0xdc, 0x32, 0xc9, 0x85, 0x4c, 0xe3, 0x0b, 0x11, 0xd2, 0x3d, 0xca, 0x51, 0xdf, 0xa8, + 0x38, 0xf3, 0x86, 0xfd, 0x6b, 0xa6, 0xc6, 0x0d, 0x81, 0x2a, 0x32, 0x7d, 0x43, 0x63, 0x80, 0x29, + 0x05, 0xf2, 0x7c, 0x91, 0x89, 0xd1, 0x40, 0x65, 0x59, 0x83, 0x78, 0xbf, 0xb7, 0x60, 0x68, 0x36, + 0x01, 0xa3, 0x3d, 0x59, 0x6b, 0xda, 0x93, 0x6d, 0xb6, 0x27, 0xf6, 0x9d, 0xba, 0x0d, 0xa9, 0xb6, + 0x42, 0xd1, 0x3e, 0xcd, 0x53, 0xac, 0x57, 0x9f, 0x14, 0x75, 0x67, 0xba, 0x0f, 0x83, 0x5c, 0xc4, + 0x7c, 0x51, 0xf7, 0x13, 0xb4, 0xbf, 0x81, 0xf6, 0x7e, 0x03, 0xfb, 0xa6, 0x8d, 0xf7, 0x37, 0x1b, + 0x06, 0x86, 0xf2, 0xda, 0x4d, 0x5b, 0xff, 0xe3, 0x4d, 0xdb, 0x6b, 0x6e, 0x7a, 0xa7, 0x0a, 0xa9, + 0x9c, 0x1e, 0x45, 0xb9, 0x4e, 0x7e, 0x13, 0xaa, 0x2d, 0x96, 0x52, 0xcb, 0x84, 0xd8, 0x2e, 0xdc, + 0x30, 0x44, 0x23, 0xb1, 0xae, 0xc2, 0x6c, 0x1f, 0x18, 0x41, 0x87, 0xbc, 0x08, 0xce, 0x5e, 0x64, + 0x4f, 0x28, 0x1a, 0xca, 0xae, 0x9e, 0xbf, 0x42, 0xc3, 0xbe, 0x05, 0x1d, 0x59, 0xf0, 0x99, 0xa0, + 0xc4, 0xda, 0x3a, 0xe8, 0x53, 0x22, 0x20, 0xe0, 0x2b, 0xdc, 0x20, 0xbf, 0xf7, 0x1e, 0xf2, 0xbd, + 0xff, 0xd8, 0xb0, 0xb9, 0xd4, 0xb6, 0x57, 0x8d, 0xb7, 0x66, 0x47, 0x7b, 0xcd, 0x8e, 0x3b, 0xd0, + 0x2e, 0x93, 0x48, 0x5d, 0xf6, 0xd6, 0xc1, 0x10, 0xf5, 0x2f, 0x92, 0xa8, 0xc0, 0x5c, 0xf2, 0x49, + 0x63, 0xc4, 0xd4, 0x7e, 0x5f, 0x42, 0x7c, 0x17, 0x6e, 0x35, 0x89, 0x7c, 0x74, 0x74, 0x72, 0x92, + 0x06, 0xe7, 0x75, 0x9f, 0x5b, 0xa5, 0x62, 0x4c, 0x0d, 0x37, 0x2a, 0xc8, 0xc7, 0x2d, 0x35, 0xde, + 0xbe, 0x0d, 0x9d, 0x00, 0xc7, 0x0d, 0xb1, 0xa4, 0x13, 0xca, 0x98, 0x3f, 0x8f, 0x5b, 0xbe, 0xd2, + 0xb3, 0x4f, 0xa0, 0x1d, 0x96, 0xf3, 0x4c, 0x73, 0xb5, 0x85, 0x76, 0xcd, 0x00, 0x78, 0xdc, 0xf2, + 0x49, 0x8b, 0x56, 0x71, 0xca, 0xc3, 0x51, 0xbf, 0xb1, 0x6a, 0xe6, 0x02, 0x5a, 0xa1, 0x16, 0xad, + 0xb0, 0xc2, 0xa8, 0xda, 0xb4, 0x55, 0xd3, 0xec, 0xd0, 0x0a, 0xb5, 0x0f, 0x7b, 0xd0, 0x95, 0x2a, + 0x91, 0x7f, 0x04, 0x37, 0x97, 0xd8, 0x3f, 0x89, 0x24, 0x51, 0xa5, 0xd4, 0x23, 0x6b, 0xdd, 0x6c, + 0xad, 0xd6, 0x8f, 0x01, 0xe8, 0x4c, 0x8f, 0xf2, 0x3c, 0xcd, 0xab, 0x19, 0x6f, 0xd5, 0x33, 0xde, + 0xfb, 0x26, 0xf4, 0xf1, 0x2c, 0xef, 0x50, 0xe3, 0x21, 0xd6, 0xa9, 0x33, 0x18, 0x52, 0xf4, 0xcf, + 0x4e, 0xd6, 0x58, 0xb0, 0x03, 0xb8, 0xad, 0x06, 0xad, 0x4a, 0xe7, 0xa7, 0xa9, 0x8c, 0x68, 0x5c, + 0xa8, 0xc2, 0x5a, 0xa9, 0xc3, 0x86, 0x2e, 0xd0, 0xdd, 0xe4, 0xd9, 0x49, 0x35, 0xfd, 0x2a, 0xd9, + 0xfb, 0x3e, 0xf4, 0x71, 0x47, 0xb5, 0xdd, 0x2e, 0x74, 0x49, 0x51, 0xf1, 0xe0, 0xd6, 0x74, 0xea, + 0x80, 0x7c, 0xad, 0xf7, 0x7e, 0x6d, 0xc1, 0x40, 0xb5, 0x2b, 0xb5, 0xf2, 0x43, 0xbb, 0xd5, 0xce, + 0xd2, 0xf2, 0xaa, 0xde, 0x4d, 0x8f, 0xfb, 0x00, 0xd4, 0x70, 0x94, 0x41, 0xbb, 0xb9, 0xde, 0x06, + 0xf5, 0x0d, 0x0b, 0xbc, 0x98, 0x46, 0x5a, 0x41, 0xed, 0xef, 0x6c, 0x18, 0xea, 0x2b, 0x55, 0x26, + 0xff, 0xa7, 0xb2, 0xd3, 0x95, 0xd1, 0x36, 0x2b, 0xe3, 0x5e, 0x55, 0x19, 0x9d, 0xe6, 0x18, 0x4d, + 0x16, 0x35, 0x85, 0x71, 0x57, 0x17, 0x46, 0x97, 0xcc, 0x36, 0xab, 0xc2, 0xa8, 0xac, 0x54, 0x5d, + 0xdc, 0xd5, 0x75, 0xb1, 0xd1, 0x18, 0xd5, 0x29, 0x55, 0x97, 0xc5, 0x5d, 0x5d, 0x16, 0xbd, 0xc6, + 0xa8, 0xbe, 0xe6, 0xba, 0x2a, 0x36, 0xa0, 0x43, 0xd7, 0xe9, 0x7d, 0x0e, 0xae, 0x49, 0x0d, 0xd5, + 0xc4, 0x3d, 0xad, 0x5c, 0x4a, 0x05, 0xc3, 0xc8, 0xd7, 0x6b, 0x5f, 0xc1, 0xe6, 0x52, 0x53, 0xc1, + 0x49, 0x17, 0xc9, 0x43, 0x9e, 0x04, 0x22, 0xae, 0x7f, 0x6a, 0x1a, 0x88, 0x91, 0x64, 0x76, 0xe3, + 0x59, 0xbb, 0x58, 0x4a, 0x32, 0xe3, 0x07, 0xa3, 0xb3, 0xf4, 0x83, 0xf1, 0x1f, 0x16, 0x0c, 0xcd, + 0x05, 0xf8, 0x9b, 0xf3, 0x51, 0x9e, 0x1f, 0xa6, 0xa1, 0xba, 0xcd, 0x8e, 0x5f, 0x89, 0x98, 0xfa, + 0xf8, 0x19, 0x73, 0x29, 0x75, 0x06, 0xd6, 0xb2, 0xd6, 0x4d, 0x82, 0x34, 0xab, 0x9e, 0x00, 0xb5, + 0xac, 0x75, 0x27, 0xe2, 0x42, 0xc4, 0x7a, 0xd4, 0xd4, 0x32, 0xee, 0xf6, 0x44, 0x48, 0x89, 0x69, + 0xa2, 0x3a, 0x64, 0x25, 0xe2, 0x2a, 0x9f, 0x5f, 0x1e, 0xf2, 0x52, 0x0a, 0xfd, 0x5b, 0xa5, 0x96, + 0x91, 0x16, 0x7c, 0xaa, 0xf0, 0x3c, 0x2d, 0x93, 0xea, 0x17, 0x8a, 0x81, 0x78, 0x97, 0x70, 0xf3, + 0x69, 0x99, 0xcf, 0x04, 0x25, 0x71, 0xf5, 0xf2, 0xd9, 0x86, 0x5e, 0x94, 0xf0, 0xa0, 0x88, 0x2e, + 0x84, 0x66, 0xb2, 0x96, 0x31, 0x7f, 0x8b, 0x68, 0x2e, 0xf4, 0x4f, 0x34, 0xfa, 0x46, 0xfb, 0x97, + 0x51, 0x2c, 0x28, 0xaf, 0xf5, 0x91, 0x2a, 0x99, 0x4a, 0x54, 0x4d, 0x57, 0xfd, 0xae, 0x51, 0x92, + 0xf7, 0x67, 0x0b, 0xb6, 0x4f, 0x33, 0x91, 0xf3, 0x42, 0xa8, 0xb7, 0xd4, 0x24, 0x38, 0x13, 0x73, + 0x5e, 0x85, 0x70, 0x07, 0xec, 0x34, 0xa3, 0xcd, 0x75, 0xbe, 0x2b, 0xf5, 0x69, 0xe6, 0xdb, 0x69, + 0x46, 0x41, 0x70, 0x79, 0xae, 0xb9, 0xa5, 0xef, 0xb5, 0x0f, 0xab, 0x6d, 0xe8, 0x85, 0xbc, 0xe0, + 0x53, 0x2e, 0x45, 0xc5, 0x69, 0x25, 0xd3, 0x1b, 0x84, 0x4f, 0xe3, 0x8a, 0x51, 0x25, 0x90, 0x27, + 0xda, 0x4d, 0xb3, 0xa9, 0x25, 0xaf, 0x80, 0xcd, 0xaf, 0xee, 0xeb, 0x64, 0x7c, 0x22, 0x0a, 0xce, + 0xb6, 0x8d, 0x20, 0x01, 0x83, 0x44, 0x8d, 0x0e, 0xf1, 0xbd, 0x35, 0x5d, 0x35, 0x02, 0xc7, 0x68, + 0x04, 0xd5, 0xb9, 0xda, 0x94, 0x78, 0xf4, 0xed, 0x7d, 0x0a, 0xb7, 0x35, 0x4f, 0x5f, 0xdd, 0xc7, + 0x5d, 0xd7, 0x32, 0xa4, 0xd4, 0x6a, 0x7b, 0xef, 0xaf, 0x16, 0x7c, 0x74, 0x65, 0xd9, 0x07, 0x3f, + 0x1c, 0x3f, 0x83, 0x36, 0x3e, 0x36, 0x46, 0x0e, 0x15, 0xcc, 0x5d, 0xdc, 0x63, 0xa5, 0xcb, 0x7d, + 0x14, 0x1e, 0x25, 0x45, 0xbe, 0xf0, 0x69, 0xc1, 0xf6, 0x4f, 0xa1, 0x5f, 0x43, 0xe8, 0xf7, 0x5c, + 0x2c, 0xaa, 0x9e, 0x78, 0x2e, 0x16, 0x38, 0xb1, 0x2f, 0x78, 0x5c, 0x2a, 0x6a, 0xf4, 0xd8, 0x5b, + 0x22, 0xd6, 0x57, 0xfa, 0xcf, 0xed, 0x1f, 0x58, 0xde, 0xaf, 0x60, 0xf4, 0x98, 0x27, 0x61, 0xac, + 0xb3, 0x44, 0x95, 0xaa, 0xa6, 0xe0, 0x1b, 0x06, 0x05, 0x03, 0xf4, 0x42, 0xda, 0x77, 0xe4, 0xc8, + 0x1d, 0xe8, 0x4f, 0xab, 0x21, 0xa5, 0x89, 0x6f, 0x00, 0x5c, 0x21, 0x5f, 0xc5, 0x52, 0x3f, 0x72, + 0xe8, 0x7b, 0xef, 0x97, 0xd0, 0x55, 0x97, 0xca, 0x36, 0xa1, 0xff, 0x65, 0x72, 0xc1, 0xe3, 0x28, + 0x3c, 0xcd, 0xdc, 0x16, 0xeb, 0x41, 0x7b, 0x52, 0xa4, 0x99, 0x6b, 0xb1, 0x3e, 0x74, 0x9e, 0x62, + 0xad, 0xb9, 0x36, 0x03, 0xe8, 0x62, 0x3b, 0x9a, 0x0b, 0xd7, 0x41, 0x78, 0x52, 0xf0, 0xbc, 0x70, + 0xdb, 0x08, 0xbf, 0xc8, 0x42, 0x5e, 0x08, 0xb7, 0xc3, 0xb6, 0x00, 0x7e, 0x52, 0x16, 0xa9, 0x36, + 0xeb, 0xee, 0xbd, 0x22, 0xb3, 0x99, 0x60, 0x2e, 0x0c, 0xb5, 0x7f, 0x92, 0xdd, 0x16, 0xdb, 0x00, + 0xe7, 0xe7, 0xe2, 0xd2, 0xb5, 0xd8, 0x00, 0x36, 0xfc, 0x32, 0xc1, 0xd7, 0xac, 0xda, 0x83, 0xb6, + 0x0b, 0x5d, 0x07, 0x15, 0x18, 0x44, 0x26, 0x42, 0xb7, 0xcd, 0x86, 0xd0, 0xfb, 0x42, 0x3f, 0x4f, + 0xdd, 0x0e, 0xaa, 0xd0, 0x0c, 0xd7, 0x74, 0x51, 0x45, 0x1b, 0xa2, 0xb4, 0xb1, 0x77, 0x0a, 0xbd, + 0x6a, 0x7a, 0xb0, 0x1b, 0x30, 0xd0, 0xbb, 0x22, 0xe4, 0xb6, 0x30, 0x6c, 0x9a, 0x11, 0xae, 0x85, + 0x47, 0xc4, 0x39, 0xe0, 0xda, 0xf8, 0x85, 0xcd, 0xde, 0x75, 0xe8, 0xd8, 0x8b, 0x24, 0x70, 0xdb, + 0x68, 0x48, 0x4d, 0xc3, 0x0d, 0xf7, 0x9e, 0xc0, 0x06, 0x7d, 0x9e, 0x22, 0xeb, 0x5b, 0xda, 0x9f, + 0x46, 0xdc, 0x16, 0x32, 0x87, 0x51, 0x2a, 0x6b, 0x0b, 0x19, 0xa0, 0x03, 0x28, 0xd9, 0xc6, 0x10, + 0x14, 0x1b, 0x0a, 0x70, 0x30, 0xbe, 0xaa, 0xda, 0xd9, 0x2d, 0xb8, 0x51, 0xb1, 0xa2, 0x21, 0xe5, + 0xf0, 0x58, 0x14, 0x0a, 0x70, 0x2d, 0xf2, 0x5f, 0x8b, 0x36, 0x12, 0xe9, 0x8b, 0x79, 0x7a, 0x21, + 0x34, 0xe2, 0xec, 0x3d, 0x80, 0x5e, 0x55, 0x1c, 0x86, 0xc3, 0x0a, 0xaa, 0x1d, 0x2a, 0xc0, 0xb5, + 0x1a, 0x0f, 0x1a, 0xb1, 0xf7, 0x1e, 0x50, 0xab, 0xc7, 0xdc, 0x32, 0x4e, 0xa8, 0x11, 0x9d, 0x0c, + 0xe7, 0x51, 0xa6, 0xaf, 0x4a, 0x64, 0x31, 0x0f, 0xea, 0x74, 0xb8, 0x10, 0x79, 0xe1, 0x3a, 0x07, + 0xff, 0xb6, 0xa1, 0xab, 0x52, 0x98, 0x3d, 0x80, 0x81, 0xf1, 0x8f, 0x0e, 0xfb, 0x18, 0x33, 0xf7, + 0xfa, 0xff, 0x4f, 0xdb, 0x5f, 0xbb, 0x86, 0xab, 0x72, 0xf3, 0x5a, 0xec, 0xc7, 0x00, 0x4d, 0xd7, + 0x66, 0x1f, 0xd1, 0x28, 0xbb, 0xda, 0xc5, 0xb7, 0x47, 0x34, 0xef, 0x57, 0xfc, 0x5b, 0xe5, 0xb5, + 0xd8, 0xcf, 0x60, 0x53, 0x97, 0xb2, 0x22, 0x89, 0x8d, 0x8d, 0xea, 0x5e, 0xd1, 0x8f, 0xdf, 0xe9, + 0xec, 0x8b, 0xda, 0x99, 0xe2, 0x8b, 0x8d, 0x56, 0xb4, 0x0a, 0xe5, 0xe6, 0xeb, 0x6b, 0x9b, 0x88, + 0xd7, 0x62, 0xc7, 0x30, 0x50, 0xa5, 0xae, 0xc6, 0xeb, 0x1d, 0xb4, 0x5d, 0x57, 0xfb, 0xef, 0x0a, + 0xe8, 0xe1, 0xe8, 0xef, 0x6f, 0xc6, 0xd6, 0xeb, 0x37, 0x63, 0xeb, 0x5f, 0x6f, 0xc6, 0xd6, 0x6f, + 0xde, 0x8e, 0x5b, 0xaf, 0xdf, 0x8e, 0x5b, 0xff, 0x7c, 0x3b, 0x6e, 0x4d, 0xbb, 0xf4, 0xa7, 0xdf, + 0xf7, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x50, 0x16, 0x36, 0x3c, 0x06, 0x14, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -3027,6 +3035,13 @@ func (m *LoadStatus) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.MetaBinlogGTID) > 0 { + i -= len(m.MetaBinlogGTID) + copy(dAtA[i:], m.MetaBinlogGTID) + i = encodeVarintDmworker(dAtA, i, uint64(len(m.MetaBinlogGTID))) + i-- + dAtA[i] = 0x2a + } if len(m.MetaBinlog) > 0 { i -= len(m.MetaBinlog) copy(dAtA[i:], m.MetaBinlog) @@ -4576,6 +4591,10 @@ func (m *LoadStatus) Size() (n int) { if l > 0 { n += 1 + l + sovDmworker(uint64(l)) } + l = len(m.MetaBinlogGTID) + if l > 0 { + n += 1 + l + sovDmworker(uint64(l)) + } return n } @@ -6047,6 +6066,38 @@ func (m *LoadStatus) Unmarshal(dAtA []byte) error { } m.MetaBinlog = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetaBinlogGTID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmworker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDmworker + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDmworker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MetaBinlogGTID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipDmworker(dAtA[iNdEx:]) diff --git a/dm/proto/dmworker.proto b/dm/proto/dmworker.proto index 91c31c3f4c..d84a9e9176 100644 --- a/dm/proto/dmworker.proto +++ b/dm/proto/dmworker.proto @@ -106,6 +106,7 @@ message LoadStatus { int64 totalBytes = 2; string progress = 3; string metaBinlog = 4; + string metaBinlogGTID = 5; } // ShardingGroup represents a DDL sharding group, this is used by SyncStatus, and is differ from ShardingGroup in syncer pkg diff --git a/dm/worker/relay.go b/dm/worker/relay.go index cad64392de..6cd3ed25e1 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/relay" "github.com/pingcap/dm/relay/purger" - rr "github.com/pingcap/dm/relay/retry" ) // RelayHolder for relay unit @@ -77,36 +76,13 @@ type realRelayHolder struct { } // NewRealRelayHolder creates a new RelayHolder -func NewRealRelayHolder(cfg *config.SourceConfig) RelayHolder { - clone := cfg.DecryptPassword() - relayCfg := &relay.Config{ - EnableGTID: clone.EnableGTID, - AutoFixGTID: clone.AutoFixGTID, - Flavor: clone.Flavor, - RelayDir: clone.RelayDir, - ServerID: clone.ServerID, - Charset: clone.Charset, - From: relay.DBConfig{ - Host: clone.From.Host, - Port: clone.From.Port, - User: clone.From.User, - Password: clone.From.Password, - }, - BinLogName: clone.RelayBinLogName, - BinlogGTID: clone.RelayBinlogGTID, - ReaderRetry: rr.ReaderRetryConfig{ // we use config from TaskChecker now - BackoffRollback: cfg.Checker.BackoffRollback.Duration, - BackoffMax: cfg.Checker.BackoffMax.Duration, - BackoffMin: cfg.Checker.BackoffMin.Duration, - BackoffJitter: cfg.Checker.BackoffJitter, - BackoffFactor: cfg.Checker.BackoffFactor, - }, - } +func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder { + cfg := relay.FromSourceCfg(sourceCfg) h := &realRelayHolder{ - cfg: cfg, + cfg: sourceCfg, stage: pb.Stage_New, - relay: relay.NewRelay(relayCfg), + relay: relay.NewRelay(cfg), l: log.With(zap.String("component", "relay holder")), } h.closed.Set(closedTrue) @@ -308,24 +284,8 @@ func (h *realRelayHolder) Result() *pb.ProcessResult { } // Update update relay config online -func (h *realRelayHolder) Update(ctx context.Context, cfg *config.SourceConfig) error { - relayCfg := &relay.Config{ - AutoFixGTID: cfg.AutoFixGTID, - Charset: cfg.Charset, - From: relay.DBConfig{ - Host: cfg.From.Host, - Port: cfg.From.Port, - User: cfg.From.User, - Password: cfg.From.Password, - }, - ReaderRetry: rr.ReaderRetryConfig{ // we use config from TaskChecker now - BackoffRollback: cfg.Checker.BackoffRollback.Duration, - BackoffMax: cfg.Checker.BackoffMax.Duration, - BackoffMin: cfg.Checker.BackoffMin.Duration, - BackoffJitter: cfg.Checker.BackoffJitter, - BackoffFactor: cfg.Checker.BackoffFactor, - }, - } +func (h *realRelayHolder) Update(ctx context.Context, sourceCfg *config.SourceConfig) error { + relayCfg := relay.FromSourceCfg(sourceCfg) stage := h.Stage() diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index ba9da3ee4e..c0774f17d6 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -19,10 +19,12 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/gtid" pkgstreamer "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay" @@ -114,6 +116,11 @@ func (d *DummyRelay) Close() {} // IsClosed implements Process interface func (d *DummyRelay) IsClosed() bool { return false } +// SaveMeta implements Process interface +func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error { + return nil +} + func (t *testRelay) TestRelay(c *C) { originNewRelay := relay.NewRelay relay.NewRelay = NewDummyRelay diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 0c6a991ddd..14ead538bd 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -28,6 +29,8 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/dumpling" "github.com/pingcap/dm/loader" + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/shardddl/pessimism" "github.com/pingcap/dm/pkg/terror" @@ -582,6 +585,13 @@ func (st *SubTask) ShardDDLOperation() *pessimism.Operation { // Currently there is only one wait condition // from Load unit to Sync unit, wait for relay-log catched up with mydumper binlog position. func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error { + var ( + gset1 gtid.Set + gset2 gtid.Set + pos1 *mysql.Position + pos2 *mysql.Position + err error + ) pu := st.PrevUnit() cu := st.CurrUnit() if pu != nil && pu.Type() == pb.UnitType_Load && cu.Type() == pb.UnitType_Sync { @@ -599,26 +609,52 @@ func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error { loadStatus := pu.Status(ctxStatus).(*pb.LoadStatus) cancelStatus() - pos1, err := utils.DecodeBinlogPosition(loadStatus.MetaBinlog) - if err != nil { - return terror.WithClass(err, terror.ClassDMWorker) + if st.cfg.EnableGTID { + gset1, err = gtid.ParserGTID(st.cfg.Flavor, loadStatus.MetaBinlogGTID) + if err != nil { + return terror.WithClass(err, terror.ClassDMWorker) + } + } else { + pos1, err = utils.DecodeBinlogPosition(loadStatus.MetaBinlog) + if err != nil { + return terror.WithClass(err, terror.ClassDMWorker) + } } + for { ctxStatus, cancelStatus = context.WithTimeout(ctxWait, utils.DefaultDBTimeout) relayStatus := hub.w.relayHolder.Status(ctxStatus) cancelStatus() - pos2, err := utils.DecodeBinlogPosition(relayStatus.RelayBinlog) - if err != nil { - return terror.WithClass(err, terror.ClassDMWorker) - } - if pos1.Compare(*pos2) <= 0 { - break + if st.cfg.EnableGTID { + gset2, err = gtid.ParserGTID(st.cfg.Flavor, relayStatus.RelayBinlogGtid) + if err != nil { + return terror.WithClass(err, terror.ClassDMWorker) + } + rc, ok := binlog.CompareGTID(gset1, gset2) + if !ok { + return terror.ErrWorkerWaitRelayCatchupGTID.Generate(loadStatus.MetaBinlogGTID, relayStatus.RelayBinlogGtid) + } + if rc <= 0 { + break + } + } else { + pos2, err = utils.DecodeBinlogPosition(relayStatus.RelayBinlog) + if err != nil { + return terror.WithClass(err, terror.ClassDMWorker) + } + if pos1.Compare(*pos2) <= 0 { + break + } } - st.l.Debug("wait relay to catchup", zap.Stringer("load end position", pos1), zap.Stringer("relay position", pos2)) + + st.l.Debug("wait relay to catchup", zap.Bool("enableGTID", st.cfg.EnableGTID), zap.Stringer("load end position", pos1), zap.String("load end gtid", loadStatus.MetaBinlogGTID), zap.Stringer("relay position", pos2), zap.String("relay gtid", relayStatus.RelayBinlogGtid)) select { case <-ctxWait.Done(): + if st.cfg.EnableGTID { + return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, loadStatus.MetaBinlogGTID, relayStatus.RelayBinlogGtid) + } return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, pos1, pos2) case <-subTaskCtx.Done(): return nil diff --git a/errors.toml b/errors.toml index 5bb860e669..181a84bdf2 100644 --- a/errors.toml +++ b/errors.toml @@ -748,6 +748,24 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-functional-11120] +message = "no relay subdir match gtid %s" +description = "" +workaround = "" +tags = ["internal", "high"] + +[error.DM-functional-11121] +message = "no relay pos match gtid %s" +description = "" +workaround = "" +tags = ["internal", "high"] + +[error.DM-functional-11122] +message = "" +description = "" +workaround = "" +tags = ["internal", "low"] + [error.DM-config-20001] message = "checking item %s is not supported\n%s" description = "" @@ -2579,7 +2597,7 @@ workaround = "Please try use `query-status` to query whether the DDL is still bl tags = ["internal", "high"] [error.DM-dm-worker-40067] -message = "waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s" +message = "waiting for relay to catch up with loader is timeout (exceeding %s), loader: %s, relay: %s" description = "" workaround = "" tags = ["internal", "high"] @@ -2644,6 +2662,12 @@ description = "" workaround = "Please check network connection of worker" tags = ["internal", "high"] +[error.DM-dm-worker-40078] +message = "cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-dm-tracer-42001] message = "parse dm-tracer config flag set" description = "" diff --git a/loader/loader.go b/loader/loader.go index 1993e3dbf4..8079e3ac37 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -408,6 +408,7 @@ type Loader struct { totalFileCount sync2.AtomicInt64 // schema + table + data finishedDataSize sync2.AtomicInt64 metaBinlog sync2.AtomicString + metaBinlogGTID sync2.AtomicString // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError @@ -1315,6 +1316,7 @@ func (l *Loader) getMydumpMetadata() error { } l.metaBinlog.Set(loc.Position.String()) + l.metaBinlogGTID.Set(loc.GTIDSetStr()) return nil } diff --git a/loader/status.go b/loader/status.go index 1a882b53ea..8c497cf998 100644 --- a/loader/status.go +++ b/loader/status.go @@ -33,10 +33,11 @@ func (l *Loader) Status(ctx context.Context) interface{} { totalSize := l.totalDataSize.Get() progress := percent(finishedSize, totalSize, l.finish.Get()) s := &pb.LoadStatus{ - FinishedBytes: finishedSize, - TotalBytes: totalSize, - Progress: progress, - MetaBinlog: l.metaBinlog.Get(), + FinishedBytes: finishedSize, + TotalBytes: totalSize, + Progress: progress, + MetaBinlog: l.metaBinlog.Get(), + MetaBinlogGTID: l.metaBinlogGTID.Get(), } return s } diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index 3d449b26f2..eb3995358d 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -201,7 +201,7 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa // a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56 // b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) { - if gSet == nil || len(gSet.String()) == 0 { + if gSet == nil { return nil, terror.ErrBinlogEmptyGTID.Generate() } diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index a59a697cbe..c8c7948e2d 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -45,6 +45,7 @@ type FileReader struct { parser *replication.BinlogParser ch chan *replication.BinlogEvent ech chan error + endCh chan struct{} logger log.Logger @@ -89,6 +90,7 @@ func NewFileReader(cfg *FileReaderConfig) Reader { parser: parser, ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize), ech: make(chan error, cfg.EchBufferSize), + endCh: make(chan struct{}), logger: log.With(zap.String("component", "binlog file reader")), } } @@ -115,6 +117,9 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { case r.ech <- err: case <-r.ctx.Done(): } + } else { + r.logger.Info("parse end of binlog file", zap.Stringer("pos", pos)) + close(r.endCh) } }() @@ -159,6 +164,8 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er return ev, nil case err := <-r.ech: return nil, err + case <-r.endCh: + return nil, terror.ErrReaderReachEndOfFile.Generate() case <-ctx.Done(): return nil, ctx.Err() } diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index af7be451bb..712a566369 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/dm/pkg/binlog/common" "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" ) var ( @@ -138,7 +139,7 @@ func (t *testFileReaderSuite) TestGetEvent(c *C) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() e, err = r.GetEvent(ctx) - c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) + c.Assert(terror.ErrReaderReachEndOfFile.Equal(err), IsTrue) c.Assert(e, IsNil) c.Assert(r.Close(), IsNil) // close the reader diff --git a/pkg/streamer/hub.go b/pkg/streamer/hub.go index 994f270f40..acd58100dc 100644 --- a/pkg/streamer/hub.go +++ b/pkg/streamer/hub.go @@ -19,13 +19,17 @@ import ( "sync" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + "github.com/siddontang/go-mysql/mysql" ) var ( - readerHub *ReaderHub // singleton instance - once sync.Once + readerHub *ReaderHub // singleton instance + relayMetaHub *RelayMetaHub + relayMetaOnce sync.Once + once sync.Once ) // RelayLogInfo represents information for relay log @@ -137,3 +141,48 @@ func (h *ReaderHub) EarliestActiveRelayLog() *RelayLogInfo { _, rli := h.rlih.earliest() return rli } + +// RelayMetaHub holds information for relay metas +type RelayMetaHub struct { + mu sync.RWMutex + meta Meta +} + +// GetRelayMetaHub gets singleton instance of RelayMetaHub +func GetRelayMetaHub() *RelayMetaHub { + relayMetaOnce.Do(func() { + relayMetaHub = &RelayMetaHub{} + }) + return relayMetaHub +} + +// GetMeta gets all metas +func (r *RelayMetaHub) GetMeta() Meta { + r.mu.Lock() + defer r.mu.Unlock() + return r.meta +} + +// SetMeta sets meta +func (r *RelayMetaHub) SetMeta(uuid string, pos mysql.Position, gset gtid.Set) { + gs := "" + if gset != nil { + gs = gset.String() + } + meta := Meta{ + BinLogPos: pos.Pos, + BinLogName: pos.Name, + BinlogGTID: gs, + UUID: uuid, + } + r.mu.Lock() + defer r.mu.Unlock() + r.meta = meta +} + +// ClearMeta clears meta +func (r *RelayMetaHub) ClearMeta() { + r.mu.Lock() + defer r.mu.Unlock() + r.meta = Meta{} +} diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index d9b9dff092..19bd39f58a 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -15,6 +15,7 @@ package streamer import ( "context" + "fmt" "os" "path" "path/filepath" @@ -22,18 +23,29 @@ import ( "sync" "time" + "github.com/BurntSushi/toml" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog/reader" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) +// Meta represents binlog meta information in relay.meta +type Meta struct { + BinLogName string `toml:"binlog-name" json:"binlog-name"` + BinLogPos uint32 `toml:"binlog-pos" json:"binlog-pos"` + BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` + UUID string `toml:"-" json:"-"` +} + var ( // polling interval for watcher watcherInterval = 100 * time.Millisecond @@ -43,6 +55,7 @@ var ( type BinlogReaderConfig struct { RelayDir string Timezone *time.Location + Flavor string } // BinlogReader is a binlog reader. @@ -60,6 +73,8 @@ type BinlogReader struct { cancel context.CancelFunc tctx *tcontext.Context + + prevGset, currGset mysql.GTIDSet } // NewBinlogReader creates a new BinlogReader @@ -103,9 +118,164 @@ func (r *BinlogReader) checkRelayPos(pos mysql.Position) error { return nil } -// StartSync start syncon +// getUUIDByGTID gets uuid subdir which contain the gtid set +func (r *BinlogReader) getUUIDByGTID(gset mysql.GTIDSet) (string, error) { + // get flush logs from oldest to newest + for _, uuid := range r.uuids { + filename := path.Join(r.cfg.RelayDir, uuid, utils.MetaFilename) + var meta Meta + _, err := toml.DecodeFile(filename, &meta) + if err != nil { + return "", terror.ErrRelayLoadMetaData.Delegate(err) + } + + gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, meta.BinlogGTID) + if err != nil { + return "", terror.ErrRelayLoadMetaData.Delegate(err) + } + if gs.Contain(gset) { + r.tctx.L().Info("get uuid subdir by gtid", zap.Stringer("GTID Set", gset), zap.String("uuid", uuid), zap.Stringer("latest GTID Set in subdir", gs)) + return uuid, nil + } + } + + // TODO: use a better mechanism to call relay.meta.Flush + // get the meta save in memory + relayMetaHub := GetRelayMetaHub() + relayMeta := relayMetaHub.GetMeta() + + if len(relayMeta.UUID) > 0 { + gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, relayMeta.BinlogGTID) + if err != nil { + return "", terror.ErrRelayLoadMetaData.Delegate(err) + } + if gs.Contain(gset) { + r.tctx.L().Info("get uuid subdir by gtid", zap.Stringer("GTID Set", gset), zap.String("uuid", relayMeta.UUID)) + return relayMeta.UUID, nil + } + } + return "", terror.ErrNoUUIDDirMatchGTID.Generate(gset.String()) +} + +// GetFilePosByGTID tries to get Pos by GTID for file +func (r *BinlogReader) GetFilePosByGTID(ctx context.Context, filePath string, gset mysql.GTIDSet) (uint32, error) { + fileReader := reader.NewFileReader(&reader.FileReaderConfig{Timezone: r.cfg.Timezone}) + defer fileReader.Close() + err := fileReader.StartSyncByPos(mysql.Position{Name: filePath, Pos: 4}) + if err != nil { + return 0, err + } + + lastPos := uint32(0) + for { + select { + case <-ctx.Done(): + return 0, nil + default: + } + + ctx2, cancel := context.WithTimeout(ctx, time.Second) + e, err := fileReader.GetEvent(ctx2) + cancel() + if err != nil { + // reach end of file + if terror.ErrReaderReachEndOfFile.Equal(err) { + return lastPos, nil + } + return 0, err + } + + switch ev := e.Event.(type) { + case *replication.PreviousGTIDsEvent: + // nil previous gtid event, continue to parse file + if len(ev.GTIDSets) == 0 { + break + } + gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, ev.GTIDSets) + if err != nil { + return 0, err + } + // if PreviousGITDsEvent contain but not equal the gset, go to previous file + if gs.Contain(gset) { + // continue to parse file if gset equals gs + if gset.Contain(gs) { + break + } + return 0, nil + } + case *replication.RotateEvent: + // should not happen + if e.Header.Timestamp != 0 && e.Header.LogPos != 0 { + return lastPos, nil + } + continue + case *replication.GTIDEvent: + u, _ := uuid.FromBytes(ev.SID) + gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, fmt.Sprintf("%s:%d", u.String(), ev.GNO)) + if err != nil { + return 0, err + } + // meet first gtid event greater than gset + if !gset.Contain(gs) { + return lastPos, nil + } + case *replication.MariadbGTIDEvent: + GTID := ev.GTID + gs, err := mysql.ParseGTIDSet(r.cfg.Flavor, fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + if err != nil { + return 0, err + } + // meet first gtid event greater than gset + if !gset.Contain(gs) { + return lastPos, nil + } + } + lastPos = e.Header.LogPos + } +} + +// getPosByGTID gets position by gtid +func (r *BinlogReader) getPosByGTID(gset mysql.GTIDSet) (*mysql.Position, error) { + uuid, err := r.getUUIDByGTID(gset) + if err != nil { + return nil, err + } + _, suffix, err := utils.ParseSuffixForUUID(uuid) + if err != nil { + return nil, err + } + + uuidDir := path.Join(r.cfg.RelayDir, uuid) + allFiles, err := CollectAllBinlogFiles(uuidDir) + if err != nil { + return nil, err + } + + // iterate files from the newest one + for i := len(allFiles) - 1; i >= 0; i-- { + file := allFiles[i] + filePath := path.Join(uuidDir, file) + pos, err := r.GetFilePosByGTID(r.tctx.Ctx, filePath, gset) + if err != nil { + return nil, err + } + if pos != 0 { + fileName, err := binlog.ParseFilename(file) + if err != nil { + return nil, err + } + return &mysql.Position{ + Name: binlog.ConstructFilenameWithUUIDSuffix(fileName, utils.SuffixIntToStr(suffix)), + Pos: uint32(pos), + }, nil + } + } + return nil, terror.ErrNoRelayPosMatchGTID.Generate(gset.String()) +} + +// StartSyncByPos start sync by pos // TODO: thread-safe? -func (r *BinlogReader) StartSync(pos mysql.Position) (Streamer, error) { +func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) { if pos.Name == "" { return nil, terror.ErrBinlogFileNotSpecified.Generate() } @@ -144,6 +314,48 @@ func (r *BinlogReader) StartSync(pos mysql.Position) (Streamer, error) { return s, nil } +// StartSyncByGTID start sync by gtid +func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (Streamer, error) { + r.tctx.L().Info("begin to sync binlog", zap.Stringer("GTID Set", gset)) + + if r.running { + return nil, terror.ErrReaderAlreadyRunning.Generate() + } + + err := r.updateUUIDs() + if err != nil { + return nil, err + } + + pos, err := r.getPosByGTID(gset) + if err != nil { + return nil, err + } + r.tctx.L().Info("get pos by gtid", zap.Stringer("GTID Set", gset), zap.Stringer("Position", pos)) + + r.prevGset = gset + r.currGset = nil + + r.latestServerID = 0 + r.running = true + s := newLocalStreamer() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.tctx.L().Info("start reading", zap.Stringer("position", pos)) + err = r.parseRelay(r.tctx.Context(), s, *pos) + if errors.Cause(err) == r.tctx.Context().Err() { + r.tctx.L().Warn("parse relay finished", log.ShortError(err)) + } else if err != nil { + s.closeWithError(err) + r.tctx.L().Error("parse relay stopped", zap.Error(err)) + } + }() + + return s, nil +} + // parseRelay parses relay root directory, it support master-slave switch (switching to next sub directory) func (r *BinlogReader) parseRelay(ctx context.Context, s *LocalStreamer, pos mysql.Position) error { var ( @@ -292,15 +504,14 @@ func (r *BinlogReader) parseFile( r.tctx.L().Debug("read event", zap.Reflect("header", e.Header)) r.latestServerID = e.Header.ServerID // record server_id - switch e.Header.EventType { - case replication.FORMAT_DESCRIPTION_EVENT: + switch ev := e.Event.(type) { + case *replication.FormatDescriptionEvent: // ignore FORMAT_DESCRIPTION event, because go-mysql will send this fake event - case replication.ROTATE_EVENT: + case *replication.RotateEvent: // add master UUID suffix to pos.Name - env := e.Event.(*replication.RotateEvent) - parsed, _ := binlog.ParseFilename(string(env.NextLogName)) + parsed, _ := binlog.ParseFilename(string(ev.NextLogName)) nameWithSuffix := binlog.ConstructFilenameWithUUIDSuffix(parsed, uuidSuffix) - env.NextLogName = []byte(nameWithSuffix) + ev.NextLogName = []byte(nameWithSuffix) if e.Header.Timestamp != 0 && e.Header.LogPos != 0 { // not fake rotate event, update file pos @@ -314,10 +525,38 @@ func (r *BinlogReader) parseFile( // and we *try* to switch to the next when `needReParse` is false. // so this `currentPos` only used for log now. currentPos := mysql.Position{ - Name: string(env.NextLogName), - Pos: uint32(env.Position), + Name: string(ev.NextLogName), + Pos: uint32(ev.Position), } r.tctx.L().Info("rotate binlog", zap.Stringer("position", currentPos)) + case *replication.GTIDEvent: + if r.prevGset == nil { + latestPos = int64(e.Header.LogPos) + break + } + u, _ := uuid.FromBytes(ev.SID) + err2 := r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO)) + if err2 != nil { + return errors.Trace(err2) + } + latestPos = int64(e.Header.LogPos) + case *replication.MariadbGTIDEvent: + if r.prevGset == nil { + latestPos = int64(e.Header.LogPos) + break + } + GTID := ev.GTID + err2 := r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + if err2 != nil { + return errors.Trace(err2) + } + latestPos = int64(e.Header.LogPos) + case *replication.XIDEvent: + ev.GSet = r.getCurrentGtidSet() + latestPos = int64(e.Header.LogPos) + case *replication.QueryEvent: + ev.GSet = r.getCurrentGtidSet() + latestPos = int64(e.Header.LogPos) default: // update file pos latestPos = int64(e.Header.LogPos) @@ -419,3 +658,24 @@ func (r *BinlogReader) GetUUIDs() []string { uuids = append(uuids, r.uuids...) return uuids } + +func (r *BinlogReader) getCurrentGtidSet() mysql.GTIDSet { + if r.currGset == nil { + return nil + } + return r.currGset.Clone() +} + +func (r *BinlogReader) advanceCurrentGtidSet(gtid string) error { + if r.currGset == nil { + r.currGset = r.prevGset.Clone() + } + prev := r.currGset.Clone() + err := r.currGset.Update(gtid) + if err == nil { + if !r.currGset.Equal(prev) { + r.prevGset = prev + } + } + return err +} diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 4a279fc34e..040916cc28 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -21,19 +21,27 @@ import ( "io/ioutil" "math/rand" "os" + "path" "path/filepath" "strconv" + "strings" "sync" + "testing" "time" + "github.com/BurntSushi/toml" + "github.com/google/uuid" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) var parseFileTimeout = 10 * time.Second @@ -41,17 +49,30 @@ var parseFileTimeout = 10 * time.Second var _ = Suite(&testReaderSuite{}) type testReaderSuite struct { + lastPos uint32 + lastGTID gtid.Set +} + +func TestReader(t *testing.T) { + TestingT(t) +} + +func (t *testReaderSuite) SetUpSuite(c *C) { + var err error + t.lastPos = 0 + t.lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0") + c.Assert(err, IsNil) } func (t *testReaderSuite) TestParseFileBase(c *C) { var ( - filename = "test-mysql-bin.000001" - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = false - baseEvents = t.genBinlogEvents(c, 0) - s = newLocalStreamer() + filename = "test-mysql-bin.000001" + baseDir = c.MkDir() + offset int64 + firstParse = true + possibleLast = false + baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) + s = newLocalStreamer() ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -60,7 +81,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { // no valid currentUUID provide, failed currentUUID := "invalid-current-uuid" relayDir := filepath.Join(baseDir, currentUUID) - cfg := &BinlogReaderConfig{RelayDir: baseDir} + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} r := NewBinlogReader(log.L(), cfg) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) @@ -75,7 +96,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" relayDir = filepath.Join(baseDir, currentUUID) fullPath := filepath.Join(relayDir, filename) - cfg = &BinlogReaderConfig{RelayDir: baseDir} + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} r = NewBinlogReader(log.L(), cfg) // relay log file not exists, failed @@ -225,20 +246,20 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { var ( - filename = "test-mysql-bin.000001" - nextFilename = "test-mysql-bin.000002" - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = true - baseEvents = t.genBinlogEvents(c, 0) - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - relayDir = filepath.Join(baseDir, currentUUID) - fullPath = filepath.Join(relayDir, filename) - nextPath = filepath.Join(relayDir, nextFilename) - s = newLocalStreamer() - cfg = &BinlogReaderConfig{RelayDir: baseDir} - r = NewBinlogReader(log.L(), cfg) + filename = "test-mysql-bin.000001" + nextFilename = "test-mysql-bin.000002" + baseDir = c.MkDir() + offset int64 + firstParse = true + possibleLast = true + baseEvents, lastPos, lastGTID = t.genBinlogEvents(c, t.lastPos, t.lastGTID) + currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" + relayDir = filepath.Join(baseDir, currentUUID) + fullPath = filepath.Join(relayDir, filename) + nextPath = filepath.Join(relayDir, nextFilename) + s = newLocalStreamer() + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) ) // create the current relay log file and write some events @@ -270,7 +291,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { // current relay log file updated, need to re-parse it var wg sync.WaitGroup wg.Add(1) - extraEvents := t.genBinlogEvents(c, baseEvents[len(baseEvents)-1].Header.LogPos) + extraEvents, _, _ := t.genBinlogEvents(c, lastPos, lastGTID) go func() { defer wg.Done() time.Sleep(500 * time.Millisecond) // wait parseFile started @@ -314,22 +335,22 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { var ( - filename = "test-mysql-bin.000001" - nextFilename = "test-mysql-bin.666888" - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = true - baseEvents = t.genBinlogEvents(c, 0) - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - switchedUUID = "b60868af-5a6f-11e9-9ea3-0242ac160007.000002" - relayDir = filepath.Join(baseDir, currentUUID) - nextRelayDir = filepath.Join(baseDir, switchedUUID) - fullPath = filepath.Join(relayDir, filename) - nextFullPath = filepath.Join(nextRelayDir, nextFilename) - s = newLocalStreamer() - cfg = &BinlogReaderConfig{RelayDir: baseDir} - r = NewBinlogReader(log.L(), cfg) + filename = "test-mysql-bin.000001" + nextFilename = "test-mysql-bin.666888" + baseDir = c.MkDir() + offset int64 + firstParse = true + possibleLast = true + baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) + currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" + switchedUUID = "b60868af-5a6f-11e9-9ea3-0242ac160007.000002" + relayDir = filepath.Join(baseDir, currentUUID) + nextRelayDir = filepath.Join(baseDir, switchedUUID) + fullPath = filepath.Join(relayDir, filename) + nextFullPath = filepath.Join(nextRelayDir, nextFilename) + s = newLocalStreamer() + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) ) // create the current relay log file and write some events @@ -385,18 +406,18 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { var ( - filename = "test-mysql-bin.000001" - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = true - baseEvents = t.genBinlogEvents(c, 0) - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - relayDir = filepath.Join(baseDir, currentUUID) - fullPath = filepath.Join(relayDir, filename) - s = newLocalStreamer() - cfg = &BinlogReaderConfig{RelayDir: baseDir} - r = NewBinlogReader(log.L(), cfg) + filename = "test-mysql-bin.000001" + baseDir = c.MkDir() + offset int64 + firstParse = true + possibleLast = true + baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) + currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" + relayDir = filepath.Join(baseDir, currentUUID) + fullPath = filepath.Join(relayDir, filename) + s = newLocalStreamer() + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) ) // create the current relay log file and write some events @@ -443,7 +464,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { func (t *testReaderSuite) TestUpdateUUIDs(c *C) { var ( baseDir = c.MkDir() - cfg = &BinlogReaderConfig{RelayDir: baseDir} + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} r = NewBinlogReader(log.L(), cfg) ) c.Assert(r.uuids, HasLen, 0) @@ -467,18 +488,18 @@ func (t *testReaderSuite) TestUpdateUUIDs(c *C) { c.Assert(r.uuids, DeepEquals, UUIDs) } -func (t *testReaderSuite) TestStartSync(c *C) { +func (t *testReaderSuite) TestStartSyncByPos(c *C) { var ( - filenamePrefix = "test-mysql-bin.00000" - baseDir = c.MkDir() - baseEvents = t.genBinlogEvents(c, 0) - eventsBuf bytes.Buffer - UUIDs = []string{ + filenamePrefix = "test-mysql-bin.00000" + baseDir = c.MkDir() + baseEvents, lastPos, lastGTID = t.genBinlogEvents(c, t.lastPos, t.lastGTID) + eventsBuf bytes.Buffer + UUIDs = []string{ "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", "b60868af-5a6f-11e9-9ea3-0242ac160007.000002", "b60868af-5a6f-11e9-9ea3-0242ac160008.000003", } - cfg = &BinlogReaderConfig{RelayDir: baseDir} + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} r = NewBinlogReader(log.L(), cfg) startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -515,7 +536,7 @@ func (t *testReaderSuite) TestStartSync(c *C) { } // start the reader - s, err := r.StartSync(startPos) + s, err := r.StartSyncByPos(startPos) c.Assert(err, IsNil) // get events from the streamer @@ -542,7 +563,7 @@ func (t *testReaderSuite) TestStartSync(c *C) { // 2. write more events to the last file lastFilename := filepath.Join(baseDir, UUIDs[2], filenamePrefix+strconv.Itoa(3)) - extraEvents := t.genBinlogEvents(c, baseEvents[len(baseEvents)-1].Header.LogPos) + extraEvents, _, _ := t.genBinlogEvents(c, lastPos, lastGTID) lastF, err := os.OpenFile(lastFilename, os.O_WRONLY|os.O_APPEND, 0600) c.Assert(err, IsNil) defer lastF.Close() @@ -597,13 +618,224 @@ func (t *testReaderSuite) TestStartSync(c *C) { r.Close() } +func (t *testReaderSuite) TestStartSyncByGTID(c *C) { + var ( + baseDir = c.MkDir() + events []*replication.BinlogEvent + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) + lastPos uint32 + lastGTID gtid.Set + previousGset, _ = gtid.ParserGTID(mysql.MySQLFlavor, "") + ) + type FileEventType struct { + filename string + eventTypes []replication.EventType + } + + testCase := []struct { + serverUUID string + uuid string + gtidStr string + fileEventTypes []FileEventType + }{ + { + "ba8f633f-1f15-11eb-b1c7-0242ac110002", + "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001", + "ba8f633f-1f15-11eb-b1c7-0242ac110002:0", + []FileEventType{ + { + "mysql.000001", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.ROTATE_EVENT, + }, + }, + { + "mysql.000002", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.ROTATE_EVENT, + }, + }, + { + "mysql.000003", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + }, + }, + }, + }, + { + "bf6227a7-1f15-11eb-9afb-0242ac110004", + "bf6227a7-1f15-11eb-9afb-0242ac110004.000002", + "bf6227a7-1f15-11eb-9afb-0242ac110004:20", + []FileEventType{ + { + "mysql.000001", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.ROTATE_EVENT, + }, + }, { + "mysql.000002", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.ROTATE_EVENT, + }, + }, { + "mysql.000003", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + }, + }, + }, + }, { + "bcbf9d42-1f15-11eb-a41c-0242ac110003", + "bcbf9d42-1f15-11eb-a41c-0242ac110003.000003", + "bcbf9d42-1f15-11eb-a41c-0242ac110003:30", + []FileEventType{ + { + "mysql.000001", + []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT, + }, + }, + }, + }, + } + + for _, subDir := range testCase { + r.uuids = append(r.uuids, subDir.uuid) + } + + // write index file + uuidBytes := t.uuidListToBytes(c, r.uuids) + err := ioutil.WriteFile(r.indexPath, uuidBytes, 0600) + c.Assert(err, IsNil) + + var allEvents []*replication.BinlogEvent + + // prePosMap record previous uuid's last file's size + // when master switch, we get the previous uuid's last pos now + prePosMap := make(map[string]uint32) + for _, subDir := range testCase { + prePosMap[subDir.serverUUID] = lastPos + lastPos = 4 + lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, subDir.gtidStr) + c.Assert(err, IsNil) + uuidDir := path.Join(baseDir, subDir.uuid) + err = os.MkdirAll(uuidDir, 0700) + c.Assert(err, IsNil) + + for _, fileEventType := range subDir.fileEventTypes { + events, lastPos, lastGTID, previousGset = t.genEvents(c, fileEventType.eventTypes, lastPos, lastGTID, previousGset) + allEvents = append(allEvents, events...) + + // write binlog file + f, err2 := os.OpenFile(path.Join(uuidDir, fileEventType.filename), os.O_CREATE|os.O_WRONLY, 0600) + c.Assert(err2, IsNil) + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + for _, ev := range events { + _, err = f.Write(ev.RawData) + c.Assert(err, IsNil) + } + f.Close() + + // write meta file + meta := Meta{BinLogName: fileEventType.filename, BinLogPos: lastPos, BinlogGTID: previousGset.String()} + metaFile, err2 := os.Create(path.Join(uuidDir, utils.MetaFilename)) + c.Assert(err2, IsNil) + err = toml.NewEncoder(metaFile).Encode(meta) + c.Assert(err, IsNil) + metaFile.Close() + } + } + + startGTID, err := gtid.ParserGTID(mysql.MySQLFlavor, "") + c.Assert(err, IsNil) + s, err := r.StartSyncByGTID(startGTID.Origin().Clone()) + c.Assert(err, IsNil) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + var obtainBaseEvents []*replication.BinlogEvent + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { + continue // ignore fake event + } + obtainBaseEvents = append(obtainBaseEvents, ev) + // start after the first nil previous event + if len(obtainBaseEvents) == len(allEvents)-1 { + break + } + } + + preGset, err := gtid.ParserGTID(mysql.MySQLFlavor, "") + c.Assert(err, IsNil) + // allEvents: [FORMAT_DESCRIPTION_EVENT, PREVIOUS_GTIDS_EVENT, GTID_EVENT, QUERY_EVENT...] + // obtainBaseEvents: [FORMAT_DESCRIPTION_EVENT(generated), GTID_EVENT, QUERY_EVENT...] + for i, event := range obtainBaseEvents { + if i == 0 { + c.Assert(event.Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT) + continue + } + c.Assert(event.Header, DeepEquals, allEvents[i+1].Header) + switch ev := event.Event.(type) { + case *replication.GTIDEvent: + pos, err2 := r.getPosByGTID(preGset.Origin().Clone()) + u, _ := uuid.FromBytes(ev.SID) + c.Assert(err2, IsNil) + // new uuid dir + if len(preGset.String()) != 0 && !strings.Contains(preGset.String(), u.String()) { + c.Assert(pos.Pos, Equals, prePosMap[u.String()], Commentf("a %d", i)) + } else { + c.Assert(pos.Pos, Equals, event.Header.LogPos-event.Header.EventSize, Commentf("b %d", i)) + } + case *replication.QueryEvent: + err2 := preGset.Set(ev.GSet) + c.Assert(err2, IsNil) + case *replication.XIDEvent: + err2 := preGset.Set(ev.GSet) + c.Assert(err2, IsNil) + } + } +} + func (t *testReaderSuite) TestStartSyncError(c *C) { var ( baseDir = c.MkDir() UUIDs = []string{ "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", } - cfg = &BinlogReaderConfig{RelayDir: baseDir} + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -612,15 +844,19 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") // no startup pos specified - s, err := r.StartSync(gmysql.Position{}) + s, err := r.StartSyncByPos(gmysql.Position{}) c.Assert(terror.ErrBinlogFileNotSpecified.Equal(err), IsTrue) c.Assert(s, IsNil) // empty UUIDs - s, err = r.StartSync(startPos) + s, err = r.StartSyncByPos(startPos) c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") c.Assert(s, IsNil) + s, err = r.StartSyncByGTID(t.lastGTID.Origin().Clone()) + c.Assert(err, ErrorMatches, ".*no relay subdir match gtid.*") + c.Assert(s, IsNil) + // write UUIDs into index file r = NewBinlogReader(log.L(), cfg) // create a new reader uuidBytes := t.uuidListToBytes(c, UUIDs) @@ -628,13 +864,23 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(err, IsNil) // the startup relay log file not found - s, err = r.StartSync(startPos) + s, err = r.StartSyncByPos(startPos) c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*not found.*", startPos.Name)) c.Assert(s, IsNil) + s, err = r.StartSyncByGTID(t.lastGTID.Origin().Clone()) + c.Assert(err, ErrorMatches, ".*load meta data.*no such file or directory.*") + c.Assert(s, IsNil) + // can not re-start the reader r.running = true - s, err = r.StartSync(startPos) + s, err = r.StartSyncByPos(startPos) + c.Assert(terror.ErrReaderAlreadyRunning.Equal(err), IsTrue) + c.Assert(s, IsNil) + r.Close() + + r.running = true + s, err = r.StartSyncByGTID(t.lastGTID.Origin().Clone()) c.Assert(terror.ErrReaderAlreadyRunning.Equal(err), IsTrue) c.Assert(s, IsNil) r.Close() @@ -648,12 +894,12 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { err = ioutil.WriteFile(relayLogFilePath, make([]byte, 100), 0600) c.Assert(err, IsNil) startPos.Pos = 10000 - s, err = r.StartSync(startPos) + s, err = r.StartSyncByPos(startPos) c.Assert(terror.ErrRelayLogGivenPosTooBig.Equal(err), IsTrue) c.Assert(s, IsNil) } -func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32) []*replication.BinlogEvent { +func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32, latestGTID gtid.Set) ([]*replication.BinlogEvent, uint32, gtid.Set) { var ( header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -672,15 +918,84 @@ func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32) []*replication // for these tests, generates some DDL events is enough count := 5 + rand.Intn(5) for i := 0; i < count; i++ { - schema := []byte(fmt.Sprintf("db_%d", i)) - query := []byte(fmt.Sprintf("CREATE TABLE %d (c1 INT)", i)) - ev, err := event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + evs, err := event.GenDDLEvents(mysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 INT)", i)) + c.Assert(err, IsNil) + events = append(events, evs.Events...) + latestPos = evs.LatestPos + latestGTID = evs.LatestGTID + } + + return events, latestPos, latestGTID +} + +func (t *testReaderSuite) genEvents(c *C, eventTypes []replication.EventType, latestPos uint32, latestGTID gtid.Set, previousGset gtid.Set) ([]*replication.BinlogEvent, uint32, gtid.Set, gtid.Set) { + var ( + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + } + events = make([]*replication.BinlogEvent, 0, 10) + pGset = previousGset.Clone() + originSet = pGset.Origin() + ) + + if latestPos <= 4 { // generate a FormatDescriptionEvent if needed + ev, err := event.GenFormatDescriptionEvent(header, 4) c.Assert(err, IsNil) latestPos = ev.Header.LogPos events = append(events, ev) } - return events + for i, eventType := range eventTypes { + switch eventType { + case replication.QUERY_EVENT: + evs, err := event.GenDDLEvents(mysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 int)", i)) + c.Assert(err, IsNil) + events = append(events, evs.Events...) + latestPos = evs.LatestPos + latestGTID = evs.LatestGTID + ev, ok := evs.Events[0].Event.(*replication.GTIDEvent) + c.Assert(ok, IsTrue) + u, _ := uuid.FromBytes(ev.SID) + gs := fmt.Sprintf("%s:%d", u.String(), ev.GNO) + err = originSet.Update(gs) + c.Assert(err, IsNil) + case replication.XID_EVENT: + insertDMLData := []*event.DMLData{ + { + TableID: uint64(i), + Schema: fmt.Sprintf("db_%d", i), + Table: strconv.Itoa(i), + ColumnType: []byte{gmysql.MYSQL_TYPE_INT24}, + Rows: [][]interface{}{{int32(1)}, {int32(2)}}, + }, + } + evs, err := event.GenDMLEvents(mysql.MySQLFlavor, 1, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, 10, insertDMLData) + c.Assert(err, IsNil) + events = append(events, evs.Events...) + latestPos = evs.LatestPos + latestGTID = evs.LatestGTID + ev, ok := evs.Events[0].Event.(*replication.GTIDEvent) + c.Assert(ok, IsTrue) + u, _ := uuid.FromBytes(ev.SID) + gs := fmt.Sprintf("%s:%d", u.String(), ev.GNO) + err = originSet.Update(gs) + c.Assert(err, IsNil) + case replication.ROTATE_EVENT: + ev, err := event.GenRotateEvent(header, latestPos, []byte("next_log"), 4) + c.Assert(err, IsNil) + events = append(events, ev) + latestPos = 4 + case replication.PREVIOUS_GTIDS_EVENT: + ev, err := event.GenPreviousGTIDsEvent(header, latestPos, pGset) + c.Assert(err, IsNil) + events = append(events, ev) + latestPos = ev.Header.LogPos + } + } + err := pGset.Set(originSet) + c.Assert(err, IsNil) + return events, latestPos, latestGTID, pGset } func (t *testReaderSuite) purgeStreamer(c *C, s Streamer) { diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index c453d14ef1..0ca7fd00dd 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -174,6 +174,11 @@ const ( // pkg/parser codeRewriteSQL + + // pkg/streamer + codeNoUUIDDirMatchGTID + codeNoRelayPosMatchGTID + codeReaderReachEndOfFile ) // Config related error code list @@ -534,6 +539,7 @@ const ( codeWorkerDDLLockOpNotFound codeWorkerTLSConfigNotValid codeWorkerFailConnectMaster + codeWorkerWaitRelayCatchupGTID ) // DM-tracer error code @@ -761,6 +767,11 @@ var ( // pkg/parser ErrRewriteSQL = New(codeRewriteSQL, ClassFunctional, ScopeInternal, LevelHigh, "failed to rewrite SQL for target DB, stmt: %+v, targetTableNames: %+v", "") + // pkg/streamer + ErrNoUUIDDirMatchGTID = New(codeNoUUIDDirMatchGTID, ClassFunctional, ScopeInternal, LevelHigh, "no relay subdir match gtid %s", "") + ErrNoRelayPosMatchGTID = New(codeNoRelayPosMatchGTID, ClassFunctional, ScopeInternal, LevelHigh, "no relay pos match gtid %s", "") + ErrReaderReachEndOfFile = New(codeReaderReachEndOfFile, ClassFunctional, ScopeInternal, LevelLow, "", "") + // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.") ErrConfigTomlTransform = New(codeConfigTomlTransform, ClassConfig, ScopeInternal, LevelMedium, "%s", "Please check the configuration file has correct TOML format.") @@ -1092,12 +1103,13 @@ var ( ErrWorkerExecSkipDDLConflict = New(codeWorkerExecSkipDDLConflict, ClassDMWorker, ScopeInternal, LevelHigh, "execDDL and skipDDL can not specify both at the same time", "") ErrWorkerExecDDLSyncerOnly = New(codeWorkerExecDDLSyncerOnly, ClassDMWorker, ScopeInternal, LevelHigh, "only syncer support ExecuteDDL, but current unit is %s", "") ErrWorkerExecDDLTimeout = New(codeWorkerExecDDLTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "ExecuteDDL timeout (exceeding %s)", "Please try use `query-status` to query whether the DDL is still blocking.") - ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s", "") + ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "waiting for relay to catch up with loader is timeout (exceeding %s), loader: %s, relay: %s", "") ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s", "Please try again later.") ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid", "Please check configs in worker configuration file.") ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "no mysql source is being handled in the worker", "") ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "mysql source handler worker already started", "") ErrWorkerSourceNotMatch = New(codeWorkerSourceNotMatch, ClassDMWorker, ScopeInternal, LevelHigh, "source of request does not match with source in worker", "") + ErrWorkerWaitRelayCatchupGTID = New(codeWorkerWaitRelayCatchupGTID, ClassDMWorker, ScopeInternal, LevelHigh, "cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s", "") ErrWorkerFailToGetSubtaskConfigFromEtcd = New(codeWorkerFailToGetSubtaskConfigFromEtcd, ClassDMWorker, ScopeInternal, LevelMedium, "there is no relative subtask config for task %s in etcd", "") ErrWorkerFailToGetSourceConfigFromEtcd = New(codeWorkerFailToGetSourceConfigFromEtcd, ClassDMWorker, ScopeInternal, LevelMedium, "there is no relative source config for source %s in etcd", "") diff --git a/relay/config.go b/relay/config.go index d982ba81fc..ce4c84c8d2 100644 --- a/relay/config.go +++ b/relay/config.go @@ -16,19 +16,20 @@ package relay import ( "encoding/json" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/relay/retry" ) // Config is the configuration for Relay. type Config struct { - EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` - AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` - RelayDir string `toml:"relay-dir" json:"relay-dir"` - ServerID uint32 `toml:"server-id" json:"server-id"` - Flavor string `toml:"flavor" json:"flavor"` - Charset string `toml:"charset" json:"charset"` - From DBConfig `toml:"data-source" json:"data-source"` + EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` + AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` + RelayDir string `toml:"relay-dir" json:"relay-dir"` + ServerID uint32 `toml:"server-id" json:"server-id"` + Flavor string `toml:"flavor" json:"flavor"` + Charset string `toml:"charset" json:"charset"` + From config.DBConfig `toml:"data-source" json:"data-source"` // synchronous start point (if no meta saved before) // do not need to specify binlog-pos, because relay will fetch the whole file @@ -39,14 +40,6 @@ type Config struct { ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` } -// DBConfig is the DB configuration. -type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"-"` // omit it for privacy - Port int `toml:"port" json:"port"` -} - func (c *Config) String() string { cfg, err := json.Marshal(c) if err != nil { @@ -54,3 +47,27 @@ func (c *Config) String() string { } return string(cfg) } + +// FromSourceCfg gen relay config from source config +func FromSourceCfg(sourceCfg *config.SourceConfig) *Config { + clone := sourceCfg.DecryptPassword() + cfg := &Config{ + EnableGTID: clone.EnableGTID, + AutoFixGTID: clone.AutoFixGTID, + Flavor: clone.Flavor, + RelayDir: clone.RelayDir, + ServerID: clone.ServerID, + Charset: clone.Charset, + From: clone.From, + BinLogName: clone.RelayBinLogName, + BinlogGTID: clone.RelayBinlogGTID, + ReaderRetry: retry.ReaderRetryConfig{ // we use config from TaskChecker now + BackoffRollback: clone.Checker.BackoffRollback.Duration, + BackoffMax: clone.Checker.BackoffMax.Duration, + BackoffMin: clone.Checker.BackoffMin.Duration, + BackoffJitter: clone.Checker.BackoffJitter, + BackoffFactor: clone.Checker.BackoffFactor, + }, + } + return cfg +} diff --git a/relay/relay.go b/relay/relay.go index c983fde852..7098ceb46e 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -15,6 +15,7 @@ package relay import ( "context" + "crypto/tls" "database/sql" "fmt" "os" @@ -36,6 +37,7 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" + "github.com/pingcap/dm/pkg/conn" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -46,6 +48,7 @@ import ( "github.com/pingcap/dm/relay/retry" "github.com/pingcap/dm/relay/transformer" "github.com/pingcap/dm/relay/writer" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) var ( @@ -90,6 +93,8 @@ type Process interface { Close() // IsClosed returns whether relay log process unit was closed IsClosed() bool + // SaveMeta save relay meta + SaveMeta(pos mysql.Position, gset gtid.Set) error } // Relay relays mysql binlog to local file. @@ -108,38 +113,16 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } + + relayMetaHub *pkgstreamer.RelayMetaHub } // NewRealRelay creates an instance of Relay. func NewRealRelay(cfg *Config) Process { - syncerCfg := replication.BinlogSyncerConfig{ - ServerID: uint32(cfg.ServerID), - Flavor: cfg.Flavor, - Host: cfg.From.Host, - Port: uint16(cfg.From.Port), - User: cfg.From.User, - Password: cfg.From.Password, - Charset: cfg.Charset, - } - common.SetDefaultReplicationCfg(&syncerCfg, common.MaxBinlogSyncerReconnect) - - if !cfg.EnableGTID { - // for rawMode(true), we only parse FormatDescriptionEvent and RotateEvent - // if not need to support GTID mode, we can enable rawMode - syncerCfg.RawModeEnabled = true - } - - if cfg.Flavor == mysql.MariaDBFlavor { - // ref: https://mariadb.com/kb/en/library/annotate_rows_log_event/#slave-option-replicate-annotate-row-events - // ref: https://github.com/MariaDB/server/blob/bf71d263621c90cbddc7bde9bf071dae503f333f/sql/sql_repl.cc#L1809 - syncerCfg.DumpCommandFlag |= dumpFlagSendAnnotateRowsEvent - } - return &Relay{ - cfg: cfg, - syncerCfg: syncerCfg, - meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), - logger: log.With(zap.String("component", "relay log")), + cfg: cfg, + meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), + logger: log.With(zap.String("component", "relay log")), } } @@ -152,13 +135,17 @@ func (r *Relay) Init(ctx context.Context) (err error) { } }() - cfg := r.cfg.From - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, showStatusConnectionTimeout) - db, err := sql.Open("mysql", dbDSN) + err = r.setSyncConfig() if err != nil { - return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream) + return err } - r.db = db + + db, err := conn.DefaultDBProvider.Apply(r.cfg.From) + if err != nil { + return terror.WithScope(err, terror.ScopeUpstream) + } + + r.db = db.DB rollbackHolder.Add(fr.FuncRollback{Name: "close-DB", Fn: r.closeDB}) if err2 := os.MkdirAll(r.cfg.RelayDir, 0755); err2 != nil { @@ -170,6 +157,9 @@ func (r *Relay) Init(ctx context.Context) (err error) { return err } + r.relayMetaHub = pkgstreamer.GetRelayMetaHub() + r.relayMetaHub.ClearMeta() + return reportRelayLogSpaceInBackground(r.cfg.RelayDir) } @@ -210,12 +200,6 @@ func (r *Relay) process(ctx context.Context) error { } if isNew { - // purge old relay log - err = r.purgeRelayDir() - if err != nil { - return err - } - // re-setup meta for new server err = r.reSetupMeta(ctx) if err != nil { @@ -376,7 +360,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser } else if err = latestGTID.Truncate(result.LatestGTIDs); err != nil { return err } - err = r.meta.Save(result.LatestPos, latestGTID) + err = r.SaveMeta(result.LatestPos, latestGTID) if err != nil { return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs) } @@ -502,7 +486,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo } if needSavePos { - err = r.meta.Save(lastPos, lastGTID) + err = r.SaveMeta(lastPos, lastGTID) if err != nil { return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID) } @@ -586,7 +570,7 @@ func (r *Relay) doIntervalOps(ctx context.Context) { select { case <-flushTicker.C: if r.meta.Dirty() { - err := r.meta.Flush() + err := r.FlushMeta() if err != nil { r.logger.Error("flush meta", zap.Error(err)) } else { @@ -671,9 +655,23 @@ func (r *Relay) IsClosed() bool { return r.closed.Get() } +// SaveMeta save relay meta and update meta in RelayLogInfo +func (r *Relay) SaveMeta(pos mysql.Position, gset gtid.Set) error { + if err := r.meta.Save(pos, gset); err != nil { + return err + } + r.relayMetaHub.SetMeta(r.meta.UUID(), pos, gset) + return nil +} + +// FlushMeta flush relay meta +func (r *Relay) FlushMeta() error { + return r.meta.Flush() +} + // stopSync stops syncing, now it used by Close and Pause func (r *Relay) stopSync() { - if err := r.meta.Flush(); err != nil { + if err := r.FlushMeta(); err != nil { r.logger.Error("flush checkpoint", zap.Error(err)) } } @@ -792,25 +790,10 @@ func (r *Relay) Reload(newCfg *Config) error { } r.db = db - syncerCfg := replication.BinlogSyncerConfig{ - ServerID: uint32(r.cfg.ServerID), - Flavor: r.cfg.Flavor, - Host: newCfg.From.Host, - Port: uint16(newCfg.From.Port), - User: newCfg.From.User, - Password: newCfg.From.Password, - Charset: newCfg.Charset, - } - common.SetDefaultReplicationCfg(&syncerCfg, common.MaxBinlogSyncerReconnect) - - if !newCfg.EnableGTID { - // for rawMode(true), we only parse FormatDescriptionEvent and RotateEvent - // if not need to support GTID mode, we can enable rawMode - syncerCfg.RawModeEnabled = true + if err := r.setSyncConfig(); err != nil { + return err } - r.syncerCfg = syncerCfg - r.logger.Info("relay unit is updated") return nil @@ -837,3 +820,40 @@ func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo { defer r.activeRelayLog.RUnlock() return r.activeRelayLog.info } + +func (r *Relay) setSyncConfig() error { + var tlsConfig *tls.Config + var err error + if r.cfg.From.Security != nil { + tlsConfig, err = toolutils.ToTLSConfig(r.cfg.From.Security.SSLCA, r.cfg.From.Security.SSLCert, r.cfg.From.Security.SSLKey) + if err != nil { + return terror.ErrConnInvalidTLSConfig.Delegate(err) + } + if tlsConfig != nil { + tlsConfig.InsecureSkipVerify = true + } + } + + syncerCfg := replication.BinlogSyncerConfig{ + ServerID: uint32(r.cfg.ServerID), + Flavor: r.cfg.Flavor, + Host: r.cfg.From.Host, + Port: uint16(r.cfg.From.Port), + User: r.cfg.From.User, + Password: r.cfg.From.Password, + Charset: r.cfg.Charset, + TLSConfig: tlsConfig, + } + common.SetDefaultReplicationCfg(&syncerCfg, common.MaxBinlogSyncerReconnect) + + if !r.cfg.EnableGTID { + syncerCfg.RawModeEnabled = true + } + + if r.cfg.Flavor == mysql.MariaDBFlavor { + syncerCfg.DumpCommandFlag |= dumpFlagSendAnnotateRowsEvent + } + + r.syncerCfg = syncerCfg + return nil +} diff --git a/relay/relay_test.go b/relay/relay_test.go index ad47e3d05a..37d9f27b3d 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -29,6 +29,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser" + "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -58,6 +59,29 @@ func (t *testRelaySuite) SetUpSuite(c *C) { c.Assert(log.InitLogger(&log.Config{}), IsNil) } +func newRelayCfg(c *C, flavor string) *Config { + dbCfg := getDBConfigForTest() + return &Config{ + EnableGTID: false, // position mode, so auto-positioning can work + Flavor: flavor, + RelayDir: c.MkDir(), + ServerID: 12321, + From: config.DBConfig{ + Host: dbCfg.Host, + Port: dbCfg.Port, + User: dbCfg.User, + Password: dbCfg.Password, + }, + ReaderRetry: retry.ReaderRetryConfig{ + BackoffRollback: 200 * time.Millisecond, + BackoffMax: 1 * time.Second, + BackoffMin: 1 * time.Millisecond, + BackoffJitter: true, + BackoffFactor: 2, + }, + } +} + func getDBConfigForTest() config.DBConfig { host := os.Getenv("MYSQL_HOST") if host == "" { @@ -151,12 +175,10 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { startPos = gmysql.Position{Name: filename, Pos: 123} parser2 = parser.New() - relayCfg = &Config{ - RelayDir: c.MkDir(), - Flavor: gmysql.MySQLFlavor, - } - r = NewRelay(relayCfg).(*Relay) + relayCfg = newRelayCfg(c, mysql.MySQLFlavor) + r = NewRelay(relayCfg).(*Relay) ) + c.Assert(r.Init(context.Background()), IsNil) // purge old relay dir f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log")) c.Assert(err, IsNil) @@ -205,7 +227,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { // write a greater GTID sets in meta greaterGITDSet, err := gtid.ParserGTID(relayCfg.Flavor, greaterGITDSetStr) c.Assert(err, IsNil) - c.Assert(r.meta.Save(startPos, greaterGITDSet), IsNil) + c.Assert(r.SaveMeta(startPos, greaterGITDSet), IsNil) // invalid data truncated, meta updated c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) @@ -217,7 +239,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { c.Assert(latestGTIDs.Equal(recoverGTIDSet), IsTrue) // verifyMetadata is not enough // no relay log file need to recover - c.Assert(r.meta.Save(minCheckpoint, latestGTIDs), IsNil) + c.Assert(r.SaveMeta(minCheckpoint, latestGTIDs), IsNil) c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) _, latestPos = r.meta.Pos() c.Assert(latestPos, DeepEquals, minCheckpoint) @@ -236,13 +258,10 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { startPos = gmysql.Position{Name: filename, Pos: 123} parser2 = parser.New() - relayCfg = &Config{ - RelayDir: c.MkDir(), - Flavor: gmysql.MySQLFlavor, - } - r = NewRelay(relayCfg).(*Relay) + relayCfg = newRelayCfg(c, mysql.MySQLFlavor) + r = NewRelay(relayCfg).(*Relay) ) - + c.Assert(r.Init(context.Background()), IsNil) recoverGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, recoverGTIDSetStr) c.Assert(err, IsNil) @@ -283,7 +302,7 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { f.Close() // recover with the subset of GTIDs (previous GTID set). - c.Assert(r.meta.Save(startPos, previousGTIDSet), IsNil) + c.Assert(r.SaveMeta(startPos, previousGTIDSet), IsNil) c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) _, latestPos = r.meta.Pos() c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos}) @@ -362,11 +381,8 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { reader2 = &mockReader{} transformer2 = transformer.NewTransformer(parser.New()) writer2 = &mockWriter{} - relayCfg = &Config{ - RelayDir: c.MkDir(), - Flavor: gmysql.MariaDBFlavor, - } - r = NewRelay(relayCfg).(*Relay) + relayCfg = newRelayCfg(c, mysql.MariaDBFlavor) + r = NewRelay(relayCfg).(*Relay) eventHeader = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -376,6 +392,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { rotateEv, _ = event.GenRotateEvent(eventHeader, 123, []byte(binlogPos.Name), uint64(binlogPos.Pos)) queryEv, _ = event.GenQueryEvent(eventHeader, 123, 0, 0, 0, nil, nil, []byte("CREATE DATABASE db_relay_test")) ) + c.Assert(r.Init(context.Background()), IsNil) // NOTE: we can mock meta later. c.Assert(r.meta.Load(), IsNil) c.Assert(r.meta.AddDir("24ecd093-8cec-11e9-aa0d-0242ac170002", nil, nil), IsNil) @@ -528,7 +545,7 @@ func (t *testRelaySuite) TestProcess(c *C) { Flavor: gmysql.MySQLFlavor, RelayDir: c.MkDir(), ServerID: 12321, - From: DBConfig{ + From: config.DBConfig{ Host: dbCfg.Host, Port: dbCfg.Port, User: dbCfg.User, @@ -544,16 +561,10 @@ func (t *testRelaySuite) TestProcess(c *C) { } r = NewRelay(relayCfg).(*Relay) ) - db, err := openDBForTest() - c.Assert(err, IsNil) - r.db = db - defer func() { - r.db.Close() - r.db = nil - }() - ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup + err := r.Init(ctx) + c.Assert(err, IsNil) wg.Add(1) go func() { defer wg.Done() diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index c1d378a800..2702903dd5 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -46,11 +46,15 @@ type StreamerProducer interface { // Read local relay log type localBinlogReader struct { - reader *streamer.BinlogReader + reader *streamer.BinlogReader + EnableGTID bool } func (l *localBinlogReader) generateStreamer(location binlog.Location) (streamer.Streamer, error) { - return l.reader.StartSync(location.Position) + if l.EnableGTID { + return l.reader.StartSyncByGTID(location.GetGTID().Origin().Clone()) + } + return l.reader.StartSyncByPos(location.Position) } // Read remote binlog @@ -201,7 +205,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone})} + c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) diff --git a/tests/handle_error/conf/source1.yaml b/tests/handle_error/conf/source1.yaml index 175e07df7a..692ce72eab 100644 --- a/tests/handle_error/conf/source1.yaml +++ b/tests/handle_error/conf/source1.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/handle_error/conf/source2.yaml b/tests/handle_error/conf/source2.yaml index 8850ed1849..28c0d00f6f 100644 --- a/tests/handle_error/conf/source2.yaml +++ b/tests/handle_error/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/handle_error/run.sh b/tests/handle_error/run.sh index 06c7ad8001..2be1acc088 100644 --- a/tests/handle_error/run.sh +++ b/tests/handle_error/run.sh @@ -118,7 +118,7 @@ function DM_SKIP_ERROR_SHARDING_CASE() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 8" } @@ -278,7 +278,7 @@ function DM_REPLACE_ERROR_SHARDING_CASE() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 4 \ run_sql_tidb_with_retry "select count(1) from ${db}.${tb}" "count(1): 8" } @@ -347,7 +347,7 @@ function DM_REPLACE_ERROR_MULTIPLE_CASE() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 4 \ run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 2" } diff --git a/tests/shardddl1/conf/source2.yaml b/tests/shardddl1/conf/source2.yaml index 100a7bd6b3..31bd5de10e 100644 --- a/tests/shardddl1/conf/source2.yaml +++ b/tests/shardddl1/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/shardddl2/conf/source2.yaml b/tests/shardddl2/conf/source2.yaml index 100a7bd6b3..31bd5de10e 100644 --- a/tests/shardddl2/conf/source2.yaml +++ b/tests/shardddl2/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/shardddl3/conf/source2.yaml b/tests/shardddl3/conf/source2.yaml index 100a7bd6b3..31bd5de10e 100644 --- a/tests/shardddl3/conf/source2.yaml +++ b/tests/shardddl3/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root