Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: must set line terminator when use strict-format (#53444) #56880

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ go_test(
"configlist_test.go",
],
flaky = True,
<<<<<<< HEAD:br/pkg/lightning/config/BUILD.bazel
shard_count = 45,
=======
shard_count = 49,
>>>>>>> 9164182d0b2 (config: must set line terminator when use strict-format (#53444)):pkg/lightning/config/BUILD.bazel
deps = [
":config",
"//br/pkg/lightning/common",
Expand Down
123 changes: 123 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,129 @@ type MydumperRuntime struct {
DataInvalidCharReplace string `toml:"data-invalid-char-replace" json:"data-invalid-char-replace"`
}

<<<<<<< HEAD:br/pkg/lightning/config/config.go
=======
func (m *MydumperRuntime) adjust() error {
if err := m.CSV.adjust(); err != nil {
return err
}
if m.StrictFormat && len(m.CSV.Terminator) == 0 {
return common.ErrInvalidConfig.GenWithStack(
`mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`)
}

for _, rule := range m.FileRouters {
if filepath.IsAbs(rule.Path) {
relPath, err := filepath.Rel(m.SourceDir, rule.Path)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).
GenWithStack("cannot find relative path for file route path %s", rule.Path)
}
// ".." means that this path is not in source dir, so we should return an error
if strings.HasPrefix(relPath, "..") {
return common.ErrInvalidConfig.GenWithStack(
"file route path '%s' is not in source dir '%s'", rule.Path, m.SourceDir)
}
rule.Path = relPath
}
}

// enable default file route rule if no rules are set
if len(m.FileRouters) == 0 {
m.DefaultFileRules = true
}

if len(m.DataCharacterSet) == 0 {
m.DataCharacterSet = defaultCSVDataCharacterSet
}
charset, err1 := ParseCharset(m.DataCharacterSet)
if err1 != nil {
return common.ErrInvalidConfig.Wrap(err1).GenWithStack("invalid `mydumper.data-character-set`")
}
if charset == GBK || charset == GB18030 {
log.L().Warn(
"incompatible strings may be encountered during the transcoding process and will be replaced, please be aware of the risk of not being able to retain the original information",
zap.String("source-character-set", charset.String()),
zap.ByteString("invalid-char-replacement", []byte(m.DataInvalidCharReplace)))
}
if m.BatchImportRatio < 0.0 || m.BatchImportRatio >= 1.0 {
m.BatchImportRatio = DefaultBatchImportRatio
}
if m.ReadBlockSize <= 0 {
m.ReadBlockSize = ReadBlockSize
}
if len(m.CharacterSet) == 0 {
m.CharacterSet = "auto"
}

if len(m.IgnoreColumns) != 0 {
// Tolower columns cause we use Name.L to compare column in tidb.
for _, ig := range m.IgnoreColumns {
cols := make([]string, len(ig.Columns))
for i, col := range ig.Columns {
cols[i] = strings.ToLower(col)
}
ig.Columns = cols
}
}
return m.adjustFilePath()
}

// adjustFilePath checks and adjusts the file path.
func (m *MydumperRuntime) adjustFilePath() error {
var u *url.URL

// An absolute Windows path like "C:\Users\XYZ" would be interpreted as
// an URL with scheme "C" and opaque data "\Users\XYZ".
// Therefore, we only perform URL parsing if we are sure the path is not
// an absolute Windows path.
// Here we use the `filepath.VolumeName` which can identify the "C:" part
// out of the path. On Linux this method always return an empty string.
// On Windows, the drive letter can only be single letters from "A:" to "Z:",
// so this won't mistake "S3:" as a Windows path.
if len(filepath.VolumeName(m.SourceDir)) == 0 {
var err error
u, err = url.Parse(m.SourceDir)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).GenWithStack("cannot parse `mydumper.data-source-dir` %s", m.SourceDir)
}
} else {
u = &url.URL{}
}

// convert path and relative path to a valid file url
if u.Scheme == "" {
if m.SourceDir == "" {
return common.ErrInvalidConfig.GenWithStack("`mydumper.data-source-dir` is not set")
}
if !common.IsDirExists(m.SourceDir) {
return common.ErrInvalidConfig.GenWithStack("'%s': `mydumper.data-source-dir` does not exist", m.SourceDir)
}
absPath, err := filepath.Abs(m.SourceDir)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).GenWithStack("covert data-source-dir '%s' to absolute path failed", m.SourceDir)
}
u.Path = filepath.ToSlash(absPath)
u.Scheme = "file"
m.SourceDir = u.String()
}

