diff --git a/.bazelrc b/.bazelrc index 11c7c8f9ad2ba..6d31d4e95ac74 100644 --- a/.bazelrc +++ b/.bazelrc @@ -20,7 +20,7 @@ build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error= test --test_env=TZ=Asia/Shanghai test --test_output=errors --test_summary=testcase -test:ci --color=yes +test:ci --color=yes --spawn_strategy=local test:ci --verbose_failures --test_verbose_timeout_warnings test:ci --test_env=GO_TEST_WRAP_TESTV=1 test:ci --experimental_ui_max_stdouterr_bytes=104857600 diff --git a/metrics/metrics.go b/metrics/metrics.go index f8bbf11b6ef6d..6767d2dfc6adb 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -220,6 +220,7 @@ func RegisterMetrics() { prometheus.MustRegister(TTLPhaseTime) prometheus.MustRegister(EMACPUUsageGauge) + prometheus.MustRegister(PoolConcurrencyCounter) prometheus.MustRegister(HistoricalStatsCounter) prometheus.MustRegister(PlanReplayerTaskCounter) diff --git a/metrics/resourcemanager.go b/metrics/resourcemanager.go index d45b8833e1225..c6715512500d4 100644 --- a/metrics/resourcemanager.go +++ b/metrics/resourcemanager.go @@ -24,4 +24,12 @@ var ( Name: "ema_cpu_usage", Help: "exponential moving average of CPU usage", }) + // PoolConcurrencyCounter means how much concurrency in the pool + PoolConcurrencyCounter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "rm", + Name: "pool_concurrency", + Help: "How many concurrency in the pool", + }, []string{LblType}) ) diff --git a/util/gpool/spmc/BUILD.bazel b/util/gpool/spmc/BUILD.bazel index db4d724052666..68028a5573b34 100644 --- a/util/gpool/spmc/BUILD.bazel +++ b/util/gpool/spmc/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/pingcap/tidb/util/gpool/spmc", visibility = ["//visibility:public"], deps = [ + "//metrics", "//resourcemanager", "//resourcemanager/pooltask", "//resourcemanager/util", @@ -18,6 +19,7 @@ go_library( "//util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", + "@com_github_prometheus_client_golang//prometheus", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 35b258957fd62..d1a3bbb80c4aa 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -21,11 +21,13 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/resourcemanager" "github.com/pingcap/tidb/resourcemanager/pooltask" "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/util/gpool" "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -47,13 +49,14 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct { workers *loopQueue[T, U, C, CT, TF] options *Options gpool.BasePool - taskManager pooltask.TaskManager[T, U, C, CT, TF] - waitingTask atomicutil.Uint32 - capacity atomic.Int32 - running atomic.Int32 - state atomic.Int32 - waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available. - heartbeatDone atomic.Bool + taskManager pooltask.TaskManager[T, U, C, CT, TF] + waitingTask atomicutil.Uint32 + capacity atomic.Int32 + running atomic.Int32 + state atomic.Int32 + waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available. + heartbeatDone atomic.Bool + concurrencyMetrics prometheus.Gauge } // NewSPMCPool create a single producer, multiple consumer goroutine pool. @@ -63,12 +66,13 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri opts.ExpiryDuration = gpool.DefaultCleanIntervalTime } result := &Pool[T, U, C, CT, TF]{ - BasePool: gpool.NewBasePool(), - taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128), - stopCh: make(chan struct{}), - lock: gpool.NewSpinLock(), - taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size), - options: opts, + BasePool: gpool.NewBasePool(), + taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128), + stopCh: make(chan struct{}), + lock: gpool.NewSpinLock(), + taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size), + concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name), + options: opts, } result.SetName(name) result.state.Store(int32(gpool.OPENED)) @@ -78,6 +82,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri } } result.capacity.Add(size) + result.concurrencyMetrics.Set(float64(size)) result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size)) result.cond = sync.NewCond(result.lock) err := resourcemanager.InstanceResourceManager.Register(result, name, component) @@ -141,6 +146,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { } p.SetLastTuneTs(time.Now()) p.capacity.Store(int32(size)) + p.concurrencyMetrics.Set(float64(size)) if size > capacity { for i := 0; i < size-capacity; i++ { if tid, boostTask := p.taskManager.Overclock(size); boostTask != nil {