Skip to content

Commit

Permalink
Make datanode load statslog lazy if SkipBFStatsLog is true (#23779)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored May 5, 2023
1 parent d7f73b0 commit c72db04
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 10 deletions.
105 changes: 97 additions & 8 deletions internal/datanode/channel_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"errors"
"fmt"
"math"
"runtime"
"strings"
"sync"
"time"

"github.com/panjf2000/ants/v2"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
Expand Down Expand Up @@ -94,6 +98,8 @@ type Channel interface {
// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
forceToSync()

close()
}

// ChannelMeta contains channel meta and the latest segments infos of the channel.
Expand All @@ -111,13 +117,22 @@ type ChannelMeta struct {

metaService *metaService
chunkManager storage.ChunkManager
workerPool *concurrency.Pool

closed *atomic.Bool
}

var _ Channel = &ChannelMeta{}

func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)

pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0), ants.WithPreAlloc(false), ants.WithNonblocking(false))
if err != nil {
// shall no happen here
panic(err)
}

channel := ChannelMeta{
collectionID: collID,
collSchema: schema,
Expand All @@ -133,6 +148,8 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection

metaService: metaService,
chunkManager: cm,
workerPool: pool,
closed: atomic.NewBool(false),
}

return &channel
Expand Down Expand Up @@ -294,13 +311,70 @@ func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment {
}

func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
if Params.DataNodeCfg.SkipBFStatsLoad {
// mark segment lazy loading
s.setLoadingLazy(true)
c.submitLoadStatsTask(s, statsBinlogs, ts)
return nil
}
return c.initPKstats(ctx, s, statsBinlogs, ts)
}

func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) {
log := log.Ctx(context.TODO()).With(
zap.Int64("segmentID", s.segmentID),
zap.Int64("collectionID", s.collectionID),
)
if c.closed.Load() {
// stop retry and resubmit if channel meta closed
return
}
// do submitting in a goroutine in case of task pool is full
go func() {
c.workerPool.Submit(func() (any, error) {
stats, err := c.loadStats(context.Background(), s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
// TODO if not retryable, add rebuild statslog logic
log.Warn("failed to lazy load statslog for segment", zap.Error(err))
if c.retryableLoadError(err) {
log.Info("retry load statslog")
c.submitLoadStatsTask(s, statsBinlogs, ts)
}
return nil, err
}
// get segment lock here
// it's ok that segment is dropped here
c.segMu.Lock()
defer c.segMu.Unlock()
s.historyStats = append(s.historyStats, stats...)
s.setLoadingLazy(false)

log.Info("lazy loading segment statslog complete")

return nil, nil
})
}()
}

func (c *ChannelMeta) retryableLoadError(err error) bool {
switch {
case errors.Is(err, errBinlogCorrupted):
return false
case errors.Is(err, storage.ErrNoSuchKey):
return false
default:
return true
}
}

func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collectionID int64, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) ([]*storage.PkStatistics, error) {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", s.segmentID))
log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs)))
schema, err := c.getCollectionSchema(s.collectionID, ts)
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))
schema, err := c.getCollectionSchema(collectionID, ts)
if err != nil {
log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err))
return err
return nil, err
}

// get pkfield id
Expand All @@ -326,14 +400,14 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs
// no stats log to parse, initialize a new BF
if len(bloomFilterFiles) == 0 {
log.Warn("no stats files to load")
return nil
return nil, nil
}

// read historical PK filter
values, err := c.chunkManager.MultiRead(ctx, bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
return err
return nil, err
}
blobs := make([]*Blob, 0)
for i := 0; i < len(values); i++ {
Expand All @@ -343,19 +417,30 @@ func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs
stats, err := storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return err
return nil, WrapBinlogCorruptedErr(strings.Join(bloomFilterFiles, ","))
}
var size uint
result := make([]*storage.PkStatistics, 0, len(stats))
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
size += stat.BF.Cap()
s.historyStats = append(s.historyStats, pkStat)
result = append(result, pkStat)
}

log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size))
return result, nil
}

func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
stats, err := c.loadStats(ctx, s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
return err
}
s.historyStats = stats

return nil
}
Expand Down Expand Up @@ -817,3 +902,7 @@ func (c *ChannelMeta) getTotalMemorySize() int64 {
}
return res
}

func (c *ChannelMeta) close() {
c.closed.Store(true)
}
Loading

0 comments on commit c72db04

Please sign in to comment.