Skip to content

Commit

Permalink
config: must set line terminator when use strict-format (#53444) (#54939
Browse files Browse the repository at this point in the history
)

close #37338
  • Loading branch information
ti-chi-bot authored Jul 29, 2024
1 parent b416986 commit 6a33993
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lightning/pkg/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func (cr *chunkProcessor) deliverLoop(
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
deliverLogger.Error("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}
Expand Down
3 changes: 3 additions & 0 deletions lightning/tests/lightning_column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[mydumper]
strict-format = true
max-region-size = 200

[mydumper.csv]
terminator = "\n"
4 changes: 4 additions & 0 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled")
}

if p.SplitFile && len(p.LinesTerminatedBy) == 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("lines_terminated_by, should not be empty when use split_file")
}

p.adjustOptions(targetNodeCPUCnt)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
],
embed = [":config"],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 5 additions & 0 deletions pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,11 @@ func (m *MydumperRuntime) adjust() error {
if err := m.CSV.adjust(); err != nil {
return err
}
if m.StrictFormat && len(m.CSV.Terminator) == 0 {
return common.ErrInvalidConfig.GenWithStack(
`mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`)
}

for _, rule := range m.FileRouters {
if filepath.IsAbs(rule.Path) {
relPath, err := filepath.Rel(m.SourceDir, rule.Path)
Expand Down
24 changes: 24 additions & 0 deletions pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234,56.78.90.12:3456", cfg.TiDB.PdAddr)
}

func TestStrictFormat(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()

cfg := NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.Backend = BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.StrictFormat = true

err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator")
t.Log(err.Error())

cfg.Mydumper.CSV.Terminator = "\r\n"
err = cfg.Adjust(context.Background())
require.NoError(t, err)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ type field struct {
quoted bool
}

// NewCSVParser creates a CSV parser.
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
Expand Down
8 changes: 7 additions & 1 deletion pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,15 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.ReadColumns(); err != nil {
_ = parser.Close()
return nil, nil, err
}
if cfg.CSV.HeaderSchemaMatch {
Expand All @@ -433,6 +435,7 @@ func SplitLargeCSV(
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
_ = parser.Close()
}
divisor := int64(cfg.ColumnCnt)
for {
Expand All @@ -446,18 +449,21 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.SetPos(endOffset, 0); err != nil {
_ = parser.Close()
return nil, nil, err
}
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
_ = parser.Close()
return nil, nil, err
}
log.FromContext(ctx).Warn("file contains no terminator at end",
Expand All @@ -466,7 +472,7 @@ func SplitLargeCSV(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
parser.Close()
_ = parser.Close()
}
regions = append(regions,
&TableRegion{
Expand Down
95 changes: 95 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
require.Equal(t, columns, regions[i].Chunk.Columns)
}
}

func TestSplitLargeFileSeekInsideCRLF(t *testing.T) {
ctx := context.Background()
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_seek_inside_crlf",
}

dir := t.TempDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("1\r\n2\r\n3\r\n4\r\n")
err := os.WriteFile(filePath, content, 0o644)
require.NoError(t, err)

dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

// if we don't set terminator, it will get the wrong result

cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 2,
},
}
divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta)

// in fact this is the wrong result, just to show the bug. pos mismatch with
// offsets. and we might read more rows than expected because we use == rather
// than >= to stop reading.
offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}}
pos := []int64{2, 5, 8, 11}

regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(offsets))
for i := range offsets {
require.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
}

file, err := os.Open(filePath)
require.NoError(t, err)
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())

// set terminator to "\r\n"

cfg.Mydumper.CSV.Terminator = "\r\n"
divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta)
// pos is contained in expectedOffsets
expectedOffsets := [][]int64{{0, 6}, {6, 12}}
pos = []int64{3, 6, 9, 12}

regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(expectedOffsets))
for i := range expectedOffsets {
require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset)
}

file, err = os.Open(filePath)
require.NoError(t, err)
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())
}
2 changes: 2 additions & 0 deletions tests/integrationtest/r/executor/import_into.result
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ import into t from '/file.csv' with skip_rows=true;
Error 8164 (HY000): Invalid option value for skip_rows
import into t from '/file.csv' with split_file='aa';
Error 8164 (HY000): Invalid option value for split_file
import into t from '/file.csv' with split_file;
Error 8164 (HY000): Invalid option value for lines_terminated_by, should not be empty when use split_file
import into t from '/file.csv' with split_file, skip_rows=2;
Error 8164 (HY000): Invalid option value for skip_rows, should be <= 1 when split-file is enabled
import into t from '/file.csv' with disk_quota='aa';
Expand Down
2 changes: 2 additions & 0 deletions tests/integrationtest/t/executor/import_into.test
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ import into t from '/file.csv' with skip_rows=true;
-- error 8164
import into t from '/file.csv' with split_file='aa';
-- error 8164
import into t from '/file.csv' with split_file;
-- error 8164
import into t from '/file.csv' with split_file, skip_rows=2;
-- error 8164
import into t from '/file.csv' with disk_quota='aa';
Expand Down
15 changes: 13 additions & 2 deletions tests/realtikvtest/importintotest4/split_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *mockGCSSuite) TestSplitFile() {
})
// split into 3 engines(subtasks)
importSQL := fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s'
with split_file, __max_engine_size = '1'`, gcsEndpoint)
with split_file, lines_terminated_by = '\n', __max_engine_size = '1'`, gcsEndpoint)
result := s.tk.MustQuery(importSQL).Rows()
s.Len(result, 1)
jobID, err := strconv.Atoi(result[0][0].(string))
Expand All @@ -74,7 +74,18 @@ func (s *mockGCSSuite) TestSplitFile() {
// skip 1 row
s.tk.MustExec("truncate table t")
importSQL = fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s'
with split_file, skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint)
with split_file, lines_terminated_by = '\n', skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint)
s.tk.MustQuery(importSQL)
s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData[1:]...))

s.tk.MustExec("create table t2 (a int primary key nonclustered, b varchar(100));")
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "split-file", Name: "2.csv"},
Content: []byte("1,2\r\n3,4\r\n5,6\r\n7,8\r\n9,10\r\n"),
})
config.MaxRegionSize = 9
importSQL = fmt.Sprintf(`import into split_file.t2 FROM 'gs://split-file/2.csv?endpoint=%s'
with split_file, lines_terminated_by='\r\n'`, gcsEndpoint)
s.tk.MustQuery(importSQL)
s.tk.MustExec("admin check table t2")
}

0 comments on commit 6a33993

Please sign in to comment.