Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: support retry for mpp query (#26462) #26480

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,28 @@ func (s *tiflashTestSuite) TestMppEnum(c *C) {
tk.MustQuery("select t1.b from t t1 join t t2 on t1.a = t2.a order by t1.b").Check(testkit.Rows("aca", "bca", "zca"))
}

func (s *tiflashTestSuite) TestDispatchTaskRetry(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("insert into t values(4,0)")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout"), IsNil)
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
testleak.BeforeTest()
defer testleak.AfterTest(c)()
Expand Down
3 changes: 2 additions & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
Expand All @@ -139,7 +140,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
aliveReq := tikvrpc.NewRequest(tikvrpc.CmdMPPAlive, &mpp.IsAliveRequest{}, kvrpcpb.Context{})
aliveReq.StoreTp = tikvrpc.TiFlash
alive := false
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, tikv.ReadTimeoutMedium)
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, 3*time.Second)
if err != nil {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()), zap.String("err message", err.Error()))
} else {
Expand Down
47 changes: 35 additions & 12 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// MPPClient servers MPP requests.
Expand Down Expand Up @@ -150,7 +152,12 @@ func (m *mppIterator) run(ctx context.Context) {
m.mu.Unlock()
m.wg.Add(1)
bo := backoff.NewBackoffer(ctx, copNextMaxBackoff)
go m.handleDispatchReq(ctx, bo, task)
go func(mppTask *kv.MPPDispatchRequest) {
defer func() {
m.wg.Done()
}()
m.handleDispatchReq(ctx, bo, mppTask)
}(task)
}
m.wg.Wait()
close(m.respChan)
Expand All @@ -174,9 +181,6 @@ func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) {
// - dispatch all tasks at once, and connect tasks at second.
// - dispatch tasks and establish connection at the same time.
func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req *kv.MPPDispatchRequest) {
defer func() {
m.wg.Done()
}()
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
Expand Down Expand Up @@ -210,23 +214,36 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// TODO: Handle dispatch task response correctly, including retry logic and cancel logic.
var rpcResp *tikvrpc.Response
var err error
var retry bool
// If copTasks is not empty, we should send request according to region distribution.
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
err = derr.ErrTiFlashServerTimeout
}
} else {
rpcResp, err = m.store.GetTiKVClient().SendRequest(ctx, req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutMedium)
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
retry = false
} else if err != nil {
if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil {
retry = true
}
}
}

if retry {
logutil.BgLogger().Warn("mpp dispatch meet error and retrying", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
m.handleDispatchReq(ctx, bo, req)
return
}

if err != nil {
Expand All @@ -239,7 +256,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
realResp := rpcResp.Resp.(*mpp.DispatchTaskResponse)

if realResp.Error != nil {
logutil.BgLogger().Error("mpp dispatch response meet error", zap.String("error", realResp.Error.Msg))
logutil.BgLogger().Error("mpp dispatch response meet error", zap.String("error", realResp.Error.Msg), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
m.sendError(errors.New(realResp.Error.Msg))
return
}
Expand Down Expand Up @@ -314,9 +331,15 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, readTimeoutUltraLong)

if err != nil {
logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
logutil.BgLogger().Warn("establish mpp connection meet error, and retrying", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
err = bo.Backoff(tikv.BoTiFlashRPC(), err)
if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
m.establishMPPConns(bo, req, taskMeta)
return
}

Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
})
resp.Resp, err = c.handleBatchCop(ctx, req.BatchCop(), timeout)
case tikvrpc.CmdMPPConn:
failpoint.Inject("mppConnTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("rpc error"))
}
})
resp.Resp, err = c.handleEstablishMPPConnection(ctx, req.EstablishMPPConn(), timeout, storeID)
case tikvrpc.CmdMPPTask:
failpoint.Inject("mppDispatchTimeout", func(val failpoint.Value) {
Expand Down