Skip to content

Commit

Permalink
This is an automated cherry-pick of #53444
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Oct 28, 2024
1 parent cc04dd7 commit 32da5e1
Show file tree
Hide file tree
Showing 12 changed files with 2,104 additions and 3 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ go_test(
"configlist_test.go",
],
flaky = True,
<<<<<<< HEAD:br/pkg/lightning/config/BUILD.bazel
shard_count = 45,
=======
shard_count = 49,
>>>>>>> 9164182d0b2 (config: must set line terminator when use strict-format (#53444)):pkg/lightning/config/BUILD.bazel
deps = [
":config",
"//br/pkg/lightning/common",
Expand Down
123 changes: 123 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,129 @@ type MydumperRuntime struct {
DataInvalidCharReplace string `toml:"data-invalid-char-replace" json:"data-invalid-char-replace"`
}

<<<<<<< HEAD:br/pkg/lightning/config/config.go
=======
func (m *MydumperRuntime) adjust() error {
if err := m.CSV.adjust(); err != nil {
return err
}
if m.StrictFormat && len(m.CSV.Terminator) == 0 {
return common.ErrInvalidConfig.GenWithStack(
`mydumper.strict-format can not be used with empty mydumper.csv.terminator. Please set mydumper.csv.terminator to a non-empty value like "\r\n"`)
}

for _, rule := range m.FileRouters {
if filepath.IsAbs(rule.Path) {
relPath, err := filepath.Rel(m.SourceDir, rule.Path)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).
GenWithStack("cannot find relative path for file route path %s", rule.Path)
}
// ".." means that this path is not in source dir, so we should return an error
if strings.HasPrefix(relPath, "..") {
return common.ErrInvalidConfig.GenWithStack(
"file route path '%s' is not in source dir '%s'", rule.Path, m.SourceDir)
}
rule.Path = relPath
}
}

// enable default file route rule if no rules are set
if len(m.FileRouters) == 0 {
m.DefaultFileRules = true
}

if len(m.DataCharacterSet) == 0 {
m.DataCharacterSet = defaultCSVDataCharacterSet
}
charset, err1 := ParseCharset(m.DataCharacterSet)
if err1 != nil {
return common.ErrInvalidConfig.Wrap(err1).GenWithStack("invalid `mydumper.data-character-set`")
}
if charset == GBK || charset == GB18030 {
log.L().Warn(
"incompatible strings may be encountered during the transcoding process and will be replaced, please be aware of the risk of not being able to retain the original information",
zap.String("source-character-set", charset.String()),
zap.ByteString("invalid-char-replacement", []byte(m.DataInvalidCharReplace)))
}
if m.BatchImportRatio < 0.0 || m.BatchImportRatio >= 1.0 {
m.BatchImportRatio = DefaultBatchImportRatio
}
if m.ReadBlockSize <= 0 {
m.ReadBlockSize = ReadBlockSize
}
if len(m.CharacterSet) == 0 {
m.CharacterSet = "auto"
}

if len(m.IgnoreColumns) != 0 {
// Tolower columns cause we use Name.L to compare column in tidb.
for _, ig := range m.IgnoreColumns {
cols := make([]string, len(ig.Columns))
for i, col := range ig.Columns {
cols[i] = strings.ToLower(col)
}
ig.Columns = cols
}
}
return m.adjustFilePath()
}

// adjustFilePath checks and adjusts the file path.
func (m *MydumperRuntime) adjustFilePath() error {
var u *url.URL

// An absolute Windows path like "C:\Users\XYZ" would be interpreted as
// an URL with scheme "C" and opaque data "\Users\XYZ".
// Therefore, we only perform URL parsing if we are sure the path is not
// an absolute Windows path.
// Here we use the `filepath.VolumeName` which can identify the "C:" part
// out of the path. On Linux this method always return an empty string.
// On Windows, the drive letter can only be single letters from "A:" to "Z:",
// so this won't mistake "S3:" as a Windows path.
if len(filepath.VolumeName(m.SourceDir)) == 0 {
var err error
u, err = url.Parse(m.SourceDir)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).GenWithStack("cannot parse `mydumper.data-source-dir` %s", m.SourceDir)
}
} else {
u = &url.URL{}
}

// convert path and relative path to a valid file url
if u.Scheme == "" {
if m.SourceDir == "" {
return common.ErrInvalidConfig.GenWithStack("`mydumper.data-source-dir` is not set")
}
if !common.IsDirExists(m.SourceDir) {
return common.ErrInvalidConfig.GenWithStack("'%s': `mydumper.data-source-dir` does not exist", m.SourceDir)
}
absPath, err := filepath.Abs(m.SourceDir)
if err != nil {
return common.ErrInvalidConfig.Wrap(err).GenWithStack("covert data-source-dir '%s' to absolute path failed", m.SourceDir)
}
u.Path = filepath.ToSlash(absPath)
u.Scheme = "file"
m.SourceDir = u.String()
}

found := false
for _, t := range supportedStorageTypes {
if u.Scheme == t {
found = true
break
}
}
if !found {
return common.ErrInvalidConfig.GenWithStack(
"unsupported data-source-dir url '%s', supported storage types are %s",
m.SourceDir, strings.Join(supportedStorageTypes, ","))
}
return nil
}

