Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker, utils: show same master status in one response #817

Merged
merged 12 commits into from
Jul 24, 2020
70 changes: 70 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/sync2"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

var (
Expand Down Expand Up @@ -257,6 +259,8 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
SourceID: s.worker.cfg.SourceID,
}

unifyMasterBinlogPos(resp)

if len(resp.SubTaskStatus) == 0 {
resp.Msg = "no sub task started"
}
Expand Down Expand Up @@ -464,3 +468,69 @@ func (s *Server) splitHostPort() (host, port string, err error) {
}
return
}

// unifyMasterBinlogPos eliminates different masterBinlog in one response
// see https://github.com/pingcap/dm/issues/727
func unifyMasterBinlogPos(resp *pb.QueryStatusResponse) {
var (
syncStatus []*pb.SubTaskStatus_Sync
syncMasterBinlog []*mysql.Position
lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check
relayMasterBinlog *mysql.Position
)

// uninitialized mysql.Position is less than any initialized mysql.Position
if resp.RelayStatus.Stage != pb.Stage_Stopped {
var err error
relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.RelayStatus.MasterBinlog)
if err != nil {
log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
lastestMasterBinlog = *relayMasterBinlog
}

for _, stStatus := range resp.SubTaskStatus {
if stStatus.Unit == pb.UnitType_Sync {
s := stStatus.Status.(*pb.SubTaskStatus_Sync)
syncStatus = append(syncStatus, s)

position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog)
if err != nil {
log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
if lastestMasterBinlog.Compare(*position) < 0 {
lastestMasterBinlog = *position
}
syncMasterBinlog = append(syncMasterBinlog, position)
}
}

// re-check relay
if resp.RelayStatus.Stage != pb.Stage_Stopped && lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 {
relayPos, err := utils.DecodeBinlogPosition(resp.RelayStatus.RelayBinlog)
if err != nil {
log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
catchUp := lastestMasterBinlog.Compare(*relayPos) == 0
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

resp.RelayStatus.MasterBinlog = lastestMasterBinlog.String()
resp.RelayStatus.RelayCatchUpMaster = catchUp
}
// re-check syncer
for i, sStatus := range syncStatus {
if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 {
syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog)
if err != nil {
log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err))
return
}
synced := lastestMasterBinlog.Compare(*syncerPos) == 0

sStatus.Sync.MasterBinlog = lastestMasterBinlog.String()
sStatus.Sync.Synced = synced
}
}
}
78 changes: 78 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ func (t *testServer) TestQueryError(c *C) {
c.Assert(resp.SubTaskError, HasLen, 1)
c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`)
}

func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
var (
pos1 = "(bin.000001, 3134)"
pos2 = "(bin.000001, 3234)"
pos3 = "(bin.000001, 3334)"
pos4 = "(bin.000001, 3434)"
)

// 1. should modify nothing
resp := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
RelayStatus: &pb.RelayStatus{
Stage: pb.Stage_Stopped,
},
}
resp2 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Name: "test",
Status: &pb.SubTaskStatus_Msg{Msg: "sub task not started"},
}},
RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
},
}
resp3 := &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}},
RelayStatus: &pb.RelayStatus{
Stage: pb.Stage_Stopped,
},
}

for _, r := range []*pb.QueryStatusResponse{resp, resp2, resp3} {
// clone resp
bytes, _ := r.Marshal()
originReps := &pb.QueryStatusResponse{}
err := originReps.Unmarshal(bytes)
c.Assert(err, IsNil)

unifyMasterBinlogPos(resp)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(r, DeepEquals, originReps)
}

// 2. could work on multiple status
resp = &pb.QueryStatusResponse{
SubTaskStatus: []*pb.SubTaskStatus{{
Unit: pb.UnitType_Load,
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos2, SyncerBinlog: pos2, Synced: true}},
}, {
Unit: pb.UnitType_Sync,
Status: &pb.SubTaskStatus_Sync{Sync: &pb.SyncStatus{MasterBinlog: pos4, SyncerBinlog: pos3, Synced: false}},
}},
RelayStatus: &pb.RelayStatus{
MasterBinlog: pos1, RelayBinlog: pos1, RelayCatchUpMaster: true,
},
}
unifyMasterBinlogPos(resp)

sync1 := resp.SubTaskStatus[1].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync1.MasterBinlog, Equals, pos4)
c.Assert(sync1.Synced, IsFalse)
sync2 := resp.SubTaskStatus[2].Status.(*pb.SubTaskStatus_Sync).Sync
c.Assert(sync2.MasterBinlog, Equals, pos4)
c.Assert(sync2.Synced, IsFalse)
relay := resp.RelayStatus
c.Assert(relay.MasterBinlog, Equals, pos4)
c.Assert(relay.RelayCatchUpMaster, IsFalse)
}