Skip to content

Commit

Permalink
lightning: sample once parquet file (#56205)
Browse files Browse the repository at this point in the history
close #56104
  • Loading branch information
zeminzhou authored Nov 18, 2024
1 parent 4a6bf46 commit 0a9a231
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
13 changes: 1 addition & 12 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,19 +1488,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
allTasks = append(allTasks, task{tr: tr, cp: cp})

if len(cp.Engines) == 0 {
for i, fi := range tableMeta.DataFiles {
for _, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
if fi.FileMeta.Type == mydump.SourceTypeParquet {
numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta)
if err != nil {
return errors.Trace(err)
}
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows))
}
fi.FileMeta.Rows = numberRows
tableMeta.DataFiles[i] = fi
}
}
} else {
for _, eng := range cp.Engines {
Expand Down
41 changes: 31 additions & 10 deletions pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"go.uber.org/zap"
Expand Down Expand Up @@ -244,6 +245,8 @@ type mdLoaderSetup struct {
dbIndexMap map[string]int
tableIndexMap map[filter.Table]int
setupCfg *MDLoaderSetupConfig

sampledParquetRowSizes map[string]float64
}

// NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
Expand Down Expand Up @@ -320,6 +323,8 @@ func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig,
dbIndexMap: make(map[string]int),
tableIndexMap: make(map[filter.Table]int),
setupCfg: mdLoaderSetupCfg,

sampledParquetRowSizes: make(map[string]float64),
}

if err := setup.setup(ctx); err != nil {
Expand Down Expand Up @@ -532,12 +537,29 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
}
s.tableDatas = append(s.tableDatas, info)
case SourceTypeParquet:
parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("fail to sample parquet data size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2))
} else {
info.FileMeta.RealSize = parquestDataSize
tableName := info.TableName.String()
if s.sampledParquetRowSizes[tableName] == 0 {
s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore())
if err != nil {
logger.Error("fail to sample parquet row size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
}
if s.sampledParquetRowSizes[tableName] != 0 {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta)
if err != nil {
logger.Error("fail to get file total row count", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName])
info.FileMeta.Rows = totalRowCount
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount))
}
}
s.tableDatas = append(s.tableDatas, info)
}
Expand Down Expand Up @@ -823,8 +845,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store
return float64(tot) / float64(pos), nil
}

// SampleParquetDataSize samples the data size of the parquet file.
func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) {
// SampleParquetRowSize samples row size of the parquet file.
func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta)
if totalRowCount == 0 || err != nil {
return 0, err
Expand Down Expand Up @@ -863,6 +885,5 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s
break
}
}
size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize))
return size, nil
return float64(rowSize) / float64(rowCount), nil
}
8 changes: 6 additions & 2 deletions pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,12 +1159,16 @@ func testSampleParquetDataSize(t *testing.T, count int) {
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{
rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{
Path: fileName,
}, store)
require.NoError(t, err)
rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{
Path: fileName,
})
require.NoError(t, err)
// expected error within 10%, so delta = totalRowSize / 10
require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10)
require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10)
}

func TestSampleParquetDataSize(t *testing.T) {
Expand Down

0 comments on commit 0a9a231

Please sign in to comment.