Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: must set line terminator when use strict-format (#53444) #54939

Merged
Merged
2 changes: 1 addition & 1 deletion lightning/pkg/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func (cr *chunkProcessor) deliverLoop(
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
deliverLogger.Error("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}
Expand Down
3 changes: 3 additions & 0 deletions lightning/tests/lightning_column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[mydumper]
strict-format = true
max-region-size = 200

[mydumper.csv]
terminator = "\n"
4 changes: 4 additions & 0 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled")
}

if p.SplitFile && len(p.LinesTerminatedBy) == 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("lines_terminated_by, should not be empty when use split_file")
}

p.adjustOptions(targetNodeCPUCnt)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
],
embed = [":config"],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 5 additions & 0 deletions pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,11 @@ func (m *MydumperRuntime) adjust() error {
if err := m.CSV.adjust(); err != nil {
return err
}
if m.StrictFormat && len(m.CSV.Terminator) == 0 {
return common.ErrInvalidConfig.GenWithStack(
`mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`)
}

for _, rule := range m.FileRouters {
if filepath.IsAbs(rule.Path) {
relPath, err := filepath.Rel(m.SourceDir, rule.Path)
Expand Down
24 changes: 24 additions & 0 deletions pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234,56.78.90.12:3456", cfg.TiDB.PdAddr)
}

func TestStrictFormat(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()

cfg := NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.Backend = BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.StrictFormat = true

err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator")
t.Log(err.Error())

cfg.Mydumper.CSV.Terminator = "\r\n"
err = cfg.Adjust(context.Background())
require.NoError(t, err)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ type field struct {
quoted bool
}

// NewCSVParser creates a CSV parser.
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
Expand Down
8 changes: 7 additions & 1 deletion pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,15 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.ReadColumns(); err != nil {
_ = parser.Close()
return nil, nil, err
}
if cfg.CSV.HeaderSchemaMatch {
Expand All @@ -433,6 +435,7 @@ func SplitLargeCSV(
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
_ = parser.Close()
}
divisor := int64(cfg.ColumnCnt)
for {
Expand All @@ -446,18 +449,21 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.SetPos(endOffset, 0); err != nil {
_ = parser.Close()
return nil, nil, err
}
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
_ = parser.Close()
return nil, nil, err
}
log.FromContext(ctx).Warn("file contains no terminator at end",
Expand All @@ -466,7 +472,7 @@ func SplitLargeCSV(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
parser.Close()
_ = parser.Close()
}
regions = append(regions,
&TableRegion{
Expand Down
95 changes: 95 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
require.Equal(t, columns, regions[i].Chunk.Columns)
}
}

func TestSplitLargeFileSeekInsideCRLF(t *testing.T) {
ctx := context.Background()
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_seek_inside_crlf",
}

dir := t.TempDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("1\r\n2\r\n3\r\n4\r\n")
err := os.WriteFile(filePath, content, 0o644)
require.NoError(t, err)

dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

// if we don't set terminator, it will get the wrong result

cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 2,
},
}
divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta)

// in fact this is the wrong result, just to show the bug. pos mismatch with
// offsets. and we might read more rows than expected because we use == rather
// than >= to stop reading.
offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}}
pos := []int64{2, 5, 8, 11}

regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(offsets))
for i := range offsets {
require.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
}

file, err := os.Open(filePath)
require.NoError(t, err)
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())

// set terminator to "\r\n"

cfg.Mydumper.CSV.Terminator = "\r\n"
divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta)
// pos is contained in expectedOffsets
expectedOffsets := [][]int64{{0, 6}, {6, 12}}
pos = []int64{3, 6, 9, 12}

regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(expectedOffsets))
for i := range expectedOffsets {
require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset)
}

file, err = os.Open(filePath)
require.NoError(t, err)
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())
}
2 changes: 2 additions & 0 deletions tests/integrationtest/r/executor/import_into.result
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ import into t from '/file.csv' with skip_rows=true;
Error 8164 (HY000): Invalid option value for skip_rows
import into t from '/file.csv' with split_file='aa';
Error 8164 (HY000): Invalid option value for split_file
import into t from '/file.csv' with split_file;
Error 8164 (HY000): Invalid option value for lines_terminated_by, should not be empty when use split_file
import into t from '/file.csv' with split_file, skip_rows=2;
Error 8164 (HY000): Invalid option value for skip_rows, should be <= 1 when split-file is enabled
import into t from '/file.csv' with disk_quota='aa';
Expand Down
2 changes: 2 additions & 0 deletions tests/integrationtest/t/executor/import_into.test
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ import into t from '/file.csv' with skip_rows=true;
-- error 8164
import into t from '/file.csv' with split_file='aa';
-- error 8164
import into t from '/file.csv' with split_file;
-- error 8164
import into t from '/file.csv' with split_file, skip_rows=2;
-- error 8164
import into t from '/file.csv' with disk_quota='aa';
Expand Down
15 changes: 13 additions & 2 deletions tests/realtikvtest/importintotest4/split_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *mockGCSSuite) TestSplitFile() {
})
// split into 3 engines(subtasks)
importSQL := fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s'
with split_file, __max_engine_size = '1'`, gcsEndpoint)
with split_file, lines_terminated_by = '\n', __max_engine_size = '1'`, gcsEndpoint)
result := s.tk.MustQuery(importSQL).Rows()
s.Len(result, 1)
jobID, err := strconv.Atoi(result[0][0].(string))
Expand All @@ -74,7 +74,18 @@ func (s *mockGCSSuite) TestSplitFile() {
// skip 1 row
s.tk.MustExec("truncate table t")
importSQL = fmt.Sprintf(`import into split_file.t FROM 'gs://split-file/1.csv?endpoint=%s'
with split_file, skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint)
with split_file, lines_terminated_by = '\n', skip_rows = 1, __max_engine_size = '1'`, gcsEndpoint)
s.tk.MustQuery(importSQL)
s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData[1:]...))

s.tk.MustExec("create table t2 (a int primary key nonclustered, b varchar(100));")
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "split-file", Name: "2.csv"},
Content: []byte("1,2\r\n3,4\r\n5,6\r\n7,8\r\n9,10\r\n"),
})
config.MaxRegionSize = 9
importSQL = fmt.Sprintf(`import into split_file.t2 FROM 'gs://split-file/2.csv?endpoint=%s'
with split_file, lines_terminated_by='\r\n'`, gcsEndpoint)
s.tk.MustQuery(importSQL)
s.tk.MustExec("admin check table t2")
}