diff --git a/dm/worker/server.go b/dm/worker/server.go index ebf8213264..09280f6115 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" "go.uber.org/zap" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) var ( @@ -257,6 +259,8 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* SourceID: s.worker.cfg.SourceID, } + unifyMasterBinlogPos(resp, s.worker.cfg.EnableGTID) + if len(resp.SubTaskStatus) == 0 { resp.Msg = "no sub task started" } @@ -464,3 +468,73 @@ func (s *Server) splitHostPort() (host, port string, err error) { } return } + +// unifyMasterBinlogPos eliminates different masterBinlog in one response +// see https://github.com/pingcap/dm/issues/727 +func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { + var ( + syncStatus []*pb.SubTaskStatus_Sync + syncMasterBinlog []*mysql.Position + lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check + relayMasterBinlog *mysql.Position + ) + + // uninitialized mysql.Position is less than any initialized mysql.Position + if resp.RelayStatus.Stage != pb.Stage_Stopped { + var err error + relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.RelayStatus.MasterBinlog) + if err != nil { + log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + lastestMasterBinlog = *relayMasterBinlog + } + + for _, stStatus := range resp.SubTaskStatus { + if stStatus.Unit == pb.UnitType_Sync { + s := stStatus.Status.(*pb.SubTaskStatus_Sync) + syncStatus = append(syncStatus, s) + + position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog) + if err != nil { + log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + if lastestMasterBinlog.Compare(*position) < 0 { + lastestMasterBinlog = *position + } + syncMasterBinlog = append(syncMasterBinlog, position) + } + } + + // re-check relay + if resp.RelayStatus.Stage != pb.Stage_Stopped && lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 { + resp.RelayStatus.MasterBinlog = lastestMasterBinlog.String() + + // if enableGTID, modify output binlog position doesn't affect RelayCatchUpMaster, skip check + if !enableGTID { + relayPos, err := utils.DecodeBinlogPosition(resp.RelayStatus.RelayBinlog) + if err != nil { + log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + catchUp := lastestMasterBinlog.Compare(*relayPos) == 0 + + resp.RelayStatus.RelayCatchUpMaster = catchUp + } + } + // re-check syncer + for i, sStatus := range syncStatus { + if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 { + syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog) + if err != nil { + log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err)) + return + } + synced := lastestMasterBinlog.Compare(*syncerPos) == 0 + + sStatus.Sync.MasterBinlog = lastestMasterBinlog.String() + sStatus.Sync.Synced = synced + } + } +} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index ab4c0fcdb5..1847d084a2 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -202,3 +202,108 @@ func (t *testServer) TestQueryError(c *C) { c.Assert(resp.SubTaskError, HasLen, 1) c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`) } + +func (t *testServer) TestUnifyMasterBinlogPos(c *C) { + var ( + pos1 = "(bin.000001, 3134)" + pos2 = "(bin.000001, 3234)" + pos3 = "(bin.000001, 3334)" + pos4 = "(bin.000001, 3434)" + ) + + // 1. should modify nothing + resp := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Name: "test", + Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"}, + }}, + RelayStatus: &pb.RelayStatus{ + Stage: pb.Stage_Stopped, + }, + } + resp2 := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Name: "test", + Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"}, + }}, + RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }, + } + resp3 := &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }}, + RelayStatus: &pb.RelayStatus{ + Stage: pb.Stage_Stopped, + }, + } + + for _, r := range []*pb.QueryStatusResponse{resp, resp2, resp3} { + // clone resp + bytes, _ := r.Marshal() + originReps := &pb.QueryStatusResponse{} + err := originReps.Unmarshal(bytes) + c.Assert(err, IsNil) + + unifyMasterBinlogPos(r, false) + c.Assert(r, DeepEquals, originReps) + } + + // 2. could work on multiple status + resp = &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}}, + }}, + RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }, + } + unifyMasterBinlogPos(resp, false) + + sync1 := resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync1.MasterBinlog, Equals, pos4) + c.Assert(sync1.Synced, IsFalse) + sync2 := resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync2.MasterBinlog, Equals, pos4) + c.Assert(sync2.Synced, IsFalse) + relay := resp.RelayStatus + c.Assert(relay.MasterBinlog, Equals, pos4) + c.Assert(relay.RelayCatchUpMaster, IsFalse) + + // 3. test unifyMasterBinlogPos(..., enableGTID = true) + resp = &pb.QueryStatusResponse{ + SubTaskStatus: []*pb.SubTaskStatus{{ + Unit: pb.UnitType_Load, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}}, + }, { + Unit: pb.UnitType_Sync, + Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}}, + }}, + RelayStatus: &pb.RelayStatus{ + MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true, + }, + } + unifyMasterBinlogPos(resp, true) + + sync1 = resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync1.MasterBinlog, Equals, pos4) + c.Assert(sync1.Synced, IsFalse) + sync2 = resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync + c.Assert(sync2.MasterBinlog, Equals, pos4) + c.Assert(sync2.Synced, IsFalse) + relay = resp.RelayStatus + c.Assert(relay.MasterBinlog, Equals, pos4) + c.Assert(relay.RelayCatchUpMaster, IsTrue) +}