diff --git a/cmd/benchfilesort/README.md b/cmd/benchfilesort/README.md deleted file mode 100644 index 79dd784a5709f..0000000000000 --- a/cmd/benchfilesort/README.md +++ /dev/null @@ -1,121 +0,0 @@ -## BenchFileSort - -BenchFileSort is a command line tool to test the performance of util/filesort. - -### Quick Start (Examples) - -Step 1 - Generate the synthetic data - -``` -./benchfilesort gen -keySize 8 -valSize 16 -scale 1000 -``` - -Expected output: - -``` -Generating... -Done! -Data placed in: /path/to/data.out -Time used: xxxx ms -================================= -``` - -Step 2 - Load the data and run the benchmark - -``` -./benchfilesort run -bufSize 50 -nWorkers 1 -inputRatio 100 -outputRatio 50 -``` - -Expected output: - -``` -Loading... - number of rows = 1000, key size = 8, value size = 16 - load 1000 rows -Done! -Loaded 1000 rows -Time used: xxxx ms -================================= -Inputing... -Done! -Input 1000 rows -Time used: xxxx s -================================= -Outputing... -Done! -Output 500 rows -Time used: xxxx ms -================================= -Closing... -Done! -Time used: xxxx ms -================================= -``` - -For performance tuning purpose, `Input` time and `Output` time are two KPIs you should focus on. -`Close` time reflects the GC performance, which might be noteworthy sometimes. - -### Commands and Arguments - -#### `gen` command - -The `gen` command generate the synthetic data for the benchmark. - -You can specify how many rows you want to generate, the key size -and value size for each row. - -The generated data is located in `$dir/data.out` (`$dir` is specified -by the `dir` argument). - -The `gen` command supports the following arguments: - -* `dir` (default: current working directory) - Specify the home directory of generated data - -* `keySize` (default: 8) - Specify the key size for generated rows - -* `valSize` (default: 8) - Specify the value size for generated rows - -* `scale` (default: 100) - Specify how many rows to generate - -* `cpuprofile` (default: "") - Turn on the CPU profile - -#### `run` command - -The `run` command load the synthetic data and run the benchmark. - -You can specify the home directory of the synthetic data. - -The benchmark will use predefined amount of memory, which is controlled -by the `bufSize` argument, to run the test. - -You can control how many rows to input into and output from, which are -defined by the `inputRatio` and `outputRatio` arguments. - -The `run` command supports the following arguments: - -* `dir` (default: current working directory) - Specify the home directory of synthetic data - -* `bufSize` (default: 500000) - Specify the amount of memory used by the benchmark - -* `nWorkers` (default: 1) - Specify the number of workers used in async sorting - -* `inputRatio` (default: 100) - Specify the percentage of rows to input: - - `# of rows to input = # of total rows * inputRatio / 100` - -* `outputRatio` (default: 100) - Specify the percentage of rows to output: - - `# of rows to output = # of rows to input * outputRatio / 100` - -* `cpuprofile` (default: "") - Turn on the CPU profile diff --git a/cmd/benchfilesort/main.go b/cmd/benchfilesort/main.go deleted file mode 100644 index 9a518cf0d5d4b..0000000000000 --- a/cmd/benchfilesort/main.go +++ /dev/null @@ -1,439 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "encoding/binary" - "flag" - "fmt" - "math/rand" - "os" - "path/filepath" - "runtime/pprof" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/filesort" - "github.com/pingcap/tidb/util/logutil" -) - -type comparableRow struct { - key []types.Datum - val []types.Datum - handle int64 -} - -var ( - genCmd = flag.NewFlagSet("gen", flag.ExitOnError) - runCmd = flag.NewFlagSet("run", flag.ExitOnError) - - logLevel = "warn" - cpuprofile string - tmpDir string - keySize int - valSize int - bufSize int - scale int - nWorkers int - inputRatio int - outputRatio int -) - -// #nosec G404 -func nextRow(r *rand.Rand, keySize int, valSize int) *comparableRow { - key := make([]types.Datum, keySize) - for i := range key { - key[i] = types.NewDatum(r.Int()) - } - - val := make([]types.Datum, valSize) - for j := range val { - val[j] = types.NewDatum(r.Int()) - } - - handle := r.Int63() - return &comparableRow{key: key, val: val, handle: handle} -} - -func encodeRow(b []byte, row *comparableRow) ([]byte, error) { - var ( - err error - head = make([]byte, 8) - body []byte - ) - sc := &stmtctx.StatementContext{TimeZone: time.Local} - body, err = codec.EncodeKey(sc, body, row.key...) - if err != nil { - return b, errors.Trace(err) - } - body, err = codec.EncodeKey(sc, body, row.val...) - if err != nil { - return b, errors.Trace(err) - } - body, err = codec.EncodeKey(sc, body, types.NewIntDatum(row.handle)) - if err != nil { - return b, errors.Trace(err) - } - - binary.BigEndian.PutUint64(head, uint64(len(body))) - - b = append(b, head...) - b = append(b, body...) - - return b, nil -} - -func decodeRow(fd *os.File) (*comparableRow, error) { - var ( - err error - n int - head = make([]byte, 8) - dcod = make([]types.Datum, 0, keySize+valSize+1) - ) - - n, err = fd.Read(head) - if n != 8 { - return nil, errors.New("incorrect header") - } - if err != nil { - return nil, errors.Trace(err) - } - - rowSize := int(binary.BigEndian.Uint64(head)) - rowBytes := make([]byte, rowSize) - - n, err = fd.Read(rowBytes) - if n != rowSize { - return nil, errors.New("incorrect row") - } - if err != nil { - return nil, errors.Trace(err) - } - - dcod, err = codec.Decode(rowBytes, keySize+valSize+1) - if err != nil { - return nil, errors.Trace(err) - } - - return &comparableRow{ - key: dcod[:keySize], - val: dcod[keySize : keySize+valSize], - handle: dcod[keySize+valSize:][0].GetInt64(), - }, nil -} - -func encodeMeta(b []byte, scale int, keySize int, valSize int) []byte { - meta := make([]byte, 8) - - binary.BigEndian.PutUint64(meta, uint64(scale)) - b = append(b, meta...) - binary.BigEndian.PutUint64(meta, uint64(keySize)) - b = append(b, meta...) - binary.BigEndian.PutUint64(meta, uint64(valSize)) - b = append(b, meta...) - - return b -} - -func decodeMeta(fd *os.File) error { - meta := make([]byte, 24) - if n, err := fd.Read(meta); err != nil || n != 24 { - if n != 24 { - return errors.New("incorrect meta data") - } - return errors.Trace(err) - } - - scale = int(binary.BigEndian.Uint64(meta[:8])) - if scale <= 0 { - return errors.New("number of rows must be positive") - } - - keySize = int(binary.BigEndian.Uint64(meta[8:16])) - if keySize <= 0 { - return errors.New("key size must be positive") - } - - valSize = int(binary.BigEndian.Uint64(meta[16:])) - if valSize <= 0 { - return errors.New("value size must be positive") - } - - return nil -} - -/* - * The synthetic data is exported as a binary format. - * The encoding format is: - * 1) Meta Data - * Three 64-bit integers represent scale size, key size and value size. - * 2) Row Data - * Each row is encoded as: - * One 64-bit integer represent the row size in bytes, followed by the - * the actual row bytes. - */ -// #nosec G404 -func export() error { - var outputBytes []byte - - fileName := filepath.Join(tmpDir, "data.out") - outputFile, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(outputFile.Close) - - outputBytes = encodeMeta(outputBytes, scale, keySize, valSize) - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - for i := 1; i <= scale; i++ { - outputBytes, err = encodeRow(outputBytes, nextRow(r, keySize, valSize)) - if err != nil { - return errors.Trace(err) - } - _, err = outputFile.Write(outputBytes) - if err != nil { - return errors.Trace(err) - } - outputBytes = outputBytes[:0] - } - - return nil -} - -func load(ratio int) ([]*comparableRow, error) { - var ( - err error - fd *os.File - ) - - fileName := filepath.Join(tmpDir, "data.out") - fd, err = os.Open(fileName) - if os.IsNotExist(err) { - return nil, errors.New("data file (data.out) does not exist") - } - if err != nil { - return nil, errors.Trace(err) - } - defer terror.Call(fd.Close) - - err = decodeMeta(fd) - if err != nil { - return nil, errors.Trace(err) - } - - cLogf("\tnumber of rows = %d, key size = %d, value size = %d", scale, keySize, valSize) - - var ( - row *comparableRow - rows = make([]*comparableRow, 0, scale) - ) - - totalRows := int(float64(scale) * (float64(ratio) / 100.0)) - cLogf("\tload %d rows", totalRows) - for i := 1; i <= totalRows; i++ { - row, err = decodeRow(fd) - if err != nil { - return nil, errors.Trace(err) - } - rows = append(rows, row) - } - - return rows, nil -} - -func driveGenCmd() { - err := genCmd.Parse(os.Args[2:]) - terror.MustNil(err) - // Sanity checks - if keySize <= 0 { - log.Fatal("key size must be positive") - } - if valSize <= 0 { - log.Fatal("value size must be positive") - } - if scale <= 0 { - log.Fatal("scale must be positive") - } - if _, err = os.Stat(tmpDir); err != nil { - if os.IsNotExist(err) { - log.Fatal("tmpDir does not exist") - } - log.Fatal(err.Error()) - } - - cLog("Generating...") - start := time.Now() - err = export() - terror.MustNil(err) - cLog("Done!") - cLogf("Data placed in: %s", filepath.Join(tmpDir, "data.out")) - cLog("Time used: ", time.Since(start)) - cLog("=================================") -} - -func driveRunCmd() { - err := runCmd.Parse(os.Args[2:]) - terror.MustNil(err) - // Sanity checks - if bufSize <= 0 { - log.Fatal("buffer size must be positive") - } - if nWorkers <= 0 { - log.Fatal("the number of workers must be positive") - } - if inputRatio < 0 || inputRatio > 100 { - log.Fatal("input ratio must between 0 and 100 (inclusive)") - } - if outputRatio < 0 || outputRatio > 100 { - log.Fatal("output ratio must between 0 and 100 (inclusive)") - } - if _, err = os.Stat(tmpDir); err != nil { - if os.IsNotExist(err) { - log.Fatal("tmpDir does not exist") - } - terror.MustNil(err) - } - - var ( - dir string - profile *os.File - fs *filesort.FileSorter - ) - cLog("Loading...") - start := time.Now() - data, err := load(inputRatio) - terror.MustNil(err) - cLog("Done!") - cLogf("Loaded %d rows", len(data)) - cLog("Time used: ", time.Since(start)) - cLog("=================================") - - sc := new(stmtctx.StatementContext) - fsBuilder := new(filesort.Builder) - byDesc := make([]bool, keySize) - for i := 0; i < keySize; i++ { - byDesc[i] = false - } - dir, err = os.MkdirTemp(tmpDir, "benchfilesort_test") - terror.MustNil(err) - fs, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(nWorkers).SetDesc(byDesc).SetDir(dir).Build() - terror.MustNil(err) - - if cpuprofile != "" { - profile, err = os.Create(cpuprofile) - terror.MustNil(err) - } - - cLog("Inputing...") - start = time.Now() - for _, r := range data { - err = fs.Input(r.key, r.val, r.handle) - terror.MustNil(err) - } - cLog("Done!") - cLogf("Input %d rows", len(data)) - cLog("Time used: ", time.Since(start)) - cLog("=================================") - - cLog("Outputing...") - totalRows := int(float64(len(data)) * (float64(outputRatio) / 100.0)) - start = time.Now() - if cpuprofile != "" { - err = pprof.StartCPUProfile(profile) - terror.MustNil(err) - } - for i := 0; i < totalRows; i++ { - _, _, _, err = fs.Output() - terror.MustNil(err) - } - if cpuprofile != "" { - pprof.StopCPUProfile() - } - cLog("Done!") - cLogf("Output %d rows", totalRows) - cLog("Time used: ", time.Since(start)) - cLog("=================================") - - cLog("Closing...") - start = time.Now() - err = fs.Close() - terror.MustNil(err) - cLog("Done!") - cLog("Time used: ", time.Since(start)) - cLog("=================================") -} - -func init() { - err := logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false)) - terror.MustNil(err) - cwd, err1 := os.Getwd() - terror.MustNil(err1) - - genCmd.StringVar(&tmpDir, "dir", cwd, "where to store the generated rows") - genCmd.IntVar(&keySize, "keySize", 8, "the size of key") - genCmd.IntVar(&valSize, "valSize", 8, "the size of value") - genCmd.IntVar(&scale, "scale", 100, "how many rows to generate") - genCmd.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") - - runCmd.StringVar(&tmpDir, "dir", cwd, "where to load the generated rows") - runCmd.IntVar(&bufSize, "bufSize", 500000, "how many rows held in memory at a time") - runCmd.IntVar(&nWorkers, "nWorkers", 1, "how many workers used in async sorting") - runCmd.IntVar(&inputRatio, "inputRatio", 100, "input percentage") - runCmd.IntVar(&outputRatio, "outputRatio", 100, "output percentage") - runCmd.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") -} - -func main() { - flag.Parse() - - if len(os.Args) == 1 { - fmt.Printf("Usage:\n\n") - fmt.Printf("\tbenchfilesort command [arguments]\n\n") - fmt.Printf("The commands are:\n\n") - fmt.Println("\tgen\t", "generate rows") - fmt.Println("\trun\t", "run tests") - fmt.Println("") - fmt.Println("Checkout benchfilesort/README for more information.") - return - } - - switch os.Args[1] { - case "gen": - driveGenCmd() - case "run": - driveRunCmd() - default: - fmt.Printf("%q is not valid command.\n", os.Args[1]) - os.Exit(2) - } -} - -func cLogf(format string, args ...interface{}) { - str := fmt.Sprintf(format, args...) - fmt.Println("\033[0;32m" + str + "\033[0m") -} - -func cLog(args ...interface{}) { - str := fmt.Sprint(args...) - fmt.Println("\033[0;32m" + str + "\033[0m") -} diff --git a/util/filesort/filesort.go b/util/filesort/filesort.go deleted file mode 100644 index db3ec718aa415..0000000000000 --- a/util/filesort/filesort.go +++ /dev/null @@ -1,625 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package filesort - -import ( - "container/heap" - "encoding/binary" - "io" - "os" - "path/filepath" - "sort" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" -) - -type comparableRow struct { - key []types.Datum - val []types.Datum - handle int64 -} - -type item struct { - index int // source file index - value *comparableRow -} - -// rowHeap maintains a min-heap property of comparableRows. -type rowHeap struct { - sc *stmtctx.StatementContext - ims []*item - byDesc []bool - err error -} - -var headSize = 8 - -func lessThan(sc *stmtctx.StatementContext, i []types.Datum, j []types.Datum, byDesc []bool) (bool, error) { - for k := range byDesc { - v1 := i[k] - v2 := j[k] - - ret, err := v1.CompareDatum(sc, &v2) - if err != nil { - return false, errors.Trace(err) - } - - if byDesc[k] { - ret = -ret - } - - if ret < 0 { - return true, nil - } else if ret > 0 { - return false, nil - } - } - return false, nil -} - -// Len implements heap.Interface Len interface. -func (rh *rowHeap) Len() int { return len(rh.ims) } - -// Swap implements heap.Interface Swap interface. -func (rh *rowHeap) Swap(i, j int) { rh.ims[i], rh.ims[j] = rh.ims[j], rh.ims[i] } - -// Less implements heap.Interface Less interface. -func (rh *rowHeap) Less(i, j int) bool { - l := rh.ims[i].value.key - r := rh.ims[j].value.key - ret, err := lessThan(rh.sc, l, r, rh.byDesc) - if rh.err == nil { - rh.err = err - } - return ret -} - -// Push pushes an element into rowHeap. -func (rh *rowHeap) Push(x interface{}) { - rh.ims = append(rh.ims, x.(*item)) -} - -// Pop pops the last element from rowHeap. -func (rh *rowHeap) Pop() interface{} { - old := rh.ims - n := len(old) - x := old[n-1] - rh.ims = old[0 : n-1] - return x -} - -// FileSorter sorts the given rows according to the byDesc order. -// FileSorter can sort rows that exceed predefined memory capacity. -type FileSorter struct { - sc *stmtctx.StatementContext - byDesc []bool - - workers []*Worker - nWorkers int // number of workers used in async sorting - cWorker int // the next worker to which the sorting job is sent - - mu sync.Mutex - tmpDir string - files []string - nFiles int - cursor int // required when performing full in-memory sort - - rowHeap *rowHeap - fds []*os.File - rowBytes []byte - head []byte - dcod []types.Datum - keySize int - valSize int - maxRowSize int - - wg sync.WaitGroup - closed bool - fetched bool - external bool // mark the necessity of performing external file sort -} - -// Worker sorts file asynchronously. -type Worker struct { - ctx *FileSorter - busy int32 - keySize int - valSize int - rowSize int - bufSize int - buf []*comparableRow - head []byte - err error -} - -// Builder builds a new FileSorter. -type Builder struct { - sc *stmtctx.StatementContext - keySize int - valSize int - bufSize int - nWorkers int - byDesc []bool - tmpDir string -} - -// SetSC sets StatementContext instance which is required in row comparison. -func (b *Builder) SetSC(sc *stmtctx.StatementContext) *Builder { - b.sc = sc - return b -} - -// SetSchema sets the schema of row, including key size and value size. -func (b *Builder) SetSchema(keySize, valSize int) *Builder { - b.keySize = keySize - b.valSize = valSize - return b -} - -// SetBuf sets the number of rows FileSorter can hold in memory at a time. -func (b *Builder) SetBuf(bufSize int) *Builder { - b.bufSize = bufSize - return b -} - -// SetWorkers sets the number of workers used in async sorting. -func (b *Builder) SetWorkers(nWorkers int) *Builder { - b.nWorkers = nWorkers - return b -} - -// SetDesc sets the ordering rule of row comparison. -func (b *Builder) SetDesc(byDesc []bool) *Builder { - b.byDesc = byDesc - return b -} - -// SetDir sets the working directory for FileSorter. -func (b *Builder) SetDir(tmpDir string) *Builder { - b.tmpDir = tmpDir - return b -} - -// Build creates a FileSorter instance using given data. -func (b *Builder) Build() (*FileSorter, error) { - // Sanity checks - if b.sc == nil { - return nil, errors.New("StatementContext is nil") - } - if b.keySize != len(b.byDesc) { - return nil, errors.New("mismatch in key size and byDesc slice") - } - if b.keySize <= 0 { - return nil, errors.New("key size is not positive") - } - if b.valSize <= 0 { - return nil, errors.New("value size is not positive") - } - if b.bufSize <= 0 { - return nil, errors.New("buffer size is not positive") - } - _, err := os.Stat(b.tmpDir) - if err != nil { - if os.IsNotExist(err) { - return nil, errors.New("tmpDir does not exist") - } - return nil, errors.Trace(err) - } - - ws := make([]*Worker, b.nWorkers) - for i := range ws { - ws[i] = &Worker{ - keySize: b.keySize, - valSize: b.valSize, - rowSize: b.keySize + b.valSize + 1, - bufSize: b.bufSize / b.nWorkers, - buf: make([]*comparableRow, 0, b.bufSize/b.nWorkers), - head: make([]byte, headSize), - } - } - - rh := &rowHeap{sc: b.sc, - ims: make([]*item, 0), - byDesc: b.byDesc, - } - - fs := &FileSorter{sc: b.sc, - workers: ws, - nWorkers: b.nWorkers, - cWorker: 0, - - head: make([]byte, headSize), - dcod: make([]types.Datum, 0, b.keySize+b.valSize+1), - keySize: b.keySize, - valSize: b.valSize, - - tmpDir: b.tmpDir, - files: make([]string, 0), - byDesc: b.byDesc, - rowHeap: rh, - } - - for i := 0; i < b.nWorkers; i++ { - fs.workers[i].ctx = fs - } - - return fs, nil -} - -func (fs *FileSorter) getUniqueFileName() string { - fs.mu.Lock() - defer fs.mu.Unlock() - ret := filepath.Join(fs.tmpDir, strconv.Itoa(fs.nFiles)) - fs.nFiles++ - return ret -} - -func (fs *FileSorter) appendFileName(fn string) { - fs.mu.Lock() - defer fs.mu.Unlock() - fs.files = append(fs.files, fn) -} - -func (fs *FileSorter) closeAllFiles() error { - var reportErr error - for _, fd := range fs.fds { - err := fd.Close() - if reportErr == nil { - reportErr = err - } - } - err := os.RemoveAll(fs.tmpDir) - if reportErr == nil { - reportErr = err - } - if reportErr != nil { - return errors.Trace(reportErr) - } - return nil -} - -// internalSort performs full in-memory sort. -func (fs *FileSorter) internalSort() (*comparableRow, error) { - w := fs.workers[fs.cWorker] - - if !fs.fetched { - sort.Sort(w) - if w.err != nil { - return nil, errors.Trace(w.err) - } - fs.fetched = true - } - if fs.cursor < len(w.buf) { - r := w.buf[fs.cursor] - fs.cursor++ - return r, nil - } - return nil, nil -} - -// externalSort performs external file sort. -func (fs *FileSorter) externalSort() (*comparableRow, error) { - if !fs.fetched { - // flush all remaining content to file (if any) - for _, w := range fs.workers { - if atomic.LoadInt32(&(w.busy)) == 0 && len(w.buf) > 0 { - fs.wg.Add(1) - go w.flushToFile() - } - } - - // wait for all workers to finish - fs.wg.Wait() - - // check errors from workers - for _, w := range fs.workers { - if w.err != nil { - return nil, errors.Trace(w.err) - } - if w.rowSize > fs.maxRowSize { - fs.maxRowSize = w.rowSize - } - } - - heap.Init(fs.rowHeap) - if fs.rowHeap.err != nil { - return nil, errors.Trace(fs.rowHeap.err) - } - - fs.rowBytes = make([]byte, fs.maxRowSize) - - err := fs.openAllFiles() - if err != nil { - return nil, errors.Trace(err) - } - - for id := range fs.fds { - row, err := fs.fetchNextRow(id) - if err != nil { - return nil, errors.Trace(err) - } - if row == nil { - return nil, errors.New("file is empty") - } - - im := &item{ - index: id, - value: row, - } - - heap.Push(fs.rowHeap, im) - if fs.rowHeap.err != nil { - return nil, errors.Trace(fs.rowHeap.err) - } - } - - fs.fetched = true - } - - if fs.rowHeap.Len() > 0 { - im := heap.Pop(fs.rowHeap).(*item) - if fs.rowHeap.err != nil { - return nil, errors.Trace(fs.rowHeap.err) - } - - row, err := fs.fetchNextRow(im.index) - if err != nil { - return nil, errors.Trace(err) - } - if row != nil { - nextIm := &item{ - index: im.index, - value: row, - } - - heap.Push(fs.rowHeap, nextIm) - if fs.rowHeap.err != nil { - return nil, errors.Trace(fs.rowHeap.err) - } - } - - return im.value, nil - } - - return nil, nil -} - -func (fs *FileSorter) openAllFiles() error { - for _, fname := range fs.files { - fd, err := os.Open(fname) - if err != nil { - return errors.Trace(err) - } - fs.fds = append(fs.fds, fd) - } - return nil -} - -// fetchNextRow fetches the next row given the source file index. -func (fs *FileSorter) fetchNextRow(index int) (*comparableRow, error) { - n, err := fs.fds[index].Read(fs.head) - if err == io.EOF { - return nil, nil - } - if err != nil { - return nil, errors.Trace(err) - } - if n != headSize { - return nil, errors.New("incorrect header") - } - rowSize := int(binary.BigEndian.Uint64(fs.head)) - - n, err = fs.fds[index].Read(fs.rowBytes) - if err != nil { - return nil, errors.Trace(err) - } - if n != rowSize { - return nil, errors.New("incorrect row") - } - - fs.dcod, err = codec.Decode(fs.rowBytes, fs.keySize+fs.valSize+1) - if err != nil { - return nil, errors.Trace(err) - } - - return &comparableRow{ - key: fs.dcod[:fs.keySize], - val: fs.dcod[fs.keySize : fs.keySize+fs.valSize], - handle: fs.dcod[fs.keySize+fs.valSize:][0].GetInt64(), - }, nil -} - -// Input adds one row into FileSorter. -// Caller should not call Input after calling Output. -func (fs *FileSorter) Input(key []types.Datum, val []types.Datum, handle int64) error { - if fs.closed { - return errors.New("FileSorter has been closed") - } - if fs.fetched { - return errors.New("call input after output") - } - - assigned := false - abortTime := time.Duration(1) * time.Minute // 1 minute - cooldownTime := time.Duration(100) * time.Millisecond // 100 milliseconds - row := &comparableRow{ - key: key, - val: val, - handle: handle, - } - - origin := time.Now() - // assign input row to some worker in a round-robin way - for { - for i := 0; i < fs.nWorkers; i++ { - wid := (fs.cWorker + i) % fs.nWorkers - if atomic.LoadInt32(&(fs.workers[wid].busy)) == 0 { - fs.workers[wid].input(row) - assigned = true - fs.cWorker = wid - break - } - } - if assigned { - break - } - - // all workers are busy now, cooldown and retry - time.Sleep(cooldownTime) - - if time.Since(origin) >= abortTime { - // weird: all workers are busy for at least 1 min - // choose to abort for safety - return errors.New("can not make progress since all workers are busy") - } - } - return nil -} - -// Output gets the next sorted row. -func (fs *FileSorter) Output() ([]types.Datum, []types.Datum, int64, error) { - var ( - r *comparableRow - err error - ) - if fs.closed { - return nil, nil, 0, errors.New("FileSorter has been closed") - } - - if fs.external { - r, err = fs.externalSort() - } else { - r, err = fs.internalSort() - } - - if err != nil { - return nil, nil, 0, errors.Trace(err) - } else if r != nil { - return r.key, r.val, r.handle, nil - } else { - return nil, nil, 0, nil - } -} - -// Close terminates the input or output process and discards all remaining data. -func (fs *FileSorter) Close() error { - if fs.closed { - return nil - } - fs.wg.Wait() - for _, w := range fs.workers { - w.buf = w.buf[:0] - } - fs.closed = true - err := fs.closeAllFiles() - if err != nil { - return errors.Trace(err) - } - return nil -} - -func (w *Worker) Len() int { return len(w.buf) } - -func (w *Worker) Swap(i, j int) { w.buf[i], w.buf[j] = w.buf[j], w.buf[i] } - -func (w *Worker) Less(i, j int) bool { - l := w.buf[i].key - r := w.buf[j].key - ret, err := lessThan(w.ctx.sc, l, r, w.ctx.byDesc) - if w.err == nil { - w.err = errors.Trace(err) - } - return ret -} - -func (w *Worker) input(row *comparableRow) { - w.buf = append(w.buf, row) - - if len(w.buf) > w.bufSize { - atomic.StoreInt32(&(w.busy), int32(1)) - w.ctx.wg.Add(1) - w.ctx.external = true - go w.flushToFile() - } -} - -// flushToFile flushes the buffer to file if it is full. -func (w *Worker) flushToFile() { - defer w.ctx.wg.Done() - var ( - outputByte []byte - prevLen int - ) - - sort.Sort(w) - if w.err != nil { - return - } - - fileName := w.ctx.getUniqueFileName() - - outputFile, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - if err != nil { - w.err = errors.Trace(err) - return - } - defer terror.Call(outputFile.Close) - sc := &stmtctx.StatementContext{TimeZone: time.Local} - for _, row := range w.buf { - prevLen = len(outputByte) - outputByte = append(outputByte, w.head...) - outputByte, err = codec.EncodeKey(sc, outputByte, row.key...) - if err != nil { - w.err = errors.Trace(err) - return - } - outputByte, err = codec.EncodeKey(sc, outputByte, row.val...) - if err != nil { - w.err = errors.Trace(err) - return - } - outputByte, err = codec.EncodeKey(sc, outputByte, types.NewIntDatum(row.handle)) - if err != nil { - w.err = errors.Trace(err) - return - } - - if len(outputByte)-prevLen-headSize > w.rowSize { - w.rowSize = len(outputByte) - prevLen - headSize - } - binary.BigEndian.PutUint64(w.head, uint64(len(outputByte)-prevLen-headSize)) - for i := 0; i < headSize; i++ { - outputByte[prevLen+i] = w.head[i] - } - } - - _, err = outputFile.Write(outputByte) - if err != nil { - w.err = errors.Trace(err) - return - } - - w.ctx.appendFileName(fileName) - w.buf = w.buf[:0] - atomic.StoreInt32(&(w.busy), int32(0)) -} diff --git a/util/filesort/filesort_test.go b/util/filesort/filesort_test.go deleted file mode 100644 index 43c7146b018d3..0000000000000 --- a/util/filesort/filesort_test.go +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package filesort - -import ( - "math/rand" - "os" - "testing" - "time" - - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/types" - "github.com/stretchr/testify/require" -) - -func TestLessThan(t *testing.T) { - t.Parallel() - - sc := new(stmtctx.StatementContext) - - d0 := types.NewDatum(0) - d1 := types.NewDatum(1) - - tests := []struct { - i []types.Datum - j []types.Datum - byDesc []bool - ret bool - }{ - {[]types.Datum{d0}, []types.Datum{d0}, []bool{false}, false}, - {[]types.Datum{d0}, []types.Datum{d1}, []bool{false}, true}, - {[]types.Datum{d1}, []types.Datum{d0}, []bool{false}, false}, - {[]types.Datum{d0}, []types.Datum{d0}, []bool{true}, false}, - {[]types.Datum{d0}, []types.Datum{d1}, []bool{true}, false}, - {[]types.Datum{d1}, []types.Datum{d0}, []bool{true}, true}, - {[]types.Datum{d0, d0}, []types.Datum{d1, d1}, []bool{false, false}, true}, - {[]types.Datum{d0, d1}, []types.Datum{d1, d1}, []bool{false, false}, true}, - {[]types.Datum{d0, d0}, []types.Datum{d1, d1}, []bool{false, false}, true}, - {[]types.Datum{d0, d0}, []types.Datum{d0, d1}, []bool{false, false}, true}, - {[]types.Datum{d0, d1}, []types.Datum{d0, d1}, []bool{false, false}, false}, - {[]types.Datum{d0, d1}, []types.Datum{d0, d0}, []bool{false, false}, false}, - {[]types.Datum{d1, d0}, []types.Datum{d0, d1}, []bool{false, false}, false}, - {[]types.Datum{d1, d1}, []types.Datum{d0, d1}, []bool{false, false}, false}, - {[]types.Datum{d1, d1}, []types.Datum{d0, d0}, []bool{false, false}, false}, - } - - for _, test := range tests { - ret, err := lessThan(sc, test.i, test.j, test.byDesc) - require.NoError(t, err) - require.Equal(t, test.ret, ret) - } -} - -func TestInMemory(t *testing.T) { - t.Parallel() - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - sc := new(stmtctx.StatementContext) - keySize := r.Intn(10) + 1 // random int in range [1, 10] - valSize := r.Intn(20) + 1 // random int in range [1, 20] - bufSize := 40 // hold up to 40 items per file - byDesc := make([]bool, keySize) - for i := range byDesc { - byDesc[i] = r.Intn(2) == 0 - } - - var ( - err error - fs *FileSorter - pkey []types.Datum - key []types.Datum - tmpDir string - ret bool - ) - - tmpDir, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - - fsBuilder := new(Builder) - fs, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(1).SetDesc(byDesc).SetDir(tmpDir).Build() - require.NoError(t, err) - defer func() { - err := fs.Close() - require.NoError(t, err) - }() - - nRows := r.Intn(bufSize-1) + 1 // random int in range [1, bufSize - 1] - for i := 1; i <= nRows; i++ { - err = fs.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - } - - pkey, _, _, err = fs.Output() - require.NoError(t, err) - - for i := 1; i < nRows; i++ { - key, _, _, err = fs.Output() - require.NoError(t, err) - ret, err = lessThan(sc, key, pkey, byDesc) - require.NoError(t, err) - require.False(t, ret) - pkey = key - } -} - -func TestMultipleFiles(t *testing.T) { - t.Parallel() - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - sc := new(stmtctx.StatementContext) - keySize := r.Intn(10) + 1 // random int in range [1, 10] - valSize := r.Intn(20) + 1 // random int in range [1, 20] - bufSize := 40 // hold up to 40 items per file - byDesc := make([]bool, keySize) - for i := range byDesc { - byDesc[i] = r.Intn(2) == 0 - } - - var ( - err error - fs *FileSorter - pkey []types.Datum - key []types.Datum - tmpDir string - ret bool - ) - - tmpDir, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - - fsBuilder := new(Builder) - - // Test for basic function. - _, err = fsBuilder.Build() - require.EqualError(t, err, "StatementContext is nil") - fsBuilder.SetSC(sc) - _, err = fsBuilder.Build() - require.EqualError(t, err, "key size is not positive") - fsBuilder.SetDesc(byDesc) - _, err = fsBuilder.Build() - require.EqualError(t, err, "mismatch in key size and byDesc slice") - fsBuilder.SetSchema(keySize, valSize) - _, err = fsBuilder.Build() - require.EqualError(t, err, "buffer size is not positive") - fsBuilder.SetBuf(bufSize) - _, err = fsBuilder.Build() - require.EqualError(t, err, "tmpDir does not exist") - fsBuilder.SetDir(tmpDir) - - fs, err = fsBuilder.SetWorkers(1).Build() - require.NoError(t, err) - defer func() { - err := fs.Close() - require.NoError(t, err) - }() - - nRows := (r.Intn(bufSize) + 1) * (r.Intn(10) + 2) - for i := 1; i <= nRows; i++ { - err = fs.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - } - - pkey, _, _, err = fs.Output() - require.NoError(t, err) - for i := 1; i < nRows; i++ { - key, _, _, err = fs.Output() - require.NoError(t, err) - ret, err = lessThan(sc, key, pkey, byDesc) - require.NoError(t, err) - require.False(t, ret) - pkey = key - } -} - -func TestMultipleWorkers(t *testing.T) { - t.Parallel() - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - sc := new(stmtctx.StatementContext) - keySize := r.Intn(10) + 1 // random int in range [1, 10] - valSize := r.Intn(20) + 1 // random int in range [1, 20] - bufSize := 40 // hold up to 40 items per file - byDesc := make([]bool, keySize) - for i := range byDesc { - byDesc[i] = r.Intn(2) == 0 - } - - var ( - err error - fs *FileSorter - pkey []types.Datum - key []types.Datum - tmpDir string - ret bool - ) - - tmpDir, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - - fsBuilder := new(Builder) - fs, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(4).SetDesc(byDesc).SetDir(tmpDir).Build() - require.NoError(t, err) - defer func() { - err := fs.Close() - require.NoError(t, err) - }() - - nRows := (r.Intn(bufSize) + 1) * (r.Intn(10) + 2) - for i := 1; i <= nRows; i++ { - err = fs.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - } - - pkey, _, _, err = fs.Output() - require.NoError(t, err) - for i := 1; i < nRows; i++ { - key, _, _, err = fs.Output() - require.NoError(t, err) - ret, err = lessThan(sc, key, pkey, byDesc) - require.NoError(t, err) - require.False(t, ret) - pkey = key - } -} - -func TestClose(t *testing.T) { - t.Parallel() - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - sc := new(stmtctx.StatementContext) - keySize := 2 - valSize := 2 - bufSize := 40 - byDesc := []bool{false, false} - - var ( - err error - fs0 *FileSorter - fs1 *FileSorter - tmpDir0 string - tmpDir1 string - errmsg = "FileSorter has been closed" - ) - - // Prepare two FileSorter instances for tests - fsBuilder := new(Builder) - tmpDir0, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - fs0, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(1).SetDesc(byDesc).SetDir(tmpDir0).Build() - require.NoError(t, err) - defer func() { - err := fs0.Close() - require.NoError(t, err) - }() - - tmpDir1, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - fs1, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(1).SetDesc(byDesc).SetDir(tmpDir1).Build() - require.NoError(t, err) - defer func() { - err := fs1.Close() - require.NoError(t, err) - }() - - // 1. Close after some Input - err = fs0.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - - err = fs0.Close() - require.NoError(t, err) - - _, err = os.Stat(tmpDir0) - require.True(t, os.IsNotExist(err)) - - _, _, _, err = fs0.Output() - require.EqualError(t, err, errmsg) - - err = fs0.Input(nextRow(r, keySize, valSize)) - require.EqualError(t, err, errmsg) - - err = fs0.Close() - require.NoError(t, err) - - // 2. Close after some Output - err = fs1.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - err = fs1.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - - _, _, _, err = fs1.Output() - require.NoError(t, err) - - err = fs1.Close() - require.NoError(t, err) - - _, err = os.Stat(tmpDir1) - require.True(t, os.IsNotExist(err)) - - _, _, _, err = fs1.Output() - require.EqualError(t, err, errmsg) - - err = fs1.Input(nextRow(r, keySize, valSize)) - require.EqualError(t, err, errmsg) - - err = fs1.Close() - require.NoError(t, err) -} - -func TestMismatchedUsage(t *testing.T) { - t.Parallel() - - seed := rand.NewSource(time.Now().UnixNano()) - r := rand.New(seed) - - sc := new(stmtctx.StatementContext) - keySize := 2 - valSize := 2 - bufSize := 40 - byDesc := []bool{false, false} - - var ( - err error - fs0 *FileSorter - fs1 *FileSorter - key []types.Datum - tmpDir string - errmsg = "call input after output" - ) - - // Prepare two FileSorter instances for tests - fsBuilder := new(Builder) - tmpDir, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - fs0, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(1).SetDesc(byDesc).SetDir(tmpDir).Build() - require.NoError(t, err) - defer func() { - err := fs0.Close() - require.NoError(t, err) - }() - - tmpDir, err = os.MkdirTemp("", "util_filesort_test") - require.NoError(t, err) - fs1, err = fsBuilder.SetSC(sc).SetSchema(keySize, valSize).SetBuf(bufSize).SetWorkers(1).SetDesc(byDesc).SetDir(tmpDir).Build() - require.NoError(t, err) - defer func() { - err := fs1.Close() - require.NoError(t, err) - }() - - // 1. call Output after fetched all rows - err = fs0.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - - key, _, _, err = fs0.Output() - require.NoError(t, err) - require.NotNil(t, key) - - key, _, _, err = fs0.Output() - require.NoError(t, err) - require.Nil(t, key) - - // 2. call Input after Output - err = fs1.Input(nextRow(r, keySize, valSize)) - require.NoError(t, err) - - key, _, _, err = fs1.Output() - require.NoError(t, err) - require.NotNil(t, key) - - err = fs1.Input(nextRow(r, keySize, valSize)) - require.EqualError(t, err, errmsg) -} diff --git a/util/filesort/main_test.go b/util/filesort/main_test.go deleted file mode 100644 index 6a3378a2187d7..0000000000000 --- a/util/filesort/main_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package filesort - -import ( - "math/rand" - "testing" - - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/testbridge" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - testbridge.WorkaroundGoCheckFlags() - goleak.VerifyTestMain(m) -} - -func nextRow(r *rand.Rand, keySize int, valSize int) (key []types.Datum, val []types.Datum, handle int64) { - key = make([]types.Datum, keySize) - for i := range key { - key[i] = types.NewDatum(r.Int()) - } - - val = make([]types.Datum, valSize) - for j := range val { - val[j] = types.NewDatum(r.Int()) - } - - handle = r.Int63() - return -}