Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: must set line terminator when use strict-format (#53444) #57071

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ go_test(
"configlist_test.go",
],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
":config",
"//br/pkg/lightning/common",
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,9 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if err := cfg.CheckAndAdjustTiDBPort(ctx, mustHaveInternalConnections); err != nil {
return err
}
cfg.AdjustMydumper()
if err := cfg.AdjustMydumper(); err != nil {
return err
}
cfg.AdjustCheckPoint()
return cfg.CheckAndAdjustFilePath()
}
Expand Down Expand Up @@ -1435,7 +1437,11 @@ func (cfg *Config) AdjustCheckPoint() {
}

// AdjustMydumper adjusts the mydumper config.
func (cfg *Config) AdjustMydumper() {
func (cfg *Config) AdjustMydumper() error {
if cfg.Mydumper.StrictFormat && len(cfg.Mydumper.CSV.Terminator) == 0 {
return common.ErrInvalidConfig.GenWithStack("" +
`mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`)
}
if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 {
cfg.Mydumper.BatchImportRatio = DefaultBatchImportRatio
}
Expand All @@ -1456,6 +1462,7 @@ func (cfg *Config) AdjustMydumper() {
ig.Columns = cols
}
}
return nil
}

// CheckAndAdjustSecurity checks and adjusts the security config.
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr)
}

func TestStrictFormat(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()

cfg := config.NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.StrictFormat = true

err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator")
t.Log(err.Error())

cfg.Mydumper.CSV.Terminator = "\r\n"
err = cfg.Adjust(context.Background())
require.NoError(t, err)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type field struct {
quoted bool
}

// NewCSVParser creates a CSV parser.
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,15 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.ReadColumns(); err != nil {
_ = parser.Close()
return nil, nil, err
}
if cfg.CSV.HeaderSchemaMatch {
Expand All @@ -459,6 +461,7 @@ func SplitLargeCSV(
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
_ = parser.Close()
}
divisor := int64(cfg.ColumnCnt)
for {
Expand All @@ -472,18 +475,21 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.SetPos(endOffset, 0); err != nil {
_ = r.Close()
return nil, nil, err
}
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
_ = r.Close()
return nil, nil, err
}
log.FromContext(ctx).Warn("file contains no terminator at end",
Expand All @@ -492,7 +498,7 @@ func SplitLargeCSV(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
parser.Close()
_ = parser.Close()
}
regions = append(regions,
&TableRegion{
Expand Down
103 changes: 103 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,106 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
require.Equal(t, columns, regions[i].Chunk.Columns)
}
}

func TestSplitLargeFileSeekInsideCRLF(t *testing.T) {
ctx := context.Background()
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_seek_inside_crlf",
}

dir := t.TempDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("1\r\n2\r\n3\r\n4\r\n")
err := os.WriteFile(filePath, content, 0o644)
require.NoError(t, err)

dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

// if we don't set terminator, it will get the wrong result

cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 2,
},
}

// in fact this is the wrong result, just to show the bug. pos mismatch with
// offsets. and we might read more rows than expected because we use == rather
// than >= to stop reading.
offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}}
pos := []int64{2, 5, 8, 11}

divideConfig := NewDataDivideConfig(cfg, 2, ioWorker, store, meta)

regions, _, err := SplitLargeCSV(
context.Background(),
divideConfig,
fileInfo,
)
require.NoError(t, err)
require.Len(t, regions, len(offsets))
for i := range offsets {
require.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
}

file, err := os.Open(filePath)
require.NoError(t, err)
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())

// set terminator to "\r\n"

cfg.Mydumper.CSV.Terminator = "\r\n"
// pos is contained in expectedOffsets
expectedOffsets := [][]int64{{0, 6}, {6, 12}}
pos = []int64{3, 6, 9, 12}

regions, _, err = SplitLargeCSV(
context.Background(),
divideConfig,
fileInfo,
)
require.NoError(t, err)
require.Len(t, regions, len(expectedOffsets))
for i := range expectedOffsets {
require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset)
}

file, err = os.Open(filePath)
require.NoError(t, err)
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())
}
3 changes: 3 additions & 0 deletions br/tests/lightning_column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[mydumper]
strict-format = true
max-region-size = 200

[mydumper.csv]
terminator = "\n"
50 changes: 35 additions & 15 deletions br/web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions tools/check/check-bazel-prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ make bazel_prepare
after_checksum=`find . -type f \( -name *.bazel -o -name *.bzl \) -exec md5sum {} \;| sort -k 2`
if [ "$before_checksum" != "$after_checksum" ]
then
echo "Please run \`make bazel_prepare\` to update \`.bazel\` files"
diff <(echo "$before_checksum") <(echo "$after_checksum")
echo "Please run \`make bazel_prepare\` to update \`.bazel\` files, or just apply the following git diff (run \`git apply -\` and paste following contents with ctrl-D as ending):"
git diff
echo -e "\n\nChecksum diff:"
diff <(echo "$before_checksum") <(echo "$after_checksum") || true
exit 1
fi