Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add tidb_rm_pool_concurrency #41113

Merged
merged 11 commits into from
Feb 7, 2023
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=

test --test_env=TZ=Asia/Shanghai
test --test_output=errors --test_summary=testcase
test:ci --color=yes
test:ci --color=yes --spawn_strategy=local
test:ci --verbose_failures --test_verbose_timeout_warnings
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --experimental_ui_max_stdouterr_bytes=104857600
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLPhaseTime)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)

prometheus.MustRegister(HistoricalStatsCounter)
prometheus.MustRegister(PlanReplayerTaskCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ var (
Name: "ema_cpu_usage",
Help: "exponential moving average of CPU usage",
})
// PoolConcurrencyCounter means how much concurrency in the pool
PoolConcurrencyCounter = prometheus.NewGaugeVec(
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "rm",
Name: "pool_concurrency",
Help: "How many concurrency in the pool",
}, []string{LblType})
)
2 changes: 2 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ go_library(
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//metrics",
"//resourcemanager",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
32 changes: 19 additions & 13 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/pooltask"
"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand All @@ -47,13 +49,14 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct {
workers *loopQueue[T, U, C, CT, TF]
options *Options
gpool.BasePool
taskManager pooltask.TaskManager[T, U, C, CT, TF]
waitingTask atomicutil.Uint32
capacity atomic.Int32
running atomic.Int32
state atomic.Int32
waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available.
heartbeatDone atomic.Bool
taskManager pooltask.TaskManager[T, U, C, CT, TF]
waitingTask atomicutil.Uint32
capacity atomic.Int32
running atomic.Int32
state atomic.Int32
waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available.
heartbeatDone atomic.Bool
concurrencyMetrics prometheus.Gauge
}

// NewSPMCPool create a single producer, multiple consumer goroutine pool.
Expand All @@ -63,12 +66,13 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
opts.ExpiryDuration = gpool.DefaultCleanIntervalTime
}
result := &Pool[T, U, C, CT, TF]{
BasePool: gpool.NewBasePool(),
taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128),
stopCh: make(chan struct{}),
lock: gpool.NewSpinLock(),
taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size),
options: opts,
BasePool: gpool.NewBasePool(),
taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128),
stopCh: make(chan struct{}),
lock: gpool.NewSpinLock(),
taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size),
concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name),
options: opts,
}
result.SetName(name)
result.state.Store(int32(gpool.OPENED))
Expand All @@ -78,6 +82,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
}
}
result.capacity.Add(size)
result.concurrencyMetrics.Set(float64(size))
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.InstanceResourceManager.Register(result, name, component)
Expand Down Expand Up @@ -141,6 +146,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
p.concurrencyMetrics.Set(float64(size))
if size > capacity {
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(size); boostTask != nil {
Expand Down