diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index dbb7d17d6e317..40a265ebeaf2d 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -43,7 +43,7 @@ go_test( ], embed = [":config"], flaky = True, - shard_count = 47, + shard_count = 48, deps = [ "//br/pkg/lightning/common", "@com_github_burntsushi_toml//:toml", diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 2ab49db444285..329f564ddee7d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -870,6 +870,11 @@ func (m *MydumperRuntime) adjust() error { if err := m.CSV.adjust(); err != nil { return err } + if m.StrictFormat && len(m.CSV.Terminator) == 0 { + return common.ErrInvalidConfig.GenWithStack( + `mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`) + } + for _, rule := range m.FileRouters { if filepath.IsAbs(rule.Path) { relPath, err := filepath.Rel(m.SourceDir, rule.Path) diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 0452121c1cb3c..d148b471b549f 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -82,6 +82,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) { require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr) } +func TestStrictFormat(t *testing.T) { + ts, host, port := startMockServer(t, http.StatusOK, + `{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`, + ) + defer ts.Close() + + cfg := NewConfig() + cfg.TiDB.Host = host + cfg.TiDB.StatusPort = port + cfg.Mydumper.SourceDir = "." + cfg.TikvImporter.Backend = BackendLocal + cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 + cfg.Mydumper.StrictFormat = true + + err := cfg.Adjust(context.Background()) + require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator") + t.Log(err.Error()) + + cfg.Mydumper.CSV.Terminator = "\r\n" + err = cfg.Adjust(context.Background()) + require.NoError(t, err) +} + func TestPausePDSchedulerScope(t *testing.T) { ts, host, port := startMockServer(t, http.StatusOK, `{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`, diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index f8d56c823e595..c9510548b866b 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -724,7 +724,7 @@ func (cr *chunkProcessor) deliverLoop( rc.status.FinishedFileSize.Add(delta) } } else { - deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset), + deliverLogger.Error("offset go back", zap.Int64("curr", highOffset), zap.Int64("start", lowOffset)) } } diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index 5d202689a0884..5be4a0a9a5c59 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -98,7 +98,8 @@ type field struct { quoted bool } -// NewCSVParser creates a CSV parser. +// NewCSVParser creates a CSV parser. The ownership of the reader is transferred +// to the parser. func NewCSVParser( ctx context.Context, cfg *config.CSVConfig, diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 0e24530f2364e..dd5a48075f35e 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -417,6 +417,7 @@ func SplitLargeCSV( // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { + _ = r.Close() return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor) @@ -424,6 +425,7 @@ func SplitLargeCSV( return nil, nil, err } if err = parser.ReadColumns(); err != nil { + _ = parser.Close() return nil, nil, err } if cfg.CSV.HeaderSchemaMatch { @@ -434,6 +436,7 @@ func SplitLargeCSV( if endOffset > dataFile.FileMeta.FileSize { endOffset = dataFile.FileMeta.FileSize } + _ = parser.Close() } divisor := int64(cfg.ColumnCnt) for { @@ -447,6 +450,7 @@ func SplitLargeCSV( // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) if err != nil { + _ = r.Close() return nil, nil, err } parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor) @@ -454,11 +458,13 @@ func SplitLargeCSV( return nil, nil, err } if err = parser.SetPos(endOffset, 0); err != nil { + _ = parser.Close() return nil, nil, err } _, pos, err := parser.ReadUntilTerminator() if err != nil { if !errors.ErrorEqual(err, io.EOF) { + _ = parser.Close() return nil, nil, err } log.FromContext(ctx).Warn("file contains no terminator at end", @@ -467,7 +473,7 @@ func SplitLargeCSV( pos = dataFile.FileMeta.FileSize } endOffset = pos - parser.Close() + _ = parser.Close() } regions = append(regions, &TableRegion{ diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index d947f1783242c..032eefe77badf 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -487,3 +487,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { require.Equal(t, columns, regions[i].Chunk.Columns) } } + +func TestSplitLargeFileSeekInsideCRLF(t *testing.T) { + ctx := context.Background() + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_seek_inside_crlf", + } + + dir := t.TempDir() + + fileName := "test.csv" + filePath := filepath.Join(dir, fileName) + + content := []byte("1\r\n2\r\n3\r\n4\r\n") + err := os.WriteFile(filePath, content, 0o644) + require.NoError(t, err) + + dataFileInfo, err := os.Stat(filePath) + require.NoError(t, err) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} + ioWorker := worker.NewPool(context.Background(), 4, "io") + + store, err := storage.NewLocalStorage(dir) + require.NoError(t, err) + + // if we don't set terminator, it will get the wrong result + + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + }, + StrictFormat: true, + Filter: []string{"*.*"}, + MaxRegionSize: 2, + }, + } + divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta) + + // in fact this is the wrong result, just to show the bug. pos mismatch with + // offsets. and we might read more rows than expected because we use == rather + // than >= to stop reading. + offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}} + pos := []int64{2, 5, 8, 11} + + regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo) + require.NoError(t, err) + require.Len(t, regions, len(offsets)) + for i := range offsets { + require.Equal(t, offsets[i][0], regions[i].Chunk.Offset) + require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset) + } + + file, err := os.Open(filePath) + require.NoError(t, err) + parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil) + require.NoError(t, err) + + for parser.ReadRow() == nil { + p, _ := parser.Pos() + require.Equal(t, pos[0], p) + pos = pos[1:] + } + require.NoError(t, parser.Close()) + + // set terminator to "\r\n" + + cfg.Mydumper.CSV.Terminator = "\r\n" + divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta) + // pos is contained in expectedOffsets + expectedOffsets := [][]int64{{0, 6}, {6, 12}} + pos = []int64{3, 6, 9, 12} + + regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo) + require.NoError(t, err) + require.Len(t, regions, len(expectedOffsets)) + for i := range expectedOffsets { + require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset) + require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset) + } + + file, err = os.Open(filePath) + require.NoError(t, err) + parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil) + require.NoError(t, err) + + for parser.ReadRow() == nil { + p, _ := parser.Pos() + require.Equal(t, pos[0], p) + pos = pos[1:] + } + require.NoError(t, parser.Close()) +} diff --git a/br/tests/lightning_column_permutation/config.toml b/br/tests/lightning_column_permutation/config.toml index 7bbff7f805a46..845873d84c8b9 100644 --- a/br/tests/lightning_column_permutation/config.toml +++ b/br/tests/lightning_column_permutation/config.toml @@ -1,3 +1,6 @@ [mydumper] strict-format = true max-region-size = 200 + +[mydumper.csv] +terminator = "\n" diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 2dd931e828658..6c2e01933b1b8 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -712,6 +712,9 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load } p.adjustOptions() + if p.SplitFile && len(p.LinesTerminatedBy) == 0 { + return exeerrors.ErrInvalidOptionVal.FastGenByArgs("lines_terminated_by, should not be empty when use split_file") + } return nil } diff --git a/tests/realtikvtest/importintotest4/split_file_test.go b/tests/realtikvtest/importintotest4/split_file_test.go index 6d8a5bf7e4665..1e88ba562dea8 100644 --- a/tests/realtikvtest/importintotest4/split_file_test.go +++ b/tests/realtikvtest/importintotest4/split_file_test.go @@ -53,7 +53,7 @@ func (s *mockGCSSuite) TestSplitFile() { }) // split into 3 engines(subtasks) importSQL := fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s' - with split_file, __max_engine_size = '1'`, gcsEndpoint) + with split_file, lines_terminated_by = '\n', __max_engine_size = '1'`, gcsEndpoint) result := s.tk.MustQuery(importSQL).Rows() s.Len(result, 1) jobID, err := strconv.Atoi(result[0][0].(string)) @@ -73,7 +73,18 @@ func (s *mockGCSSuite) TestSplitFile() { // skip 1 row s.tk.MustExec("truncate table t") importSQL = fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s' - with split_file, skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint) + with split_file, lines_terminated_by = '\n', skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint) s.tk.MustQuery(importSQL) s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData[1:]...)) + + s.tk.MustExec("create table t2 (a int primary key nonclustered, b varchar(100));") + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "split-file", Name: "2.csv"}, + Content: []byte("1,2\r\n3,4\r\n5,6\r\n7,8\r\n9,10\r\n"), + }) + config.MaxRegionSize = 9 + importSQL = fmt.Sprintf(`import into split_file.t2 FROM 'gs://split-file/2.csv?endpoint=%s' + with split_file, lines_terminated_by='\r\n'`, gcsEndpoint) + s.tk.MustQuery(importSQL) + s.tk.MustExec("admin check table t2") }