From 74b2e55c7630735040d1e5da5635da0c5407ef61 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Fri, 14 Feb 2020 12:27:46 +0800 Subject: [PATCH] *: add metrics for unified read pool (#6534) Signed-off-by: Yilin Chen --- Cargo.lock | 9 ++- Cargo.toml | 5 +- cmd/Cargo.toml | 6 +- cmd/src/server.rs | 48 ++++++++-------- components/backup/Cargo.toml | 2 +- components/engine/Cargo.toml | 2 +- components/engine_rocks/Cargo.toml | 2 +- components/pd_client/Cargo.toml | 2 +- components/sst_importer/Cargo.toml | 2 +- components/tidb_query/Cargo.toml | 5 +- components/tikv_util/Cargo.toml | 2 +- src/coprocessor/metrics.rs | 92 ++++++++++++++++++++++++++++++ src/coprocessor/mod.rs | 2 +- src/coprocessor/readpool_impl.rs | 91 +---------------------------- src/coprocessor/tracker.rs | 2 +- src/read_pool.rs | 68 +++++++++++++++++++--- src/storage/mod.rs | 2 +- 17 files changed, 200 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bab2aeb44f2..176556fa16e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,6 +568,7 @@ dependencies = [ "log", "nix 0.11.1", "pd_client", + "prometheus", "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "raft 0.6.0-alpha (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.2", @@ -2739,7 +2740,7 @@ dependencies = [ [[package]] name = "prometheus" version = "0.8.0" -source = "git+https://github.com/pingcap/rust-prometheus.git?rev=d919ccd35976b9b84b8d03c07138c1cc05a36087#d919ccd35976b9b84b8d03c07138c1cc05a36087" +source = "git+https://github.com/tikv/rust-prometheus.git?rev=d919ccd35976b9b84b8d03c07138c1cc05a36087#d919ccd35976b9b84b8d03c07138c1cc05a36087" dependencies = [ "cfg-if", "fnv", @@ -2755,7 +2756,7 @@ dependencies = [ [[package]] name = "prometheus-static-metric" version = "0.3.0" -source = "git+https://github.com/pingcap/rust-prometheus.git?rev=d919ccd35976b9b84b8d03c07138c1cc05a36087#d919ccd35976b9b84b8d03c07138c1cc05a36087" +source = "git+https://github.com/tikv/rust-prometheus.git?rev=d919ccd35976b9b84b8d03c07138c1cc05a36087#d919ccd35976b9b84b8d03c07138c1cc05a36087" dependencies = [ "lazy_static", "proc-macro2 1.0.4", @@ -5117,12 +5118,14 @@ checksum = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992" [[package]] name = "yatp" version = "0.0.1" -source = "git+https://github.com/tikv/yatp.git?rev=4e3ac06b948f1c2dffe92986a760ef561d0bdbb9#4e3ac06b948f1c2dffe92986a760ef561d0bdbb9" +source = "git+https://github.com/tikv/yatp.git#fe59cbc61657c5293d3537db5cfae3c9e83c9583" dependencies = [ "crossbeam-deque", "dashmap", + "lazy_static", "num_cpus", "parking_lot_core 0.7.0", + "prometheus", "rand 0.7.2", ] diff --git a/Cargo.toml b/Cargo.toml index 871b58fd0ae..7cfc12d9999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,17 +131,16 @@ vlog = "0.1.4" git = "https://github.com/pingcap/murmur3.git" [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] [dependencies.prometheus-static-metric] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" [dependencies.yatp] git = "https://github.com/tikv/yatp.git" -rev = "4e3ac06b948f1c2dffe92986a760ef561d0bdbb9" [dev-dependencies] panic_hook = { path = "components/panic_hook" } diff --git a/cmd/Cargo.toml b/cmd/Cargo.toml index 1bb2b709914..25faac90905 100644 --- a/cmd/Cargo.toml +++ b/cmd/Cargo.toml @@ -57,9 +57,13 @@ toml = "0.4" url = "2" vlog = "0.1.4" +[dependencies.prometheus] +git = "https://github.com/tikv/rust-prometheus.git" +rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" +features = ["nightly", "push", "process"] + [dependencies.yatp] git = "https://github.com/tikv/yatp.git" -rev = "4e3ac06b948f1c2dffe92986a760ef561d0bdbb9" [target.'cfg(unix)'.dependencies] signal = "0.6" diff --git a/cmd/src/server.rs b/cmd/src/server.rs index 05e659d24fc..3081c4348aa 100644 --- a/cmd/src/server.rs +++ b/cmd/src/server.rs @@ -29,23 +29,21 @@ use std::{ thread::JoinHandle, time::Duration, }; -use tikv::config::ConfigHandler; -use tikv::raftstore::coprocessor::config::SplitCheckConfigManager; -use tikv::raftstore::router::ServerRaftStoreRouter; -use tikv::raftstore::store::config::RaftstoreConfigManager; use tikv::{ - config::{ConfigController, DBConfigManger, DBType, TiKvConfig}, + config::{ConfigController, ConfigHandler, DBConfigManger, DBType, TiKvConfig}, coprocessor, import::{ImportSSTService, SSTImporter}, raftstore::{ - coprocessor::{CoprocessorHost, RegionInfoAccessor}, + coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor}, + router::ServerRaftStoreRouter, store::{ + config::RaftstoreConfigManager, fsm, fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_VOTES_CAP}, new_compaction_listener, LocalReader, PdTask, SnapManagerBuilder, SplitCheckRunner, }, }, - read_pool::{ReadPool, ReadPoolRunner}, + read_pool::{build_yatp_read_pool, ReadPool}, server::{ config::Config as ServerConfig, create_raft_storage, @@ -65,10 +63,6 @@ use tikv_util::{ time::Monitor, worker::{FutureScheduler, FutureWorker, Worker}, }; -use yatp::{ - pool::CloneRunnerBuilder, - queue::{multilevel, QueueType}, -}; /// Run a TiKV server. Returns when the server is shutdown by the user, in which /// case the server will be properly stopped. @@ -81,6 +75,7 @@ pub fn run_tikv(config: TiKvConfig) { let _m = Monitor::default(); tikv.init_fs(); + tikv.init_yatp(); tikv.init_engines(); let gc_worker = tikv.init_gc_worker(); let server_config = tikv.init_servers(&gc_worker); @@ -281,6 +276,17 @@ impl TiKVServer { .unwrap(); } + fn init_yatp(&self) { + let yatp_enabled = self.config.readpool.unify_read_pool; + if !yatp_enabled { + return; + } + + yatp::metrics::set_namespace(Some("tikv")); + prometheus::register(Box::new(yatp::metrics::MULTILEVEL_LEVEL0_CHANCE.clone())).unwrap(); + prometheus::register(Box::new(yatp::metrics::MULTILEVEL_LEVEL_ELAPSED.clone())).unwrap(); + } + fn init_engines(&mut self) { let block_cache = self.config.storage.block_cache.build_shared_cache(); @@ -403,18 +409,10 @@ impl TiKVServer { let pd_sender = pd_worker.scheduler(); let unified_read_pool = if self.config.readpool.unify_read_pool { - let unified_read_pool_cfg = &self.config.readpool.unified; - let mut builder = yatp::Builder::new("unified-read-pool"); - builder - .min_thread_count(unified_read_pool_cfg.min_thread_count) - .max_thread_count(unified_read_pool_cfg.max_thread_count); - let multilevel_builder = multilevel::Builder::new(Default::default()); - let read_pool_runner = ReadPoolRunner::new(engines.engine.clone(), Default::default()); - let runner_builder = - multilevel_builder.runner_builder(CloneRunnerBuilder(read_pool_runner)); - Some(builder.build_with_queue_and_runner( - QueueType::Multilevel(multilevel_builder), - runner_builder, + Some(build_yatp_read_pool( + &self.config.readpool.unified, + pd_sender.clone(), + engines.engine.clone(), )) } else { None @@ -423,12 +421,12 @@ impl TiKVServer { let storage_read_pool = if self.config.readpool.unify_read_pool { ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone()) } else { - let cop_read_pools = storage::build_read_pool( + let storage_read_pools = storage::build_read_pool( &self.config.readpool.storage, pd_sender.clone(), engines.engine.clone(), ); - ReadPool::from(cop_read_pools) + ReadPool::from(storage_read_pools) }; let storage = create_raft_storage( diff --git a/components/backup/Cargo.toml b/components/backup/Cargo.toml index 9c7f9ac4a59..ea0611ac35a 100644 --- a/components/backup/Cargo.toml +++ b/components/backup/Cargo.toml @@ -39,7 +39,7 @@ tokio-threadpool = "0.1" url = "2.0" [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" default-features = false features = ["nightly", "push", "process"] diff --git a/components/engine/Cargo.toml b/components/engine/Cargo.toml index 354a1beea5d..81c31094749 100644 --- a/components/engine/Cargo.toml +++ b/components/engine/Cargo.toml @@ -30,7 +30,7 @@ tikv_util = { path = "../tikv_util" } time = "0.1" [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] diff --git a/components/engine_rocks/Cargo.toml b/components/engine_rocks/Cargo.toml index 4b36c93ef66..d9d09b0f3ef 100644 --- a/components/engine_rocks/Cargo.toml +++ b/components/engine_rocks/Cargo.toml @@ -29,7 +29,7 @@ tikv_util = { path = "../tikv_util" } time = "0.1" [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] diff --git a/components/pd_client/Cargo.toml b/components/pd_client/Cargo.toml index fac7645f899..871f475811d 100644 --- a/components/pd_client/Cargo.toml +++ b/components/pd_client/Cargo.toml @@ -27,7 +27,7 @@ tokio-timer = "0.2" txn_types = { path = "../txn_types" } [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] diff --git a/components/sst_importer/Cargo.toml b/components/sst_importer/Cargo.toml index 2ff23df5812..ddfa558e6c3 100644 --- a/components/sst_importer/Cargo.toml +++ b/components/sst_importer/Cargo.toml @@ -28,7 +28,7 @@ tokio-sync = "0.1.7" uuid = { version = "0.7", features = [ "serde", "v4" ] } [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" default-features = false diff --git a/components/tidb_query/Cargo.toml b/components/tidb_query/Cargo.toml index cee9b6379fd..0c10b854322 100644 --- a/components/tidb_query/Cargo.toml +++ b/components/tidb_query/Cargo.toml @@ -52,17 +52,16 @@ uuid = { version = "0.8.1", features = ["v4"] } static_assertions = { version = "1.0", features = ["nightly"] } [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] [dependencies.prometheus-static-metric] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" [dependencies.yatp] git = "https://github.com/tikv/yatp.git" -rev = "4e3ac06b948f1c2dffe92986a760ef561d0bdbb9" [dev-dependencies] profiler = { path = "../profiler" } diff --git a/components/tikv_util/Cargo.toml b/components/tikv_util/Cargo.toml index b75a515ae3a..183a74ddeef 100644 --- a/components/tikv_util/Cargo.toml +++ b/components/tikv_util/Cargo.toml @@ -53,7 +53,7 @@ url = "2" zeroize = { version = "0.10", features = [ "alloc" ] } [dependencies.prometheus] -git = "https://github.com/pingcap/rust-prometheus.git" +git = "https://github.com/tikv/rust-prometheus.git" rev = "d919ccd35976b9b84b8d03c07138c1cc05a36087" features = ["nightly", "push", "process"] diff --git a/src/coprocessor/metrics.rs b/src/coprocessor/metrics.rs index ec5ff94fc72..25c948776f1 100644 --- a/src/coprocessor/metrics.rs +++ b/src/coprocessor/metrics.rs @@ -1,5 +1,12 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. +use std::cell::RefCell; +use std::mem; + +use crate::storage::{FlowStatistics, FlowStatsReporter, Statistics}; +use tikv_util::collections::HashMap; + +use prometheus::local::*; use prometheus::*; lazy_static! { @@ -61,3 +68,88 @@ lazy_static! { ) .unwrap(); } + +pub struct CopLocalMetrics { + pub local_copr_req_histogram_vec: LocalHistogramVec, + pub local_copr_req_handle_time: LocalHistogramVec, + pub local_copr_req_wait_time: LocalHistogramVec, + pub local_copr_scan_keys: LocalHistogramVec, + pub local_copr_rocksdb_perf_counter: LocalIntCounterVec, + local_scan_details: HashMap<&'static str, Statistics>, + local_cop_flow_stats: HashMap, +} + +thread_local! { + pub static TLS_COP_METRICS: RefCell = RefCell::new( + CopLocalMetrics { + local_copr_req_histogram_vec: + COPR_REQ_HISTOGRAM_VEC.local(), + local_copr_req_handle_time: + COPR_REQ_HANDLE_TIME.local(), + local_copr_req_wait_time: + COPR_REQ_WAIT_TIME.local(), + local_copr_scan_keys: + COPR_SCAN_KEYS.local(), + local_copr_rocksdb_perf_counter: + COPR_ROCKSDB_PERF_COUNTER.local(), + local_scan_details: + HashMap::default(), + local_cop_flow_stats: + HashMap::default(), + } + ); +} + +pub fn tls_flush(reporter: &R) { + TLS_COP_METRICS.with(|m| { + // Flush Prometheus metrics + let mut m = m.borrow_mut(); + m.local_copr_req_histogram_vec.flush(); + m.local_copr_req_handle_time.flush(); + m.local_copr_req_wait_time.flush(); + m.local_copr_scan_keys.flush(); + m.local_copr_rocksdb_perf_counter.flush(); + + for (cmd, stat) in m.local_scan_details.drain() { + for (cf, cf_details) in stat.details().iter() { + for (tag, count) in cf_details.iter() { + COPR_SCAN_DETAILS + .with_label_values(&[cmd, *cf, *tag]) + .inc_by(*count as i64); + } + } + } + + // Report PD metrics + if m.local_cop_flow_stats.is_empty() { + // Stats to report to PD is empty, ignore. + return; + } + + let mut read_stats = HashMap::default(); + mem::swap(&mut read_stats, &mut m.local_cop_flow_stats); + + reporter.report_read_stats(read_stats); + }); +} + +pub fn tls_collect_scan_details(cmd: &'static str, stats: &Statistics) { + TLS_COP_METRICS.with(|m| { + m.borrow_mut() + .local_scan_details + .entry(cmd) + .or_insert_with(Default::default) + .add(stats); + }); +} + +pub fn tls_collect_read_flow(region_id: u64, statistics: &Statistics) { + TLS_COP_METRICS.with(|m| { + let map = &mut m.borrow_mut().local_cop_flow_stats; + let flow_stats = map + .entry(region_id) + .or_insert_with(crate::storage::FlowStatistics::default); + flow_stats.add(&statistics.write.flow_stats); + flow_stats.add(&statistics.data.flow_stats); + }); +} diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index d0d366ffec7..cd881b01a3d 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -25,7 +25,7 @@ pub mod dag; mod endpoint; mod error; pub mod local_metrics; -mod metrics; +pub(crate) mod metrics; pub mod readpool_impl; mod statistics; mod tracker; diff --git a/src/coprocessor/readpool_impl.rs b/src/coprocessor/readpool_impl.rs index 19dd4e4185c..158cf485789 100644 --- a/src/coprocessor/readpool_impl.rs +++ b/src/coprocessor/readpool_impl.rs @@ -1,48 +1,13 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::cell::RefCell; -use std::mem; use std::sync::{Arc, Mutex}; use crate::config::CoprReadPoolConfig; use crate::storage::kv::{destroy_tls_engine, set_tls_engine}; -use crate::storage::{Engine, FlowStatistics, FlowStatsReporter, Statistics}; -use tikv_util::collections::HashMap; +use crate::storage::{Engine, FlowStatsReporter}; use tikv_util::future_pool::{Builder, Config, FuturePool}; use super::metrics::*; -use prometheus::local::*; - -pub struct CopLocalMetrics { - pub local_copr_req_histogram_vec: LocalHistogramVec, - pub local_copr_req_handle_time: LocalHistogramVec, - pub local_copr_req_wait_time: LocalHistogramVec, - pub local_copr_scan_keys: LocalHistogramVec, - pub local_copr_rocksdb_perf_counter: LocalIntCounterVec, - local_scan_details: HashMap<&'static str, Statistics>, - local_cop_flow_stats: HashMap, -} - -thread_local! { - pub static TLS_COP_METRICS: RefCell = RefCell::new( - CopLocalMetrics { - local_copr_req_histogram_vec: - COPR_REQ_HISTOGRAM_VEC.local(), - local_copr_req_handle_time: - COPR_REQ_HANDLE_TIME.local(), - local_copr_req_wait_time: - COPR_REQ_WAIT_TIME.local(), - local_copr_scan_keys: - COPR_SCAN_KEYS.local(), - local_copr_rocksdb_perf_counter: - COPR_ROCKSDB_PERF_COUNTER.local(), - local_scan_details: - HashMap::default(), - local_cop_flow_stats: - HashMap::default(), - } - ); -} pub fn build_read_pool( config: &CoprReadPoolConfig, @@ -95,57 +60,3 @@ pub fn build_read_pool_for_test( }) .collect() } - -fn tls_flush(reporter: &R) { - TLS_COP_METRICS.with(|m| { - // Flush Prometheus metrics - let mut m = m.borrow_mut(); - m.local_copr_req_histogram_vec.flush(); - m.local_copr_req_handle_time.flush(); - m.local_copr_req_wait_time.flush(); - m.local_copr_scan_keys.flush(); - m.local_copr_rocksdb_perf_counter.flush(); - - for (cmd, stat) in m.local_scan_details.drain() { - for (cf, cf_details) in stat.details().iter() { - for (tag, count) in cf_details.iter() { - COPR_SCAN_DETAILS - .with_label_values(&[cmd, *cf, *tag]) - .inc_by(*count as i64); - } - } - } - - // Report PD metrics - if m.local_cop_flow_stats.is_empty() { - // Stats to report to PD is empty, ignore. - return; - } - - let mut read_stats = HashMap::default(); - mem::swap(&mut read_stats, &mut m.local_cop_flow_stats); - - reporter.report_read_stats(read_stats); - }); -} - -pub fn tls_collect_scan_details(cmd: &'static str, stats: &Statistics) { - TLS_COP_METRICS.with(|m| { - m.borrow_mut() - .local_scan_details - .entry(cmd) - .or_insert_with(Default::default) - .add(stats); - }); -} - -pub fn tls_collect_read_flow(region_id: u64, statistics: &Statistics) { - TLS_COP_METRICS.with(|m| { - let map = &mut m.borrow_mut().local_cop_flow_stats; - let flow_stats = map - .entry(region_id) - .or_insert_with(crate::storage::FlowStatistics::default); - flow_stats.add(&statistics.write.flow_stats); - flow_stats.add(&statistics.data.flow_stats); - }); -} diff --git a/src/coprocessor/tracker.rs b/src/coprocessor/tracker.rs index d005e158228..ebb2b29fbc0 100644 --- a/src/coprocessor/tracker.rs +++ b/src/coprocessor/tracker.rs @@ -6,7 +6,7 @@ use crate::storage::kv::{PerfStatisticsDelta, PerfStatisticsInstant}; use tikv_util::time::{self, Duration, Instant}; -use crate::coprocessor::readpool_impl::*; +use super::metrics::*; use crate::coprocessor::*; use crate::storage::Statistics; diff --git a/src/read_pool.rs b/src/read_pool.rs index 25129128e19..121ccf9b61f 100644 --- a/src/read_pool.rs +++ b/src/read_pool.rs @@ -2,14 +2,18 @@ use futures::sync::oneshot; use futures::{future, Future}; use futures03::prelude::*; use kvproto::kvrpcpb::CommandPri; +use std::cell::Cell; use std::future::Future as StdFuture; +use std::time::Duration; use tikv_util::future_pool::{self, FuturePool}; -use yatp::pool::{Local, Runner}; -use yatp::queue::Extras; +use tikv_util::time::Instant; +use yatp::pool::{CloneRunnerBuilder, Local, Runner}; +use yatp::queue::{multilevel, Extras, QueueType}; use yatp::task::future::{Runner as FutureRunner, TaskCell}; use yatp::Remote; -use crate::storage::kv::{destroy_tls_engine, set_tls_engine, Engine}; +use crate::config::UnifiedReadPoolConfig; +use crate::storage::kv::{destroy_tls_engine, set_tls_engine, Engine, FlowStatsReporter}; #[derive(Clone)] pub enum ReadPool { @@ -81,12 +85,13 @@ impl ReadPool { } #[derive(Clone)] -pub struct ReadPoolRunner { +pub struct ReadPoolRunner { engine: Option, + reporter: R, inner: FutureRunner, } -impl Runner for ReadPoolRunner { +impl Runner for ReadPoolRunner { type TaskCell = TaskCell; fn start(&mut self, local: &mut Local) { @@ -95,7 +100,11 @@ impl Runner for ReadPoolRunner { } fn handle(&mut self, local: &mut Local, task_cell: Self::TaskCell) -> bool { - self.inner.handle(local, task_cell) + let finished = self.inner.handle(local, task_cell); + if finished { + self.flush_metrics_on_tick(); + } + finished } fn pause(&mut self, local: &mut Local) -> bool { @@ -108,17 +117,60 @@ impl Runner for ReadPoolRunner { fn end(&mut self, local: &mut Local) { self.inner.end(local); + self.flush_metrics(); unsafe { destroy_tls_engine::() } } } -impl ReadPoolRunner { - pub fn new(engine: E, inner: FutureRunner) -> Self { +impl ReadPoolRunner { + pub fn new(engine: E, inner: FutureRunner, reporter: R) -> Self { ReadPoolRunner { engine: Some(engine), + reporter, inner, } } + + // Do nothing if no tick passed + fn flush_metrics_on_tick(&self) { + const TICK_INTERVAL: Duration = Duration::from_secs(1); + + thread_local! { + static THREAD_LAST_TICK_TIME: Cell = Cell::new(Instant::now_coarse()); + } + + THREAD_LAST_TICK_TIME.with(|tls_last_tick| { + let now = Instant::now_coarse(); + let last_tick = tls_last_tick.get(); + if now.duration_since(last_tick) < TICK_INTERVAL { + return; + } + tls_last_tick.set(now); + self.flush_metrics(); + }) + } + + fn flush_metrics(&self) { + crate::storage::metrics::tls_flush(&self.reporter); + crate::coprocessor::metrics::tls_flush(&self.reporter); + } +} + +pub fn build_yatp_read_pool( + config: &UnifiedReadPoolConfig, + reporter: R, + engine: E, +) -> yatp::ThreadPool { + let pool_name = "unified-read-pool"; + let mut builder = yatp::Builder::new(pool_name); + builder + .min_thread_count(config.min_thread_count) + .max_thread_count(config.max_thread_count); + let multilevel_builder = + multilevel::Builder::new(multilevel::Config::default().name(Some(pool_name))); + let read_pool_runner = ReadPoolRunner::new(engine, Default::default(), reporter); + let runner_builder = multilevel_builder.runner_builder(CloneRunnerBuilder(read_pool_runner)); + builder.build_with_queue_and_runner(QueueType::Multilevel(multilevel_builder), runner_builder) } impl From> for ReadPool { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3039652f48d..f3a2e649c2f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -13,10 +13,10 @@ pub mod config; pub mod errors; pub mod kv; pub mod lock_manager; +pub(crate) mod metrics; pub mod mvcc; pub mod txn; -mod metrics; mod read_pool; mod types;