Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: introduce param to skip CSV header parsing #41128

Merged
merged 7 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,15 @@ type PostRestore struct {

type CSVConfig struct {
// Separator, Delimiter and Terminator should all be in utf8mb4 encoding.
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
HeaderSchemaMatch bool `toml:"header-schema-match" json:"header-schema-match"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
Expand Down Expand Up @@ -743,13 +744,14 @@ func NewConfig() *Config {
Mydumper: MydumperRuntime{
ReadBlockSize: ReadBlockSize,
CSV: CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ func (parser *CSVParser) ReadColumns() error {
if err != nil {
return errors.Trace(err)
}
if !parser.cfg.HeaderSchemaMatch {
return nil
}
parser.columns = make([]string, 0, len(columns))
for _, colName := range columns {
colName, _, err = parser.unescapeString(colName)
Expand Down
84 changes: 78 additions & 6 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func TestSyntaxErrorCSV(t *testing.T) {

func TestTSV(t *testing.T) {
cfg := config.CSVConfig{
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
HeaderSchemaMatch: true,
}

parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f
Expand Down Expand Up @@ -577,6 +578,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) {
require.Nil(t, parser.Close())

cfg.Header = true
cfg.HeaderSchemaMatch = true
data = " \r\na,b,c\r\n0,,abc\r\n"
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -609,6 +611,7 @@ func TestEmpty(t *testing.T) {
// Try again with headers.

cfg.Header = true
cfg.HeaderSchemaMatch = true

parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1292,3 +1295,72 @@ func BenchmarkReadRowUsingEncodingCSV(b *testing.B) {
}
require.Equal(b, b.N, rowsCount)
}

func TestHeaderSchemaMatch(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
},
}

inputData := `id,val1,val2,val3
1,111,aaa,1.0
2,222,bbb,2.0
3,333,ccc,3.0
4,444,ddd,4.0`

parsedDataPart := [][]types.Datum{
{types.NewStringDatum("1"), types.NewStringDatum("111"), types.NewStringDatum("aaa"), types.NewStringDatum("1.0")},
{types.NewStringDatum("2"), types.NewStringDatum("222"), types.NewStringDatum("bbb"), types.NewStringDatum("2.0")},
{types.NewStringDatum("3"), types.NewStringDatum("333"), types.NewStringDatum("ccc"), types.NewStringDatum("3.0")},
{types.NewStringDatum("4"), types.NewStringDatum("444"), types.NewStringDatum("ddd"), types.NewStringDatum("4.0")},
}

type testCase struct {
Header bool
HeaderSchemaMatch bool
ExpectedData [][]types.Datum
ExpectedColumns []string
}

for _, tc := range []testCase{
{
Header: true,
HeaderSchemaMatch: true,
ExpectedData: parsedDataPart,
ExpectedColumns: []string{"id", "val1", "val2", "val3"},
},
{
Header: true,
HeaderSchemaMatch: false,
ExpectedData: parsedDataPart,
ExpectedColumns: nil,
},
{
Header: false,
HeaderSchemaMatch: true,
ExpectedData: append([][]types.Datum{
{types.NewStringDatum("id"), types.NewStringDatum("val1"), types.NewStringDatum("val2"), types.NewStringDatum("val3")},
}, parsedDataPart...),
ExpectedColumns: nil,
},
} {
comment := fmt.Sprintf("header = %v, header-schema-match = %v", tc.Header, tc.HeaderSchemaMatch)
cfg.CSV.Header = tc.Header
cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkers, tc.Header, charsetConvertor)
assert.NoError(t, err)
for i, row := range tc.ExpectedData {
comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch)
e := parser.ReadRow()
assert.NoErrorf(t, e, "row = %d, error = %s", i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, comment)
assert.Equal(t, tc.ExpectedColumns, parser.Columns(), comment)
}
}
4 changes: 3 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ func SplitLargeFile(
if err = parser.ReadColumns(); err != nil {
return 0, nil, nil, err
}
columns = parser.Columns()
if cfg.Mydumper.CSV.HeaderSchemaMatch {
columns = parser.Columns()
}
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
Expand Down
75 changes: 40 additions & 35 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ func TestMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -230,13 +231,14 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -284,13 +286,14 @@ func TestSplitLargeFile(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -342,13 +345,14 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -447,13 +451,14 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down
32 changes: 18 additions & 14 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() {

// set csv header to true, this will cause check columns fail
s.cfg.Mydumper.CSV.Header = true
s.cfg.Mydumper.CSV.HeaderSchemaMatch = true
s.cfg.Mydumper.StrictFormat = true
regionSize := s.cfg.Mydumper.MaxRegionSize
s.cfg.Mydumper.MaxRegionSize = 5
Expand Down Expand Up @@ -455,6 +456,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.MaxRegionSize = 40

cfg.Mydumper.CSV.Header = true
cfg.Mydumper.CSV.HeaderSchemaMatch = true
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

Expand Down Expand Up @@ -2135,13 +2137,14 @@ func (s *tableRestoreSuite) TestSchemaIsValid() {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: ca.ignoreColumns,
},
Expand Down Expand Up @@ -2170,13 +2173,14 @@ func (s *tableRestoreSuite) TestGBKEncodedSchemaIsValid() {
DataCharacterSet: "gb18030",
DataInvalidCharReplace: string(utf8.RuneError),
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: nil,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER PRIMARY KEY,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
aaa,bbb
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
9 changes: 9 additions & 0 deletions br/tests/lightning_config_skip_csv_header/err_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true
header-schema-match = true

[tikv-importer]
duplicate-resolution = 'remove'
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
9 changes: 9 additions & 0 deletions br/tests/lightning_config_skip_csv_header/normal_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true
header-schema-match = false

[tikv-importer]
duplicate-resolution = 'remove'
Loading