found := false
for _, t := range supportedStorageTypes {
if u.Scheme == t {
found = true
break
}
}
if !found {
return common.ErrInvalidConfig.GenWithStack(
"unsupported data-source-dir url '%s', supported storage types are %s",
m.SourceDir, strings.Join(supportedStorageTypes, ","))
}
return nil
}

>>>>>>> 9164182d0b2 (config: must set line terminator when use strict-format (#53444)):pkg/lightning/config/config.go
// AllIgnoreColumns is a slice of IgnoreColumns.
type AllIgnoreColumns []*IgnoreColumns

Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr)
}

func TestStrictFormat(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()

cfg := NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.Backend = BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.StrictFormat = true

err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator")
t.Log(err.Error())

cfg.Mydumper.CSV.Terminator = "\r\n"
err = cfg.Adjust(context.Background())
require.NoError(t, err)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (cr *chunkProcessor) deliverLoop(
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
deliverLogger.Error("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type field struct {
quoted bool
}

// NewCSVParser creates a CSV parser.
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,15 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.ReadColumns(); err != nil {
_ = parser.Close()
return nil, nil, err
}
if cfg.CSV.HeaderSchemaMatch {
Expand All @@ -459,6 +461,7 @@ func SplitLargeCSV(
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
_ = parser.Close()
}
divisor := int64(cfg.ColumnCnt)
for {
Expand All @@ -472,18 +475,21 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.SetPos(endOffset, 0); err != nil {
_ = parser.Close()
return nil, nil, err
}
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
_ = parser.Close()
return nil, nil, err
}
log.FromContext(ctx).Warn("file contains no terminator at end",
Expand All @@ -492,7 +498,7 @@ func SplitLargeCSV(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
parser.Close()
_ = parser.Close()
}
regions = append(regions,
&TableRegion{
Expand Down
95 changes: 95 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
require.Equal(t, columns, regions[i].Chunk.Columns)
}
}

func TestSplitLargeFileSeekInsideCRLF(t *testing.T) {
ctx := context.Background()
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_seek_inside_crlf",
}

dir := t.TempDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("1\r\n2\r\n3\r\n4\r\n")
err := os.WriteFile(filePath, content, 0o644)
require.NoError(t, err)

dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

// if we don't set terminator, it will get the wrong result

cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 2,
},
}
divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta)

// in fact this is the wrong result, just to show the bug. pos mismatch with
// offsets. and we might read more rows than expected because we use == rather
// than >= to stop reading.
offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}}
pos := []int64{2, 5, 8, 11}

regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(offsets))
for i := range offsets {
require.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
}

file, err := os.Open(filePath)
require.NoError(t, err)
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())

// set terminator to "\r\n"

cfg.Mydumper.CSV.Terminator = "\r\n"
divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta)
// pos is contained in expectedOffsets
expectedOffsets := [][]int64{{0, 6}, {6, 12}}
pos = []int64{3, 6, 9, 12}

regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(expectedOffsets))
for i := range expectedOffsets {
require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset)
}

file, err = os.Open(filePath)
require.NoError(t, err)
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())
}
3 changes: 3 additions & 0 deletions br/tests/lightning_column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[mydumper]
strict-format = true
max-region-size = 200

[mydumper.csv]
terminator = "\n"
Loading