Skip to content

Commit

Permalink
statistics: move StatsUsage and TableDelta into new package (#47238)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
hawkingrei authored Sep 25, 2023
1 parent 29ba912 commit 48f6b35
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 129 deletions.
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
"//statistics",
"//statistics/handle/globalstats",
"//statistics/handle/internal",
"//statistics/handle/usage",
"//testkit",
"//testkit/testsetup",
"//types",
Expand Down
12 changes: 6 additions & 6 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ type Handle struct {
statsCache *cache.StatsCachePointer

// tableDelta contains all the delta map from collectors when we dump them to KV.
tableDelta *tableDelta
tableDelta *usage.TableDelta

// statsUsage contains all the column stats usage information from collectors when we dump them to KV.
statsUsage *statsUsage
statsUsage *usage.StatsUsage

// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad
Expand Down Expand Up @@ -146,8 +146,8 @@ func (h *Handle) Clear() {
<-h.ddlEventCh
}
h.listHead.ClearForTest()
h.tableDelta.reset()
h.statsUsage.reset()
h.tableDelta.Reset()
h.statsUsage.Reset()
}

type sessionPool interface {
Expand Down Expand Up @@ -176,8 +176,8 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool ses
return nil, err
}
handle.statsCache = statsCache
handle.tableDelta = newTableDelta()
handle.statsUsage = newStatsUsage()
handle.tableDelta = usage.NewTableDelta()
handle.statsUsage = usage.NewStatsUsage()
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
Expand Down
143 changes: 22 additions & 121 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,114 +37,15 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

// tableDelta is used to collect tables' change information.
// All methods of it are thread-safe.
type tableDelta struct {
delta map[int64]variable.TableDelta // map[tableID]delta
lock sync.Mutex
}

func newTableDelta() *tableDelta {
return &tableDelta{
delta: make(map[int64]variable.TableDelta),
}
}

func (m *tableDelta) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.delta = make(map[int64]variable.TableDelta)
}

func (m *tableDelta) getDeltaAndReset() map[int64]variable.TableDelta {
m.lock.Lock()
defer m.lock.Unlock()
ret := m.delta
m.delta = make(map[int64]variable.TableDelta)
return ret
}

func (m *tableDelta) update(id int64, delta int64, count int64, colSize *map[int64]int64) {
m.lock.Lock()
defer m.lock.Unlock()
updateTableDeltaMap(m.delta, id, delta, count, colSize)
}

func updateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) {
item := m[id]
item.Delta += delta
item.Count += count
if item.ColSize == nil {
item.ColSize = make(map[int64]int64)
}
if colSize != nil {
for key, val := range *colSize {
item.ColSize[key] += val
}
}
m[id] = item
}

func (m *tableDelta) merge(deltaMap map[int64]variable.TableDelta) {
if len(deltaMap) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
for id, item := range deltaMap {
updateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize)
}
}

// statsUsage maps (tableID, columnID) to the last time when the column stats are used(needed).
// All methods of it are thread-safe.
type statsUsage struct {
usage map[model.TableItemID]time.Time
lock sync.RWMutex
}

func newStatsUsage() *statsUsage {
return &statsUsage{
usage: make(map[model.TableItemID]time.Time),
}
}

func (m *statsUsage) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.usage = make(map[model.TableItemID]time.Time)
}

func (m *statsUsage) getUsageAndReset() map[model.TableItemID]time.Time {
m.lock.Lock()
defer m.lock.Unlock()
ret := m.usage
m.usage = make(map[model.TableItemID]time.Time)
return ret
}

func (m *statsUsage) merge(other map[model.TableItemID]time.Time) {
if len(other) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
for id, t := range other {
if mt, ok := m.usage[id]; !ok || mt.Before(t) {
m.usage[id] = t
}
}
}