>>>>>>> 9164182d0b2 (config: must set line terminator when use strict-format (#53444)):pkg/lightning/config/config.go
// AllIgnoreColumns is a slice of IgnoreColumns.
type AllIgnoreColumns []*IgnoreColumns

Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ func TestAdjustPdAddrAndPort(t *testing.T) {
require.Equal(t, "123.45.67.89:1234", cfg.TiDB.PdAddr)
}

func TestStrictFormat(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
)
defer ts.Close()

cfg := NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.Backend = BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.StrictFormat = true

err := cfg.Adjust(context.Background())
require.ErrorContains(t, err, "mydumper.strict-format can not be used with empty mydumper.csv.terminator")
t.Log(err.Error())

cfg.Mydumper.CSV.Terminator = "\r\n"
err = cfg.Adjust(context.Background())
require.NoError(t, err)
}

func TestPausePDSchedulerScope(t *testing.T) {
ts, host, port := startMockServer(t, http.StatusOK,
`{"port":4444,"advertise-address":"","path":"123.45.67.89:1234,56.78.90.12:3456"}`,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (cr *chunkProcessor) deliverLoop(
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
deliverLogger.Error("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type field struct {
quoted bool
}

// NewCSVParser creates a CSV parser.
// NewCSVParser creates a CSV parser. The ownership of the reader is transferred
// to the parser.
func NewCSVParser(
ctx context.Context,
cfg *config.CSVConfig,
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,15 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, true, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.ReadColumns(); err != nil {
_ = parser.Close()
return nil, nil, err
}
if cfg.CSV.HeaderSchemaMatch {
Expand All @@ -459,6 +461,7 @@ func SplitLargeCSV(
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
_ = parser.Close()
}
divisor := int64(cfg.ColumnCnt)
for {
Expand All @@ -472,18 +475,21 @@ func SplitLargeCSV(
// Create a utf8mb4 convertor to encode and decode data with the charset of CSV files.
charsetConvertor, err := NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
if err != nil {
_ = r.Close()
return nil, nil, err
}
parser, err := NewCSVParser(ctx, &cfg.CSV, r, cfg.ReadBlockSize, cfg.IOWorkers, false, charsetConvertor)
if err != nil {
return nil, nil, err
}
if err = parser.SetPos(endOffset, 0); err != nil {
_ = parser.Close()
return nil, nil, err
}
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
_ = parser.Close()
return nil, nil, err
}
log.FromContext(ctx).Warn("file contains no terminator at end",
Expand All @@ -492,7 +498,7 @@ func SplitLargeCSV(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
parser.Close()
_ = parser.Close()
}
regions = append(regions,
&TableRegion{
Expand Down
95 changes: 95 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,98 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
require.Equal(t, columns, regions[i].Chunk.Columns)
}
}

func TestSplitLargeFileSeekInsideCRLF(t *testing.T) {
ctx := context.Background()
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_seek_inside_crlf",
}

dir := t.TempDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("1\r\n2\r\n3\r\n4\r\n")
err := os.WriteFile(filePath, content, 0o644)
require.NoError(t, err)

dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

// if we don't set terminator, it will get the wrong result

cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 2,
},
}
divideConfig := NewDataDivideConfig(cfg, 1, ioWorker, store, meta)

// in fact this is the wrong result, just to show the bug. pos mismatch with
// offsets. and we might read more rows than expected because we use == rather
// than >= to stop reading.
offsets := [][]int64{{0, 3}, {3, 6}, {6, 9}, {9, 12}}
pos := []int64{2, 5, 8, 11}

regions, _, err := SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(offsets))
for i := range offsets {
require.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
}

file, err := os.Open(filePath)
require.NoError(t, err)
parser, err := NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())

// set terminator to "\r\n"

cfg.Mydumper.CSV.Terminator = "\r\n"
divideConfig = NewDataDivideConfig(cfg, 1, ioWorker, store, meta)
// pos is contained in expectedOffsets
expectedOffsets := [][]int64{{0, 6}, {6, 12}}
pos = []int64{3, 6, 9, 12}

regions, _, err = SplitLargeCSV(context.Background(), divideConfig, fileInfo)
require.NoError(t, err)
require.Len(t, regions, len(expectedOffsets))
for i := range expectedOffsets {
require.Equal(t, expectedOffsets[i][0], regions[i].Chunk.Offset)
require.Equal(t, expectedOffsets[i][1], regions[i].Chunk.EndOffset)
}

file, err = os.Open(filePath)
require.NoError(t, err)
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, file, 128, ioWorker, false, nil)
require.NoError(t, err)

for parser.ReadRow() == nil {
p, _ := parser.Pos()
require.Equal(t, pos[0], p)
pos = pos[1:]
}
require.NoError(t, parser.Close())
}
3 changes: 3 additions & 0 deletions br/tests/lightning_column_permutation/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[mydumper]
strict-format = true
max-region-size = 200

[mydumper.csv]
terminator = "\n"
Loading

0 comments on commit 32da5e1

Please sign in to comment.