Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: ease the impact of stats feedback on cluster (#15503) #18770

Merged
merged 2 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ var defaultConf = Config{
RunAutoAnalyze: true,
StmtCountLimit: 5000,
FeedbackProbability: 0.05,
QueryFeedbackLimit: 1024,
QueryFeedbackLimit: 512,
PseudoEstimateRatio: 0.8,
ForcePriority: "NO_PRIORITY",
BindInfoLease: "3s",
Expand Down
2 changes: 1 addition & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ run-auto-analyze = true
feedback-probability = 0.05

# The max number of query feedback that cache in memory.
query-feedback-limit = 1024
query-feedback-limit = 512

# Pseudo stats will be used if the ratio between the modify count and
# row count in statistics of a table is greater than it.
Expand Down
3 changes: 3 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,13 @@ func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) {
// Test analyze incremental with feedback.
tk.MustExec("insert into t values (3,3)")
oriProbability := statistics.FeedbackProbability.Load()
oriMinLogCount := handle.MinLogScanCount
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
handle.MinLogScanCount = oriMinLogCount
}()
statistics.FeedbackProbability.Store(1)
handle.MinLogScanCount = 0
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
Expand Down
133 changes: 102 additions & 31 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,64 @@ func NewQueryFeedback(physicalID int64, hist *Histogram, expected int64, desc bo
}
}

// QueryFeedbackKey is the key for a group of feedbacks on the same index/column.
type QueryFeedbackKey struct {
PhysicalID int64
HistID int64
Tp int
}

// QueryFeedbackMap is the collection of feedbacks.
type QueryFeedbackMap struct {
Size int
Feedbacks map[QueryFeedbackKey][]*QueryFeedback
}

// NewQueryFeedbackMap builds a feedback collection.
func NewQueryFeedbackMap() *QueryFeedbackMap {
return &QueryFeedbackMap{Feedbacks: make(map[QueryFeedbackKey][]*QueryFeedback)}
}

// Append adds a feedback into map.
func (m *QueryFeedbackMap) Append(q *QueryFeedback) {
k := QueryFeedbackKey{
PhysicalID: q.PhysicalID,
HistID: q.Hist.ID,
Tp: q.Tp,
}
m.append(k, []*QueryFeedback{q})
return
}

// MaxQueryFeedbackCount is the max number of feedbacks that are cached in memory.
var MaxQueryFeedbackCount = atomic.NewInt64(1 << 9)

func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool {
remained := MaxQueryFeedbackCount.Load() - int64(m.Size)
if remained <= 0 {
return false
}
s, ok := m.Feedbacks[k]
if !ok || s == nil {
s = make([]*QueryFeedback, 0, 8)
}
l := mathutil.MinInt64(int64(len(qs)), remained)
s = append(s, qs[:l]...)
m.Feedbacks[k] = s
m.Size = m.Size + int(l)
return true
}

// Merge combines 2 collections of feedbacks.
func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) {
for k, qs := range r.Feedbacks {
if !m.append(k, qs) {
break
}
}
return
}

