From 6a3399337772f6bd473009cea75a9ac380c55667 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 29 Jul 2024 17:25:17 +0800 Subject: [PATCH] config: must set line terminator when use strict-format (#53444) (#54939) close pingcap/tidb#37338 --- lightning/pkg/importer/chunk_process.go | 2 +- .../lightning_column_permutation/config.toml | 3 + pkg/executor/importer/import.go | 4 + pkg/lightning/config/BUILD.bazel | 2 +- pkg/lightning/config/config.go | 5 + pkg/lightning/config/config_test.go | 24 +++++ pkg/lightning/mydump/csv_parser.go | 3 +- pkg/lightning/mydump/region.go | 8 +- pkg/lightning/mydump/region_test.go | 95 +++++++++++++++++++ .../r/executor/import_into.result | 2 + .../t/executor/import_into.test | 2 + .../importintotest4/split_file_test.go | 15 ++- 12 files changed, 159 insertions(+), 6 deletions(-) diff --git a/lightning/pkg/importer/chunk_process.go b/lightning/pkg/importer/chunk_process.go index 4f071ccae038a..4c9ce9047bc2a 100644 --- a/lightning/pkg/importer/chunk_process.go +++ b/lightning/pkg/importer/chunk_process.go @@ -724,7 +724,7 @@ func (cr *chunkProcessor) deliverLoop( rc.status.FinishedFileSize.Add(delta) } } else { - deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset), + deliverLogger.Error("offset go back", zap.Int64("curr", highOffset), zap.Int64("start", lowOffset)) } } diff --git a/lightning/tests/lightning_column_permutation/config.toml b/lightning/tests/lightning_column_permutation/config.toml index 7bbff7f805a46..845873d84c8b9 100644 --- a/lightning/tests/lightning_column_permutation/config.toml +++ b/lightning/tests/lightning_column_permutation/config.toml @@ -1,3 +1,6 @@ [mydumper] strict-format = true max-region-size = 200 + +[mydumper.csv] +terminator = "\n" diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 589f7a9df6000..07889f31bf622 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -758,6 +758,10 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled") } + if p.SplitFile && len(p.LinesTerminatedBy) == 0 { + return exeerrors.ErrInvalidOptionVal.FastGenByArgs("lines_terminated_by, should not be empty when use split_file") + } + p.adjustOptions(targetNodeCPUCnt) return nil } diff --git a/pkg/lightning/config/BUILD.bazel b/pkg/lightning/config/BUILD.bazel index 1eb10ff294821..459bc46dda9fa 100644 --- a/pkg/lightning/config/BUILD.bazel +++ b/pkg/lightning/config/BUILD.bazel @@ -42,7 +42,7 @@ go_test( ], embed = [":config"], flaky = True, - shard_count = 48, + shard_count = 49, deps = [ "@com_github_burntsushi_toml//:toml", "@com_github_stretchr_testify//require", diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index 3e52545ea6e05..991535c6543ad 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -884,6 +884,11 @@ func (m *MydumperRuntime) adjust() error { if err := m.CSV.adjust(); err != nil { return err } + if m.StrictFormat && len(m.CSV.Terminator) == 0 { + return common.ErrInvalidConfig.GenWithStack( + `mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`) + } + for _, rule := range m.FileRouters { if filepath.IsAbs(rule.Path) { relPath, err := filepath.Rel(m.SourceDir, rule.Path) diff --git a/pkg/lightning/config/config_test.go b/pkg/lightning/config/config_test.go index 165a3b5023bff..be46653682b75 100644 --- a/pkg/lightning/config/config_test.go +++ b/pkg/lightning/config/config_test.go @@ -81,6 +81,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) { require.Equal(t, "123.45.67.89:1234,56.78.90.12:3456", cfg.TiDB.PdAddr) } +func TestStrictFormat(t *testing.T) { + ts, host, port := startMockServer(t, http.StatusOK, + `{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`, + ) + defer ts.Close() + + cfg := NewConfig() + cfg.TiDB.Host = host + cfg.TiDB.StatusPort = port + cfg.Mydumper.SourceDir = "." + cfg.TikvImporter.Backend = BackendLocal + cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 + cfg.Mydumper.StrictFormat = true + + err := cfg.Adjust(context.Background()) + require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator") + t.Log(err.Error()) + + cfg.Mydumper.CSV.Terminator = "\r\n" + err = cfg.Adjust(context.Background()) + require.NoError(t, err) +} + func TestPausePDSchedulerScope(t *testing.T) { ts, host, port := startMockServer(t, http.StatusOK, `{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`, diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index c82f09adb0436..8d48496089bd9 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -97,7 +97,8 @@ type field struct { quoted bool } -// NewCSVParser creates a CSV parser. +// NewCSVParser creates a CSV parser. The ownership of the reader is transferred +// to the parser. func NewCSVParser( ctx context.Context, cfg *config.CSVConfig, diff --git a/pkg/lightning/mydump/region.go b/pkg/lightning/mydump/region.go index 496854134c2c2..fa8863079b764 100644 --- a/pkg/lightning/mydump/region.go +++ b/pkg/lightning/mydump/region.go @@ -416,6 +416,7 @@ func SplitLargeCSV( // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { + _ = r.Close() return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor) @@ -423,6 +424,7 @@ func SplitLargeCSV( return nil, nil, err } if err = parser.ReadColumns(); err != nil { + _ = parser.Close() return nil, nil, err } if cfg.CSV.HeaderSchemaMatch { @@ -433,6 +435,7 @@ func SplitLargeCSV( if endOffset > dataFile.FileMeta.FileSize { endOffset = dataFile.FileMeta.FileSize } + _ = parser.Close() } divisor := int64(cfg.ColumnCnt) for { @@ -446,6 +449,7 @@ func SplitLargeCSV( // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { + _ = r.Close() return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor) @@ -453,11 +457,13 @@ func SplitLargeCSV( return nil, nil, err } if err = parser.SetPos(endOffset, 0); err != nil { + _ = parser.Close() return nil, nil, err } _, pos, err := parser.ReadUntilTerminator() if err != nil { if !errors.ErrorEqual(err, io.EOF) { + _ = parser.Close() return nil, nil, err } log.FromContext(ctx).Warn("file contains no terminator at end", @@ -466,7 +472,7 @@ func SplitLargeCSV( pos = dataFile.FileMeta.FileSize } endOffset = pos - parser.Close() + _ = parser.Close() } regions = append(regions, &TableRegion{ diff --git a/pkg/lightning/mydump/region_test.go b/pkg/lightning/mydump/region_test.go index 1c2f70c1a2d6b..835934cfc7a00 100644 --- a/pkg/lightning/mydump/region_test.go +++ b/pkg/lightning/mydump/region_test.go @@ -487,3 +487,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { require.Equal(t, columns, regions[i].Chunk.Columns) } } + +func TestSplitLargeFileSeekInsideCRLF(t *testing.T) { + ctx := context.Background() + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_seek_inside_crlf", + } + + dir := t.TempDir() + + fileName := "test.csv" + filePath := filepath.Join(dir, fileName) + + content := []byte("1\r\n2\r\n3\r\n4\r\n") + err := os.WriteFile(filePath, content, 0o644) + require.NoError(t, err) + + dataFileInfo, err := os.Stat(filePath) + require.NoError(t, err) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} + ioWorker := worker.NewPool(context.Background(), 4, "io") + + store, err := storage.NewLocalStorage(dir) + require.NoError(t, err) + + // if we don't set terminator, it will get the wrong result + + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + }, + StrictFormat: true, + Filter: []string{"*.*"}, + MaxRegionSize: 2, + }, + } + divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta) + + // in fact this is the wrong result, just to show the bug. pos mismatch with + // offsets. and we might read more rows than expected because we use == rather + // than >= to stop reading. + offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}} + pos := []int64{2, 5, 8, 11} + + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) + require.NoError(t, err) + require.Len(t, regions, len(offsets)) + for i := range offsets { + require.Equal(t, offsets[i][0], regions[i].Chunk.Offset) + require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset) + } + + file, err := os.Open(filePath) + require.NoError(t, err) + parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil) + require.NoError(t, err) + + for parser.ReadRow() == nil { + p, _ := parser.Pos() + require.Equal(t, pos[0], p) + pos = pos[1:] + } + require.NoError(t, parser.Close()) + + // set terminator to "\r\n" + + cfg.Mydumper.CSV.Terminator = "\r\n" + divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta) + // pos is contained in expectedOffsets + expectedOffsets := [][]int64{{0, 6}, {6, 12}} + pos = []int64{3, 6, 9, 12} + + regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo) + require.NoError(t, err) + require.Len(t, regions, len(expectedOffsets)) + for i := range expectedOffsets { + require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset) + require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset) + } + + file, err = os.Open(filePath) + require.NoError(t, err) + parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil) + require.NoError(t, err) + + for parser.ReadRow() == nil { + p, _ := parser.Pos() + require.Equal(t, pos[0], p) + pos = pos[1:] + } + require.NoError(t, parser.Close()) +} diff --git a/tests/integrationtest/r/executor/import_into.result b/tests/integrationtest/r/executor/import_into.result index ce7896e9cd4fb..5c5250765c6c9 100644 --- a/tests/integrationtest/r/executor/import_into.result +++ b/tests/integrationtest/r/executor/import_into.result @@ -74,6 +74,8 @@ import into t from '/file.csv' with skip_rows=true; Error 8164 (HY000): Invalid option value for skip_rows import into t from '/file.csv' with split_file='aa'; Error 8164 (HY000): Invalid option value for split_file +import into t from '/file.csv' with split_file; +Error 8164 (HY000): Invalid option value for lines_terminated_by, should not be empty when use split_file import into t from '/file.csv' with split_file, skip_rows=2; Error 8164 (HY000): Invalid option value for skip_rows, should be <= 1 when split-file is enabled import into t from '/file.csv' with disk_quota='aa'; diff --git a/tests/integrationtest/t/executor/import_into.test b/tests/integrationtest/t/executor/import_into.test index 6c1dec4ae4e21..68b3a4fa679b2 100644 --- a/tests/integrationtest/t/executor/import_into.test +++ b/tests/integrationtest/t/executor/import_into.test @@ -78,6 +78,8 @@ import into t from '/file.csv' with skip_rows=true; -- error 8164 import into t from '/file.csv' with split_file='aa'; -- error 8164 +import into t from '/file.csv' with split_file; +-- error 8164 import into t from '/file.csv' with split_file, skip_rows=2; -- error 8164 import into t from '/file.csv' with disk_quota='aa'; diff --git a/tests/realtikvtest/importintotest4/split_file_test.go b/tests/realtikvtest/importintotest4/split_file_test.go index ac51dbe8e2206..6f4559a980bd0 100644 --- a/tests/realtikvtest/importintotest4/split_file_test.go +++ b/tests/realtikvtest/importintotest4/split_file_test.go @@ -54,7 +54,7 @@ func (s *mockGCSSuite) TestSplitFile() { }) // split into 3 engines(subtasks) importSQL := fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s' - with split_file, __max_engine_size = '1'`, gcsEndpoint) + with split_file, lines_terminated_by = '\n', __max_engine_size = '1'`, gcsEndpoint) result := s.tk.MustQuery(importSQL).Rows() s.Len(result, 1) jobID, err := strconv.Atoi(result[0][0].(string)) @@ -74,7 +74,18 @@ func (s *mockGCSSuite) TestSplitFile() { // skip 1 row s.tk.MustExec("truncate table t") importSQL = fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s' - with split_file, skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint) + with split_file, lines_terminated_by = '\n', skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint) s.tk.MustQuery(importSQL) s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData[1:]...)) + + s.tk.MustExec("create table t2 (a int primary key nonclustered, b varchar(100));") + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "split-file", Name: "2.csv"}, + Content: []byte("1,2\r\n3,4\r\n5,6\r\n7,8\r\n9,10\r\n"), + }) + config.MaxRegionSize = 9 + importSQL = fmt.Sprintf(`import into split_file.t2 FROM 'gs://split-file/2.csv?endpoint=%s' + with split_file, lines_terminated_by='\r\n'`, gcsEndpoint) + s.tk.MustQuery(importSQL) + s.tk.MustExec("admin check table t2") }