func merge(s *SessionStatsCollector, deltaMap *tableDelta, colMap *statsUsage) {
deltaMap.merge(s.mapper.getDeltaAndReset())
colMap.merge(s.statsUsage.getUsageAndReset())
func merge(s *SessionStatsCollector, deltaMap *usage.TableDelta, colMap *usage.StatsUsage) {
deltaMap.Merge(s.mapper.GetDeltaAndReset())
colMap.Merge(s.statsUsage.GetUsageAndReset())
}

// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it.
type SessionStatsCollector struct {
mapper *tableDelta
statsUsage *statsUsage
mapper *usage.TableDelta
statsUsage *usage.StatsUsage
next *SessionStatsCollector
sync.Mutex

Expand All @@ -155,8 +56,8 @@ type SessionStatsCollector struct {
// NewSessionStatsCollector initializes a new SessionStatsCollector.
func NewSessionStatsCollector() *SessionStatsCollector {
return &SessionStatsCollector{
mapper: newTableDelta(),
statsUsage: newStatsUsage(),
mapper: usage.NewTableDelta(),
statsUsage: usage.NewStatsUsage(),
}
}

Expand All @@ -171,15 +72,15 @@ func (s *SessionStatsCollector) Delete() {
func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSize *map[int64]int64) {
s.Lock()
defer s.Unlock()
s.mapper.update(id, delta, count, colSize)
s.mapper.Update(id, delta, count, colSize)
}

// ClearForTest clears the mapper for test.
func (s *SessionStatsCollector) ClearForTest() {
s.Lock()
defer s.Unlock()
s.mapper = newTableDelta()
s.statsUsage = newStatsUsage()
s.mapper = usage.NewTableDelta()
s.statsUsage = usage.NewStatsUsage()
s.next = nil
s.deleted = false
}
Expand All @@ -188,17 +89,17 @@ func (s *SessionStatsCollector) ClearForTest() {
func (s *SessionStatsCollector) UpdateColStatsUsage(colMap map[model.TableItemID]time.Time) {
s.Lock()
defer s.Unlock()
s.statsUsage.merge(colMap)
s.statsUsage.Merge(colMap)
}

// NewSessionStatsCollector allocates a stats collector for a session.
func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
h.listHead.Lock()
defer h.listHead.Unlock()
newCollector := &SessionStatsCollector{
mapper: newTableDelta(),
mapper: usage.NewTableDelta(),
next: h.listHead.next,
statsUsage: newStatsUsage(),
statsUsage: usage.NewStatsUsage(),
}
h.listHead.next = newCollector
return newCollector
Expand Down Expand Up @@ -302,8 +203,8 @@ const (
// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
deltaMap := newTableDelta()
colMap := newStatsUsage()
deltaMap := usage.NewTableDelta()
colMap := usage.NewStatsUsage()
prev := h.listHead
prev.Lock()
for curr := prev.next; curr != nil; curr = curr.next {
Expand All @@ -321,17 +222,17 @@ func (h *Handle) sweepList() {
}
}
prev.Unlock()
h.tableDelta.merge(deltaMap.getDeltaAndReset())
h.statsUsage.merge(colMap.getUsageAndReset())
h.tableDelta.Merge(deltaMap.GetDeltaAndReset())
h.statsUsage.Merge(colMap.GetUsageAndReset())
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.sweepList()
deltaMap := h.tableDelta.getDeltaAndReset()
deltaMap := h.tableDelta.GetDeltaAndReset()
defer func() {
h.tableDelta.merge(deltaMap)
h.tableDelta.Merge(deltaMap)
}()

se, err := h.pool.Get()
Expand All @@ -351,7 +252,7 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
return errors.Trace(err)
}
if updated {
updateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
usage.UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
}
if err = h.dumpTableStatColSizeToKV(id, item); err != nil {
delete(deltaMap, id)
Expand Down Expand Up @@ -516,9 +417,9 @@ func (h *Handle) DumpColStatsUsageToKV() error {
return nil
}
h.sweepList()
colMap := h.statsUsage.getUsageAndReset()
colMap := h.statsUsage.GetUsageAndReset()
defer func() {
h.statsUsage.merge(colMap)
h.statsUsage.Merge(colMap)
}()
type pair struct {
lastUsedAt string
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/update_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package handle
import (
"testing"

"github.com/pingcap/tidb/statistics/handle/usage"
"github.com/stretchr/testify/require"
)

func TestInsertAndDelete(t *testing.T) {
h := Handle{
listHead: &SessionStatsCollector{mapper: newTableDelta()},
listHead: &SessionStatsCollector{mapper: usage.NewTableDelta()},
}
var items []*SessionStatsCollector
for i := 0; i < 5; i++ {
Expand Down
8 changes: 7 additions & 1 deletion statistics/handle/usage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "usage",
srcs = ["index_usage.go"],
srcs = [
"index_usage.go",
"stats_usage.go",
"table_delta.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle/usage",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/model",
"//sessionctx",
"//sessionctx/variable",
"//types",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
Expand Down
66 changes: 66 additions & 0 deletions statistics/handle/usage/stats_usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package usage

import (
"sync"
"time"

"github.com/pingcap/tidb/parser/model"
)

// StatsUsage maps (tableID, columnID) to the last time when the column stats are used(needed).
// All methods of it are thread-safe.
type StatsUsage struct {
usage map[model.TableItemID]time.Time
lock sync.RWMutex
}

// NewStatsUsage creates a new StatsUsage.
func NewStatsUsage() *StatsUsage {
return &StatsUsage{
usage: make(map[model.TableItemID]time.Time),
}
}

// Reset resets the StatsUsage.
func (m *StatsUsage) Reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.usage = make(map[model.TableItemID]time.Time)
}

// GetUsageAndReset gets the usage and resets the StatsUsage.
func (m *StatsUsage) GetUsageAndReset() map[model.TableItemID]time.Time {
m.lock.Lock()
defer m.lock.Unlock()
ret := m.usage
m.usage = make(map[model.TableItemID]time.Time)
return ret
}

// Merge merges the usageMap into the StatsUsage.
func (m *StatsUsage) Merge(other map[model.TableItemID]time.Time) {
if len(other) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
for id, t := range other {
if mt, ok := m.usage[id]; !ok || mt.Before(t) {
m.usage[id] = t
}
}
}
Loading

0 comments on commit 48f6b35

Please sign in to comment.