Skip to content

Commit

Permalink
lightning: sample once parquet file (#56205) (#57921)
Browse files Browse the repository at this point in the history
close #56104
  • Loading branch information
ti-chi-bot authored Dec 6, 2024
1 parent f8d5c54 commit e5fd514
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
13 changes: 1 addition & 12 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,19 +1770,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
allTasks = append(allTasks, task{tr: tr, cp: cp})

if len(cp.Engines) == 0 {
for i, fi := range tableMeta.DataFiles {
for _, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
if fi.FileMeta.Type == mydump.SourceTypeParquet {
numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta)
if err != nil {
return errors.Trace(err)
}
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows))
}
fi.FileMeta.Rows = numberRows
tableMeta.DataFiles[i] = fi
}
}
} else {
for _, eng := range cp.Engines {
Expand Down
41 changes: 31 additions & 10 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/storage"
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
Expand Down Expand Up @@ -201,6 +202,8 @@ type mdLoaderSetup struct {
dbIndexMap map[string]int
tableIndexMap map[filter.Table]int
setupCfg *MDLoaderSetupConfig

sampledParquetRowSizes map[string]float64
}

// NewMyDumpLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
Expand Down Expand Up @@ -277,6 +280,8 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config,
dbIndexMap: make(map[string]int),
tableIndexMap: make(map[filter.Table]int),
setupCfg: mdLoaderSetupCfg,

sampledParquetRowSizes: make(map[string]float64),
}

if err := setup.setup(ctx); err != nil {
Expand Down Expand Up @@ -489,12 +494,29 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
}
s.tableDatas = append(s.tableDatas, info)
case SourceTypeParquet:
parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("fail to sample parquet data size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2))
} else {
info.FileMeta.RealSize = parquestDataSize
tableName := info.TableName.String()
if s.sampledParquetRowSizes[tableName] == 0 {
s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore())
if err != nil {
logger.Error("fail to sample parquet row size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
}
if s.sampledParquetRowSizes[tableName] != 0 {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta)
if err != nil {
logger.Error("fail to get file total row count", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName])
info.FileMeta.Rows = totalRowCount
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount))
}
}
s.tableDatas = append(s.tableDatas, info)
}
Expand Down Expand Up @@ -780,8 +802,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store
return float64(tot) / float64(pos), nil
}

// SampleParquetDataSize samples the data size of the parquet file.
func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) {
// SampleParquetRowSize samples row size of the parquet file.
func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta)
if totalRowCount == 0 || err != nil {
return 0, err
Expand Down Expand Up @@ -820,6 +842,5 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s
break
}
}
size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize))
return size, nil
return float64(rowSize) / float64(rowCount), nil
}
8 changes: 6 additions & 2 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,12 +1159,16 @@ func testSampleParquetDataSize(t *testing.T, count int) {
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{
rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{
Path: fileName,
}, store)
require.NoError(t, err)
rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{
Path: fileName,
})
require.NoError(t, err)
// expected error within 10%, so delta = totalRowSize / 10
require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10)
require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10)
}
func TestSampleParquetDataSize(t *testing.T) {
t.Run("count=1000", func(t *testing.T) { testSampleParquetDataSize(t, 1000) })
Expand Down

0 comments on commit e5fd514

Please sign in to comment.