var (
// MaxNumberOfRanges is the max number of ranges before split to collect feedback.
MaxNumberOfRanges = 20
Expand Down Expand Up @@ -202,7 +260,7 @@ func (q *QueryFeedback) Actual() int64 {
// Update updates the query feedback. `startKey` is the start scan key of the partial result, used to find
// the range for update. `counts` is the scan counts of each range, used to update the feedback count info.
func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) {
// Older version do not have the counts info.
// Older versions do not have the counts info.
if len(counts) == 0 {
q.Invalidate()
return
Expand Down Expand Up @@ -248,6 +306,43 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) {
return
}

// NonOverlappedFeedbacks extracts a set of feedbacks which are not overlapped with each other.
func NonOverlappedFeedbacks(sc *stmtctx.StatementContext, fbs []Feedback) ([]Feedback, bool) {
// Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped
// with the previous chosen feedbacks.
var existsErr bool
sort.Slice(fbs, func(i, j int) bool {
res, err := fbs[i].Upper.CompareDatum(sc, fbs[j].Upper)
if err != nil {
existsErr = true
}
if existsErr || res != 0 {
return res < 0
}
res, err = fbs[i].Lower.CompareDatum(sc, fbs[j].Lower)
if err != nil {
existsErr = true
}
return res < 0
})
if existsErr {
return fbs, false
}
resFBs := make([]Feedback, 0, len(fbs))
previousEnd := &types.Datum{}
for _, fb := range fbs {
res, err := previousEnd.CompareDatum(sc, fb.Lower)
if err != nil {
return fbs, false
}
if res <= 0 {
resFBs = append(resFBs, fb)
previousEnd = fb.Upper
}
}
return resFBs, true
}

// BucketFeedback stands for all the feedback for a bucket.
type BucketFeedback struct {
feedback []Feedback // All the feedback info in the same bucket.
Expand Down Expand Up @@ -482,39 +577,15 @@ func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContex
if len(feedbacks) == 0 {
return 0, 0, false
}
// Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped
// with the previous chosen feedbacks.
var existsErr bool
sort.Slice(feedbacks, func(i, j int) bool {
res, err := feedbacks[i].Upper.CompareDatum(sc, feedbacks[j].Upper)
if err != nil {
existsErr = true
}
if existsErr || res != 0 {
return res < 0
}
res, err = feedbacks[i].Lower.CompareDatum(sc, feedbacks[j].Lower)
if err != nil {
existsErr = true
}
return res < 0
})
if existsErr {
sortedFBs, ok := NonOverlappedFeedbacks(sc, feedbacks)
if !ok {
return 0, 0, false
}
previousEnd := &types.Datum{}
var sumFraction, sumCount float64
for _, fb := range feedbacks {
res, err := previousEnd.CompareDatum(sc, fb.Lower)
if err != nil {
return 0, 0, false
}
if res <= 0 {
fraction, _ := getOverlapFraction(fb, bkt)
sumFraction += fraction
sumCount += float64(fb.Count)
previousEnd = fb.Upper
}
for _, fb := range sortedFBs {
fraction, _ := getOverlapFraction(fb, bkt)
sumFraction += fraction
sumCount += float64(fb.Count)
}
return sumFraction, sumCount, true
}
Expand Down
19 changes: 8 additions & 11 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type Handle struct {
schemaVersion int64
}

// It can be read by multiply readers at the same time without acquire lock, but it can be
// written only after acquire the lock.
// It can be read by multiple readers at the same time without acquiring lock, but it can be
// written only after acquiring the lock.
statsCache struct {
sync.Mutex
atomic.Value
Expand All @@ -78,7 +78,7 @@ type Handle struct {
// globalMap contains all the delta map from collectors when we dump them to KV.
globalMap tableDeltaMap
// feedback is used to store query feedback info.
feedback []*statistics.QueryFeedback
feedback *statistics.QueryFeedbackMap

lease atomic2.Duration
}
Expand All @@ -90,7 +90,7 @@ func (h *Handle) Clear() {
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
h.feedback = h.feedback[:0]
h.feedback = statistics.NewQueryFeedbackMap()
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().ProjectionConcurrency = 0
Expand All @@ -100,16 +100,13 @@ func (h *Handle) Clear() {
h.mu.Unlock()
}

// MaxQueryFeedbackCount is the max number of feedback that cache in memory.
var MaxQueryFeedbackCount = atomic2.NewInt64(1 << 10)

// NewHandle creates a Handle for update stats.
func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
feedback: make([]*statistics.QueryFeedback, 0, MaxQueryFeedbackCount.Load()),
feedback: statistics.NewQueryFeedbackMap(),
}
handle.lease.Store(lease)
// It is safe to use it concurrently because the exec won't touch the ctx.
Expand All @@ -132,10 +129,10 @@ func (h *Handle) SetLease(lease time.Duration) {
h.lease.Store(lease)
}

// GetQueryFeedback gets the query feedback. It is only use in test.
func (h *Handle) GetQueryFeedback() []*statistics.QueryFeedback {
// GetQueryFeedback gets the query feedback. It is only used in test.
func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap {
defer func() {
h.feedback = h.feedback[:0]
h.feedback = statistics.NewQueryFeedbackMap()
}()
return h.feedback
}
Expand Down
Loading