From 050e33cd219bd6da0857e1bf592c4fa877fab7ff Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 2 Sep 2021 16:53:45 +0800 Subject: [PATCH 1/9] fix lightning populate chunks --- br/pkg/lightning/mydump/region.go | 5 ++- br/pkg/lightning/mydump/region_test.go | 56 ++++++++++++++++++++++++++ br/pkg/storage/s3.go | 23 +++++++++++ br/pkg/storage/s3_test.go | 9 +++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 07b7693a9a216..53a17a5bfef1e 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -268,7 +268,7 @@ func makeSourceFileRegion( } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. - if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat { + if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize)*11/10 { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } @@ -359,6 +359,9 @@ func SplitLargeFile( columns = parser.Columns() startOffset, _ = parser.Pos() endOffset = startOffset + maxRegionSize + if endOffset > dataFile.FileMeta.FileSize { + endOffset = dataFile.FileMeta.FileSize + } } for { curRowsCnt := (endOffset - startOffset) / divisor diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 657781c92c996..6ee26692d4e8c 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -331,3 +331,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) { c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1]) } } + +func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) { + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + } + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: "", + Header: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, + }, + StrictFormat: true, + Filter: []string{"*.*"}, + MaxRegionSize: 15, + }, + } + + dir := c.MkDir() + + fileName := "test.csv" + filePath := filepath.Join(dir, fileName) + + content := []byte("field1,field2\r\n123,456\r\n") + err := os.WriteFile(filePath, content, 0o644) + c.Assert(err, IsNil) + + dataFileInfo, err := os.Stat(filePath) + c.Assert(err, IsNil) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} + colCnt := int64(2) + columns := []string{"field1", "field2"} + prevRowIdxMax := int64(0) + ioWorker := worker.NewPool(context.Background(), 4, "io") + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + offsets := [][]int64{{14, 24}} + + _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + c.Assert(err, IsNil) + c.Assert(regions, HasLen, len(offsets)) + for i := range offsets { + c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0]) + c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1]) + c.Assert(regions[i].Chunk.Columns, DeepEquals, columns) + } +} diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 8fe1b6b3ae24c..56a54848100a8 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -648,6 +648,17 @@ func (r *s3ObjectReader) Close() error { return r.reader.Close() } +// eofReader is a io.ReaderClose Reader that always return io.EOF +type eofReader struct{} + +func (r eofReader) Read([]byte) (n int, err error) { + return 0, io.EOF +} + +func (r eofReader) Close() error { + return nil +} + // Seek implement the io.Seeker interface. // // Currently, tidb-lightning depends on this method to read parquet file for s3 storage. @@ -666,6 +677,18 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { if realOffset == r.pos { return realOffset, nil + } else if realOffset >= r.rangeInfo.Size { + // See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + // because s3's GetObject interface doesn't all a range that matches zero lenghth data, + // so if the position is out of range, we need to always return io.EOF after the seek operation. + + // close current read and open a new one which target offset + if err := r.reader.Close(); err != nil { + log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err)) + } + + r.reader = eofReader{} + return r.rangeInfo.Size, nil } // if seek ahead no more than 64k, we discard these data diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index f1a4e42221afa..886e2e318487d 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -740,6 +740,15 @@ func (s *s3Suite) TestOpenSeek(c *C) { c.Assert(err, IsNil) c.Assert(n, Equals, 100) c.Assert(slice, DeepEquals, someRandomBytes[990100:990200]) + + // test seek to the file end or bigger positions + for _, p := range []int64{1000000, 1000001, 2000000} { + offset, err = reader.Seek(p, io.SeekStart) + c.Assert(offset, Equals, int64(1000000)) + c.Assert(err, IsNil) + _, err = reader.Read(slice) + c.Assert(err, Equals, io.EOF) + } } type limitedBytesReader struct { From c5068644cfa29d549f3b17a635d7f580736bd283 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 2 Sep 2021 17:06:00 +0800 Subject: [PATCH 2/9] fix comment --- br/pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 56a54848100a8..3a22d5e14c254 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -679,7 +679,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return realOffset, nil } else if realOffset >= r.rangeInfo.Size { // See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - // because s3's GetObject interface doesn't all a range that matches zero lenghth data, + // because s3's GetObject interface doesn't allow get a range that matches zero length data, // so if the position is out of range, we need to always return io.EOF after the seek operation. // close current read and open a new one which target offset From d87cdb8074ebbe99ec330c48d35ca1ea70ec8d9d Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 10:05:34 +0800 Subject: [PATCH 3/9] resolve comments --- br/pkg/lightning/mydump/region.go | 3 +++ br/pkg/storage/s3.go | 13 +------------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 53a17a5bfef1e..46a069f54ffe0 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -268,6 +268,9 @@ func makeSourceFileRegion( } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. + // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools + // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can + // avoid split a lot of small chunks. if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize)*11/10 { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 3a22d5e14c254..f664d51e0d970 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -648,17 +648,6 @@ func (r *s3ObjectReader) Close() error { return r.reader.Close() } -// eofReader is a io.ReaderClose Reader that always return io.EOF -type eofReader struct{} - -func (r eofReader) Read([]byte) (n int, err error) { - return 0, io.EOF -} - -func (r eofReader) Close() error { - return nil -} - // Seek implement the io.Seeker interface. // // Currently, tidb-lightning depends on this method to read parquet file for s3 storage. @@ -687,7 +676,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err)) } - r.reader = eofReader{} + r.reader = io.NopCloser(bytes.NewReader(nil)) return r.rangeInfo.Size, nil } From 30f32d11ddb66b45065374f4ed755729706fab32 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 10:09:41 +0800 Subject: [PATCH 4/9] fix seek --- br/pkg/storage/s3.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index f664d51e0d970..2c07b5af2cad0 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -677,7 +677,8 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { } r.reader = io.NopCloser(bytes.NewReader(nil)) - return r.rangeInfo.Size, nil + r.pos = r.rangeInfo.Size + return r.pos, nil } // if seek ahead no more than 64k, we discard these data From b79e04e93982aa1f00c1f3bbc4e9350ab22f03cd Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 10:17:42 +0800 Subject: [PATCH 5/9] add check for the seek position --- br/pkg/storage/s3_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 886e2e318487d..d55362055d3e2 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -746,8 +746,9 @@ func (s *s3Suite) TestOpenSeek(c *C) { offset, err = reader.Seek(p, io.SeekStart) c.Assert(offset, Equals, int64(1000000)) c.Assert(err, IsNil) - _, err = reader.Read(slice) + pos, err := reader.Read(slice) c.Assert(err, Equals, io.EOF) + c.Assert(pos, Equals, int64(1000000)) } } From 3e76255f329bbc55629202dbc18796c84d1614e8 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 14:56:09 +0800 Subject: [PATCH 6/9] resolve comments --- br/pkg/lightning/mydump/region.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 46a069f54ffe0..5424fb6151390 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -30,7 +30,11 @@ import ( "go.uber.org/zap" ) -const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 +const ( + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + // the increment ration of large CSV file size threshold by `region-split-size` + largeCSVLowerThresholdRation = 10 +) type TableRegion struct { EngineID int32 @@ -271,7 +275,7 @@ func makeSourceFileRegion( // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can // avoid split a lot of small chunks. - if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize)*11/10 { + if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize + cfg.Mydumper.MaxRegionSize / largeCSVLowerThresholdRation) { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } From 84417741191539671d3487fe328edf3fbd979551 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 15:21:21 +0800 Subject: [PATCH 7/9] fmt --- br/pkg/lightning/mydump/region.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 9617db306e653..5f60d795d33c6 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -275,7 +275,7 @@ func makeSourceFileRegion( // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can // avoid split a lot of small chunks. - if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize + cfg.Mydumper.MaxRegionSize / largeCSVLowerThresholdRation) { + if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } From 01c4e53ad9648ea530ad34f2ee6ee70236f94695 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 6 Sep 2021 16:26:18 +0800 Subject: [PATCH 8/9] fix typo --- br/pkg/lightning/mydump/region.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 5f60d795d33c6..4a3ce247a43a9 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -32,7 +32,7 @@ import ( const ( tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 - // the increment ration of large CSV file size threshold by `region-split-size` + // the increment ratio of large CSV file size threshold by `region-split-size` largeCSVLowerThresholdRation = 10 ) From 689f411088bb2d8085df6650948b86af0ec97b55 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 8 Sep 2021 10:51:32 +0800 Subject: [PATCH 9/9] fix unit test --- br/pkg/storage/s3_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index d55362055d3e2..413f5e8881da1 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -746,9 +746,8 @@ func (s *s3Suite) TestOpenSeek(c *C) { offset, err = reader.Seek(p, io.SeekStart) c.Assert(offset, Equals, int64(1000000)) c.Assert(err, IsNil) - pos, err := reader.Read(slice) + _, err := reader.Read(slice) c.Assert(err, Equals, io.EOF) - c.Assert(pos, Equals, int64(1000000)) } }