From d8e9dc0ad154ad30b6f90354f97c2987b8b2230f Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Thu, 4 Jun 2020 14:10:06 +0800 Subject: [PATCH] use tidb cophandler to handle coprocessor request. (#400) --- go.mod | 4 +- go.sum | 5 + tikv/analyze.go | 280 ------------- tikv/closure_exec.go | 846 --------------------------------------- tikv/cop_handler.go | 368 ----------------- tikv/cop_handler_test.go | 388 ------------------ tikv/mvcc.go | 5 +- tikv/server.go | 13 +- tikv/topn.go | 139 ------- 9 files changed, 12 insertions(+), 2036 deletions(-) delete mode 100644 tikv/analyze.go delete mode 100644 tikv/closure_exec.go delete mode 100644 tikv/cop_handler.go delete mode 100644 tikv/cop_handler_test.go delete mode 100644 tikv/topn.go diff --git a/go.mod b/go.mod index 45ce624c..fc5b689b 100644 --- a/go.mod +++ b/go.mod @@ -14,10 +14,8 @@ require ( github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 github.com/pingcap/log v0.0.0-20200511115504-543df19646ad - github.com/pingcap/parser v0.0.0-20200522094936-3b720a0512a6 github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 - github.com/pingcap/tidb v1.1.0-beta.0.20200603101356-552e7709de0d - github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee + github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098 github.com/prometheus/client_golang v1.5.1 github.com/shirou/gopsutil v2.19.10+incompatible github.com/stretchr/testify v1.5.1 diff --git a/go.sum b/go.sum index 68b3bd07..013dfc40 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -403,6 +404,7 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/ngaut/unistore v0.0.0-20200603091253-e0b717679796/go.mod h1:9mpqZeS1CkNlgZwJ0LZXb+Qd7xVO5o55ngys7T1/oH8= +github.com/ngaut/unistore v0.0.0-20200604043635-5004cdad650f/go.mod h1:5Vec+R2BwOyugVQ8Id8uDmlIYbqodCvykM50IpaAjk4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -497,6 +499,8 @@ github.com/pingcap/tidb v1.1.0-beta.0.20200513065557-5a0787dfa915 h1:I2a/RvdAP5x github.com/pingcap/tidb v1.1.0-beta.0.20200513065557-5a0787dfa915/go.mod h1:khS9Z9YlbtxsaZsSsSahelgh5L16EtP30QADFmPiI/I= github.com/pingcap/tidb v1.1.0-beta.0.20200603101356-552e7709de0d h1:U4Bm4KdtCfOIH1L5QbWFeck32gjZhLb7dCoTkEc1vPY= github.com/pingcap/tidb v1.1.0-beta.0.20200603101356-552e7709de0d/go.mod h1:wgu4vP3wq+x/l1X3ckOZFvyGKcVIBkq30NQVh0Y+qYA= +github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098 h1:O930BM9xqiV+6JN+PpqC54b8lx8rQA9F4irShNcaXlA= +github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098/go.mod h1:UMxsNE326wyfgFJCx6aerPRLj1/tGPYDBKS9T9NOHI8= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible h1:+K5bqDYG5HT+GqLdx4GH5VmS84+xHgpHbGg6Xt6qQec= github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= @@ -631,6 +635,7 @@ github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM= github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo= github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= diff --git a/tikv/analyze.go b/tikv/analyze.go deleted file mode 100644 index 61337331..00000000 --- a/tikv/analyze.go +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "math" - "time" - - "github.com/golang/protobuf/proto" - "github.com/juju/errors" - "github.com/ngaut/unistore/tikv/dbreader" - "github.com/pingcap/badger/y" - "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/charset" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/pingcap/tipb/go-tipb" - "golang.org/x/net/context" -) - -func (svr *Server) handleCopAnalyzeRequest(reqCtx *requestCtx, req *coprocessor.Request) *coprocessor.Response { - resp := &coprocessor.Response{} - if len(req.Ranges) == 0 { - return resp - } - if req.GetTp() != kv.ReqTypeAnalyze { - return resp - } - analyzeReq := new(tipb.AnalyzeReq) - err := proto.Unmarshal(req.Data, analyzeReq) - if err != nil { - resp.OtherError = err.Error() - return resp - } - ranges, err := svr.extractKVRanges(reqCtx.regCtx, req.Ranges, false) - if err != nil { - resp.OtherError = err.Error() - return resp - } - y.Assert(len(ranges) == 1) - if analyzeReq.Tp == tipb.AnalyzeType_TypeIndex { - resp, err = svr.handleAnalyzeIndexReq(reqCtx, ranges[0], analyzeReq, req.StartTs) - } else { - resp, err = svr.handleAnalyzeColumnsReq(reqCtx, ranges[0], analyzeReq, req.StartTs) - } - if err != nil { - resp = &coprocessor.Response{ - OtherError: err.Error(), - } - } - return resp -} - -func (svr *Server) handleAnalyzeIndexReq(reqCtx *requestCtx, ran kv.KeyRange, analyzeReq *tipb.AnalyzeReq, startTS uint64) (*coprocessor.Response, error) { - dbReader := reqCtx.getDBReader() - processor := &analyzeIndexProcessor{ - colLen: int(analyzeReq.IdxReq.NumColumns), - statsBuilder: statistics.NewSortedBuilder(flagsToStatementContext(analyzeReq.Flags), analyzeReq.IdxReq.BucketSize, 0, types.NewFieldType(mysql.TypeBlob)), - } - if analyzeReq.IdxReq.CmsketchDepth != nil && analyzeReq.IdxReq.CmsketchWidth != nil { - processor.cms = statistics.NewCMSketch(*analyzeReq.IdxReq.CmsketchDepth, *analyzeReq.IdxReq.CmsketchWidth) - } - err := dbReader.Scan(ran.StartKey, ran.EndKey, math.MaxInt64, startTS, processor) - if err != nil { - return nil, err - } - hg := statistics.HistogramToProto(processor.statsBuilder.Hist()) - var cm *tipb.CMSketch - if processor.cms != nil { - cm = statistics.CMSketchToProto(processor.cms) - } - data, err := proto.Marshal(&tipb.AnalyzeIndexResp{Hist: hg, Cms: cm}) - if err != nil { - return nil, errors.Trace(err) - } - return &coprocessor.Response{Data: data}, nil -} - -type analyzeIndexProcessor struct { - skipVal - - colLen int - statsBuilder *statistics.SortedBuilder - cms *statistics.CMSketch - rowBuf []byte -} - -func (p *analyzeIndexProcessor) Process(key, value []byte) error { - values, _, err := tablecodec.CutIndexKeyNew(key, p.colLen) - if err != nil { - return err - } - p.rowBuf = p.rowBuf[:0] - for _, val := range values { - p.rowBuf = append(p.rowBuf, val...) - if p.cms != nil { - p.cms.InsertBytes(p.rowBuf) - } - } - rowData := safeCopy(p.rowBuf) - err = p.statsBuilder.Iterate(types.NewBytesDatum(rowData)) - if err != nil { - return err - } - return nil -} - -type analyzeColumnsExec struct { - skipVal - reader *dbreader.DBReader - seekKey []byte - endKey []byte - startTS uint64 - - chk *chunk.Chunk - decoder *rowcodec.ChunkDecoder - req *chunk.Chunk - evalCtx *evalContext - fields []*ast.ResultField -} - -func (svr *Server) handleAnalyzeColumnsReq(reqCtx *requestCtx, ran kv.KeyRange, analyzeReq *tipb.AnalyzeReq, startTS uint64) (*coprocessor.Response, error) { - sc := flagsToStatementContext(analyzeReq.Flags) - sc.TimeZone = time.FixedZone("UTC", int(analyzeReq.TimeZoneOffset)) - evalCtx := &evalContext{sc: sc} - columns := analyzeReq.ColReq.ColumnsInfo - evalCtx.setColumnInfo(columns) - decoder, err := evalCtx.newRowDecoder() - if err != nil { - return nil, err - } - e := &analyzeColumnsExec{ - reader: reqCtx.getDBReader(), - seekKey: ran.StartKey, - endKey: ran.EndKey, - startTS: startTS, - chk: chunk.NewChunkWithCapacity(evalCtx.fieldTps, 1), - decoder: decoder, - evalCtx: evalCtx, - } - e.fields = make([]*ast.ResultField, len(columns)) - for i := range e.fields { - rf := new(ast.ResultField) - rf.Column = new(model.ColumnInfo) - rf.Column.FieldType = types.FieldType{Tp: mysql.TypeBlob, Flen: mysql.MaxBlobWidth, Charset: charset.CharsetUTF8, Collate: charset.CollationUTF8} - e.fields[i] = rf - } - - pkID := int64(-1) - numCols := len(columns) - if columns[0].GetPkHandle() { - pkID = columns[0].ColumnId - numCols-- - } - colReq := analyzeReq.ColReq - builder := statistics.SampleBuilder{ - Sc: sc, - RecordSet: e, - ColLen: numCols, - MaxBucketSize: colReq.BucketSize, - MaxFMSketchSize: colReq.SketchSize, - MaxSampleSize: colReq.SampleSize, - } - if pkID != -1 { - builder.PkBuilder = statistics.NewSortedBuilder(sc, builder.MaxBucketSize, pkID, types.NewFieldType(mysql.TypeBlob)) - } - if colReq.CmsketchWidth != nil && colReq.CmsketchDepth != nil { - builder.CMSketchWidth = *colReq.CmsketchWidth - builder.CMSketchDepth = *colReq.CmsketchDepth - } - collectors, pkBuilder, err := builder.CollectColumnStats() - if err != nil { - return nil, errors.Trace(err) - } - colResp := &tipb.AnalyzeColumnsResp{} - if pkID != -1 { - colResp.PkHist = statistics.HistogramToProto(pkBuilder.Hist()) - } - for _, c := range collectors { - colResp.Collectors = append(colResp.Collectors, statistics.SampleCollectorToProto(c)) - } - data, err := proto.Marshal(colResp) - if err != nil { - return nil, errors.Trace(err) - } - return &coprocessor.Response{Data: data}, nil -} - -// Fields implements the sqlexec.RecordSet Fields interface. -func (e *analyzeColumnsExec) Fields() []*ast.ResultField { - return e.fields -} - -func (e *analyzeColumnsExec) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() - e.req = req - err := e.reader.Scan(e.seekKey, e.endKey, math.MaxInt64, e.startTS, e) - if err != nil { - return err - } - if req.NumRows() < req.Capacity() { - e.seekKey = e.endKey - } - return nil -} - -func (e *analyzeColumnsExec) Process(key, value []byte) error { - handle, err := tablecodec.DecodeRowKey(key) - if err != nil { - return errors.Trace(err) - } - err = e.decoder.DecodeToChunk(value, handle, e.chk) - if err != nil { - return errors.Trace(err) - } - row := e.chk.GetRow(0) - for i, tp := range e.evalCtx.fieldTps { - d := row.GetDatum(i, tp) - if d.IsNull() { - e.req.AppendNull(i) - continue - } - - value, err := tablecodec.EncodeValue(e.evalCtx.sc, nil, d) - if err != nil { - return err - } - e.req.AppendBytes(i, value) - } - e.chk.Reset() - if e.req.NumRows() == e.req.Capacity() { - e.seekKey = kv.Key(key).PrefixNext() - return dbreader.ScanBreak - } - return nil -} - -func (e *analyzeColumnsExec) NewChunk() *chunk.Chunk { - fields := make([]*types.FieldType, 0, len(e.fields)) - for _, field := range e.fields { - fields = append(fields, &field.Column.FieldType) - } - return chunk.NewChunkWithCapacity(fields, 1024) -} - -// Close implements the sqlexec.RecordSet Close interface. -func (e *analyzeColumnsExec) Close() error { - return nil -} diff --git a/tikv/closure_exec.go b/tikv/closure_exec.go deleted file mode 100644 index f3035533..00000000 --- a/tikv/closure_exec.go +++ /dev/null @@ -1,846 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "encoding/binary" - "fmt" - "math" - "sort" - - "github.com/juju/errors" - "github.com/ngaut/unistore/tikv/dbreader" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/codec" - mockpkg "github.com/pingcap/tidb/util/mock" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/pingcap/tipb/go-tipb" -) - -const chunkMaxRows = 1024 - -const ( - pkColNotExists = iota - pkColIsSigned - pkColIsUnsigned -) - -// buildClosureExecutor build a closureExecutor for the DAGRequest. -// Currently the composition of executors are: -// tableScan|indexScan [selection] [topN | limit | agg] -func (svr *Server) buildClosureExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (*closureExecutor, error) { - ce, err := svr.newClosureExecutor(dagCtx, dagReq) - if err != nil { - return nil, errors.Trace(err) - } - executors := dagReq.Executors - scanExec := executors[0] - if scanExec.Tp == tipb.ExecType_TypeTableScan { - ce.processor = &tableScanProcessor{closureExecutor: ce} - } else { - ce.processor = &indexScanProcessor{closureExecutor: ce} - } - if len(executors) == 1 { - return ce, nil - } - if secondExec := executors[1]; secondExec.Tp == tipb.ExecType_TypeSelection { - ce.selectionCtx.conditions, err = convertToExprs(ce.sc, ce.fieldTps, secondExec.Selection.Conditions) - if err != nil { - return nil, errors.Trace(err) - } - ce.processor = &selectionProcessor{closureExecutor: ce} - } - lastExecutor := executors[len(executors)-1] - switch lastExecutor.Tp { - case tipb.ExecType_TypeLimit: - ce.limit = int(lastExecutor.Limit.Limit) - case tipb.ExecType_TypeTopN: - err = svr.buildTopNProcessor(ce, lastExecutor.TopN) - case tipb.ExecType_TypeAggregation: - err = svr.buildHashAggProcessor(ce, dagCtx, lastExecutor.Aggregation) - case tipb.ExecType_TypeStreamAgg: - err = svr.buildStreamAggProcessor(ce, dagCtx, executors) - case tipb.ExecType_TypeSelection: - ce.processor = &selectionProcessor{closureExecutor: ce} - default: - panic("unknown executor type " + lastExecutor.Tp.String()) - } - if err != nil { - return nil, err - } - return ce, nil -} - -func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, pbExprs []*tipb.Expr) ([]expression.Expression, error) { - exprs := make([]expression.Expression, 0, len(pbExprs)) - for _, expr := range pbExprs { - e, err := expression.PBToExpr(expr, fieldTps, sc) - if err != nil { - return nil, errors.Trace(err) - } - exprs = append(exprs, e) - } - return exprs, nil -} - -func (svr *Server) newClosureExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (*closureExecutor, error) { - e := &closureExecutor{ - evalContext: dagCtx.evalCtx, - reqCtx: dagCtx.reqCtx, - outputOff: dagReq.OutputOffsets, - mvccStore: svr.mvccStore, - startTS: dagCtx.startTS, - limit: math.MaxInt64, - } - seCtx := mockpkg.NewContext() - seCtx.GetSessionVars().StmtCtx = e.sc - e.seCtx = seCtx - executors := dagReq.Executors - scanExec := executors[0] - switch scanExec.Tp { - case tipb.ExecType_TypeTableScan: - tblScan := executors[0].TblScan - e.unique = true - e.scanCtx.desc = tblScan.Desc - case tipb.ExecType_TypeIndexScan: - idxScan := executors[0].IdxScan - e.unique = idxScan.GetUnique() - e.scanCtx.desc = idxScan.Desc - e.initIdxScanCtx() - default: - panic(fmt.Sprintf("unknown first executor type %s", executors[0].Tp)) - } - ranges, err := svr.extractKVRanges(dagCtx.reqCtx.regCtx, dagCtx.keyRanges, e.scanCtx.desc) - if err != nil { - return nil, errors.Trace(err) - } - if dagReq.GetCollectRangeCounts() { - e.counts = make([]int64, len(ranges)) - } - e.kvRanges = ranges - e.scanCtx.chk = chunk.NewChunkWithCapacity(e.fieldTps, 32) - if e.idxScanCtx == nil { - e.scanCtx.decoder, err = e.evalContext.newRowDecoder() - if err != nil { - return nil, errors.Trace(err) - } - } - return e, nil -} - -func (e *closureExecutor) initIdxScanCtx() { - e.idxScanCtx = new(idxScanCtx) - e.idxScanCtx.columnLen = len(e.columnInfos) - e.idxScanCtx.pkStatus = pkColNotExists - lastColumn := e.columnInfos[len(e.columnInfos)-1] - // The PKHandle column info has been collected in ctx. - if lastColumn.GetPkHandle() { - if mysql.HasUnsignedFlag(uint(lastColumn.GetFlag())) { - e.idxScanCtx.pkStatus = pkColIsUnsigned - } else { - e.idxScanCtx.pkStatus = pkColIsSigned - } - e.idxScanCtx.columnLen-- - } else if lastColumn.ColumnId == model.ExtraHandleID { - e.idxScanCtx.pkStatus = pkColIsSigned - e.idxScanCtx.columnLen-- - } - - colInfos := make([]rowcodec.ColInfo, e.idxScanCtx.columnLen) - for i := range colInfos { - col := e.columnInfos[i] - colInfos[i] = rowcodec.ColInfo{ - ID: col.ColumnId, - Ft: e.fieldTps[i], - IsPKHandle: col.GetPkHandle(), - } - } - e.idxScanCtx.colInfos = colInfos - - colIDs := make(map[int64]int, len(colInfos)) - for i, col := range colInfos { - colIDs[col.ID] = i - } - e.scanCtx.newCollationIds = colIDs - - // We don't need to decode handle here, and colIDs >= 0 always. - e.scanCtx.newCollationRd = rowcodec.NewByteDecoder(colInfos, []int64{-1}, nil, nil) -} - -func (svr *Server) isCountAgg(pbAgg *tipb.Aggregation) bool { - if len(pbAgg.AggFunc) == 1 && len(pbAgg.GroupBy) == 0 { - aggFunc := pbAgg.AggFunc[0] - if aggFunc.Tp == tipb.ExprType_Count && len(aggFunc.Children) == 1 { - return true - } - } - return false -} - -func (svr *Server) tryBuildCountProcessor(e *closureExecutor, executors []*tipb.Executor) (bool, error) { - if len(executors) > 2 { - return false, nil - } - agg := executors[1].Aggregation - if !svr.isCountAgg(agg) { - return false, nil - } - child := agg.AggFunc[0].Children[0] - switch child.Tp { - case tipb.ExprType_ColumnRef: - _, idx, err := codec.DecodeInt(child.Val) - if err != nil { - return false, errors.Trace(err) - } - e.aggCtx.col = e.columnInfos[idx] - if e.aggCtx.col.PkHandle { - e.processor = &countStarProcessor{skipVal: skipVal(true), closureExecutor: e} - } else { - e.processor = &countColumnProcessor{closureExecutor: e} - } - case tipb.ExprType_Null, tipb.ExprType_ScalarFunc: - return false, nil - default: - e.processor = &countStarProcessor{skipVal: skipVal(true), closureExecutor: e} - } - return true, nil -} - -func (svr *Server) buildTopNProcessor(e *closureExecutor, topN *tipb.TopN) error { - heap, conds, err := svr.getTopNInfo(e.evalContext, topN) - if err != nil { - return errors.Trace(err) - } - - ctx := &topNCtx{ - heap: heap, - orderByExprs: conds, - sortRow: e.newTopNSortRow(), - } - - e.topNCtx = ctx - e.processor = &topNProcessor{closureExecutor: e} - return nil -} - -func (svr *Server) buildHashAggProcessor(e *closureExecutor, ctx *dagContext, agg *tipb.Aggregation) error { - aggs, groupBys, err := svr.getAggInfo(ctx, agg) - if err != nil { - return err - } - e.processor = &hashAggProcessor{ - closureExecutor: e, - aggExprs: aggs, - groupByExprs: groupBys, - groups: map[string]struct{}{}, - groupKeys: nil, - aggCtxsMap: map[string][]*aggregation.AggEvaluateContext{}, - } - return nil -} - -func (svr *Server) buildStreamAggProcessor(e *closureExecutor, ctx *dagContext, executors []*tipb.Executor) error { - ok, err := svr.tryBuildCountProcessor(e, executors) - if err != nil || ok { - return err - } - return svr.buildHashAggProcessor(e, ctx, executors[len(executors)-1].Aggregation) -} - -// closureExecutor is an execution engine that flatten the DAGRequest.Executors to a single closure `processor` that -// process key/value pairs. We can define many closures for different kinds of requests, try to use the specially -// optimized one for some frequently used query. -type closureExecutor struct { - *evalContext - reqCtx *requestCtx - outputOff []uint32 - mvccStore *MVCCStore - seCtx sessionctx.Context - kvRanges []kv.KeyRange - startTS uint64 - ignoreLock bool - lockChecked bool - scanCtx scanCtx - idxScanCtx *idxScanCtx - selectionCtx selectionCtx - aggCtx aggCtx - topNCtx *topNCtx - - rowCount int - unique bool - limit int - - oldChunks []tipb.Chunk - oldRowBuf []byte - processor closureProcessor - - counts []int64 -} - -type closureProcessor interface { - dbreader.ScanProcessor - Finish() error -} - -type scanCtx struct { - count int - limit int - chk *chunk.Chunk - desc bool - decoder *rowcodec.ChunkDecoder - - newCollationRd *rowcodec.BytesDecoder - newCollationIds map[int64]int -} - -type idxScanCtx struct { - pkStatus int - columnLen int - colInfos []rowcodec.ColInfo -} - -type aggCtx struct { - col *tipb.ColumnInfo -} - -type selectionCtx struct { - conditions []expression.Expression -} - -type topNCtx struct { - heap *topNHeap - orderByExprs []expression.Expression - sortRow *sortRow -} - -func (e *closureExecutor) execute() ([]tipb.Chunk, error) { - err := e.checkRangeLock() - if err != nil { - return nil, errors.Trace(err) - } - dbReader := e.reqCtx.getDBReader() - for i, ran := range e.kvRanges { - if e.unique && ran.IsPoint() { - val, err := dbReader.Get(ran.StartKey, e.startTS) - if err != nil { - return nil, errors.Trace(err) - } - if len(val) == 0 { - continue - } - if e.counts != nil { - e.counts[i]++ - } - err = e.processor.Process(ran.StartKey, val) - if err != nil { - return nil, errors.Trace(err) - } - } else { - oldCnt := e.rowCount - if e.scanCtx.desc { - err = dbReader.ReverseScan(ran.StartKey, ran.EndKey, math.MaxInt64, e.startTS, e.processor) - } else { - err = dbReader.Scan(ran.StartKey, ran.EndKey, math.MaxInt64, e.startTS, e.processor) - } - delta := int64(e.rowCount - oldCnt) - if e.counts != nil { - e.counts[i] += delta - } - if err != nil { - return nil, errors.Trace(err) - } - } - if e.rowCount == e.limit { - break - } - } - err = e.processor.Finish() - return e.oldChunks, err -} - -func (e *closureExecutor) checkRangeLock() error { - if !e.ignoreLock && !e.lockChecked { - for _, ran := range e.kvRanges { - err := e.mvccStore.CheckRangeLock(e.startTS, ran.StartKey, ran.EndKey, e.reqCtx.rpcCtx.ResolvedLocks) - if err != nil { - return err - } - } - e.lockChecked = true - } - return nil -} - -type countStarProcessor struct { - skipVal - *closureExecutor -} - -// countStarProcess is used for `count(*)`. -func (e *countStarProcessor) Process(key, value []byte) error { - e.rowCount++ - return nil -} - -func (e *countStarProcessor) Finish() error { - return e.countFinish() -} - -// countFinish is used for `count(*)`. -func (e *closureExecutor) countFinish() error { - d := types.NewIntDatum(int64(e.rowCount)) - rowData, err := codec.EncodeValue(e.sc, nil, d) - if err != nil { - return errors.Trace(err) - } - e.oldChunks = appendRow(e.oldChunks, rowData, 0) - return nil -} - -type countColumnProcessor struct { - skipVal - *closureExecutor -} - -func (e *countColumnProcessor) Process(key, value []byte) error { - if e.idxScanCtx != nil { - values, _, err := tablecodec.CutIndexKeyNew(key, e.idxScanCtx.columnLen) - if err != nil { - return errors.Trace(err) - } - if values[0][0] != codec.NilFlag { - e.rowCount++ - } - } else { - // Since the handle value doesn't affect the count result, we don't need to decode the handle. - isNull, err := e.scanCtx.decoder.ColumnIsNull(value, e.aggCtx.col.ColumnId, e.aggCtx.col.DefaultVal) - if err != nil { - return errors.Trace(err) - } - if !isNull { - e.rowCount++ - } - } - return nil -} - -func (e *countColumnProcessor) Finish() error { - return e.countFinish() -} - -type skipVal bool - -func (s skipVal) SkipValue() bool { - return bool(s) -} - -type tableScanProcessor struct { - skipVal - *closureExecutor -} - -func (e *tableScanProcessor) Process(key, value []byte) error { - if e.rowCount == e.limit { - return dbreader.ScanBreak - } - e.rowCount++ - err := e.tableScanProcessCore(key, value) - if e.scanCtx.chk.NumRows() == chunkMaxRows { - err = e.chunkToOldChunk(e.scanCtx.chk) - } - return err -} - -func (e *tableScanProcessor) Finish() error { - return e.scanFinish() -} - -func (e *closureExecutor) processCore(key, value []byte) error { - if e.idxScanCtx != nil { - return e.indexScanProcessCore(key, value) - } else { - return e.tableScanProcessCore(key, value) - } -} - -func (e *closureExecutor) hasSelection() bool { - return len(e.selectionCtx.conditions) > 0 -} - -func (e *closureExecutor) processSelection() (gotRow bool, err error) { - chk := e.scanCtx.chk - row := chk.GetRow(chk.NumRows() - 1) - gotRow = true - for _, expr := range e.selectionCtx.conditions { - wc := e.sc.WarningCount() - d, err := expr.Eval(row) - if err != nil { - return false, errors.Trace(err) - } - - if d.IsNull() { - gotRow = false - } else { - isBool, err := d.ToBool(e.sc) - isBool, err = expression.HandleOverflowOnSelection(e.sc, isBool, err) - if err != nil { - return false, errors.Trace(err) - } - gotRow = isBool != 0 - } - if !gotRow { - if e.sc.WarningCount() > wc { - // Deep-copy error object here, because the data it referenced is going to be truncated. - warns := e.sc.TruncateWarnings(int(wc)) - for i, warn := range warns { - warns[i].Err = e.copyError(warn.Err) - } - e.sc.AppendWarnings(warns) - } - chk.TruncateTo(chk.NumRows() - 1) - break - } - } - return -} - -func (e *closureExecutor) copyError(err error) error { - if err == nil { - return nil - } - var ret error - x := errors.Cause(err) - switch y := x.(type) { - case *terror.Error: - ret = y.ToSQLError() - default: - ret = errors.New(err.Error()) - } - return ret -} - -func (e *closureExecutor) tableScanProcessCore(key, value []byte) error { - handle, err := tablecodec.DecodeRowKey(key) - if err != nil { - return errors.Trace(err) - } - err = e.scanCtx.decoder.DecodeToChunk(value, handle, e.scanCtx.chk) - if err != nil { - return errors.Trace(err) - } - return nil -} - -func (e *closureExecutor) scanFinish() error { - return e.chunkToOldChunk(e.scanCtx.chk) -} - -type indexScanProcessor struct { - skipVal - *closureExecutor -} - -func (e *indexScanProcessor) Process(key, value []byte) error { - if e.rowCount == e.limit { - return dbreader.ScanBreak - } - e.rowCount++ - err := e.indexScanProcessCore(key, value) - if e.scanCtx.chk.NumRows() == chunkMaxRows { - err = e.chunkToOldChunk(e.scanCtx.chk) - } - return err -} - -func (e *indexScanProcessor) Finish() error { - return e.scanFinish() -} - -func (e *closureExecutor) indexScanProcessCore(key, value []byte) error { - if len(value) > tablecodec.MaxOldEncodeValueLen { - return e.indexScanProcessNewCollation(key, value) - } - return e.indexScanProcessOldCollation(key, value) -} - -func (e *closureExecutor) indexScanProcessNewCollation(key, value []byte) error { - colLen := e.idxScanCtx.columnLen - pkStatus := e.idxScanCtx.pkStatus - chk := e.scanCtx.chk - rd := e.scanCtx.newCollationRd - colIDs := e.scanCtx.newCollationIds - - vLen := len(value) - tailLen := int(value[0]) - values, err := rd.DecodeToBytesNoHandle(colIDs, value[1:vLen-tailLen]) - if err != nil { - return errors.Trace(err) - } - decoder := codec.NewDecoder(chk, e.sc.TimeZone) - for i, colVal := range values { - _, err = decoder.DecodeOne(colVal, i, e.fieldTps[i]) - if err != nil { - return errors.Trace(err) - } - } - - if tailLen < 8 { - if pkStatus != pkColNotExists { - _, err = decoder.DecodeOne(key[len(key)-9:], colLen, e.fieldTps[colLen]) - if err != nil { - return errors.Trace(err) - } - } - } else if pkStatus != pkColNotExists { - chk.AppendInt64(colLen, int64(binary.BigEndian.Uint64(value[vLen-tailLen:]))) - } - return nil -} - -func (e *closureExecutor) indexScanProcessOldCollation(key, value []byte) error { - colLen := e.idxScanCtx.columnLen - pkStatus := e.idxScanCtx.pkStatus - chk := e.scanCtx.chk - values, b, err := tablecodec.CutIndexKeyNew(key, colLen) - if err != nil { - return errors.Trace(err) - } - decoder := codec.NewDecoder(chk, e.sc.TimeZone) - for i, colVal := range values { - _, err = decoder.DecodeOne(colVal, i, e.fieldTps[i]) - if err != nil { - return errors.Trace(err) - } - } - if len(b) > 0 { - if pkStatus != pkColNotExists { - _, err = decoder.DecodeOne(b, colLen, e.fieldTps[colLen]) - if err != nil { - return errors.Trace(err) - } - } - } else if pkStatus != pkColNotExists { - chk.AppendInt64(colLen, int64(binary.BigEndian.Uint64(value))) - } - return nil -} - -func (e *closureExecutor) chunkToOldChunk(chk *chunk.Chunk) error { - var oldRow []types.Datum - for i := 0; i < chk.NumRows(); i++ { - oldRow = oldRow[:0] - for _, outputOff := range e.outputOff { - d := chk.GetRow(i).GetDatum(int(outputOff), e.fieldTps[outputOff]) - oldRow = append(oldRow, d) - } - var err error - e.oldRowBuf, err = codec.EncodeValue(e.sc, e.oldRowBuf[:0], oldRow...) - if err != nil { - return errors.Trace(err) - } - e.oldChunks = appendRow(e.oldChunks, e.oldRowBuf, i) - } - chk.Reset() - return nil -} - -type selectionProcessor struct { - skipVal - *closureExecutor -} - -func (e *selectionProcessor) Process(key, value []byte) error { - if e.rowCount == e.limit { - return dbreader.ScanBreak - } - err := e.processCore(key, value) - if err != nil { - return errors.Trace(err) - } - gotRow, err := e.processSelection() - if err != nil { - return err - } - if gotRow { - e.rowCount++ - if e.scanCtx.chk.NumRows() == chunkMaxRows { - err = e.chunkToOldChunk(e.scanCtx.chk) - } - } - return err -} - -func (e *selectionProcessor) Finish() error { - return e.scanFinish() -} - -type topNProcessor struct { - skipVal - *closureExecutor -} - -func (e *topNProcessor) Process(key, value []byte) (err error) { - if err = e.processCore(key, value); err != nil { - return err - } - if e.hasSelection() { - gotRow, err1 := e.processSelection() - if err1 != nil || !gotRow { - return err1 - } - } - - ctx := e.topNCtx - row := e.scanCtx.chk.GetRow(0) - for i, expr := range ctx.orderByExprs { - d, err := expr.Eval(row) - if err != nil { - return errors.Trace(err) - } - d.Copy(&ctx.sortRow.key[i]) - } - e.scanCtx.chk.Reset() - - if ctx.heap.tryToAddRow(ctx.sortRow) { - ctx.sortRow.data[0] = safeCopy(key) - ctx.sortRow.data[1] = safeCopy(value) - ctx.sortRow = e.newTopNSortRow() - } - return errors.Trace(ctx.heap.err) -} - -func (e *closureExecutor) newTopNSortRow() *sortRow { - return &sortRow{ - key: make([]types.Datum, len(e.evalContext.columnInfos)), - data: make([][]byte, 2), - } -} - -func (e *topNProcessor) Finish() error { - ctx := e.topNCtx - sort.Sort(&ctx.heap.topNSorter) - chk := e.scanCtx.chk - for _, row := range ctx.heap.rows { - err := e.processCore(row.data[0], row.data[1]) - if err != nil { - return err - } - if chk.NumRows() == chunkMaxRows { - if err = e.chunkToOldChunk(chk); err != nil { - return errors.Trace(err) - } - } - } - return e.chunkToOldChunk(chk) -} - -type hashAggProcessor struct { - skipVal - *closureExecutor - - aggExprs []aggregation.Aggregation - groupByExprs []expression.Expression - groups map[string]struct{} - groupKeys [][]byte - aggCtxsMap map[string][]*aggregation.AggEvaluateContext -} - -func (e *hashAggProcessor) Process(key, value []byte) (err error) { - err = e.processCore(key, value) - if err != nil { - return err - } - if e.hasSelection() { - gotRow, err1 := e.processSelection() - if err1 != nil || !gotRow { - return err1 - } - } - row := e.scanCtx.chk.GetRow(e.scanCtx.chk.NumRows() - 1) - gk, err := e.getGroupKey(row) - if _, ok := e.groups[string(gk)]; !ok { - e.groups[string(gk)] = struct{}{} - e.groupKeys = append(e.groupKeys, gk) - } - // Update aggregate expressions. - aggCtxs := e.getContexts(gk) - for i, agg := range e.aggExprs { - err = agg.Update(aggCtxs[i], e.sc, row) - if err != nil { - return errors.Trace(err) - } - } - e.scanCtx.chk.Reset() - return nil -} - -func (e *hashAggProcessor) getGroupKey(row chunk.Row) ([]byte, error) { - length := len(e.groupByExprs) - if length == 0 { - return nil, nil - } - key := make([]byte, 0, 32) - for _, item := range e.groupByExprs { - v, err := item.Eval(row) - if err != nil { - return nil, errors.Trace(err) - } - b, err := codec.EncodeValue(e.sc, nil, v) - if err != nil { - return nil, errors.Trace(err) - } - key = append(key, b...) - } - return key, nil -} - -func (e *hashAggProcessor) getContexts(groupKey []byte) []*aggregation.AggEvaluateContext { - aggCtxs, ok := e.aggCtxsMap[string(groupKey)] - if !ok { - aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.aggExprs)) - for _, agg := range e.aggExprs { - aggCtxs = append(aggCtxs, agg.CreateContext(e.sc)) - } - e.aggCtxsMap[string(groupKey)] = aggCtxs - } - return aggCtxs -} - -func (e *hashAggProcessor) Finish() error { - for i, gk := range e.groupKeys { - aggCtxs := e.getContexts(gk) - e.oldRowBuf = e.oldRowBuf[:0] - for i, agg := range e.aggExprs { - partialResults := agg.GetPartialResult(aggCtxs[i]) - var err error - e.oldRowBuf, err = codec.EncodeValue(e.sc, e.oldRowBuf, partialResults...) - if err != nil { - return err - } - } - e.oldRowBuf = append(e.oldRowBuf, gk...) - e.oldChunks = appendRow(e.oldChunks, e.oldRowBuf, i) - } - return nil -} diff --git a/tikv/cop_handler.go b/tikv/cop_handler.go deleted file mode 100644 index a2c76f04..00000000 --- a/tikv/cop_handler.go +++ /dev/null @@ -1,368 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "bytes" - "fmt" - "time" - - "github.com/golang/protobuf/proto" - "github.com/juju/errors" - "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/collate" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/pingcap/tipb/go-tipb" -) - -var dummySlice = make([]byte, 0) - -type dagContext struct { - reqCtx *requestCtx - dagReq *tipb.DAGRequest - keyRanges []*coprocessor.KeyRange - evalCtx *evalContext - startTS uint64 -} - -func (svr *Server) handleCopChecksumRequest(reqCtx *requestCtx, req *coprocessor.Request) *coprocessor.Response { - resp := &tipb.ChecksumResponse{ - Checksum: 1, - TotalKvs: 1, - TotalBytes: 1, - } - data, err := resp.Marshal() - if err != nil { - return &coprocessor.Response{OtherError: fmt.Sprintf("marshal checksum response error: %v", err)} - } - return &coprocessor.Response{Data: data} -} - -func (svr *Server) handleCopDAGRequest(reqCtx *requestCtx, req *coprocessor.Request) *coprocessor.Response { - startTime := time.Now() - resp := &coprocessor.Response{} - dagCtx, dagReq, err := svr.buildDAG(reqCtx, req) - if err != nil { - resp.OtherError = err.Error() - return resp - } - closureExec, err := svr.buildClosureExecutor(dagCtx, dagReq) - if err != nil { - return buildResp(nil, nil, err, dagCtx.evalCtx.sc.GetWarnings(), time.Since(startTime)) - } - chunks, err := closureExec.execute() - return buildResp(chunks, closureExec.counts, err, dagCtx.evalCtx.sc.GetWarnings(), time.Since(startTime)) -} - -func (svr *Server) buildDAG(reqCtx *requestCtx, req *coprocessor.Request) (*dagContext, *tipb.DAGRequest, error) { - if len(req.Ranges) == 0 { - return nil, nil, errors.New("request range is null") - } - if req.GetTp() != kv.ReqTypeDAG { - return nil, nil, errors.Errorf("unsupported request type %d", req.GetTp()) - } - - dagReq := new(tipb.DAGRequest) - err := proto.Unmarshal(req.Data, dagReq) - if err != nil { - return nil, nil, errors.Trace(err) - } - sc := flagsToStatementContext(dagReq.Flags) - sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset)) - ctx := &dagContext{ - reqCtx: reqCtx, - dagReq: dagReq, - keyRanges: req.Ranges, - evalCtx: &evalContext{sc: sc}, - startTS: req.StartTs, - } - scanExec := dagReq.Executors[0] - if scanExec.Tp == tipb.ExecType_TypeTableScan { - ctx.evalCtx.setColumnInfo(scanExec.TblScan.Columns) - } else { - ctx.evalCtx.setColumnInfo(scanExec.IdxScan.Columns) - } - return ctx, dagReq, err -} - -func (svr *Server) getAggInfo(ctx *dagContext, pbAgg *tipb.Aggregation) ([]aggregation.Aggregation, []expression.Expression, error) { - length := len(pbAgg.AggFunc) - aggs := make([]aggregation.Aggregation, 0, length) - var err error - for _, expr := range pbAgg.AggFunc { - var aggExpr aggregation.Aggregation - aggExpr, err = aggregation.NewDistAggFunc(expr, ctx.evalCtx.fieldTps, ctx.evalCtx.sc) - if err != nil { - return nil, nil, errors.Trace(err) - } - aggs = append(aggs, aggExpr) - } - groupBys, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.fieldTps, pbAgg.GetGroupBy()) - if err != nil { - return nil, nil, errors.Trace(err) - } - - return aggs, groupBys, nil -} - -func (svr *Server) getTopNInfo(ctx *evalContext, topN *tipb.TopN) (heap *topNHeap, conds []expression.Expression, err error) { - pbConds := make([]*tipb.Expr, len(topN.OrderBy)) - for i, item := range topN.OrderBy { - pbConds[i] = item.Expr - } - heap = &topNHeap{ - totalCount: int(topN.Limit), - topNSorter: topNSorter{ - orderByItems: topN.OrderBy, - sc: ctx.sc, - }, - } - if conds, err = convertToExprs(ctx.sc, ctx.fieldTps, pbConds); err != nil { - return nil, nil, errors.Trace(err) - } - - return heap, conds, nil -} - -type evalContext struct { - colIDs map[int64]int - columnInfos []*tipb.ColumnInfo - fieldTps []*types.FieldType - sc *stmtctx.StatementContext -} - -func (e *evalContext) setColumnInfo(cols []*tipb.ColumnInfo) { - e.columnInfos = make([]*tipb.ColumnInfo, len(cols)) - copy(e.columnInfos, cols) - - e.colIDs = make(map[int64]int, len(e.columnInfos)) - e.fieldTps = make([]*types.FieldType, 0, len(e.columnInfos)) - for i, col := range e.columnInfos { - ft := fieldTypeFromPBColumn(col) - e.fieldTps = append(e.fieldTps, ft) - e.colIDs[col.GetColumnId()] = i - } -} - -func (e *evalContext) newRowDecoder() (*rowcodec.ChunkDecoder, error) { - var ( - handleColID int64 - cols = make([]rowcodec.ColInfo, 0, len(e.columnInfos)) - ) - for i := range e.columnInfos { - info := e.columnInfos[i] - ft := e.fieldTps[i] - col := rowcodec.ColInfo{ - ID: info.ColumnId, - Ft: ft, - IsPKHandle: info.PkHandle, - } - cols = append(cols, col) - if info.PkHandle { - handleColID = info.ColumnId - } - } - def := func(i int, chk *chunk.Chunk) error { - info := e.columnInfos[i] - if info.PkHandle || len(info.DefaultVal) == 0 { - chk.AppendNull(i) - return nil - } - decoder := codec.NewDecoder(chk, e.sc.TimeZone) - _, err := decoder.DecodeOne(info.DefaultVal, i, e.fieldTps[i]) - if err != nil { - return err - } - return nil - } - return rowcodec.NewChunkDecoder(cols, []int64{handleColID}, def, e.sc.TimeZone), nil -} - -// decodeRelatedColumnVals decodes data to Datum slice according to the row information. -func (e *evalContext) decodeRelatedColumnVals(relatedColOffsets []int, value [][]byte, row []types.Datum) error { - var err error - for _, offset := range relatedColOffsets { - row[offset], err = tablecodec.DecodeColumnValue(value[offset], e.fieldTps[offset], e.sc.TimeZone) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// Flags are used by tipb.SelectRequest.Flags to handle execution mode, like how to handle truncate error. -const ( - // FlagIgnoreTruncate indicates if truncate error should be ignored. - // Read-only statements should ignore truncate error, write statements should not ignore truncate error. - FlagIgnoreTruncate uint64 = 1 - // FlagTruncateAsWarning indicates if truncate error should be returned as warning. - // This flag only matters if FlagIgnoreTruncate is not set, in strict sql mode, truncate error should - // be returned as error, in non-strict sql mode, truncate error should be saved as warning. - FlagTruncateAsWarning uint64 = 1 << 1 -) - -// flagsToStatementContext creates a StatementContext from a `tipb.SelectRequest.Flags`. -func flagsToStatementContext(flags uint64) *stmtctx.StatementContext { - sc := new(stmtctx.StatementContext) - sc.IgnoreTruncate = (flags & model.FlagIgnoreTruncate) > 0 - sc.TruncateAsWarning = (flags & model.FlagTruncateAsWarning) > 0 - sc.InInsertStmt = (flags & model.FlagInInsertStmt) > 0 - sc.InSelectStmt = (flags & model.FlagInSelectStmt) > 0 - sc.InDeleteStmt = (flags & model.FlagInUpdateOrDeleteStmt) > 0 - sc.OverflowAsWarning = (flags & model.FlagOverflowAsWarning) > 0 - sc.IgnoreZeroInDate = (flags & model.FlagIgnoreZeroInDate) > 0 - sc.DividedByZeroAsWarning = (flags & model.FlagDividedByZeroAsWarning) > 0 - return sc -} - -func buildResp(chunks []tipb.Chunk, counts []int64, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { - resp := &coprocessor.Response{} - selResp := &tipb.SelectResponse{ - Error: toPBError(err), - Chunks: chunks, - OutputCounts: counts, - } - if len(warnings) > 0 { - selResp.Warnings = make([]*tipb.Error, 0, len(warnings)) - for i := range warnings { - selResp.Warnings = append(selResp.Warnings, toPBError(warnings[i].Err)) - } - } - if locked, ok := errors.Cause(err).(*ErrLocked); ok { - resp.Locked = &kvrpcpb.LockInfo{ - Key: locked.Key, - PrimaryLock: locked.Primary, - LockVersion: locked.StartTS, - LockTtl: locked.TTL, - } - } - resp.ExecDetails = &kvrpcpb.ExecDetails{ - HandleTime: &kvrpcpb.HandleTime{ProcessMs: int64(dur / time.Millisecond)}, - } - data, err := proto.Marshal(selResp) - if err != nil { - resp.OtherError = err.Error() - return resp - } - resp.Data = data - return resp -} - -func toPBError(err error) *tipb.Error { - if err == nil { - return nil - } - perr := new(tipb.Error) - e := errors.Cause(err) - switch y := e.(type) { - case *terror.Error: - tmp := y.ToSQLError() - perr.Code = int32(tmp.Code) - perr.Msg = tmp.Message - case *mysql.SQLError: - perr.Code = int32(y.Code) - perr.Msg = y.Message - default: - perr.Code = int32(1) - perr.Msg = err.Error() - } - return perr -} - -// extractKVRanges extracts kv.KeyRanges slice from a SelectRequest. -func (svr *Server) extractKVRanges(regCtx *regionCtx, keyRanges []*coprocessor.KeyRange, descScan bool) (kvRanges []kv.KeyRange, err error) { - startKey := regCtx.rawStartKey() - endKey := regCtx.rawEndKey() - kvRanges = make([]kv.KeyRange, 0, len(keyRanges)) - for _, kran := range keyRanges { - if bytes.Compare(kran.GetStart(), kran.GetEnd()) >= 0 { - err = errors.Errorf("invalid range, start should be smaller than end: %v %v", kran.GetStart(), kran.GetEnd()) - return - } - - upperKey := kran.GetEnd() - if bytes.Compare(upperKey, startKey) <= 0 { - continue - } - lowerKey := kran.GetStart() - if len(endKey) != 0 && bytes.Compare(lowerKey, endKey) >= 0 { - break - } - r := kv.KeyRange{ - StartKey: kv.Key(maxStartKey(lowerKey, startKey)), - EndKey: kv.Key(minEndKey(upperKey, endKey)), - } - kvRanges = append(kvRanges, r) - } - if descScan { - reverseKVRanges(kvRanges) - } - return -} - -func reverseKVRanges(kvRanges []kv.KeyRange) { - for i := 0; i < len(kvRanges)/2; i++ { - j := len(kvRanges) - i - 1 - kvRanges[i], kvRanges[j] = kvRanges[j], kvRanges[i] - } -} - -const rowsPerChunk = 64 - -func appendRow(chunks []tipb.Chunk, data []byte, rowCnt int) []tipb.Chunk { - if rowCnt%rowsPerChunk == 0 { - chunks = append(chunks, tipb.Chunk{}) - } - cur := &chunks[len(chunks)-1] - cur.RowsData = append(cur.RowsData, data...) - return chunks -} - -func maxStartKey(rangeStartKey kv.Key, regionStartKey []byte) []byte { - if bytes.Compare([]byte(rangeStartKey), regionStartKey) > 0 { - return []byte(rangeStartKey) - } - return regionStartKey -} - -func minEndKey(rangeEndKey kv.Key, regionEndKey []byte) []byte { - if len(regionEndKey) == 0 || bytes.Compare([]byte(rangeEndKey), regionEndKey) < 0 { - return []byte(rangeEndKey) - } - return regionEndKey -} - -// fieldTypeFromPBColumn creates a types.FieldType from tipb.ColumnInfo. -func fieldTypeFromPBColumn(col *tipb.ColumnInfo) *types.FieldType { - return &types.FieldType{ - Tp: byte(col.GetTp()), - Flag: uint(col.Flag), - Flen: int(col.GetColumnLen()), - Decimal: int(col.GetDecimal()), - Elems: col.Elems, - Collate: mysql.Collations[uint8(collate.RestoreCollationIDIfNeeded(col.GetCollation()))], - } -} diff --git a/tikv/cop_handler_test.go b/tikv/cop_handler_test.go deleted file mode 100644 index 8e6ebff4..00000000 --- a/tikv/cop_handler_test.go +++ /dev/null @@ -1,388 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "errors" - "math" - "testing" - - "github.com/pingcap/kvproto/pkg/coprocessor" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/pingcap/tipb/go-tipb" - "github.com/stretchr/testify/require" -) - -const ( - keyNumber = 3 - TableId = 0 - StartTs = 10 - TTL = 60000 - DagRequestStartTs = 100 -) - -// wrapper of test data, including encoded data, column types etc. -type data struct { - encodedTestKVDatas []*encodedTestKVData - colInfos []*tipb.ColumnInfo - rows map[int64][]types.Datum // handle -> row - colTypes map[int64]*types.FieldType // colId -> fieldType -} - -type encodedTestKVData struct { - encodedRowKey []byte - encodedRowValue []byte -} - -func initTestData(store *TestStore, encodedKVDatas []*encodedTestKVData) []error { - reqCtx := store.newReqCtx() - i := 0 - for _, kvData := range encodedKVDatas { - mutation := makeATestMutaion(kvrpcpb.Op_Put, kvData.encodedRowKey, - kvData.encodedRowValue) - req := &kvrpcpb.PrewriteRequest{ - Mutations: []*kvrpcpb.Mutation{mutation}, - PrimaryLock: kvData.encodedRowKey, - StartVersion: uint64(StartTs + i), - LockTtl: TTL, - } - err := store.MvccStore.Prewrite(reqCtx, req) - if err != nil { - return []error{err} - } - commitError := store.MvccStore.Commit(reqCtx, [][]byte{kvData.encodedRowKey}, - uint64(StartTs+i), uint64(StartTs+i+1)) - if commitError != nil { - return []error{commitError} - } - i += 2 - } - return nil -} - -func makeATestMutaion(op kvrpcpb.Op, key []byte, value []byte) *kvrpcpb.Mutation { - return &kvrpcpb.Mutation{ - Op: op, - Key: key, - Value: value, - } -} - -func prepareTestTableData(t *testing.T, keyNumber int, tableId int64) *data { - stmtCtx := new(stmtctx.StatementContext) - colIds := []int64{1, 2, 3} - colTypes := []*types.FieldType{ - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeDouble), - } - colInfos := make([]*tipb.ColumnInfo, 3) - colTypeMap := map[int64]*types.FieldType{} - for i := 0; i < 3; i++ { - colInfos[i] = &tipb.ColumnInfo{ - ColumnId: colIds[i], - Tp: int32(colTypes[i].Tp), - } - colTypeMap[colIds[i]] = colTypes[i] - } - rows := map[int64][]types.Datum{} - encodedTestKVDatas := make([]*encodedTestKVData, keyNumber) - for i := 0; i < keyNumber; i++ { - datum := types.MakeDatums(i, "abc", 10.0) - rows[int64(i)] = datum - rowEncodedData, err := tablecodec.EncodeRow(stmtCtx, datum, colIds, nil, nil, &rowcodec.Encoder{}) - require.Nil(t, err) - rowKeyEncodedData := tablecodec.EncodeRowKeyWithHandle(tableId, kv.IntHandle(i)) - encodedTestKVDatas[i] = &encodedTestKVData{encodedRowKey: rowKeyEncodedData, encodedRowValue: rowEncodedData} - } - return &data{ - colInfos: colInfos, - encodedTestKVDatas: encodedTestKVDatas, - rows: rows, - colTypes: colTypeMap, - } -} - -func getTestPointRange(tableId int64, handle int64) kv.KeyRange { - startKey := tablecodec.EncodeRowKeyWithHandle(tableId, kv.IntHandle(handle)) - endKey := make([]byte, len(startKey)) - copy(endKey, startKey) - convertToPrefixNext(endKey) - return kv.KeyRange{ - StartKey: startKey, - EndKey: endKey, - } -} - -// convert this key to the smallest key which is larger than the key given. -// see tikv/src/coprocessor/util.rs for more detail. -func convertToPrefixNext(key []byte) []byte { - if key == nil || len(key) == 0 { - return []byte{0} - } - for i := len(key) - 1; i >= 0; i-- { - if key[i] == 255 { - key[i] = 0 - } else { - key[i] += 1 - return key - } - } - for i := 0; i < len(key); i++ { - key[i] = 255 - } - return append(key, 0) -} - -// return whether these two keys are equal. -func isPrefixNext(key []byte, expected []byte) bool { - key = convertToPrefixNext(key) - if len(key) != len(expected) { - return false - } - for i := 0; i < len(key); i++ { - if key[i] != expected[i] { - return false - } - } - return true -} - -// return a dag context according to dagReq and key ranges. -func newDagContext(store *TestStore, keyRanges []kv.KeyRange, dagReq *tipb.DAGRequest, startTs uint64) *dagContext { - sc := flagsToStatementContext(dagReq.Flags) - dagCtx := &dagContext{ - reqCtx: &requestCtx{ - svr: store.Svr, - regCtx: ®ionCtx{ - meta: &metapb.Region{ - StartKey: nil, - EndKey: nil, - }, - }, - rpcCtx: &kvrpcpb.Context{}, - }, - dagReq: dagReq, - evalCtx: &evalContext{sc: sc}, - startTS: startTs, - } - if dagReq.Executors[0].Tp == tipb.ExecType_TypeTableScan { - dagCtx.evalCtx.setColumnInfo(dagReq.Executors[0].TblScan.Columns) - } else { - dagCtx.evalCtx.setColumnInfo(dagReq.Executors[0].IdxScan.Columns) - } - dagCtx.keyRanges = make([]*coprocessor.KeyRange, len(keyRanges)) - for i, keyRange := range keyRanges { - dagCtx.keyRanges[i] = &coprocessor.KeyRange{ - Start: keyRange.StartKey, - End: keyRange.EndKey, - } - } - return dagCtx -} - -// build and execute the executors according to the dagRequest and dagContext, -// return the result chunk data, rows count and err if occurs. -func buildExecutorsAndExecute(store *TestStore, dagRequest *tipb.DAGRequest, - dagCtx *dagContext) ([]tipb.Chunk, int, error) { - closureExec, err := store.Svr.buildClosureExecutor(dagCtx, dagRequest) - if err != nil { - return nil, 0, err - } - if closureExec != nil { - chunks, err := closureExec.execute() - if err != nil { - return nil, 0, err - } - return chunks, closureExec.rowCount, nil - } - return nil, 0, errors.New("closureExec creation failed") -} - -// dagBuilder is used to build dag request -type dagBuilder struct { - startTs uint64 - executors []*tipb.Executor - outputOffsets []uint32 -} - -// return a default dagBuilder -func newDagBuilder() *dagBuilder { - return &dagBuilder{executors: make([]*tipb.Executor, 0)} -} - -func (dagBuilder *dagBuilder) setStartTs(startTs uint64) *dagBuilder { - dagBuilder.startTs = startTs - return dagBuilder -} - -func (dagBuilder *dagBuilder) setOutputOffsets(outputOffsets []uint32) *dagBuilder { - dagBuilder.outputOffsets = outputOffsets - return dagBuilder -} - -func (dagBuilder *dagBuilder) addTableScan(colInfos []*tipb.ColumnInfo, tableId int64) *dagBuilder { - dagBuilder.executors = append(dagBuilder.executors, &tipb.Executor{ - Tp: tipb.ExecType_TypeTableScan, - TblScan: &tipb.TableScan{ - Columns: colInfos, - TableId: tableId, - }, - }) - return dagBuilder -} - -func (dagBuilder *dagBuilder) addSelection(expr *tipb.Expr) *dagBuilder { - dagBuilder.executors = append(dagBuilder.executors, &tipb.Executor{ - Tp: tipb.ExecType_TypeSelection, - Selection: &tipb.Selection{ - Conditions: []*tipb.Expr{expr}, - XXX_unrecognized: nil, - }, - }) - return dagBuilder -} - -func (dagBuilder *dagBuilder) addLimit(limit uint64) *dagBuilder { - dagBuilder.executors = append(dagBuilder.executors, &tipb.Executor{ - Tp: tipb.ExecType_TypeLimit, - Limit: &tipb.Limit{Limit: limit}, - }) - return dagBuilder -} - -func (dagBuilder *dagBuilder) build() *tipb.DAGRequest { - return &tipb.DAGRequest{ - Executors: dagBuilder.executors, - OutputOffsets: dagBuilder.outputOffsets, - } -} - -// see tikv/src/coprocessor/util.rs for more detail -func TestIsPrefixNext(t *testing.T) { - require.True(t, isPrefixNext([]byte{}, []byte{0})) - require.True(t, isPrefixNext([]byte{0}, []byte{1})) - require.True(t, isPrefixNext([]byte{1}, []byte{2})) - require.True(t, isPrefixNext([]byte{255}, []byte{255, 0})) - require.True(t, isPrefixNext([]byte{255, 255, 255}, []byte{255, 255, 255, 0})) - require.True(t, isPrefixNext([]byte{1, 255}, []byte{2, 0})) - require.True(t, isPrefixNext([]byte{0, 1, 255}, []byte{0, 2, 0})) - require.True(t, isPrefixNext([]byte{0, 1, 255, 5}, []byte{0, 1, 255, 6})) - require.True(t, isPrefixNext([]byte{0, 1, 5, 255}, []byte{0, 1, 6, 0})) - require.True(t, isPrefixNext([]byte{0, 1, 255, 255}, []byte{0, 2, 0, 0})) - require.True(t, isPrefixNext([]byte{0, 255, 255, 255}, []byte{1, 0, 0, 0})) -} - -func TestPointGet(t *testing.T) { - // here would build mvccStore and server, and prepare - // three rows data, just like the test data of table_scan.rs. - // then init the store with the generated data. - data := prepareTestTableData(t, keyNumber, TableId) - store, err := NewTestStore("cop_handler_test_db", "cop_handler_test_log", nil) - defer CleanTestStore(store) - require.Nil(t, err) - errors := initTestData(store, data.encodedTestKVDatas) - require.Nil(t, errors) - - // point get should return nothing when handle is math.MinInt64 - handle := int64(math.MinInt64) - dagRequest := newDagBuilder(). - setStartTs(DagRequestStartTs). - addTableScan(data.colInfos, TableId). - setOutputOffsets([]uint32{0, 1}). - build() - dagCtx := newDagContext(store, []kv.KeyRange{getTestPointRange(TableId, handle)}, - dagRequest, DagRequestStartTs) - chunks, rowCount, err := buildExecutorsAndExecute(store, dagRequest, dagCtx) - require.Nil(t, err) - require.Equal(t, rowCount, 0) - - // point get should return one row when handle = 0 - handle = 0 - dagRequest = newDagBuilder(). - setStartTs(DagRequestStartTs). - addTableScan(data.colInfos, TableId). - setOutputOffsets([]uint32{0, 1}). - build() - dagCtx = newDagContext(store, []kv.KeyRange{getTestPointRange(TableId, handle)}, - dagRequest, DagRequestStartTs) - chunks, rowCount, err = buildExecutorsAndExecute(store, dagRequest, dagCtx) - require.Nil(t, err) - require.Equal(t, 1, rowCount) - returnedRow, err := codec.Decode(chunks[0].RowsData, 2) - require.Nil(t, err) - // returned row should has 2 cols - require.Equal(t, len(returnedRow), 2) - - // verify the returned rows value as input - expectedRow := data.rows[handle] - eq, err := returnedRow[0].CompareDatum(nil, &expectedRow[0]) - require.Nil(t, err) - require.Equal(t, eq, 0) - eq, err = returnedRow[1].CompareDatum(nil, &expectedRow[1]) - require.Nil(t, err) - require.Equal(t, eq, 0) -} - -func TestClosureExecutor(t *testing.T) { - data := prepareTestTableData(t, keyNumber, TableId) - store, err := NewTestStore("cop_handler_test_db", "cop_handler_test_log", nil) - defer CleanTestStore(store) - require.Nil(t, err) - errors := initTestData(store, data.encodedTestKVDatas) - require.Nil(t, errors) - - dagRequest := newDagBuilder(). - setStartTs(DagRequestStartTs). - addTableScan(data.colInfos, TableId). - addSelection(buildEQIntExpr(1, -1)). - addLimit(1). - setOutputOffsets([]uint32{0, 1}). - build() - - dagCtx := newDagContext(store, []kv.KeyRange{getTestPointRange(TableId, 1)}, - dagRequest, DagRequestStartTs) - _, rowCount, err := buildExecutorsAndExecute(store, dagRequest, dagCtx) - require.Nil(t, err, "%v", err) - require.Equal(t, rowCount, 0) -} - -func buildEQIntExpr(colID, val int64) *tipb.Expr { - return &tipb.Expr{ - Tp: tipb.ExprType_ScalarFunc, - Sig: tipb.ScalarFuncSig_EQInt, - FieldType: expression.ToPBFieldType(types.NewFieldType(mysql.TypeLonglong)), - Children: []*tipb.Expr{ - { - Tp: tipb.ExprType_ColumnRef, - Val: codec.EncodeInt(nil, colID), - FieldType: expression.ToPBFieldType(types.NewFieldType(mysql.TypeLonglong)), - }, - { - Tp: tipb.ExprType_Int64, - Val: codec.EncodeInt(nil, val), - FieldType: expression.ToPBFieldType(types.NewFieldType(mysql.TypeLonglong)), - }, - }, - } -} diff --git a/tikv/mvcc.go b/tikv/mvcc.go index 38fdac85..a43acf8e 100644 --- a/tikv/mvcc.go +++ b/tikv/mvcc.go @@ -1239,7 +1239,6 @@ func isResolved(startTS uint64, resolved []uint64) bool { } type kvScanProcessor struct { - skipVal buf []byte pairs []*kvrpcpb.KvPair } @@ -1252,6 +1251,10 @@ func (p *kvScanProcessor) Process(key, value []byte) (err error) { return nil } +func (p *kvScanProcessor) SkipValue() bool { + return false +} + func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kvrpcpb.KvPair { var startKey, endKey []byte if req.Reverse { diff --git a/tikv/server.go b/tikv/server.go index 6ba31b2d..d87dfdcd 100644 --- a/tikv/server.go +++ b/tikv/server.go @@ -15,7 +15,6 @@ package tikv import ( "context" - "fmt" "io" "sync" "sync/atomic" @@ -31,7 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/mockstore/unistore/cophandler" "go.uber.org/zap" ) @@ -472,15 +471,7 @@ func (svr *Server) Coprocessor(ctx context.Context, req *coprocessor.Request) (* if reqCtx.regErr != nil { return &coprocessor.Response{RegionError: reqCtx.regErr}, nil } - switch req.Tp { - case kv.ReqTypeDAG: - return svr.handleCopDAGRequest(reqCtx, req), nil - case kv.ReqTypeAnalyze: - return svr.handleCopAnalyzeRequest(reqCtx, req), nil - case kv.ReqTypeChecksum: - return svr.handleCopChecksumRequest(reqCtx, req), nil - } - return &coprocessor.Response{OtherError: fmt.Sprintf("unsupported request type %d", req.GetTp())}, nil + return cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req), nil } func (svr *Server) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { diff --git a/tikv/topn.go b/tikv/topn.go deleted file mode 100644 index b312b7fd..00000000 --- a/tikv/topn.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2019-present PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "container/heap" - - "github.com/juju/errors" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/types" - tipb "github.com/pingcap/tipb/go-tipb" -) - -type sortRow struct { - key []types.Datum - data [][]byte -} - -// topNSorter implements sort.Interface. When all rows have been processed, the topNSorter will sort the whole data in heap. -type topNSorter struct { - orderByItems []*tipb.ByItem - rows []*sortRow - err error - sc *stmtctx.StatementContext -} - -func (t *topNSorter) Len() int { - return len(t.rows) -} - -func (t *topNSorter) Swap(i, j int) { - t.rows[i], t.rows[j] = t.rows[j], t.rows[i] -} - -func (t *topNSorter) Less(i, j int) bool { - for index, by := range t.orderByItems { - v1 := t.rows[i].key[index] - v2 := t.rows[j].key[index] - - ret, err := v1.CompareDatum(t.sc, &v2) - if err != nil { - t.err = errors.Trace(err) - return true - } - - if by.Desc { - ret = -ret - } - - if ret < 0 { - return true - } else if ret > 0 { - return false - } - } - - return false -} - -// topNHeap holds the top n elements using heap structure. It implements heap.Interface. -// When we insert a row, topNHeap will check if the row can become one of the top n element or not. -type topNHeap struct { - topNSorter - - // totalCount is equal to the limit count, which means the max size of heap. - totalCount int - // heapSize means the current size of this heap. - heapSize int -} - -func (t *topNHeap) Len() int { - return t.heapSize -} - -func (t *topNHeap) Push(x interface{}) { - t.rows = append(t.rows, x.(*sortRow)) - t.heapSize++ -} - -func (t *topNHeap) Pop() interface{} { - return nil -} - -func (t *topNHeap) Less(i, j int) bool { - for index, by := range t.orderByItems { - v1 := t.rows[i].key[index] - v2 := t.rows[j].key[index] - - ret, err := v1.CompareDatum(t.sc, &v2) - if err != nil { - t.err = errors.Trace(err) - return true - } - - if by.Desc { - ret = -ret - } - - if ret > 0 { - return true - } else if ret < 0 { - return false - } - } - - return false -} - -// tryToAddRow tries to add a row to heap. -// When this row is not less than any rows in heap, it will never become the top n element. -// Then this function returns false. -func (t *topNHeap) tryToAddRow(row *sortRow) bool { - success := false - if t.heapSize == t.totalCount { - t.rows = append(t.rows, row) - // When this row is less than the top element, it will replace it and adjust the heap structure. - if t.Less(0, t.heapSize) { - t.Swap(0, t.heapSize) - heap.Fix(t, 0) - success = true - } - t.rows = t.rows[:t.heapSize] - } else { - heap.Push(t, row) - success = true - } - return success -}