From c07899159341cc30f452e93a89679de6706e77b9 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 6 May 2021 13:15:52 +0800 Subject: [PATCH 1/3] cherry pick #24410 to release-5.0 Signed-off-by: ti-srebot --- go.mod | 7 + go.sum | 5 + store/copr/mpp.go | 7 + store/mockstore/unistore/tikv/server.go | 1056 +++++++++++++++++++++++ store/tikv/region_cache.go | 5 + store/tikv/region_request_test.go | 24 - 6 files changed, 1080 insertions(+), 24 deletions(-) create mode 100644 store/mockstore/unistore/tikv/server.go diff --git a/go.mod b/go.mod index 33241944dad8b..247c577286674 100644 --- a/go.mod +++ b/go.mod @@ -48,10 +48,17 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 +<<<<<<< HEAD github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69 github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 +======= + github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e + github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 + github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde + github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 +>>>>>>> c269b1170... store/copr: invalidate stale regions for Mpp query. (#24410) github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b github.com/prometheus/client_golang v1.5.1 diff --git a/go.sum b/go.sum index bb2dfd283410c..70e6d55615961 100644 --- a/go.sum +++ b/go.sum @@ -460,8 +460,13 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +<<<<<<< HEAD github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI= github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +======= +github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfejh9/jQ+4UZ5xk9MRYcouWJ0oXRKNE= +github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +>>>>>>> c269b1170... store/copr: invalidate stale regions for Mpp query. (#24410) github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 13488ae5b3e03..a6ff19042f4dd 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -243,6 +243,13 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.sendError(errors.New(realResp.Error.Msg)) return } + if len(realResp.RetryRegions) > 0 { + for _, retry := range realResp.RetryRegions { + id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version) + logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String())) + m.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch) + } + } failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) { if val.(bool) && !req.IsRoot { time.Sleep(1 * time.Second) diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go new file mode 100644 index 0000000000000..f571ff4fe963f --- /dev/null +++ b/store/mockstore/unistore/tikv/server.go @@ -0,0 +1,1056 @@ +// Copyright 2019-present PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "context" + "io" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/coprocessor_v2" + deadlockPb "github.com/pingcap/kvproto/pkg/deadlock" + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/mockstore/unistore/client" + "github.com/pingcap/tidb/store/mockstore/unistore/cophandler" + "github.com/pingcap/tidb/store/mockstore/unistore/tikv/dbreader" + "github.com/pingcap/tidb/store/mockstore/unistore/tikv/pberror" + "github.com/pingcap/tidb/store/mockstore/unistore/util/lockwaiter" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +var _ tikvpb.TikvServer = new(Server) + +// Server implements the tikvpb.TikvServer interface. +type Server struct { + mvccStore *MVCCStore + regionManager RegionManager + innerServer InnerServer + RPCClient client.Client + refCount int32 + stopped int32 +} + +// NewServer returns a new server. +func NewServer(rm RegionManager, store *MVCCStore, innerServer InnerServer) *Server { + return &Server{ + mvccStore: store, + regionManager: rm, + innerServer: innerServer, + } +} + +// Stop stops the server. +func (svr *Server) Stop() { + atomic.StoreInt32(&svr.stopped, 1) + for { + if atomic.LoadInt32(&svr.refCount) == 0 { + break + } + time.Sleep(time.Millisecond * 10) + } + + if err := svr.mvccStore.Close(); err != nil { + log.Error("close mvcc store failed", zap.Error(err)) + } + if err := svr.regionManager.Close(); err != nil { + log.Error("close region manager failed", zap.Error(err)) + } + if err := svr.innerServer.Stop(); err != nil { + log.Error("close inner server failed", zap.Error(err)) + } +} + +// GetStoreIDByAddr gets a store id by the store address. +func (svr *Server) GetStoreIDByAddr(addr string) (uint64, error) { + return svr.regionManager.GetStoreIDByAddr(addr) +} + +// GetStoreAddrByStoreID gets a store address by the store id. +func (svr *Server) GetStoreAddrByStoreID(storeID uint64) (string, error) { + return svr.regionManager.GetStoreAddrByStoreID(storeID) +} + +type requestCtx struct { + svr *Server + regCtx RegionCtx + regErr *errorpb.Error + buf []byte + reader *dbreader.DBReader + method string + startTime time.Time + rpcCtx *kvrpcpb.Context + storeAddr string + storeID uint64 + asyncMinCommitTS uint64 + onePCCommitTS uint64 +} + +func newRequestCtx(svr *Server, ctx *kvrpcpb.Context, method string) (*requestCtx, error) { + atomic.AddInt32(&svr.refCount, 1) + if atomic.LoadInt32(&svr.stopped) > 0 { + atomic.AddInt32(&svr.refCount, -1) + return nil, ErrRetryable("server is closed") + } + req := &requestCtx{ + svr: svr, + method: method, + startTime: time.Now(), + rpcCtx: ctx, + } + req.regCtx, req.regErr = svr.regionManager.GetRegionFromCtx(ctx) + storeAddr, storeID, regErr := svr.regionManager.GetStoreInfoFromCtx(ctx) + req.storeAddr = storeAddr + req.storeID = storeID + if regErr != nil { + req.regErr = regErr + } + return req, nil +} + +// For read-only requests that doesn't acquire latches, this function must be called after all locks has been checked. +func (req *requestCtx) getDBReader() *dbreader.DBReader { + if req.reader == nil { + mvccStore := req.svr.mvccStore + txn := mvccStore.db.NewTransaction(false) + req.reader = dbreader.NewDBReader(req.regCtx.RawStart(), req.regCtx.RawEnd(), txn) + } + return req.reader +} + +func (req *requestCtx) finish() { + atomic.AddInt32(&req.svr.refCount, -1) + if req.reader != nil { + req.reader.Close() + } +} + +// KvGet implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvGet") + if err != nil { + return &kvrpcpb.GetResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.GetResponse{RegionError: reqCtx.regErr}, nil + } + err = svr.mvccStore.CheckKeysLock(req.GetVersion(), req.Context.ResolvedLocks, req.Key) + if err != nil { + return &kvrpcpb.GetResponse{Error: convertToKeyError(err)}, nil + } + reader := reqCtx.getDBReader() + val, err := reader.Get(req.Key, req.GetVersion()) + if err != nil { + return &kvrpcpb.GetResponse{ + Error: convertToKeyError(err), + }, nil + } + val = safeCopy(val) + return &kvrpcpb.GetResponse{ + Value: val, + }, nil +} + +// KvScan implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvScan") + if err != nil { + return &kvrpcpb.ScanResponse{Pairs: []*kvrpcpb.KvPair{{Error: convertToKeyError(err)}}}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.ScanResponse{RegionError: reqCtx.regErr}, nil + } + pairs := svr.mvccStore.Scan(reqCtx, req) + return &kvrpcpb.ScanResponse{ + Pairs: pairs, + }, nil +} + +// KvPessimisticLock implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock") + if err != nil { + return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.PessimisticLockResponse{RegionError: reqCtx.regErr}, nil + } + resp := &kvrpcpb.PessimisticLockResponse{} + waiter, err := svr.mvccStore.PessimisticLock(reqCtx, req, resp) + resp.Errors, resp.RegionError = convertToPBErrors(err) + if waiter == nil { + return resp, nil + } + result := waiter.Wait() + svr.mvccStore.DeadlockDetectCli.CleanUpWaitFor(req.StartVersion, waiter.LockTS, waiter.KeyHash) + svr.mvccStore.lockWaiterManager.CleanUp(waiter) + if result.WakeupSleepTime == lockwaiter.WaitTimeout { + return resp, nil + } + if result.DeadlockResp != nil { + log.Error("deadlock found", zap.Stringer("entry", &result.DeadlockResp.Entry)) + errLocked := err.(*ErrLocked) + deadlockErr := &ErrDeadlock{ + LockKey: errLocked.Key, + LockTS: errLocked.Lock.StartTS, + DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash, + } + resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr) + return resp, nil + } + if result.WakeupSleepTime == lockwaiter.WakeUpThisWaiter { + if req.Force { + req.WaitTimeout = lockwaiter.LockNoWait + _, err := svr.mvccStore.PessimisticLock(reqCtx, req, resp) + resp.Errors, resp.RegionError = convertToPBErrors(err) + if err == nil { + return resp, nil + } + if _, ok := err.(*ErrLocked); !ok { + resp.Errors, resp.RegionError = convertToPBErrors(err) + return resp, nil + } + log.Warn("wakeup force lock request, try lock still failed", zap.Error(err)) + } + } + // The key is rollbacked, we don't have the exact commitTS, but we can use the server's latest. + // Always use the store latest ts since the waiter result commitTs may not be the real conflict ts + conflictCommitTS := svr.mvccStore.getLatestTS() + err = &ErrConflict{ + StartTS: req.GetForUpdateTs(), + ConflictTS: waiter.LockTS, + ConflictCommitTS: conflictCommitTS, + } + resp.Errors, _ = convertToPBErrors(err) + return resp, nil +} + +// KVPessimisticRollback implements implements the tikvpb.TikvServer interface. +func (svr *Server) KVPessimisticRollback(ctx context.Context, req *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticRollback") + if err != nil { + return &kvrpcpb.PessimisticRollbackResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.PessimisticRollbackResponse{RegionError: reqCtx.regErr}, nil + } + err = svr.mvccStore.PessimisticRollback(reqCtx, req) + resp := &kvrpcpb.PessimisticRollbackResponse{} + resp.Errors, resp.RegionError = convertToPBErrors(err) + return resp, nil +} + +// KvTxnHeartBeat implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvTxnHeartBeat(ctx context.Context, req *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "TxnHeartBeat") + if err != nil { + return &kvrpcpb.TxnHeartBeatResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.TxnHeartBeatResponse{RegionError: reqCtx.regErr}, nil + } + lockTTL, err := svr.mvccStore.TxnHeartBeat(reqCtx, req) + resp := &kvrpcpb.TxnHeartBeatResponse{LockTtl: lockTTL} + resp.Error, resp.RegionError = convertToPBError(err) + return resp, nil +} + +// KvCheckTxnStatus implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvCheckTxnStatus") + if err != nil { + return &kvrpcpb.CheckTxnStatusResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.CheckTxnStatusResponse{RegionError: reqCtx.regErr}, nil + } + txnStatus, err := svr.mvccStore.CheckTxnStatus(reqCtx, req) + ttl := uint64(0) + if txnStatus.lockInfo != nil { + ttl = txnStatus.lockInfo.LockTtl + } + resp := &kvrpcpb.CheckTxnStatusResponse{ + LockTtl: ttl, + CommitVersion: txnStatus.commitTS, + Action: txnStatus.action, + LockInfo: txnStatus.lockInfo, + } + resp.Error, resp.RegionError = convertToPBError(err) + return resp, nil +} + +// KvCheckSecondaryLocks implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvCheckSecondaryLocks(ctx context.Context, req *kvrpcpb.CheckSecondaryLocksRequest) (*kvrpcpb.CheckSecondaryLocksResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvCheckSecondaryLocks") + if err != nil { + return &kvrpcpb.CheckSecondaryLocksResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.CheckSecondaryLocksResponse{RegionError: reqCtx.regErr}, nil + } + locksStatus, err := svr.mvccStore.CheckSecondaryLocks(reqCtx, req.Keys, req.StartVersion) + resp := &kvrpcpb.CheckSecondaryLocksResponse{} + if err == nil { + resp.Locks = locksStatus.locks + resp.CommitTs = locksStatus.commitTS + } else { + resp.Error, resp.RegionError = convertToPBError(err) + } + return resp, nil +} + +// KvPrewrite implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvPrewrite") + if err != nil { + return &kvrpcpb.PrewriteResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.PrewriteResponse{RegionError: reqCtx.regErr}, nil + } + err = svr.mvccStore.Prewrite(reqCtx, req) + resp := &kvrpcpb.PrewriteResponse{} + if reqCtx.asyncMinCommitTS > 0 { + resp.MinCommitTs = reqCtx.asyncMinCommitTS + } + if reqCtx.onePCCommitTS > 0 { + resp.OnePcCommitTs = reqCtx.onePCCommitTS + } + resp.Errors, resp.RegionError = convertToPBErrors(err) + return resp, nil +} + +// KvCommit implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvCommit(ctx context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvCommit") + if err != nil { + return &kvrpcpb.CommitResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.CommitResponse{RegionError: reqCtx.regErr}, nil + } + resp := new(kvrpcpb.CommitResponse) + err = svr.mvccStore.Commit(reqCtx, req.Keys, req.GetStartVersion(), req.GetCommitVersion()) + if err != nil { + resp.Error, resp.RegionError = convertToPBError(err) + } + return resp, nil +} + +// RawGetKeyTTL implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawGetKeyTTL(ctx context.Context, req *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) { + // TODO + return &kvrpcpb.RawGetKeyTTLResponse{}, nil +} + +// KvImport implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) { + // TODO + return &kvrpcpb.ImportResponse{}, nil +} + +// KvCleanup implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvCleanup(ctx context.Context, req *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvCleanup") + if err != nil { + return &kvrpcpb.CleanupResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.CleanupResponse{RegionError: reqCtx.regErr}, nil + } + err = svr.mvccStore.Cleanup(reqCtx, req.Key, req.StartVersion, req.CurrentTs) + resp := new(kvrpcpb.CleanupResponse) + if committed, ok := err.(ErrAlreadyCommitted); ok { + resp.CommitVersion = uint64(committed) + } else if err != nil { + log.Error("cleanup failed", zap.Error(err)) + resp.Error, resp.RegionError = convertToPBError(err) + } + return resp, nil +} + +// KvBatchGet implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvBatchGet(ctx context.Context, req *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvBatchGet") + if err != nil { + return &kvrpcpb.BatchGetResponse{Pairs: []*kvrpcpb.KvPair{{Error: convertToKeyError(err)}}}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.BatchGetResponse{RegionError: reqCtx.regErr}, nil + } + pairs := svr.mvccStore.BatchGet(reqCtx, req.Keys, req.GetVersion()) + return &kvrpcpb.BatchGetResponse{ + Pairs: pairs, + }, nil +} + +// KvBatchRollback implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvBatchRollback") + if err != nil { + return &kvrpcpb.BatchRollbackResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.BatchRollbackResponse{RegionError: reqCtx.regErr}, nil + } + resp := new(kvrpcpb.BatchRollbackResponse) + err = svr.mvccStore.Rollback(reqCtx, req.Keys, req.StartVersion) + resp.Error, resp.RegionError = convertToPBError(err) + return resp, nil +} + +// KvScanLock implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvScanLock(ctx context.Context, req *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvScanLock") + if err != nil { + return &kvrpcpb.ScanLockResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.ScanLockResponse{RegionError: reqCtx.regErr}, nil + } + log.Debug("kv scan lock") + locks, err := svr.mvccStore.ScanLock(reqCtx, req.MaxVersion, int(req.Limit)) + return &kvrpcpb.ScanLockResponse{Error: convertToKeyError(err), Locks: locks}, nil +} + +// KvResolveLock implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvResolveLock") + if err != nil { + return &kvrpcpb.ResolveLockResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.ResolveLockResponse{RegionError: reqCtx.regErr}, nil + } + resp := &kvrpcpb.ResolveLockResponse{} + if len(req.TxnInfos) > 0 { + for _, txnInfo := range req.TxnInfos { + log.S().Debugf("kv resolve lock region:%d txn:%v", reqCtx.regCtx.Meta().Id, txnInfo.Txn) + err := svr.mvccStore.ResolveLock(reqCtx, nil, txnInfo.Txn, txnInfo.Status) + if err != nil { + resp.Error, resp.RegionError = convertToPBError(err) + break + } + } + } else { + log.S().Debugf("kv resolve lock region:%d txn:%v", reqCtx.regCtx.Meta().Id, req.StartVersion) + err := svr.mvccStore.ResolveLock(reqCtx, req.Keys, req.StartVersion, req.CommitVersion) + resp.Error, resp.RegionError = convertToPBError(err) + } + return resp, nil +} + +// KvGC implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvGC(ctx context.Context, req *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvGC") + if err != nil { + return &kvrpcpb.GCResponse{Error: convertToKeyError(err)}, nil + } + defer reqCtx.finish() + svr.mvccStore.UpdateSafePoint(req.SafePoint) + return &kvrpcpb.GCResponse{}, nil +} + +// KvDeleteRange implements implements the tikvpb.TikvServer interface. +func (svr *Server) KvDeleteRange(ctx context.Context, req *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "KvDeleteRange") + if err != nil { + return &kvrpcpb.DeleteRangeResponse{Error: convertToKeyError(err).String()}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.DeleteRangeResponse{RegionError: reqCtx.regErr}, nil + } + err = svr.mvccStore.dbWriter.DeleteRange(req.StartKey, req.EndKey, reqCtx.regCtx) + if err != nil { + log.Error("delete range failed", zap.Error(err)) + } + return &kvrpcpb.DeleteRangeResponse{}, nil +} + +// RawKV commands. + +// RawGet implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { + return &kvrpcpb.RawGetResponse{}, nil +} + +// RawPut implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { + return &kvrpcpb.RawPutResponse{}, nil +} + +// RawDelete implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { + return &kvrpcpb.RawDeleteResponse{}, nil +} + +// RawScan implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { + return &kvrpcpb.RawScanResponse{}, nil +} + +// RawBatchDelete implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { + return &kvrpcpb.RawBatchDeleteResponse{}, nil +} + +// RawBatchGet implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { + return &kvrpcpb.RawBatchGetResponse{}, nil +} + +// RawBatchPut implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { + return &kvrpcpb.RawBatchPutResponse{}, nil +} + +// RawBatchScan implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) { + return &kvrpcpb.RawBatchScanResponse{}, nil +} + +// RawDeleteRange implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { + return &kvrpcpb.RawDeleteRangeResponse{}, nil +} + +// SQL push down commands. + +// Coprocessor implements implements the tikvpb.TikvServer interface. +func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") + if err != nil { + return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &coprocessor.Response{RegionError: reqCtx.regErr}, nil + } + return cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req), nil +} + +// CoprocessorStream implements implements the tikvpb.TikvServer interface. +func (svr *Server) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { + // TODO + return nil +} + +// RegionError represents a region error +type RegionError struct { + err *errorpb.Error +} + +// Error implements Error method. +func (regionError *RegionError) Error() string { + return regionError.err.Message +} + +// BatchCoprocessor implements implements the tikvpb.TikvServer interface. +func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServer tikvpb.Tikv_BatchCoprocessorServer) error { + reqCtxs := make([]*requestCtx, 0, len(req.Regions)) + defer func() { + for _, ctx := range reqCtxs { + ctx.finish() + } + }() + for _, ri := range req.Regions { + cop := coprocessor.Request{ + Tp: kv.ReqTypeDAG, + Data: req.Data, + StartTs: req.StartTs, + Ranges: ri.Ranges, + } + regionCtx := *req.Context + regionCtx.RegionEpoch = ri.RegionEpoch + regionCtx.RegionId = ri.RegionId + cop.Context = ®ionCtx + + reqCtx, err := newRequestCtx(svr, ®ionCtx, "Coprocessor") + if err != nil { + return err + } + reqCtxs = append(reqCtxs, reqCtx) + if reqCtx.regErr != nil { + return &RegionError{err: reqCtx.regErr} + } + copResponse := cophandler.HandleCopRequestWithMPPCtx(reqCtx.getDBReader(), svr.mvccStore.lockStore, &cop, nil) + err = batchCopServer.Send(&coprocessor.BatchResponse{Data: copResponse.Data}) + if err != nil { + return err + } + } + return nil +} + +func (mrm *MockRegionManager) removeMPPTaskHandler(taskID int64, storeID uint64) error { + set := mrm.getMPPTaskSet(storeID) + if set == nil { + return errors.New("cannot find mpp task set for store") + } + set.mu.Lock() + defer set.mu.Unlock() + if _, ok := set.taskHandlers[taskID]; ok { + delete(set.taskHandlers, taskID) + return nil + } + return errors.New("cannot find mpp task") +} + +// DispatchMPPTask implements implements the tikvpb.TikvServer interface. +func (svr *Server) DispatchMPPTask(_ context.Context, _ *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) { + panic("todo") +} + +func (svr *Server) executeMPPDispatch(ctx context.Context, req *mpp.DispatchTaskRequest, storeAddr string, storeID uint64, handler *cophandler.MPPTaskHandler) error { + var reqCtx *requestCtx + if len(req.Regions) > 0 { + kvContext := &kvrpcpb.Context{ + RegionId: req.Regions[0].RegionId, + RegionEpoch: req.Regions[0].RegionEpoch, + // this is a hack to reuse task id in kvContext to pass mpp task id + TaskId: uint64(handler.Meta.TaskId), + Peer: &metapb.Peer{StoreId: storeID}, + } + var err error + reqCtx, err = newRequestCtx(svr, kvContext, "Mpp") + if err != nil { + return errors.Trace(err) + } + } + copReq := &coprocessor.Request{ + Tp: kv.ReqTypeDAG, + Data: req.EncodedPlan, + StartTs: req.Meta.StartTs, + } + for _, regionMeta := range req.Regions { + copReq.Ranges = append(copReq.Ranges, regionMeta.Ranges...) + } + var dbreader *dbreader.DBReader + if reqCtx != nil { + dbreader = reqCtx.getDBReader() + } + go func() { + resp := cophandler.HandleCopRequestWithMPPCtx(dbreader, svr.mvccStore.lockStore, copReq, &cophandler.MPPCtx{ + RPCClient: svr.RPCClient, + StoreAddr: storeAddr, + TaskHandler: handler, + Ctx: ctx, + }) + handler.Err = svr.RemoveMPPTaskHandler(req.Meta.TaskId, storeID) + if len(resp.OtherError) > 0 { + handler.Err = errors.New(resp.OtherError) + } + if reqCtx != nil { + reqCtx.finish() + } + }() + return nil +} + +// DispatchMPPTaskWithStoreID implements implements the tikvpb.TikvServer interface. +func (svr *Server) DispatchMPPTaskWithStoreID(ctx context.Context, req *mpp.DispatchTaskRequest, storeID uint64) (*mpp.DispatchTaskResponse, error) { + mppHandler, err := svr.CreateMPPTaskHandler(req.Meta, storeID) + if err != nil { + return nil, errors.Trace(err) + } + storeAddr, err := svr.GetStoreAddrByStoreID(storeID) + if err != nil { + return nil, err + } + err = svr.executeMPPDispatch(ctx, req, storeAddr, storeID, mppHandler) + resp := &mpp.DispatchTaskResponse{} + if err != nil { + resp.Error = &mpp.Error{Msg: err.Error()} + } + return resp, nil +} + +// CancelMPPTask implements implements the tikvpb.TikvServer interface. +func (svr *Server) CancelMPPTask(_ context.Context, _ *mpp.CancelTaskRequest) (*mpp.CancelTaskResponse, error) { + panic("todo") +} + +// GetMPPTaskHandler implements implements the tikvpb.TikvServer interface. +func (svr *Server) GetMPPTaskHandler(taskID int64, storeID uint64) (*cophandler.MPPTaskHandler, error) { + if mrm, ok := svr.regionManager.(*MockRegionManager); ok { + set := mrm.getMPPTaskSet(storeID) + if set == nil { + return nil, errors.New("cannot find mpp task set for store") + } + set.mu.Lock() + defer set.mu.Unlock() + if handler, ok := set.taskHandlers[taskID]; ok { + return handler, nil + } + return nil, nil + } + return nil, errors.New("Only mock region mgr supports get mpp task") +} + +// RemoveMPPTaskHandler implements implements the tikvpb.TikvServer interface. +func (svr *Server) RemoveMPPTaskHandler(taskID int64, storeID uint64) error { + if mrm, ok := svr.regionManager.(*MockRegionManager); ok { + err := mrm.removeMPPTaskHandler(taskID, storeID) + return errors.Trace(err) + } + return errors.New("Only mock region mgr supports remove mpp task") +} + +// CreateMPPTaskHandler implements implements the tikvpb.TikvServer interface. +func (svr *Server) CreateMPPTaskHandler(meta *mpp.TaskMeta, storeID uint64) (*cophandler.MPPTaskHandler, error) { + if mrm, ok := svr.regionManager.(*MockRegionManager); ok { + set := mrm.getMPPTaskSet(storeID) + if set == nil { + return nil, errors.New("cannot find mpp task set for store") + } + set.mu.Lock() + defer set.mu.Unlock() + if handler, ok := set.taskHandlers[meta.TaskId]; ok { + return handler, errors.Errorf("Task %d has been created", meta.TaskId) + } + handler := &cophandler.MPPTaskHandler{ + TunnelSet: make(map[int64]*cophandler.ExchangerTunnel), + Meta: meta, + RPCClient: svr.RPCClient, + } + set.taskHandlers[meta.TaskId] = handler + return handler, nil + } + return nil, errors.New("Only mock region mgr supports get mpp task") +} + +// EstablishMPPConnection implements implements the tikvpb.TikvServer interface. +func (svr *Server) EstablishMPPConnection(*mpp.EstablishMPPConnectionRequest, tikvpb.Tikv_EstablishMPPConnectionServer) error { + panic("todo") +} + +// EstablishMPPConnectionWithStoreID implements implements the tikvpb.TikvServer interface. +func (svr *Server) EstablishMPPConnectionWithStoreID(req *mpp.EstablishMPPConnectionRequest, server tikvpb.Tikv_EstablishMPPConnectionServer, storeID uint64) error { + var ( + mppHandler *cophandler.MPPTaskHandler + err error + ) + maxRetryTime := 5 + for i := 0; i < maxRetryTime; i++ { + mppHandler, err = svr.GetMPPTaskHandler(req.SenderMeta.TaskId, storeID) + if err != nil { + return errors.Trace(err) + } + if mppHandler == nil { + time.Sleep(time.Second) + } else { + break + } + } + if mppHandler == nil { + return errors.New("tatsk not found") + } + ctx1, cancel := context.WithCancel(context.Background()) + defer cancel() + tunnel, err := mppHandler.HandleEstablishConn(ctx1, req) + if err != nil { + return errors.Trace(err) + } + var sendError error + for sendError == nil { + chunk, err := tunnel.RecvChunk() + if err != nil { + sendError = server.Send(&mpp.MPPDataPacket{Error: &mpp.Error{Msg: err.Error()}}) + break + } + if chunk == nil { + // todo return io.EOF error? + break + } + res := tipb.SelectResponse{ + Chunks: []tipb.Chunk{*chunk}, + } + raw, err := res.Marshal() + if err != nil { + sendError = server.Send(&mpp.MPPDataPacket{Error: &mpp.Error{Msg: err.Error()}}) + break + } + sendError = server.Send(&mpp.MPPDataPacket{Data: raw}) + } + return sendError +} + +// Raft commands (tikv <-> tikv). + +// Raft implements implements the tikvpb.TikvServer interface. +func (svr *Server) Raft(stream tikvpb.Tikv_RaftServer) error { + return svr.innerServer.Raft(stream) +} + +// Snapshot implements implements the tikvpb.TikvServer interface. +func (svr *Server) Snapshot(stream tikvpb.Tikv_SnapshotServer) error { + return svr.innerServer.Snapshot(stream) +} + +// BatchRaft implements implements the tikvpb.TikvServer interface. +func (svr *Server) BatchRaft(stream tikvpb.Tikv_BatchRaftServer) error { + return svr.innerServer.BatchRaft(stream) +} + +// Region commands. + +// SplitRegion implements implements the tikvpb.TikvServer interface. +func (svr *Server) SplitRegion(ctx context.Context, req *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "SplitRegion") + if err != nil { + return &kvrpcpb.SplitRegionResponse{RegionError: &errorpb.Error{Message: err.Error()}}, nil + } + defer reqCtx.finish() + return svr.regionManager.SplitRegion(req), nil +} + +// ReadIndex implements implements the tikvpb.TikvServer interface. +func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { + // TODO: + return &kvrpcpb.ReadIndexResponse{}, nil +} + +// transaction debugger commands. + +// MvccGetByKey implements implements the tikvpb.TikvServer interface. +func (svr *Server) MvccGetByKey(ctx context.Context, req *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "MvccGetByKey") + if err != nil { + return &kvrpcpb.MvccGetByKeyResponse{Error: err.Error()}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.MvccGetByKeyResponse{RegionError: reqCtx.regErr}, nil + } + resp := new(kvrpcpb.MvccGetByKeyResponse) + mvccInfo, err := svr.mvccStore.MvccGetByKey(reqCtx, req.GetKey()) + if err != nil { + resp.Error = err.Error() + } + resp.Info = mvccInfo + return resp, nil +} + +// MvccGetByStartTs implements implements the tikvpb.TikvServer interface. +func (svr *Server) MvccGetByStartTs(ctx context.Context, req *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) { + reqCtx, err := newRequestCtx(svr, req.Context, "MvccGetByStartTs") + if err != nil { + return &kvrpcpb.MvccGetByStartTsResponse{Error: err.Error()}, nil + } + defer reqCtx.finish() + if reqCtx.regErr != nil { + return &kvrpcpb.MvccGetByStartTsResponse{RegionError: reqCtx.regErr}, nil + } + resp := new(kvrpcpb.MvccGetByStartTsResponse) + mvccInfo, key, err := svr.mvccStore.MvccGetByStartTs(reqCtx, req.StartTs) + if err != nil { + resp.Error = err.Error() + } + resp.Info = mvccInfo + resp.Key = key + return resp, nil +} + +// UnsafeDestroyRange implements implements the tikvpb.TikvServer interface. +func (svr *Server) UnsafeDestroyRange(ctx context.Context, req *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) { + start, end := req.GetStartKey(), req.GetEndKey() + svr.mvccStore.DeleteFileInRange(start, end) + return &kvrpcpb.UnsafeDestroyRangeResponse{}, nil +} + +// GetWaitForEntries tries to get the waitFor entries +// deadlock detection related services +func (svr *Server) GetWaitForEntries(ctx context.Context, + req *deadlockPb.WaitForEntriesRequest) (*deadlockPb.WaitForEntriesResponse, error) { + // TODO + return &deadlockPb.WaitForEntriesResponse{}, nil +} + +// Detect will handle detection rpc from other nodes +func (svr *Server) Detect(stream deadlockPb.Deadlock_DetectServer) error { + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + if !svr.mvccStore.DeadlockDetectSvr.IsLeader() { + log.Warn("detection requests received on non leader node") + break + } + resp := svr.mvccStore.DeadlockDetectSvr.Detect(req) + if resp != nil { + if sendErr := stream.Send(resp); sendErr != nil { + log.Error("send deadlock response failed", zap.Error(sendErr)) + break + } + } + } + return nil +} + +// CheckLockObserver implements implements the tikvpb.TikvServer interface. +func (svr *Server) CheckLockObserver(context.Context, *kvrpcpb.CheckLockObserverRequest) (*kvrpcpb.CheckLockObserverResponse, error) { + // TODO: implement Observer + return &kvrpcpb.CheckLockObserverResponse{IsClean: true}, nil +} + +// PhysicalScanLock implements implements the tikvpb.TikvServer interface. +func (svr *Server) PhysicalScanLock(ctx context.Context, req *kvrpcpb.PhysicalScanLockRequest) (*kvrpcpb.PhysicalScanLockResponse, error) { + resp := &kvrpcpb.PhysicalScanLockResponse{} + resp.Locks = svr.mvccStore.PhysicalScanLock(req.StartKey, req.MaxTs, int(req.Limit)) + return resp, nil +} + +// RegisterLockObserver implements implements the tikvpb.TikvServer interface. +func (svr *Server) RegisterLockObserver(context.Context, *kvrpcpb.RegisterLockObserverRequest) (*kvrpcpb.RegisterLockObserverResponse, error) { + // TODO: implement Observer + return &kvrpcpb.RegisterLockObserverResponse{}, nil +} + +// RemoveLockObserver implements implements the tikvpb.TikvServer interface. +func (svr *Server) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserverRequest) (*kvrpcpb.RemoveLockObserverResponse, error) { + // TODO: implement Observer + return &kvrpcpb.RemoveLockObserverResponse{}, nil +} + +// CheckLeader implements implements the tikvpb.TikvServer interface. +func (svr *Server) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { + panic("unimplemented") +} + +// RawCompareAndSwap implements the tikvpb.TikvServer interface. +func (svr *Server) RawCompareAndSwap(context.Context, *kvrpcpb.RawCASRequest) (*kvrpcpb.RawCASResponse, error) { + panic("implement me") +} + +// CoprocessorV2 implements the tikvpb.TikvServer interface. +func (svr *Server) CoprocessorV2(context.Context, *coprocessor_v2.RawCoprocessorRequest) (*coprocessor_v2.RawCoprocessorResponse, error) { + panic("implement me") +} + +// GetStoreSafeTS implements the tikvpb.TikvServer interface. +func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest) (*kvrpcpb.StoreSafeTSResponse, error) { + return &kvrpcpb.StoreSafeTSResponse{}, nil +} + +func convertToKeyError(err error) *kvrpcpb.KeyError { + if err == nil { + return nil + } + causeErr := errors.Cause(err) + switch x := causeErr.(type) { + case *ErrLocked: + return &kvrpcpb.KeyError{ + Locked: x.Lock.ToLockInfo(x.Key), + } + case ErrRetryable: + return &kvrpcpb.KeyError{ + Retryable: x.Error(), + } + case *ErrKeyAlreadyExists: + return &kvrpcpb.KeyError{ + AlreadyExist: &kvrpcpb.AlreadyExist{ + Key: x.Key, + }, + } + case *ErrConflict: + return &kvrpcpb.KeyError{ + Conflict: &kvrpcpb.WriteConflict{ + StartTs: x.StartTS, + ConflictTs: x.ConflictTS, + ConflictCommitTs: x.ConflictCommitTS, + Key: x.Key, + }, + } + case *ErrDeadlock: + return &kvrpcpb.KeyError{ + Deadlock: &kvrpcpb.Deadlock{ + LockKey: x.LockKey, + LockTs: x.LockTS, + DeadlockKeyHash: x.DeadlockKeyHash, + }, + } + case *ErrCommitExpire: + return &kvrpcpb.KeyError{ + CommitTsExpired: &kvrpcpb.CommitTsExpired{ + StartTs: x.StartTs, + AttemptedCommitTs: x.CommitTs, + Key: x.Key, + MinCommitTs: x.MinCommitTs, + }, + } + case *ErrTxnNotFound: + return &kvrpcpb.KeyError{ + TxnNotFound: &kvrpcpb.TxnNotFound{ + StartTs: x.StartTS, + PrimaryKey: x.PrimaryKey, + }, + } + default: + return &kvrpcpb.KeyError{ + Abort: err.Error(), + } + } +} + +func convertToPBError(err error) (*kvrpcpb.KeyError, *errorpb.Error) { + if regErr := extractRegionError(err); regErr != nil { + return nil, regErr + } + return convertToKeyError(err), nil +} + +func convertToPBErrors(err error) ([]*kvrpcpb.KeyError, *errorpb.Error) { + if err != nil { + if regErr := extractRegionError(err); regErr != nil { + return nil, regErr + } + return []*kvrpcpb.KeyError{convertToKeyError(err)}, nil + } + return nil, nil +} + +func extractRegionError(err error) *errorpb.Error { + if pbError, ok := err.(*pberror.PBError); ok { + return pbError.RequestErr + } + return nil +} diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index af6d91ccae03e..4ab671beaf575 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1516,6 +1516,11 @@ type RegionVerID struct { ver uint64 } +// NewRegionVerID creates a region ver id, which used for invalidating regions. +func NewRegionVerID(id, confVer, ver uint64) RegionVerID { + return RegionVerID{id, confVer, ver} +} + // GetID returns the id of the region func (r *RegionVerID) GetID() uint64 { return r.id diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 3b07becd366e8..6cc1274cfe4c9 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -470,30 +470,6 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques return nil, errors.New("unreachable") } -func (s *mockTikvGrpcServer) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) { - return nil, errors.New("unreachable") -} - func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { return nil, errors.New("unreachable") } From e8271f4ce9ca9265fcc11262e70f03cb777be15e Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 13 May 2021 18:40:08 +0800 Subject: [PATCH 2/3] resolve --- go.mod | 9 +- go.sum | 9 +- store/mockstore/unistore/tikv/server.go | 1056 ----------------------- store/tikv/region_request_test.go | 24 + 4 files changed, 27 insertions(+), 1071 deletions(-) delete mode 100644 store/mockstore/unistore/tikv/server.go diff --git a/go.mod b/go.mod index 247c577286674..5e8a788a3230a 100644 --- a/go.mod +++ b/go.mod @@ -48,17 +48,10 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 -<<<<<<< HEAD - github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 + github.com/pingcap/kvproto v0.0.0-20210513093146-12df7bbc771e github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69 github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99 -======= - github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e - github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde - github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 ->>>>>>> c269b1170... store/copr: invalidate stale regions for Mpp query. (#24410) github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b github.com/prometheus/client_golang v1.5.1 diff --git a/go.sum b/go.sum index 70e6d55615961..9377866c064cf 100644 --- a/go.sum +++ b/go.sum @@ -460,13 +460,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -<<<<<<< HEAD -github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI= -github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -======= -github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfejh9/jQ+4UZ5xk9MRYcouWJ0oXRKNE= -github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= ->>>>>>> c269b1170... store/copr: invalidate stale regions for Mpp query. (#24410) +github.com/pingcap/kvproto v0.0.0-20210513093146-12df7bbc771e h1:VRM4I6zWODFNAnWtV/R6+FeZtJ2D7bMtLdn83wlNOKo= +github.com/pingcap/kvproto v0.0.0-20210513093146-12df7bbc771e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go deleted file mode 100644 index f571ff4fe963f..0000000000000 --- a/store/mockstore/unistore/tikv/server.go +++ /dev/null @@ -1,1056 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "context" - "io" - "sync/atomic" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/kvproto/pkg/coprocessor_v2" - deadlockPb "github.com/pingcap/kvproto/pkg/deadlock" - "github.com/pingcap/kvproto/pkg/errorpb" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/mpp" - "github.com/pingcap/kvproto/pkg/tikvpb" - "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store/mockstore/unistore/client" - "github.com/pingcap/tidb/store/mockstore/unistore/cophandler" - "github.com/pingcap/tidb/store/mockstore/unistore/tikv/dbreader" - "github.com/pingcap/tidb/store/mockstore/unistore/tikv/pberror" - "github.com/pingcap/tidb/store/mockstore/unistore/util/lockwaiter" - "github.com/pingcap/tipb/go-tipb" - "go.uber.org/zap" -) - -var _ tikvpb.TikvServer = new(Server) - -// Server implements the tikvpb.TikvServer interface. -type Server struct { - mvccStore *MVCCStore - regionManager RegionManager - innerServer InnerServer - RPCClient client.Client - refCount int32 - stopped int32 -} - -// NewServer returns a new server. -func NewServer(rm RegionManager, store *MVCCStore, innerServer InnerServer) *Server { - return &Server{ - mvccStore: store, - regionManager: rm, - innerServer: innerServer, - } -} - -// Stop stops the server. -func (svr *Server) Stop() { - atomic.StoreInt32(&svr.stopped, 1) - for { - if atomic.LoadInt32(&svr.refCount) == 0 { - break - } - time.Sleep(time.Millisecond * 10) - } - - if err := svr.mvccStore.Close(); err != nil { - log.Error("close mvcc store failed", zap.Error(err)) - } - if err := svr.regionManager.Close(); err != nil { - log.Error("close region manager failed", zap.Error(err)) - } - if err := svr.innerServer.Stop(); err != nil { - log.Error("close inner server failed", zap.Error(err)) - } -} - -// GetStoreIDByAddr gets a store id by the store address. -func (svr *Server) GetStoreIDByAddr(addr string) (uint64, error) { - return svr.regionManager.GetStoreIDByAddr(addr) -} - -// GetStoreAddrByStoreID gets a store address by the store id. -func (svr *Server) GetStoreAddrByStoreID(storeID uint64) (string, error) { - return svr.regionManager.GetStoreAddrByStoreID(storeID) -} - -type requestCtx struct { - svr *Server - regCtx RegionCtx - regErr *errorpb.Error - buf []byte - reader *dbreader.DBReader - method string - startTime time.Time - rpcCtx *kvrpcpb.Context - storeAddr string - storeID uint64 - asyncMinCommitTS uint64 - onePCCommitTS uint64 -} - -func newRequestCtx(svr *Server, ctx *kvrpcpb.Context, method string) (*requestCtx, error) { - atomic.AddInt32(&svr.refCount, 1) - if atomic.LoadInt32(&svr.stopped) > 0 { - atomic.AddInt32(&svr.refCount, -1) - return nil, ErrRetryable("server is closed") - } - req := &requestCtx{ - svr: svr, - method: method, - startTime: time.Now(), - rpcCtx: ctx, - } - req.regCtx, req.regErr = svr.regionManager.GetRegionFromCtx(ctx) - storeAddr, storeID, regErr := svr.regionManager.GetStoreInfoFromCtx(ctx) - req.storeAddr = storeAddr - req.storeID = storeID - if regErr != nil { - req.regErr = regErr - } - return req, nil -} - -// For read-only requests that doesn't acquire latches, this function must be called after all locks has been checked. -func (req *requestCtx) getDBReader() *dbreader.DBReader { - if req.reader == nil { - mvccStore := req.svr.mvccStore - txn := mvccStore.db.NewTransaction(false) - req.reader = dbreader.NewDBReader(req.regCtx.RawStart(), req.regCtx.RawEnd(), txn) - } - return req.reader -} - -func (req *requestCtx) finish() { - atomic.AddInt32(&req.svr.refCount, -1) - if req.reader != nil { - req.reader.Close() - } -} - -// KvGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvGet") - if err != nil { - return &kvrpcpb.GetResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.GetResponse{RegionError: reqCtx.regErr}, nil - } - err = svr.mvccStore.CheckKeysLock(req.GetVersion(), req.Context.ResolvedLocks, req.Key) - if err != nil { - return &kvrpcpb.GetResponse{Error: convertToKeyError(err)}, nil - } - reader := reqCtx.getDBReader() - val, err := reader.Get(req.Key, req.GetVersion()) - if err != nil { - return &kvrpcpb.GetResponse{ - Error: convertToKeyError(err), - }, nil - } - val = safeCopy(val) - return &kvrpcpb.GetResponse{ - Value: val, - }, nil -} - -// KvScan implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvScan") - if err != nil { - return &kvrpcpb.ScanResponse{Pairs: []*kvrpcpb.KvPair{{Error: convertToKeyError(err)}}}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.ScanResponse{RegionError: reqCtx.regErr}, nil - } - pairs := svr.mvccStore.Scan(reqCtx, req) - return &kvrpcpb.ScanResponse{ - Pairs: pairs, - }, nil -} - -// KvPessimisticLock implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock") - if err != nil { - return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.PessimisticLockResponse{RegionError: reqCtx.regErr}, nil - } - resp := &kvrpcpb.PessimisticLockResponse{} - waiter, err := svr.mvccStore.PessimisticLock(reqCtx, req, resp) - resp.Errors, resp.RegionError = convertToPBErrors(err) - if waiter == nil { - return resp, nil - } - result := waiter.Wait() - svr.mvccStore.DeadlockDetectCli.CleanUpWaitFor(req.StartVersion, waiter.LockTS, waiter.KeyHash) - svr.mvccStore.lockWaiterManager.CleanUp(waiter) - if result.WakeupSleepTime == lockwaiter.WaitTimeout { - return resp, nil - } - if result.DeadlockResp != nil { - log.Error("deadlock found", zap.Stringer("entry", &result.DeadlockResp.Entry)) - errLocked := err.(*ErrLocked) - deadlockErr := &ErrDeadlock{ - LockKey: errLocked.Key, - LockTS: errLocked.Lock.StartTS, - DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash, - } - resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr) - return resp, nil - } - if result.WakeupSleepTime == lockwaiter.WakeUpThisWaiter { - if req.Force { - req.WaitTimeout = lockwaiter.LockNoWait - _, err := svr.mvccStore.PessimisticLock(reqCtx, req, resp) - resp.Errors, resp.RegionError = convertToPBErrors(err) - if err == nil { - return resp, nil - } - if _, ok := err.(*ErrLocked); !ok { - resp.Errors, resp.RegionError = convertToPBErrors(err) - return resp, nil - } - log.Warn("wakeup force lock request, try lock still failed", zap.Error(err)) - } - } - // The key is rollbacked, we don't have the exact commitTS, but we can use the server's latest. - // Always use the store latest ts since the waiter result commitTs may not be the real conflict ts - conflictCommitTS := svr.mvccStore.getLatestTS() - err = &ErrConflict{ - StartTS: req.GetForUpdateTs(), - ConflictTS: waiter.LockTS, - ConflictCommitTS: conflictCommitTS, - } - resp.Errors, _ = convertToPBErrors(err) - return resp, nil -} - -// KVPessimisticRollback implements implements the tikvpb.TikvServer interface. -func (svr *Server) KVPessimisticRollback(ctx context.Context, req *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticRollback") - if err != nil { - return &kvrpcpb.PessimisticRollbackResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.PessimisticRollbackResponse{RegionError: reqCtx.regErr}, nil - } - err = svr.mvccStore.PessimisticRollback(reqCtx, req) - resp := &kvrpcpb.PessimisticRollbackResponse{} - resp.Errors, resp.RegionError = convertToPBErrors(err) - return resp, nil -} - -// KvTxnHeartBeat implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvTxnHeartBeat(ctx context.Context, req *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "TxnHeartBeat") - if err != nil { - return &kvrpcpb.TxnHeartBeatResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.TxnHeartBeatResponse{RegionError: reqCtx.regErr}, nil - } - lockTTL, err := svr.mvccStore.TxnHeartBeat(reqCtx, req) - resp := &kvrpcpb.TxnHeartBeatResponse{LockTtl: lockTTL} - resp.Error, resp.RegionError = convertToPBError(err) - return resp, nil -} - -// KvCheckTxnStatus implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvCheckTxnStatus") - if err != nil { - return &kvrpcpb.CheckTxnStatusResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.CheckTxnStatusResponse{RegionError: reqCtx.regErr}, nil - } - txnStatus, err := svr.mvccStore.CheckTxnStatus(reqCtx, req) - ttl := uint64(0) - if txnStatus.lockInfo != nil { - ttl = txnStatus.lockInfo.LockTtl - } - resp := &kvrpcpb.CheckTxnStatusResponse{ - LockTtl: ttl, - CommitVersion: txnStatus.commitTS, - Action: txnStatus.action, - LockInfo: txnStatus.lockInfo, - } - resp.Error, resp.RegionError = convertToPBError(err) - return resp, nil -} - -// KvCheckSecondaryLocks implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvCheckSecondaryLocks(ctx context.Context, req *kvrpcpb.CheckSecondaryLocksRequest) (*kvrpcpb.CheckSecondaryLocksResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvCheckSecondaryLocks") - if err != nil { - return &kvrpcpb.CheckSecondaryLocksResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.CheckSecondaryLocksResponse{RegionError: reqCtx.regErr}, nil - } - locksStatus, err := svr.mvccStore.CheckSecondaryLocks(reqCtx, req.Keys, req.StartVersion) - resp := &kvrpcpb.CheckSecondaryLocksResponse{} - if err == nil { - resp.Locks = locksStatus.locks - resp.CommitTs = locksStatus.commitTS - } else { - resp.Error, resp.RegionError = convertToPBError(err) - } - return resp, nil -} - -// KvPrewrite implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvPrewrite") - if err != nil { - return &kvrpcpb.PrewriteResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.PrewriteResponse{RegionError: reqCtx.regErr}, nil - } - err = svr.mvccStore.Prewrite(reqCtx, req) - resp := &kvrpcpb.PrewriteResponse{} - if reqCtx.asyncMinCommitTS > 0 { - resp.MinCommitTs = reqCtx.asyncMinCommitTS - } - if reqCtx.onePCCommitTS > 0 { - resp.OnePcCommitTs = reqCtx.onePCCommitTS - } - resp.Errors, resp.RegionError = convertToPBErrors(err) - return resp, nil -} - -// KvCommit implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvCommit(ctx context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvCommit") - if err != nil { - return &kvrpcpb.CommitResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.CommitResponse{RegionError: reqCtx.regErr}, nil - } - resp := new(kvrpcpb.CommitResponse) - err = svr.mvccStore.Commit(reqCtx, req.Keys, req.GetStartVersion(), req.GetCommitVersion()) - if err != nil { - resp.Error, resp.RegionError = convertToPBError(err) - } - return resp, nil -} - -// RawGetKeyTTL implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawGetKeyTTL(ctx context.Context, req *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) { - // TODO - return &kvrpcpb.RawGetKeyTTLResponse{}, nil -} - -// KvImport implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) { - // TODO - return &kvrpcpb.ImportResponse{}, nil -} - -// KvCleanup implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvCleanup(ctx context.Context, req *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvCleanup") - if err != nil { - return &kvrpcpb.CleanupResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.CleanupResponse{RegionError: reqCtx.regErr}, nil - } - err = svr.mvccStore.Cleanup(reqCtx, req.Key, req.StartVersion, req.CurrentTs) - resp := new(kvrpcpb.CleanupResponse) - if committed, ok := err.(ErrAlreadyCommitted); ok { - resp.CommitVersion = uint64(committed) - } else if err != nil { - log.Error("cleanup failed", zap.Error(err)) - resp.Error, resp.RegionError = convertToPBError(err) - } - return resp, nil -} - -// KvBatchGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvBatchGet(ctx context.Context, req *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvBatchGet") - if err != nil { - return &kvrpcpb.BatchGetResponse{Pairs: []*kvrpcpb.KvPair{{Error: convertToKeyError(err)}}}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.BatchGetResponse{RegionError: reqCtx.regErr}, nil - } - pairs := svr.mvccStore.BatchGet(reqCtx, req.Keys, req.GetVersion()) - return &kvrpcpb.BatchGetResponse{ - Pairs: pairs, - }, nil -} - -// KvBatchRollback implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvBatchRollback") - if err != nil { - return &kvrpcpb.BatchRollbackResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.BatchRollbackResponse{RegionError: reqCtx.regErr}, nil - } - resp := new(kvrpcpb.BatchRollbackResponse) - err = svr.mvccStore.Rollback(reqCtx, req.Keys, req.StartVersion) - resp.Error, resp.RegionError = convertToPBError(err) - return resp, nil -} - -// KvScanLock implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvScanLock(ctx context.Context, req *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvScanLock") - if err != nil { - return &kvrpcpb.ScanLockResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.ScanLockResponse{RegionError: reqCtx.regErr}, nil - } - log.Debug("kv scan lock") - locks, err := svr.mvccStore.ScanLock(reqCtx, req.MaxVersion, int(req.Limit)) - return &kvrpcpb.ScanLockResponse{Error: convertToKeyError(err), Locks: locks}, nil -} - -// KvResolveLock implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvResolveLock") - if err != nil { - return &kvrpcpb.ResolveLockResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.ResolveLockResponse{RegionError: reqCtx.regErr}, nil - } - resp := &kvrpcpb.ResolveLockResponse{} - if len(req.TxnInfos) > 0 { - for _, txnInfo := range req.TxnInfos { - log.S().Debugf("kv resolve lock region:%d txn:%v", reqCtx.regCtx.Meta().Id, txnInfo.Txn) - err := svr.mvccStore.ResolveLock(reqCtx, nil, txnInfo.Txn, txnInfo.Status) - if err != nil { - resp.Error, resp.RegionError = convertToPBError(err) - break - } - } - } else { - log.S().Debugf("kv resolve lock region:%d txn:%v", reqCtx.regCtx.Meta().Id, req.StartVersion) - err := svr.mvccStore.ResolveLock(reqCtx, req.Keys, req.StartVersion, req.CommitVersion) - resp.Error, resp.RegionError = convertToPBError(err) - } - return resp, nil -} - -// KvGC implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvGC(ctx context.Context, req *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvGC") - if err != nil { - return &kvrpcpb.GCResponse{Error: convertToKeyError(err)}, nil - } - defer reqCtx.finish() - svr.mvccStore.UpdateSafePoint(req.SafePoint) - return &kvrpcpb.GCResponse{}, nil -} - -// KvDeleteRange implements implements the tikvpb.TikvServer interface. -func (svr *Server) KvDeleteRange(ctx context.Context, req *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "KvDeleteRange") - if err != nil { - return &kvrpcpb.DeleteRangeResponse{Error: convertToKeyError(err).String()}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.DeleteRangeResponse{RegionError: reqCtx.regErr}, nil - } - err = svr.mvccStore.dbWriter.DeleteRange(req.StartKey, req.EndKey, reqCtx.regCtx) - if err != nil { - log.Error("delete range failed", zap.Error(err)) - } - return &kvrpcpb.DeleteRangeResponse{}, nil -} - -// RawKV commands. - -// RawGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { - return &kvrpcpb.RawGetResponse{}, nil -} - -// RawPut implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { - return &kvrpcpb.RawPutResponse{}, nil -} - -// RawDelete implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { - return &kvrpcpb.RawDeleteResponse{}, nil -} - -// RawScan implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { - return &kvrpcpb.RawScanResponse{}, nil -} - -// RawBatchDelete implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { - return &kvrpcpb.RawBatchDeleteResponse{}, nil -} - -// RawBatchGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { - return &kvrpcpb.RawBatchGetResponse{}, nil -} - -// RawBatchPut implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { - return &kvrpcpb.RawBatchPutResponse{}, nil -} - -// RawBatchScan implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) { - return &kvrpcpb.RawBatchScanResponse{}, nil -} - -// RawDeleteRange implements implements the tikvpb.TikvServer interface. -func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { - return &kvrpcpb.RawDeleteRangeResponse{}, nil -} - -// SQL push down commands. - -// Coprocessor implements implements the tikvpb.TikvServer interface. -func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") - if err != nil { - return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &coprocessor.Response{RegionError: reqCtx.regErr}, nil - } - return cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req), nil -} - -// CoprocessorStream implements implements the tikvpb.TikvServer interface. -func (svr *Server) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { - // TODO - return nil -} - -// RegionError represents a region error -type RegionError struct { - err *errorpb.Error -} - -// Error implements Error method. -func (regionError *RegionError) Error() string { - return regionError.err.Message -} - -// BatchCoprocessor implements implements the tikvpb.TikvServer interface. -func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServer tikvpb.Tikv_BatchCoprocessorServer) error { - reqCtxs := make([]*requestCtx, 0, len(req.Regions)) - defer func() { - for _, ctx := range reqCtxs { - ctx.finish() - } - }() - for _, ri := range req.Regions { - cop := coprocessor.Request{ - Tp: kv.ReqTypeDAG, - Data: req.Data, - StartTs: req.StartTs, - Ranges: ri.Ranges, - } - regionCtx := *req.Context - regionCtx.RegionEpoch = ri.RegionEpoch - regionCtx.RegionId = ri.RegionId - cop.Context = ®ionCtx - - reqCtx, err := newRequestCtx(svr, ®ionCtx, "Coprocessor") - if err != nil { - return err - } - reqCtxs = append(reqCtxs, reqCtx) - if reqCtx.regErr != nil { - return &RegionError{err: reqCtx.regErr} - } - copResponse := cophandler.HandleCopRequestWithMPPCtx(reqCtx.getDBReader(), svr.mvccStore.lockStore, &cop, nil) - err = batchCopServer.Send(&coprocessor.BatchResponse{Data: copResponse.Data}) - if err != nil { - return err - } - } - return nil -} - -func (mrm *MockRegionManager) removeMPPTaskHandler(taskID int64, storeID uint64) error { - set := mrm.getMPPTaskSet(storeID) - if set == nil { - return errors.New("cannot find mpp task set for store") - } - set.mu.Lock() - defer set.mu.Unlock() - if _, ok := set.taskHandlers[taskID]; ok { - delete(set.taskHandlers, taskID) - return nil - } - return errors.New("cannot find mpp task") -} - -// DispatchMPPTask implements implements the tikvpb.TikvServer interface. -func (svr *Server) DispatchMPPTask(_ context.Context, _ *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) { - panic("todo") -} - -func (svr *Server) executeMPPDispatch(ctx context.Context, req *mpp.DispatchTaskRequest, storeAddr string, storeID uint64, handler *cophandler.MPPTaskHandler) error { - var reqCtx *requestCtx - if len(req.Regions) > 0 { - kvContext := &kvrpcpb.Context{ - RegionId: req.Regions[0].RegionId, - RegionEpoch: req.Regions[0].RegionEpoch, - // this is a hack to reuse task id in kvContext to pass mpp task id - TaskId: uint64(handler.Meta.TaskId), - Peer: &metapb.Peer{StoreId: storeID}, - } - var err error - reqCtx, err = newRequestCtx(svr, kvContext, "Mpp") - if err != nil { - return errors.Trace(err) - } - } - copReq := &coprocessor.Request{ - Tp: kv.ReqTypeDAG, - Data: req.EncodedPlan, - StartTs: req.Meta.StartTs, - } - for _, regionMeta := range req.Regions { - copReq.Ranges = append(copReq.Ranges, regionMeta.Ranges...) - } - var dbreader *dbreader.DBReader - if reqCtx != nil { - dbreader = reqCtx.getDBReader() - } - go func() { - resp := cophandler.HandleCopRequestWithMPPCtx(dbreader, svr.mvccStore.lockStore, copReq, &cophandler.MPPCtx{ - RPCClient: svr.RPCClient, - StoreAddr: storeAddr, - TaskHandler: handler, - Ctx: ctx, - }) - handler.Err = svr.RemoveMPPTaskHandler(req.Meta.TaskId, storeID) - if len(resp.OtherError) > 0 { - handler.Err = errors.New(resp.OtherError) - } - if reqCtx != nil { - reqCtx.finish() - } - }() - return nil -} - -// DispatchMPPTaskWithStoreID implements implements the tikvpb.TikvServer interface. -func (svr *Server) DispatchMPPTaskWithStoreID(ctx context.Context, req *mpp.DispatchTaskRequest, storeID uint64) (*mpp.DispatchTaskResponse, error) { - mppHandler, err := svr.CreateMPPTaskHandler(req.Meta, storeID) - if err != nil { - return nil, errors.Trace(err) - } - storeAddr, err := svr.GetStoreAddrByStoreID(storeID) - if err != nil { - return nil, err - } - err = svr.executeMPPDispatch(ctx, req, storeAddr, storeID, mppHandler) - resp := &mpp.DispatchTaskResponse{} - if err != nil { - resp.Error = &mpp.Error{Msg: err.Error()} - } - return resp, nil -} - -// CancelMPPTask implements implements the tikvpb.TikvServer interface. -func (svr *Server) CancelMPPTask(_ context.Context, _ *mpp.CancelTaskRequest) (*mpp.CancelTaskResponse, error) { - panic("todo") -} - -// GetMPPTaskHandler implements implements the tikvpb.TikvServer interface. -func (svr *Server) GetMPPTaskHandler(taskID int64, storeID uint64) (*cophandler.MPPTaskHandler, error) { - if mrm, ok := svr.regionManager.(*MockRegionManager); ok { - set := mrm.getMPPTaskSet(storeID) - if set == nil { - return nil, errors.New("cannot find mpp task set for store") - } - set.mu.Lock() - defer set.mu.Unlock() - if handler, ok := set.taskHandlers[taskID]; ok { - return handler, nil - } - return nil, nil - } - return nil, errors.New("Only mock region mgr supports get mpp task") -} - -// RemoveMPPTaskHandler implements implements the tikvpb.TikvServer interface. -func (svr *Server) RemoveMPPTaskHandler(taskID int64, storeID uint64) error { - if mrm, ok := svr.regionManager.(*MockRegionManager); ok { - err := mrm.removeMPPTaskHandler(taskID, storeID) - return errors.Trace(err) - } - return errors.New("Only mock region mgr supports remove mpp task") -} - -// CreateMPPTaskHandler implements implements the tikvpb.TikvServer interface. -func (svr *Server) CreateMPPTaskHandler(meta *mpp.TaskMeta, storeID uint64) (*cophandler.MPPTaskHandler, error) { - if mrm, ok := svr.regionManager.(*MockRegionManager); ok { - set := mrm.getMPPTaskSet(storeID) - if set == nil { - return nil, errors.New("cannot find mpp task set for store") - } - set.mu.Lock() - defer set.mu.Unlock() - if handler, ok := set.taskHandlers[meta.TaskId]; ok { - return handler, errors.Errorf("Task %d has been created", meta.TaskId) - } - handler := &cophandler.MPPTaskHandler{ - TunnelSet: make(map[int64]*cophandler.ExchangerTunnel), - Meta: meta, - RPCClient: svr.RPCClient, - } - set.taskHandlers[meta.TaskId] = handler - return handler, nil - } - return nil, errors.New("Only mock region mgr supports get mpp task") -} - -// EstablishMPPConnection implements implements the tikvpb.TikvServer interface. -func (svr *Server) EstablishMPPConnection(*mpp.EstablishMPPConnectionRequest, tikvpb.Tikv_EstablishMPPConnectionServer) error { - panic("todo") -} - -// EstablishMPPConnectionWithStoreID implements implements the tikvpb.TikvServer interface. -func (svr *Server) EstablishMPPConnectionWithStoreID(req *mpp.EstablishMPPConnectionRequest, server tikvpb.Tikv_EstablishMPPConnectionServer, storeID uint64) error { - var ( - mppHandler *cophandler.MPPTaskHandler - err error - ) - maxRetryTime := 5 - for i := 0; i < maxRetryTime; i++ { - mppHandler, err = svr.GetMPPTaskHandler(req.SenderMeta.TaskId, storeID) - if err != nil { - return errors.Trace(err) - } - if mppHandler == nil { - time.Sleep(time.Second) - } else { - break - } - } - if mppHandler == nil { - return errors.New("tatsk not found") - } - ctx1, cancel := context.WithCancel(context.Background()) - defer cancel() - tunnel, err := mppHandler.HandleEstablishConn(ctx1, req) - if err != nil { - return errors.Trace(err) - } - var sendError error - for sendError == nil { - chunk, err := tunnel.RecvChunk() - if err != nil { - sendError = server.Send(&mpp.MPPDataPacket{Error: &mpp.Error{Msg: err.Error()}}) - break - } - if chunk == nil { - // todo return io.EOF error? - break - } - res := tipb.SelectResponse{ - Chunks: []tipb.Chunk{*chunk}, - } - raw, err := res.Marshal() - if err != nil { - sendError = server.Send(&mpp.MPPDataPacket{Error: &mpp.Error{Msg: err.Error()}}) - break - } - sendError = server.Send(&mpp.MPPDataPacket{Data: raw}) - } - return sendError -} - -// Raft commands (tikv <-> tikv). - -// Raft implements implements the tikvpb.TikvServer interface. -func (svr *Server) Raft(stream tikvpb.Tikv_RaftServer) error { - return svr.innerServer.Raft(stream) -} - -// Snapshot implements implements the tikvpb.TikvServer interface. -func (svr *Server) Snapshot(stream tikvpb.Tikv_SnapshotServer) error { - return svr.innerServer.Snapshot(stream) -} - -// BatchRaft implements implements the tikvpb.TikvServer interface. -func (svr *Server) BatchRaft(stream tikvpb.Tikv_BatchRaftServer) error { - return svr.innerServer.BatchRaft(stream) -} - -// Region commands. - -// SplitRegion implements implements the tikvpb.TikvServer interface. -func (svr *Server) SplitRegion(ctx context.Context, req *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "SplitRegion") - if err != nil { - return &kvrpcpb.SplitRegionResponse{RegionError: &errorpb.Error{Message: err.Error()}}, nil - } - defer reqCtx.finish() - return svr.regionManager.SplitRegion(req), nil -} - -// ReadIndex implements implements the tikvpb.TikvServer interface. -func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { - // TODO: - return &kvrpcpb.ReadIndexResponse{}, nil -} - -// transaction debugger commands. - -// MvccGetByKey implements implements the tikvpb.TikvServer interface. -func (svr *Server) MvccGetByKey(ctx context.Context, req *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "MvccGetByKey") - if err != nil { - return &kvrpcpb.MvccGetByKeyResponse{Error: err.Error()}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.MvccGetByKeyResponse{RegionError: reqCtx.regErr}, nil - } - resp := new(kvrpcpb.MvccGetByKeyResponse) - mvccInfo, err := svr.mvccStore.MvccGetByKey(reqCtx, req.GetKey()) - if err != nil { - resp.Error = err.Error() - } - resp.Info = mvccInfo - return resp, nil -} - -// MvccGetByStartTs implements implements the tikvpb.TikvServer interface. -func (svr *Server) MvccGetByStartTs(ctx context.Context, req *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) { - reqCtx, err := newRequestCtx(svr, req.Context, "MvccGetByStartTs") - if err != nil { - return &kvrpcpb.MvccGetByStartTsResponse{Error: err.Error()}, nil - } - defer reqCtx.finish() - if reqCtx.regErr != nil { - return &kvrpcpb.MvccGetByStartTsResponse{RegionError: reqCtx.regErr}, nil - } - resp := new(kvrpcpb.MvccGetByStartTsResponse) - mvccInfo, key, err := svr.mvccStore.MvccGetByStartTs(reqCtx, req.StartTs) - if err != nil { - resp.Error = err.Error() - } - resp.Info = mvccInfo - resp.Key = key - return resp, nil -} - -// UnsafeDestroyRange implements implements the tikvpb.TikvServer interface. -func (svr *Server) UnsafeDestroyRange(ctx context.Context, req *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) { - start, end := req.GetStartKey(), req.GetEndKey() - svr.mvccStore.DeleteFileInRange(start, end) - return &kvrpcpb.UnsafeDestroyRangeResponse{}, nil -} - -// GetWaitForEntries tries to get the waitFor entries -// deadlock detection related services -func (svr *Server) GetWaitForEntries(ctx context.Context, - req *deadlockPb.WaitForEntriesRequest) (*deadlockPb.WaitForEntriesResponse, error) { - // TODO - return &deadlockPb.WaitForEntriesResponse{}, nil -} - -// Detect will handle detection rpc from other nodes -func (svr *Server) Detect(stream deadlockPb.Deadlock_DetectServer) error { - for { - req, err := stream.Recv() - if err != nil { - if err == io.EOF { - break - } - return err - } - if !svr.mvccStore.DeadlockDetectSvr.IsLeader() { - log.Warn("detection requests received on non leader node") - break - } - resp := svr.mvccStore.DeadlockDetectSvr.Detect(req) - if resp != nil { - if sendErr := stream.Send(resp); sendErr != nil { - log.Error("send deadlock response failed", zap.Error(sendErr)) - break - } - } - } - return nil -} - -// CheckLockObserver implements implements the tikvpb.TikvServer interface. -func (svr *Server) CheckLockObserver(context.Context, *kvrpcpb.CheckLockObserverRequest) (*kvrpcpb.CheckLockObserverResponse, error) { - // TODO: implement Observer - return &kvrpcpb.CheckLockObserverResponse{IsClean: true}, nil -} - -// PhysicalScanLock implements implements the tikvpb.TikvServer interface. -func (svr *Server) PhysicalScanLock(ctx context.Context, req *kvrpcpb.PhysicalScanLockRequest) (*kvrpcpb.PhysicalScanLockResponse, error) { - resp := &kvrpcpb.PhysicalScanLockResponse{} - resp.Locks = svr.mvccStore.PhysicalScanLock(req.StartKey, req.MaxTs, int(req.Limit)) - return resp, nil -} - -// RegisterLockObserver implements implements the tikvpb.TikvServer interface. -func (svr *Server) RegisterLockObserver(context.Context, *kvrpcpb.RegisterLockObserverRequest) (*kvrpcpb.RegisterLockObserverResponse, error) { - // TODO: implement Observer - return &kvrpcpb.RegisterLockObserverResponse{}, nil -} - -// RemoveLockObserver implements implements the tikvpb.TikvServer interface. -func (svr *Server) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserverRequest) (*kvrpcpb.RemoveLockObserverResponse, error) { - // TODO: implement Observer - return &kvrpcpb.RemoveLockObserverResponse{}, nil -} - -// CheckLeader implements implements the tikvpb.TikvServer interface. -func (svr *Server) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { - panic("unimplemented") -} - -// RawCompareAndSwap implements the tikvpb.TikvServer interface. -func (svr *Server) RawCompareAndSwap(context.Context, *kvrpcpb.RawCASRequest) (*kvrpcpb.RawCASResponse, error) { - panic("implement me") -} - -// CoprocessorV2 implements the tikvpb.TikvServer interface. -func (svr *Server) CoprocessorV2(context.Context, *coprocessor_v2.RawCoprocessorRequest) (*coprocessor_v2.RawCoprocessorResponse, error) { - panic("implement me") -} - -// GetStoreSafeTS implements the tikvpb.TikvServer interface. -func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest) (*kvrpcpb.StoreSafeTSResponse, error) { - return &kvrpcpb.StoreSafeTSResponse{}, nil -} - -func convertToKeyError(err error) *kvrpcpb.KeyError { - if err == nil { - return nil - } - causeErr := errors.Cause(err) - switch x := causeErr.(type) { - case *ErrLocked: - return &kvrpcpb.KeyError{ - Locked: x.Lock.ToLockInfo(x.Key), - } - case ErrRetryable: - return &kvrpcpb.KeyError{ - Retryable: x.Error(), - } - case *ErrKeyAlreadyExists: - return &kvrpcpb.KeyError{ - AlreadyExist: &kvrpcpb.AlreadyExist{ - Key: x.Key, - }, - } - case *ErrConflict: - return &kvrpcpb.KeyError{ - Conflict: &kvrpcpb.WriteConflict{ - StartTs: x.StartTS, - ConflictTs: x.ConflictTS, - ConflictCommitTs: x.ConflictCommitTS, - Key: x.Key, - }, - } - case *ErrDeadlock: - return &kvrpcpb.KeyError{ - Deadlock: &kvrpcpb.Deadlock{ - LockKey: x.LockKey, - LockTs: x.LockTS, - DeadlockKeyHash: x.DeadlockKeyHash, - }, - } - case *ErrCommitExpire: - return &kvrpcpb.KeyError{ - CommitTsExpired: &kvrpcpb.CommitTsExpired{ - StartTs: x.StartTs, - AttemptedCommitTs: x.CommitTs, - Key: x.Key, - MinCommitTs: x.MinCommitTs, - }, - } - case *ErrTxnNotFound: - return &kvrpcpb.KeyError{ - TxnNotFound: &kvrpcpb.TxnNotFound{ - StartTs: x.StartTS, - PrimaryKey: x.PrimaryKey, - }, - } - default: - return &kvrpcpb.KeyError{ - Abort: err.Error(), - } - } -} - -func convertToPBError(err error) (*kvrpcpb.KeyError, *errorpb.Error) { - if regErr := extractRegionError(err); regErr != nil { - return nil, regErr - } - return convertToKeyError(err), nil -} - -func convertToPBErrors(err error) ([]*kvrpcpb.KeyError, *errorpb.Error) { - if err != nil { - if regErr := extractRegionError(err); regErr != nil { - return nil, regErr - } - return []*kvrpcpb.KeyError{convertToKeyError(err)}, nil - } - return nil, nil -} - -func extractRegionError(err error) *errorpb.Error { - if pbError, ok := err.(*pberror.PBError); ok { - return pbError.RequestErr - } - return nil -} diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 6cc1274cfe4c9..3b07becd366e8 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -470,6 +470,30 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) { + return nil, errors.New("unreachable") +} + func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { return nil, errors.New("unreachable") } From 149e5a8ee78dd04299a8a02b9efc17f0d2930b36 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 18 Jun 2021 16:13:48 +0800 Subject: [PATCH 3/3] address comments --- store/copr/mpp.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index a6ff19042f4dd..49b2e7f609d19 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -243,12 +243,10 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.sendError(errors.New(realResp.Error.Msg)) return } - if len(realResp.RetryRegions) > 0 { - for _, retry := range realResp.RetryRegions { - id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version) - logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String())) - m.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch) - } + for _, retry := range realResp.RetryRegions { + id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version) + logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String())) + m.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch) } failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) { if val.(bool) && !req.IsRoot {