From 02b3ea21293d9556144da8e23d2e1d464a003fa3 Mon Sep 17 00:00:00 2001 From: odysa Date: Mon, 20 Mar 2023 22:44:59 -0700 Subject: [PATCH] feat: telemetry (#7384) --- Cargo.lock | 447 +++++++++--------- Makefile.toml | 6 +- ci/scripts/common.env.sh | 2 + dashboard/proto/gen/meta.ts | 54 +++ integration_tests/scripts/run_demos.py | 7 +- proto/meta.proto | 12 + src/common/Cargo.toml | 2 + src/common/src/config.rs | 15 + src/common/src/lib.rs | 2 + src/common/src/system_param/mod.rs | 6 + src/common/src/system_param/reader.rs | 4 + src/common/src/telemetry/manager.rs | 163 +++++++ src/common/src/telemetry/mod.rs | 226 +++++++++ src/common/src/telemetry/report.rs | 112 +++++ src/compute/Cargo.toml | 2 + src/compute/src/lib.rs | 1 + src/compute/src/server.rs | 30 +- src/compute/src/telemetry.rs | 76 +++ src/frontend/src/lib.rs | 2 + src/frontend/src/session.rs | 57 ++- src/frontend/src/telemetry.rs | 91 ++++ .../backup_restore/meta_snapshot_builder.rs | 1 - src/meta/src/backup_restore/restore.rs | 1 + src/meta/src/lib.rs | 2 + src/meta/src/manager/env.rs | 3 + src/meta/src/rpc/server.rs | 27 ++ src/meta/src/rpc/service/mod.rs | 1 + src/meta/src/rpc/service/telemetry_service.rs | 63 +++ src/meta/src/storage/etcd_meta_store.rs | 5 + src/meta/src/storage/mem_meta_store.rs | 5 + src/meta/src/storage/meta_store.rs | 3 + src/meta/src/telemetry.rs | 160 +++++++ src/rpc_client/src/meta_client.rs | 25 +- src/storage/compactor/Cargo.toml | 3 + src/storage/compactor/src/lib.rs | 1 + src/storage/compactor/src/server.rs | 29 +- src/storage/compactor/src/telemetry.rs | 75 +++ src/workspace-hack/Cargo.toml | 2 - 38 files changed, 1458 insertions(+), 265 deletions(-) create mode 100644 src/common/src/telemetry/manager.rs create mode 100644 src/common/src/telemetry/mod.rs create mode 100644 src/common/src/telemetry/report.rs create mode 100644 src/compute/src/telemetry.rs create mode 100644 src/frontend/src/telemetry.rs create mode 100644 src/meta/src/rpc/service/telemetry_service.rs create mode 100644 src/meta/src/telemetry.rs create mode 100644 src/storage/compactor/src/telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index e24e255464c89..18c884658b917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,12 +333,11 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" dependencies = [ "event-listener", - "futures-lite", ] [[package]] @@ -408,9 +407,9 @@ checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" [[package]] name = "async-trait" -version = "0.1.64" +version = "0.1.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" +checksum = "b84f9ebcc6c1f5b8cb160f6990096a5c127f423fcb6e1ccc46c370cbdfb75dfc" dependencies = [ "proc-macro2", "quote", @@ -885,9 +884,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.8" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd379e511536bad07447f899300aa526e9bae8e6f66dc5e5ca45d7587b7c1ec" +checksum = "13d8068b6ccb8b34db9de397c7043f91db8b4c66414952c6db944f238c4d3db3" dependencies = [ "async-trait", "axum-core", @@ -911,16 +910,15 @@ dependencies = [ "sync_wrapper", "tokio", "tower", - "tower-http", "tower-layer", "tower-service", ] [[package]] name = "axum-core" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34" +checksum = "b2f958c80c248b34b9a877a643811be8dbca03ca5ba827f2b63baf3a81e5fc4e" dependencies = [ "async-trait", "bytes", @@ -1044,9 +1042,9 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ "generic-array", ] @@ -1128,19 +1126,20 @@ checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "bytecheck" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d11cac2c12b5adc6570dad2ee1b87eff4955dac476fe12d81e5fdd352e52406f" +checksum = "13fe11640a23eb24562225322cd3e452b93a3d4091d62fab69c70542fcd17d1f" dependencies = [ "bytecheck_derive", "ptr_meta", + "simdutf8", ] [[package]] name = "bytecheck_derive" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e576ebe98e605500b3c8041bb888e966653577172df6dd97398714eb30b9bf" +checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5" dependencies = [ "proc-macro2", "quote", @@ -1155,9 +1154,9 @@ checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" [[package]] name = "bytemuck" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c041d3eab048880cb0b86b256447da3f18859a163c3b8d8893f4e6368abe6393" +checksum = "17febce684fd15d89027105661fec94afb475cb995fbc59d2865198446ba2eea" [[package]] name = "byteorder" @@ -1216,9 +1215,9 @@ dependencies = [ [[package]] name = "camino" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6031a462f977dd38968b6f23378356512feeace69cef817e1a4475108093cec3" +checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" dependencies = [ "serde", ] @@ -1280,9 +1279,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" dependencies = [ "iana-time-zone", "js-sys", @@ -1691,9 +1690,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1701,9 +1700,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ "cfg-if", "crossbeam-epoch", @@ -1712,14 +1711,14 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.13" +version = "0.9.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset 0.7.1", + "memoffset 0.8.0", "scopeguard", ] @@ -1735,9 +1734,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ "cfg-if", ] @@ -1846,9 +1845,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62" +checksum = "9a140f260e6f3f79013b8bfc65e7ce630c9ab4388c6a89c71e07226f49487b72" dependencies = [ "cc", "cxxbridge-flags", @@ -1858,9 +1857,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690" +checksum = "da6383f459341ea689374bf0a42979739dc421874f112ff26f829b8040b8e613" dependencies = [ "cc", "codespan-reporting", @@ -1873,15 +1872,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf" +checksum = "90201c1a650e95ccff1c8c0bb5a343213bdd317c6e600a93075bca2eff54ec97" [[package]] name = "cxxbridge-macro" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" +checksum = "0b75aed41bb2e6367cae39e6326ef817a851db13c13e4f3263714ca3cfb8de56" dependencies = [ "proc-macro2", "quote", @@ -1900,12 +1899,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" dependencies = [ - "darling_core 0.14.3", - "darling_macro 0.14.3", + "darling_core 0.14.4", + "darling_macro 0.14.4", ] [[package]] @@ -1924,9 +1923,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "001d80444f28e193f30c2f293455da62dcf9a6b29918a4253152ae2b1de592cb" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" dependencies = [ "fnv", "ident_case", @@ -1949,11 +1948,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" dependencies = [ - "darling_core 0.14.3", + "darling_core 0.14.4", "quote", "syn", ] @@ -2057,7 +2056,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" dependencies = [ - "darling 0.14.3", + "darling 0.14.4", "proc-macro2", "quote", "syn", @@ -2174,9 +2173,9 @@ dependencies = [ [[package]] name = "dyn-clone" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9b0705efd4599c15a38151f4721f7bc388306f61084d3bfd50bd07fbca5cb60" +checksum = "68b0cf012f1230e43cd00ebb729c6bb58707ecfa8ad08b52ef3a4ccd2697fc30" [[package]] name = "easy-ext" @@ -2231,18 +2230,18 @@ dependencies = [ [[package]] name = "enum-iterator" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ea166b3f7dc1032f7866d13f8d8e02c8d87507b61750176b86554964dc6a7bf" +checksum = "706d9e7cf1c7664859d79cd524e4e53ea2b67ea03c98cc2870c5e539695d597e" dependencies = [ "enum-iterator-derive", ] [[package]] name = "enum-iterator-derive" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "828de45d0ca18782232dfb8f3ea9cc428e8ced380eb26a520baaacfc70de39ce" +checksum = "355f93763ef7b0ae1c43c4d8eccc9d5848d84ad1a1d8ce61c421d1ac85a19d05" dependencies = [ "proc-macro2", "quote", @@ -2475,9 +2474,9 @@ checksum = "673464e1e314dd67a0fd9544abc99e8eb28d0c7e3b69b033bcff9b2d00b87333" [[package]] name = "futures" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" +checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549" dependencies = [ "futures-channel", "futures-core", @@ -2512,9 +2511,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ "futures-core", "futures-sink", @@ -2522,15 +2521,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" +checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83" dependencies = [ "futures-core", "futures-task", @@ -2539,9 +2538,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" [[package]] name = "futures-lite" @@ -2560,9 +2559,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" dependencies = [ "proc-macro2", "quote", @@ -2571,15 +2570,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "futures-task" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" [[package]] name = "futures-timer" @@ -2589,9 +2588,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ "futures-channel", "futures-core", @@ -2787,9 +2786,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +checksum = "5be7b54589b581f624f566bf5d8eb2bab1db736c51528720b6bd36b96b55924d" dependencies = [ "bytes", "fnv", @@ -3004,9 +3003,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.24" +version = "0.14.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c" +checksum = "cc5e554ff619822309ffd57d8734d77cd5ce6238bc956f037ea06c58238c9899" dependencies = [ "bytes", "futures-channel", @@ -3132,9 +3131,9 @@ dependencies = [ [[package]] name = "indextree" -version = "4.5.0" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497f036ac2fae75c34224648a77802e5dd4e9cfb56f4713ab6b12b7160a0523b" +checksum = "c40411d0e5c63ef1323c3d09ce5ec6d84d71531e18daed0743fccea279d7deb6" [[package]] name = "indicatif" @@ -3189,9 +3188,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "io-lifetimes" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +checksum = "cfa919a82ea574332e2de6e74b4c36e74d41982b335080fa59d4ef31be20fdf3" dependencies = [ "libc", "windows-sys 0.45.0", @@ -3259,9 +3258,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "jni" @@ -3285,9 +3284,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "jobserver" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" dependencies = [ "libc", ] @@ -3423,9 +3422,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.139" +version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" [[package]] name = "libflate" @@ -3590,9 +3589,9 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e3c98b41d46214f4ae435a95e246710ad7fb1100754f809dd7c18606a7607c4" +checksum = "c846a15d407458f1ac5da7da965810277229be9c96ed8082a3eaf2787ef81c23" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3606,12 +3605,13 @@ dependencies = [ "madsim-macros", "naive-timer", "rand 0.8.5", + "rand_xoshiro", "rustversion", "serde", - "spin 0.9.5", + "spin 0.9.6", "tokio", "tokio-util", - "toml 0.7.2", + "toml 0.7.3", "tracing", "tracing-subscriber", ] @@ -3629,15 +3629,15 @@ dependencies = [ "bytes", "http", "madsim", - "spin 0.9.5", + "spin 0.9.6", "tracing", ] [[package]] name = "madsim-etcd-client" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb918383c4f5966f29760ec48820e1c2846739e4ae411c2a8aaa4466ce1421b7" +checksum = "72b3894525ac4b7d5732b2123f9d29d018005c96a218e5a7c38d1f42601b927d" dependencies = [ "etcd-client", "futures-util", @@ -3645,10 +3645,10 @@ dependencies = [ "madsim", "serde", "serde_with 2.3.1", - "spin 0.9.5", + "spin 0.9.6", "thiserror", "tokio", - "toml 0.7.2", + "toml 0.7.3", "tonic", "tracing", ] @@ -3659,7 +3659,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" dependencies = [ - "darling 0.14.3", + "darling 0.14.4", "proc-macro2", "quote", "syn", @@ -3682,7 +3682,7 @@ dependencies = [ "serde_derive", "serde_json", "slab", - "spin 0.9.5", + "spin 0.9.6", "thiserror", "tokio", "tracing", @@ -3700,9 +3700,9 @@ dependencies = [ [[package]] name = "madsim-tonic" -version = "0.2.14" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420ca55ac297f5a3555cb03fdb085e7e91b1287dd872751a6b30dd3c3573277c" +checksum = "0a0d4e7468777e5885b6c3b88a97e3dd81547e0f3304324126c1a07ae89be470" dependencies = [ "async-stream", "chrono", @@ -3806,9 +3806,9 @@ dependencies = [ [[package]] name = "memoffset" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" dependencies = [ "autocfg", ] @@ -4018,15 +4018,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nom8" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae01545c9c7fc4486ab7debaf2aad7003ac19431791868fb2e8066df97fad2f8" -dependencies = [ - "memchr", -] - [[package]] name = "ntapi" version = "0.4.0" @@ -4186,7 +4177,7 @@ version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" dependencies = [ - "proc-macro-crate 1.3.0", + "proc-macro-crate 1.3.1", "proc-macro2", "quote", "syn", @@ -4250,9 +4241,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.30.1" +version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b63493afc5a8fb8cafbfa87118ffd0aea076935a7ae2375da9e03c1f49aaf3" +checksum = "b3f51ae9f36eabb51da3a10c491feb406ecf2c65faf1edb0b580f9a1acb0f1e8" dependencies = [ "anyhow", "async-compat", @@ -4283,9 +4274,9 @@ dependencies = [ [[package]] name = "openidconnect" -version = "2.5.0" +version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a0f47b0f1499d08c4a8480c963d49c5ec77f4249c2b6869780979415f45809" +checksum = "98dd5b7049bac4fdd2233b8c9767d42c05da8006fdb79cc903258556d2b18009" dependencies = [ "base64 0.13.1", "chrono", @@ -4342,9 +4333,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.25.0+1.1.1t" +version = "111.25.1+1.1.1t" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3173cd3626c43e3854b1b727422a276e568d9ec5fe8cec197822cf52cfb743d6" +checksum = "1ef9a9cc6ea7d9d5e7c4a913dc4b48d0e359eddf01af1dfec96ba7064b4aba10" dependencies = [ "cc", ] @@ -4606,9 +4597,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" [[package]] name = "path-absolutize" @@ -4850,16 +4841,18 @@ dependencies = [ [[package]] name = "polling" -version = "2.5.2" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6" +checksum = "7e1f879b2998099c2d69ab9605d145d5b661195627eccc680002c4918a7fb6fa" dependencies = [ "autocfg", + "bitflags", "cfg-if", + "concurrent-queue", "libc", "log", - "wepoll-ffi", - "windows-sys 0.42.0", + "pin-project-lite", + "windows-sys 0.45.0", ] [[package]] @@ -4990,9 +4983,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e3215779627f01ee256d2fad52f3d95e8e1c11e9fc6fd08f7cd455d5d5c78" +checksum = "4ebcd279d20a4a0a2404a33056388e950504d891c855c7975b9a8fef75f3bf04" dependencies = [ "proc-macro2", "syn", @@ -5009,12 +5002,12 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66618389e4ec1c7afe67d51a9bf34ff9236480f8d51e7489b7d5ab0303c13f34" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit 0.18.1", + "toml_edit", ] [[package]] @@ -5421,6 +5414,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "random-string" version = "1.0.0" @@ -5432,18 +5434,18 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.6.1" +version = "10.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c307f7aacdbab3f0adee67d52739a1d71112cc068d6fab169ddeb18e48877fad" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" dependencies = [ "bitflags", ] [[package]] name = "rayon" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" dependencies = [ "either", "rayon-core", @@ -5451,9 +5453,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.10.2" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -5883,6 +5885,7 @@ dependencies = [ "prost 0.11.8", "rand 0.8.5", "regex", + "reqwest", "risingwave_pb", "rust_decimal", "ryu", @@ -5898,6 +5901,7 @@ dependencies = [ "tracing", "twox-hash", "url", + "uuid", "workspace-hack", ] @@ -5962,6 +5966,7 @@ dependencies = [ name = "risingwave_compactor" version = "0.2.0-alpha" dependencies = [ + "anyhow", "async-trait", "clap 4.1.8", "madsim-tokio", @@ -5975,6 +5980,8 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "risingwave_storage", + "serde", + "serde_json", "tracing", "workspace-hack", ] @@ -6011,12 +6018,14 @@ dependencies = [ "risingwave_storage", "risingwave_stream", "risingwave_tracing", + "serde", "serde_json", "tempfile", "tikv-jemalloc-ctl", "tokio-stream", "tower", "tracing", + "uuid", "workspace-hack", ] @@ -6361,7 +6370,7 @@ dependencies = [ "prometheus", "random-string", "risingwave_common", - "spin 0.9.5", + "spin 0.9.6", "tempfile", "thiserror", "tracing", @@ -6635,7 +6644,7 @@ dependencies = [ "risingwave_tracing", "scopeguard", "sled", - "spin 0.9.5", + "spin 0.9.6", "sync-point", "sysinfo", "tempfile", @@ -6774,9 +6783,9 @@ checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" [[package]] name = "rsa" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b3896c9b7790b70a9aa314a30e4ae114200992a19c96cbe0ca6070edd32ab8" +checksum = "55a77d189da1fee555ad95b7e50e7457d91c0e089ec68ca69ad2989413bbdab4" dependencies = [ "byteorder", "digest", @@ -6840,9 +6849,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.8" +version = "0.36.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644" +checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc" dependencies = [ "bitflags", "errno", @@ -6887,15 +6896,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" [[package]] name = "ryu" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" [[package]] name = "same-file" @@ -6930,9 +6939,9 @@ dependencies = [ [[package]] name = "scheduled-thread-pool" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" dependencies = [ "parking_lot 0.12.1", ] @@ -6951,9 +6960,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "scratch" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" [[package]] name = "sct" @@ -6996,18 +7005,18 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" dependencies = [ "serde", ] [[package]] name = "serde" -version = "1.0.152" +version = "1.0.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8" dependencies = [ "serde_derive", ] @@ -7024,9 +7033,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630" dependencies = [ "proc-macro2", "quote", @@ -7035,9 +7044,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" +checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" dependencies = [ "itoa", "ryu", @@ -7046,9 +7055,9 @@ dependencies = [ [[package]] name = "serde_path_to_error" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" +checksum = "db0969fff533976baadd92e08b1d102c5a3d8a8049eadfd69d4d1e3c5b2ed189" dependencies = [ "serde", ] @@ -7138,7 +7147,7 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cbcd6104f8a4ab6af7f6be2a0da6be86b9de3c401f6e86bb856ab2af739232f" dependencies = [ - "darling 0.14.3", + "darling 0.14.4", "proc-macro2", "quote", "syn", @@ -7146,9 +7155,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.17" +version = "0.9.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567" +checksum = "f82e6c8c047aa50a7328632d067bcae6ef38772a79e28daf32f735e0e4f3dd10" dependencies = [ "indexmap", "itoa", @@ -7393,9 +7402,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "socket2" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" dependencies = [ "libc", "winapi", @@ -7419,9 +7428,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "spin" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc" +checksum = "b5d6e0250b93c8427a177b849d144a96d5acc57006149479403d7861ab721e34" dependencies = [ "lock_api", ] @@ -7584,7 +7593,7 @@ version = "0.1.0" dependencies = [ "futures-util", "madsim-tokio", - "spin 0.9.5", + "spin 0.9.6", "thiserror", ] @@ -7652,18 +7661,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" dependencies = [ "proc-macro2", "quote", @@ -7830,9 +7839,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", "bytes", @@ -7846,7 +7855,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -7991,22 +8000,16 @@ dependencies = [ [[package]] name = "toml" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7afcae9e3f0fe2c370fd4657108972cbb2fa9db1b9f84849cefd80741b01cb6" +checksum = "b403acf6f2bb0859c93c7f0d967cb4a75a7ac552100f9322faf64dc047669b21" dependencies = [ "serde", "serde_spanned", - "toml_datetime 0.6.1", - "toml_edit 0.19.4", + "toml_datetime", + "toml_edit", ] -[[package]] -name = "toml_datetime" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4553f467ac8e3d374bc9a177a26801e5d0f9b211aa1673fb137a403afd1c9cf5" - [[package]] name = "toml_datetime" version = "0.6.1" @@ -8018,25 +8021,14 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.18.1" +version = "0.19.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c59d8dd7d0dcbc6428bf7aa2f0e823e26e43b3c9aca15bbc9475d23e5fa12b" -dependencies = [ - "indexmap", - "nom8", - "toml_datetime 0.5.1", -] - -[[package]] -name = "toml_edit" -version = "0.19.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1eb0622d28f4b9c90adc4ea4b2b46b47663fde9ac5fafcb14a1369d5508825" +checksum = "08de71aa0d6e348f070457f85af8bd566e2bc452156a423ddf22861b3a953fae" dependencies = [ "indexmap", "serde", "serde_spanned", - "toml_datetime 0.6.1", + "toml_datetime", "winnow", ] @@ -8129,7 +8121,6 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower", "tower-layer", "tower-service", ] @@ -8330,15 +8321,15 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" +checksum = "524b68aca1d05e03fdf03fcdce2c6c94b6daf6d16861ddaa7e4f2b6638a9052c" [[package]] name = "unicode-ident" -version = "1.0.6" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" [[package]] name = "unicode-normalization" @@ -8363,9 +8354,9 @@ checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" [[package]] name = "unsafe-libyaml" -version = "0.2.5" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" +checksum = "ad2024452afd3874bf539695e04af6732ba06517424dbf958fdb16a01f3bef6c" [[package]] name = "untrusted" @@ -8617,15 +8608,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "wepoll-ffi" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" -dependencies = [ - "cc", -] - [[package]] name = "which" version = "4.4.0" @@ -8703,9 +8685,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -8718,51 +8700,51 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_i686_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_x86_64_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "winnow" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf09497b8f8b5ac5d3bb4d05c0a99be20f26fd3d5f2db7b0716e946d5103658" +checksum = "ee7b2c67f962bf5042bfd8b6a916178df33a26eec343ae064cb8e069f638fa6f" dependencies = [ "memchr", ] @@ -8898,7 +8880,6 @@ dependencies = [ "tonic", "tonic-build", "tower", - "tower-http", "tracing", "tracing-core", "tracing-futures", diff --git a/Makefile.toml b/Makefile.toml index 3fd35d45138f5..011f29c089224 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -22,6 +22,8 @@ env_scripts = [ ''' #!@duckscript +set_env ENABLE_TELEMETRY "false" + is_sanitizer_enabled = get_env ENABLE_SANITIZER is_all_in_one_enabled = get_env ENABLE_ALL_IN_ONE is_hdfs_backend = get_env ENABLE_HDFS @@ -164,7 +166,7 @@ script = ''' #!/usr/bin/env bash set -e -if [[ -z "$1" ]]; then +if [[ -z "$1" ]]; then echo "Please pass a parameter to this script, defining which logs you want to follow" echo "Available logs are..." ls ${PREFIX_LOG} @@ -176,7 +178,7 @@ if [[ ! -f ${PREFIX_LOG}/$1 ]]; then echo "Available logs are..." ls ${PREFIX_LOG} exit 1 -fi +fi tail -f -n 5 ${PREFIX_LOG}/$1 ''' diff --git a/ci/scripts/common.env.sh b/ci/scripts/common.env.sh index 31e95d47737c9..dbaded683b351 100644 --- a/ci/scripts/common.env.sh +++ b/ci/scripts/common.env.sh @@ -3,6 +3,8 @@ export PROTOC_NO_VENDOR=true export CARGO_HOME=/risingwave/.cargo export RISINGWAVE_CI=true export RUST_BACKTRACE=1 +export ENABLE_TELEMETRY=false + if [ -n "${BUILDKITE_COMMIT:-}" ]; then export GIT_SHA=$BUILDKITE_COMMIT fi diff --git a/dashboard/proto/gen/meta.ts b/dashboard/proto/gen/meta.ts index fda7e6edeb8c0..22a8a13cc0f6e 100644 --- a/dashboard/proto/gen/meta.ts +++ b/dashboard/proto/gen/meta.ts @@ -71,6 +71,13 @@ export function subscribeTypeToJSON(object: SubscribeType): string { } } +export interface GetTelemetryInfoRequest { +} + +export interface TelemetryInfoResponse { + trackingId?: string | undefined; +} + export interface HeartbeatRequest { nodeId: number; /** Lightweight info piggybacked by heartbeat request. */ @@ -576,6 +583,7 @@ export interface SystemParams { dataDirectory?: string | undefined; backupStorageUrl?: string | undefined; backupStorageDirectory?: string | undefined; + telemetryEnabled?: boolean | undefined; } export interface GetSystemParamsRequest { @@ -594,6 +602,48 @@ export interface SetSystemParamRequest { export interface SetSystemParamResponse { } +function createBaseGetTelemetryInfoRequest(): GetTelemetryInfoRequest { + return {}; +} + +export const GetTelemetryInfoRequest = { + fromJSON(_: any): GetTelemetryInfoRequest { + return {}; + }, + + toJSON(_: GetTelemetryInfoRequest): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): GetTelemetryInfoRequest { + const message = createBaseGetTelemetryInfoRequest(); + return message; + }, +}; + +function createBaseTelemetryInfoResponse(): TelemetryInfoResponse { + return { trackingId: undefined }; +} + +export const TelemetryInfoResponse = { + fromJSON(object: any): TelemetryInfoResponse { + return { trackingId: isSet(object.trackingId) ? String(object.trackingId) : undefined }; + }, + + toJSON(message: TelemetryInfoResponse): unknown { + const obj: any = {}; + message.trackingId !== undefined && (obj.trackingId = message.trackingId); + return obj; + }, + + fromPartial, I>>(object: I): TelemetryInfoResponse { + const message = createBaseTelemetryInfoResponse(); + message.trackingId = object.trackingId ?? undefined; + return message; + }, +}; + function createBaseHeartbeatRequest(): HeartbeatRequest { return { nodeId: 0, info: [] }; } @@ -2506,6 +2556,7 @@ function createBaseSystemParams(): SystemParams { dataDirectory: undefined, backupStorageUrl: undefined, backupStorageDirectory: undefined, + telemetryEnabled: undefined, }; } @@ -2521,6 +2572,7 @@ export const SystemParams = { dataDirectory: isSet(object.dataDirectory) ? String(object.dataDirectory) : undefined, backupStorageUrl: isSet(object.backupStorageUrl) ? String(object.backupStorageUrl) : undefined, backupStorageDirectory: isSet(object.backupStorageDirectory) ? String(object.backupStorageDirectory) : undefined, + telemetryEnabled: isSet(object.telemetryEnabled) ? Boolean(object.telemetryEnabled) : undefined, }; }, @@ -2535,6 +2587,7 @@ export const SystemParams = { message.dataDirectory !== undefined && (obj.dataDirectory = message.dataDirectory); message.backupStorageUrl !== undefined && (obj.backupStorageUrl = message.backupStorageUrl); message.backupStorageDirectory !== undefined && (obj.backupStorageDirectory = message.backupStorageDirectory); + message.telemetryEnabled !== undefined && (obj.telemetryEnabled = message.telemetryEnabled); return obj; }, @@ -2549,6 +2602,7 @@ export const SystemParams = { message.dataDirectory = object.dataDirectory ?? undefined; message.backupStorageUrl = object.backupStorageUrl ?? undefined; message.backupStorageDirectory = object.backupStorageDirectory ?? undefined; + message.telemetryEnabled = object.telemetryEnabled ?? undefined; return message; }, }; diff --git a/integration_tests/scripts/run_demos.py b/integration_tests/scripts/run_demos.py index 330ebee1065b7..cd171baf9d3b2 100644 --- a/integration_tests/scripts/run_demos.py +++ b/integration_tests/scripts/run_demos.py @@ -25,8 +25,8 @@ def run_demo(demo: str, format: str): demo_dir = os.path.join(project_dir, demo) print("Running demo: {}".format(demo)) - subprocess.run(["docker", "compose", "up", "-d"], - cwd=demo_dir, check=True) + subprocess.run(["docker", "compose", "up", "-d", "-e", + "ENABLE_TELEMETRY=false"], cwd=demo_dir, check=True) sleep(40) sql_files = ['create_source.sql', 'create_mv.sql', 'query.sql'] @@ -50,8 +50,7 @@ def run_iceberg_demo(): demo_dir = os.path.join(project_dir, demo) print("Running demo: iceberg-sink") - subprocess.run(["docker", "compose", "up", "-d"], - cwd=demo_dir, check=True) + subprocess.run(["docker", "compose", "up", "-d", "-e", "ENABLE_TELEMETRY=false"], cwd=demo_dir, check=True) sleep(40) subprocess.run(["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "create-table"], diff --git a/proto/meta.proto b/proto/meta.proto index c30fd81d83f45..aa7bece1845bc 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -13,6 +13,17 @@ import "user.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; +message GetTelemetryInfoRequest {} + +message TelemetryInfoResponse { + optional string tracking_id = 1; +} + +service TelemetryInfoService { + // Request telemetry info from meta node + rpc GetTelemetryInfo(GetTelemetryInfoRequest) returns (TelemetryInfoResponse); +} + message HeartbeatRequest { message ExtraInfo { oneof info { @@ -344,6 +355,7 @@ message SystemParams { optional string data_directory = 7; optional string backup_storage_url = 8; optional string backup_storage_directory = 9; + optional bool telemetry_enabled = 10; } message GetSystemParamsRequest {} diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 0bc938808559d..df19a127b985d 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -56,6 +56,7 @@ prometheus = { version = "0.13" } prost = "0.11" rand = "0.8" regex = "1" +reqwest = { version = "0.11", features = ["json"] } risingwave_pb = { path = "../prost" } rust_decimal = { version = "1", features = ["db-tokio-postgres"] } ryu = "1.0" @@ -78,6 +79,7 @@ toml = "0.5" tonic = { version = "0.2", package = "madsim-tonic" } tracing = "0.1" url = "2" +uuid = "1.2.2" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 5b173fffd4a6b..5ec40114e262b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -222,6 +222,9 @@ pub struct ServerConfig { /// >0 = open metrics pub metrics_level: u32, + #[serde(default = "default::server::telemetry_enabled")] + pub telemetry_enabled: bool, + #[serde(flatten)] pub unrecognized: HashMap, } @@ -498,6 +501,9 @@ pub struct SystemConfig { /// Remote directory for storing snapshots. #[serde(default = "default::system::backup_storage_directory")] pub backup_storage_directory: String, + + #[serde(default = "default::system::telemetry_enabled")] + pub telemetry_enabled: bool, } impl Default for SystemConfig { @@ -518,6 +524,7 @@ impl SystemConfig { data_directory: Some(self.data_directory), backup_storage_url: Some(self.backup_storage_url), backup_storage_directory: Some(self.backup_storage_directory), + telemetry_enabled: Some(self.telemetry_enabled), } } } @@ -588,6 +595,10 @@ mod default { pub fn metrics_level() -> u32 { 0 } + + pub fn telemetry_enabled() -> bool { + true + } } pub mod storage { @@ -776,5 +787,9 @@ mod default { pub fn backup_storage_directory() -> String { system_param::default::backup_storage_directory() } + + pub fn telemetry_enabled() -> bool { + system_param::default::telemetry_enabled() + } } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index cce75fade92c8..6e68b6240cc3b 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -54,6 +54,8 @@ pub mod monitor; pub mod row; pub mod session_config; pub mod system_param; +pub mod telemetry; + #[cfg(test)] pub mod test_utils; pub mod types; diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 712391fa6d811..f4409db479c8d 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -42,6 +42,7 @@ macro_rules! for_all_undeprecated_params { { data_directory, String, "hummock_001".to_string() }, { backup_storage_url, String, "memory".to_string() }, { backup_storage_directory, String, "backup".to_string() }, + { telemetry_enabled, bool, true}, $({ $field, $type, $default },)* } }; @@ -291,6 +292,10 @@ impl ValidateOnSet for OverrideValidateOnSet { // TODO Ok(()) } + + fn telemetry_enabled(_: &bool) -> Result<()> { + Ok(()) + } } for_all_undeprecated_params!(impl_default_from_other_params); @@ -315,6 +320,7 @@ mod tests { (DATA_DIRECTORY_KEY, "a"), (BACKUP_STORAGE_URL_KEY, "a"), (BACKUP_STORAGE_DIRECTORY_KEY, "a"), + (TELEMETRY_ENABLED_KEY, "false"), ]; // To kv - missing field. diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index bf6de6b672c81..931b0684d1b11 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -76,6 +76,10 @@ impl SystemParamsReader { self.prost.backup_storage_directory.as_ref().unwrap() } + pub fn telemetry_enabled(&self) -> bool { + self.prost.telemetry_enabled.unwrap() + } + pub fn to_kv(&self) -> Vec<(String, String)> { system_params_to_kv(&self.prost).unwrap() } diff --git a/src/common/src/telemetry/manager.rs b/src/common/src/telemetry/manager.rs new file mode 100644 index 0000000000000..cb341b295b1b3 --- /dev/null +++ b/src/common/src/telemetry/manager.rs @@ -0,0 +1,163 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use parking_lot::RwLock; +use tokio::select; +use tokio::sync::oneshot::{self, Sender}; +use tokio::sync::watch::Receiver; +use tokio::task::JoinHandle; + +use super::report::{start_telemetry_reporting, TelemetryInfoFetcher, TelemetryReportCreator}; +use crate::system_param::local_manager::SystemParamsReaderRef; +use crate::telemetry::telemetry_env_enabled; + +pub struct TelemetryManager +where + F: TelemetryReportCreator + Send + Sync + 'static, + I: TelemetryInfoFetcher + Send + Sync + 'static, +{ + core: Arc>>, + sys_params_change_rx: Receiver, +} + +impl TelemetryManager +where + F: TelemetryReportCreator + Send + Sync + 'static, + I: TelemetryInfoFetcher + Send + Sync + 'static, +{ + pub fn new( + sys_params_change_rx: Receiver, + info_fetcher: Arc, + report_creator: Arc, + ) -> Self { + Self { + core: Arc::new(RwLock::new(TelemetryManagerCore::new( + info_fetcher, + report_creator, + ))), + sys_params_change_rx, + } + } + + pub fn start_telemetry_reporting(&self) { + self.core.write().start(); + } + + pub fn watch_params_change(mut self) -> (JoinHandle<()>, Sender<()>) { + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let handle = tokio::spawn(async move { + loop { + select! { + Ok(_) = self.sys_params_change_rx.changed() => { + let telemetry_enabled = { + let params = self.sys_params_change_rx.borrow().load(); + // check both environment variable and system params + // if either is false, then stop telemetry + params.telemetry_enabled() && telemetry_env_enabled() + }; + + let telemetry_running = { + let core = self.core.read(); + core.telemetry_running() + }; + + match (telemetry_running, telemetry_enabled) { + (false, true) => { + tracing::info!("telemetry config changed to true, start reporting"); + self.core.write().start(); + } + (true, false) => { + tracing::info!("telemetry config changed to false, stop reporting"); + self.core.write().stop(); + } + _ => {} + }; + } + , + _ = &mut shutdown_rx =>{ + tracing::info!("Telemetry exit"); + return; + } + } + } + }); + (handle, shutdown_tx) + } +} + +struct TelemetryManagerCore +where + F: TelemetryReportCreator + Send + Sync + 'static, + I: TelemetryInfoFetcher + Send + Sync + 'static, +{ + telemetry_handle: Option>, + telemetry_shutdown_tx: Option>, + telemetry_running: Arc, + info_fetcher: Arc, + report_creator: Arc, +} + +impl TelemetryManagerCore +where + F: TelemetryReportCreator + Send + Sync + 'static, + I: TelemetryInfoFetcher + Send + Sync + 'static, +{ + fn new(info_fetcher: Arc, report_creator: Arc) -> Self { + Self { + telemetry_handle: None, + telemetry_shutdown_tx: None, + telemetry_running: Arc::new(AtomicBool::new(false)), + info_fetcher, + report_creator, + } + } + + fn telemetry_running(&self) -> bool { + self.telemetry_running.load(Ordering::Relaxed) + } + + fn start(&mut self) { + if self.telemetry_running() { + return; + } + + let (handle, tx) = + start_telemetry_reporting(self.info_fetcher.clone(), self.report_creator.clone()); + self.telemetry_handle = Some(handle); + self.telemetry_shutdown_tx = Some(tx); + self.telemetry_running.store(true, Ordering::Relaxed); + } + + fn stop(&mut self) { + match ( + self.telemetry_running.load(Ordering::Relaxed), + self.telemetry_shutdown_tx.take(), + self.telemetry_handle.take(), + ) { + (true, Some(shutdown_rx), Some(_)) => { + if let Err(()) = shutdown_rx.send(()) { + tracing::error!("telemetry mgr failed to send stop signal"); + } else { + self.telemetry_running.store(false, Ordering::Relaxed) + } + } + // do nothing if telemetry is not running + (false, None, None) => {} + _ => unreachable!("impossible telemetry handler"), + } + } +} diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs new file mode 100644 index 0000000000000..469f81df9ca7c --- /dev/null +++ b/src/common/src/telemetry/mod.rs @@ -0,0 +1,226 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod manager; +pub mod report; + +use std::time::SystemTime; + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use sysinfo::{System, SystemExt}; + +use crate::util::resource_util::cpu::total_cpu_available; +use crate::util::resource_util::memory::{total_memory_available_bytes, total_memory_used_bytes}; + +/// Url of telemetry backend +pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v1/report"; + +/// Telemetry reporting interval in seconds, 6 hours +pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60; + +/// Environment Variable that is default to be true +const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY"; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum TelemetryNodeType { + Meta, + Compute, + Frontend, + Compactor, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TelemetryReportBase { + /// tracking_id is persistent in etcd + pub tracking_id: String, + /// session_id is reset every time node restarts + pub session_id: String, + /// system_data is hardware and os info + pub system_data: SystemData, + /// up_time is how long the node has been running + pub up_time: u64, + /// time_stamp is when the report is created + pub time_stamp: u64, + /// node_type is the node that creates the report + pub node_type: TelemetryNodeType, +} + +pub trait TelemetryReport { + fn to_json(&self) -> Result; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SystemData { + memory: Memory, + os: Os, + cpu: Cpu, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Memory { + used: usize, + available: usize, + total: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Os { + name: String, + kernel_version: String, + version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Cpu { + // total number of cpu available as a float + available: f32, +} + +impl SystemData { + pub fn new() -> Self { + let mut sys = System::new(); + + let memory = { + let available = total_memory_available_bytes(); + let used = total_memory_used_bytes(); + Memory { + available, + used, + total: available + used, + } + }; + + let os = { + sys.refresh_system(); + Os { + name: sys.name().unwrap_or_default(), + kernel_version: sys.kernel_version().unwrap_or_default(), + version: sys.os_version().unwrap_or_default(), + } + }; + + let cpu = Cpu { + available: total_cpu_available(), + }; + + SystemData { memory, os, cpu } + } +} + +impl Default for SystemData { + fn default() -> Self { + Self::new() + } +} + +/// post a telemetry reporting request +pub async fn post_telemetry_report(url: &str, report_body: String) -> Result<(), anyhow::Error> { + let client = reqwest::Client::new(); + let res = client + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(report_body) + .send() + .await?; + if res.status().is_success() { + Ok(()) + } else { + Err(anyhow!( + "invalid telemetry resp, url {}, status {}", + url, + res.status() + )) + } +} + +/// check whether telemetry is enabled in environment variable +pub fn telemetry_env_enabled() -> bool { + // default to be true + get_bool_env(TELEMETRY_ENV_ENABLE).unwrap_or(true) +} + +pub fn get_bool_env(key: &str) -> Result { + let b = std::env::var(key) + .unwrap_or("true".to_string()) + .trim() + .to_ascii_lowercase() + .parse()?; + Ok(b) +} + +pub fn current_timestamp() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock might go backward") + .as_secs() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_system_data_new() { + let system_data = SystemData::new(); + + assert!(system_data.memory.available > 0); + assert!(system_data.memory.used > 0); + assert!(system_data.memory.total > 0); + assert!(!system_data.os.name.is_empty()); + assert!(!system_data.os.kernel_version.is_empty()); + assert!(!system_data.os.version.is_empty()); + assert!(system_data.cpu.available > 0.0); + } + + #[test] + fn test_get_bool_env_true() { + let key = "MY_ENV_VARIABLE_TRUE"; + std::env::set_var(key, "true"); + let result = get_bool_env(key).unwrap(); + assert!(result); + } + + #[test] + fn test_get_bool_env_false() { + let key = "MY_ENV_VARIABLE_FALSE"; + std::env::set_var(key, "false"); + let result = get_bool_env(key).unwrap(); + assert!(!result); + } + + #[test] + fn test_get_bool_env_default() { + let key = "MY_ENV_VARIABLE_NOT_SET"; + std::env::remove_var(key); + let result = get_bool_env(key).unwrap(); + assert!(result); + } + + #[test] + fn test_get_bool_env_case_insensitive() { + let key = "MY_ENV_VARIABLE_MIXED_CASE"; + std::env::set_var(key, "tRue"); + let result = get_bool_env(key).unwrap(); + assert!(result); + } + + #[test] + fn test_get_bool_env_invalid() { + let key = "MY_ENV_VARIABLE_INVALID"; + std::env::set_var(key, "not_a_bool"); + let result = get_bool_env(key); + assert!(result.is_err()); + } +} diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs new file mode 100644 index 0000000000000..886019807cc2a --- /dev/null +++ b/src/common/src/telemetry/report.rs @@ -0,0 +1,112 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::oneshot::Sender; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration}; +use uuid::Uuid; + +use super::{ + post_telemetry_report, TelemetryReport, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL, +}; + +#[async_trait::async_trait] +pub trait TelemetryInfoFetcher { + async fn fetch_telemetry_info(&self) -> Result; +} + +pub trait TelemetryReportCreator { + // inject dependencies to impl structs if more metrics needed + fn create_report( + &self, + tracking_id: String, + session_id: String, + up_time: u64, + ) -> Result; + + fn report_type(&self) -> &str; +} + +pub fn start_telemetry_reporting( + info_fetcher: Arc, + report_creator: Arc, +) -> (JoinHandle<()>, Sender<()>) +where + F: TelemetryReportCreator + Send + Sync + 'static, + I: TelemetryInfoFetcher + Send + Sync + 'static, +{ + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + + let join_handle = tokio::spawn(async move { + tracing::info!("start telemetry reporting"); + + let begin_time = std::time::Instant::now(); + let session_id = Uuid::new_v4().to_string(); + let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // fetch telemetry tracking_id from the meta node only at the beginning + // There is only one case tracking_id updated at the runtime ---- etcd data has been + // cleaned. There is no way that etcd has been cleaned but nodes are still running + let tracking_id = match info_fetcher.fetch_telemetry_info().await { + Ok(resp) => resp, + Err(err) => { + tracing::error!("Telemetry failed to get tracking_id, err {}", err); + return; + } + }; + + loop { + tokio::select! { + _ = interval.tick() => {}, + _ = &mut shutdown_rx => { + tracing::info!("Telemetry exit"); + return; + } + } + + // create a report and serialize to json + let report_json = match report_creator + .create_report( + tracking_id.clone(), + session_id.clone(), + begin_time.elapsed().as_secs(), + ) + .map(|r| r.to_json()) + { + Ok(Ok(report_json)) => report_json, + Ok(Err(e)) => { + tracing::error!("Telemetry failed to serialize report to json, {}", e); + continue; + } + Err(e) => { + tracing::error!("Telemetry failed to create report {}", e); + continue; + } + }; + + let url = + (TELEMETRY_REPORT_URL.to_owned() + "/" + report_creator.report_type()).to_owned(); + + match post_telemetry_report(&url, report_json).await { + Ok(_) => tracing::info!("Telemetry post success, id {}", tracking_id), + Err(e) => tracing::error!("Telemetry post error, {}", e), + } + } + }); + (join_handle, shutdown_tx) +} diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 985f7a58f1175..74338843457f2 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -39,6 +39,7 @@ risingwave_source = { path = "../source" } risingwave_storage = { path = "../storage" } risingwave_stream = { path = "../stream" } risingwave_tracing = { path = "../tracing" } +serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", @@ -53,6 +54,7 @@ tokio-stream = "0.1" tonic = { version = "0.2", package = "madsim-tonic" } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" +uuid = "1.2.2" [target.'cfg(target_os = "linux")'.dependencies] tikv-jemalloc-ctl = "0.5" diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index cd1d399097652..108f3dcbc2017 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -28,6 +28,7 @@ pub mod memory_management; pub mod observer; pub mod rpc; pub mod server; +pub mod telemetry; use clap::Parser; use risingwave_common::config::AsyncStackTraceOption; diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 2be6c1dbf14ab..a527e0629870f 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -26,6 +26,8 @@ use risingwave_common::config::{ }; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +use risingwave_common::telemetry::manager::TelemetryManager; +use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_service::metrics_manager::MetricsManager; @@ -71,6 +73,7 @@ use crate::rpc::service::monitor_service::{ AwaitTreeMiddlewareLayer, AwaitTreeRegistryRef, MonitorServiceImpl, }; use crate::rpc::service::stream_service::StreamServiceImpl; +use crate::telemetry::ComputeTelemetryCreator; use crate::ComputeNodeOpts; /// Bootstraps the compute-node. @@ -215,10 +218,10 @@ pub async fn compute_node_serve( )); monitor_cache(memory_collector, ®istry).unwrap(); let backup_reader = storage.backup_reader(); - let system_params_manager = system_params_manager.clone(); + let system_params_mgr = system_params_manager.clone(); tokio::spawn(async move { backup_reader - .watch_config_change(system_params_manager.watch_params()) + .watch_config_change(system_params_mgr.watch_params()) .await; }); } @@ -268,6 +271,8 @@ pub async fn compute_node_serve( // of lru manager. stream_mgr.set_watermark_epoch(watermark_epoch).await; + let telemetry_enabled = system_params.telemetry_enabled(); + let grpc_await_tree_reg = await_tree_config .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); let dml_mgr = Arc::new(DmlManager::default()); @@ -310,7 +315,7 @@ pub async fn compute_node_serve( worker_id, state_store, dml_mgr, - system_params_manager, + system_params_manager.clone(), source_metrics, ); @@ -332,6 +337,25 @@ pub async fn compute_node_serve( let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr); let health_srv = HealthServiceImpl::new(); + let telemetry_manager = TelemetryManager::new( + system_params_manager.watch_params(), + Arc::new(meta_client.clone()), + Arc::new(ComputeTelemetryCreator::new()), + ); + + // if the toml config file or env variable disables telemetry, do not watch system params change + // because if any of configs disable telemetry, we should never start it + if config.server.telemetry_enabled && telemetry_env_enabled() { + // if all configs are true, start reporting + if telemetry_enabled { + telemetry_manager.start_telemetry_reporting(); + } + // if config and env are true, starting watching + sub_tasks.push(telemetry_manager.watch_params_change()); + } else { + tracing::info!("Telemetry didn't start due to config"); + } + let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel::<()>(); let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/compute/src/telemetry.rs b/src/compute/src/telemetry.rs new file mode 100644 index 0000000000000..aab7af87fc906 --- /dev/null +++ b/src/compute/src/telemetry.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use risingwave_common::telemetry::report::TelemetryReportCreator; +use risingwave_common::telemetry::{ + current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy)] +pub(crate) struct ComputeTelemetryCreator {} + +impl ComputeTelemetryCreator { + pub(crate) fn new() -> Self { + Self {} + } +} + +impl TelemetryReportCreator for ComputeTelemetryCreator { + fn create_report( + &self, + tracking_id: String, + session_id: String, + up_time: u64, + ) -> Result { + Ok(ComputeTelemetryReport::new( + tracking_id, + session_id, + up_time, + )) + } + + fn report_type(&self) -> &str { + "compute" + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ComputeTelemetryReport { + #[serde(flatten)] + base: TelemetryReportBase, +} + +impl TelemetryReport for ComputeTelemetryReport { + fn to_json(&self) -> Result { + let json = serde_json::to_string(self)?; + Ok(json) + } +} + +impl ComputeTelemetryReport { + pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self { + Self { + base: TelemetryReportBase { + tracking_id, + session_id, + up_time, + system_data: SystemData::new(), + time_stamp: current_timestamp(), + node_type: TelemetryNodeType::Compute, + }, + } + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 3e521d7b147be..091ea516888e7 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -61,6 +61,8 @@ mod user; pub mod health_service; mod monitor; +mod telemetry; + use std::ffi::OsString; use std::iter; use std::sync::Arc; diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 8a8357b46235e..e00d8f9b0b69a 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -35,6 +35,8 @@ use risingwave_common::error::{Result, RwError}; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::session_config::ConfigMap; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +use risingwave_common::telemetry::manager::TelemetryManager; +use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::stream_cancel::{stream_tripwire, Trigger, Tripwire}; @@ -73,6 +75,7 @@ use crate::scheduler::SchedulerError::QueryCancelError; use crate::scheduler::{ DistributedQueryMetrics, HummockSnapshotManager, HummockSnapshotManagerRef, QueryManager, }; +use crate::telemetry::FrontendTelemetryCreator; use crate::user::user_authentication::md5_hash_with_salt; use crate::user::user_manager::UserInfoManager; use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl}; @@ -157,9 +160,7 @@ impl FrontendEnv { } } - pub async fn init( - opts: FrontendOpts, - ) -> Result<(Self, JoinHandle<()>, JoinHandle<()>, Sender<()>)> { + pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec>, Vec>)> { let config = load_config(&opts.config_path, Some(opts.override_opts)); info!("Starting frontend node"); info!("> config: {:?}", config); @@ -197,6 +198,8 @@ impl FrontendEnv { Duration::from_secs(config.server.max_heartbeat_interval_secs as u64), vec![], ); + let mut join_handles = vec![heartbeat_join_handle]; + let mut shutdown_senders = vec![heartbeat_shutdown_sender]; let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0); let catalog = Arc::new(RwLock::new(Catalog::default())); @@ -233,7 +236,10 @@ impl FrontendEnv { user_info_updated_rx, )); - let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader)); + let telemetry_enabled = system_params_reader.telemetry_enabled(); + + let system_params_manager = + Arc::new(LocalSystemParamsManager::new(system_params_reader.clone())); let frontend_observer_node = FrontendObserverNode::new( worker_node_manager.clone(), catalog, @@ -241,12 +247,13 @@ impl FrontendEnv { user_info_manager, user_info_updated_tx, hummock_snapshot_manager.clone(), - system_params_manager, + system_params_manager.clone(), ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) .await; let observer_join_handle = observer_manager.start().await; + join_handles.push(observer_join_handle); meta_client.activate(&frontend_address).await?; @@ -261,6 +268,28 @@ impl FrontendEnv { let health_srv = HealthServiceImpl::new(); let host = opts.health_check_listener_addr.clone(); + + let telemetry_manager = TelemetryManager::new( + system_params_manager.watch_params(), + Arc::new(meta_client.clone()), + Arc::new(FrontendTelemetryCreator::new()), + ); + + // if the toml config file or env variable disables telemetry, do not watch system params + // change because if any of configs disable telemetry, we should never start it + if config.server.telemetry_enabled && telemetry_env_enabled() { + if telemetry_enabled { + telemetry_manager.start_telemetry_reporting(); + } + let (telemetry_join_handle, telemetry_shutdown_sender) = + telemetry_manager.watch_params_change(); + + join_handles.push(telemetry_join_handle); + shutdown_senders.push(telemetry_shutdown_sender); + } else { + tracing::info!("Telemetry didn't start due to config"); + } + tokio::spawn(async move { tonic::transport::Server::builder() .add_service(HealthServer::new(health_srv)) @@ -294,9 +323,8 @@ impl FrontendEnv { source_metrics, creating_streaming_job_tracker, }, - observer_join_handle, - heartbeat_join_handle, - heartbeat_shutdown_sender, + join_handles, + shutdown_senders, )) } @@ -545,9 +573,8 @@ impl SessionImpl { pub struct SessionManagerImpl { env: FrontendEnv, - _observer_join_handle: JoinHandle<()>, - _heartbeat_join_handle: JoinHandle<()>, - _heartbeat_shutdown_sender: Sender<()>, + _join_handles: Vec>, + _shutdown_senders: Vec>, number: AtomicI32, } @@ -669,13 +696,11 @@ impl SessionManager for SessionManagerImpl { impl SessionManagerImpl { pub async fn new(opts: FrontendOpts) -> Result { - let (env, join_handle, heartbeat_join_handle, heartbeat_shutdown_sender) = - FrontendEnv::init(opts).await?; + let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?; Ok(Self { env, - _observer_join_handle: join_handle, - _heartbeat_join_handle: heartbeat_join_handle, - _heartbeat_shutdown_sender: heartbeat_shutdown_sender, + _join_handles: join_handles, + _shutdown_senders: shutdown_senders, number: AtomicI32::new(0), }) } diff --git a/src/frontend/src/telemetry.rs b/src/frontend/src/telemetry.rs new file mode 100644 index 0000000000000..b69747df523fe --- /dev/null +++ b/src/frontend/src/telemetry.rs @@ -0,0 +1,91 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::telemetry::report::TelemetryReportCreator; +use risingwave_common::telemetry::{ + current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy)] +pub(crate) struct FrontendTelemetryCreator {} + +impl FrontendTelemetryCreator { + pub(crate) fn new() -> Self { + Self {} + } +} + +impl TelemetryReportCreator for FrontendTelemetryCreator { + fn create_report( + &self, + tracking_id: String, + session_id: String, + up_time: u64, + ) -> anyhow::Result { + Ok(FrontendTelemetryReport::new( + tracking_id, + session_id, + up_time, + )) + } + + fn report_type(&self) -> &str { + "frontend" + } +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct FrontendTelemetryReport { + #[serde(flatten)] + base: TelemetryReportBase, +} + +impl TelemetryReport for FrontendTelemetryReport { + fn to_json(&self) -> anyhow::Result { + let json = serde_json::to_string(self)?; + Ok(json) + } +} + +impl FrontendTelemetryReport { + pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self { + Self { + base: TelemetryReportBase { + tracking_id, + session_id, + system_data: SystemData::new(), + up_time, + time_stamp: current_timestamp(), + node_type: TelemetryNodeType::Frontend, + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let report = + FrontendTelemetryReport::new("tracking_id".to_owned(), "session_id".to_owned(), 0); + + assert_eq!(report.base.tracking_id, "tracking_id"); + assert_eq!(report.base.session_id, "session_id"); + assert_eq!(report.base.up_time, 0); + assert_eq!(report.base.node_type, TelemetryNodeType::Frontend); + } +} diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 723d3442da828..24a880f3c6dcf 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -98,7 +98,6 @@ impl MetaSnapshotBuilder { let system_param = SystemParams::get_at_snapshot::(&meta_store_snapshot) .await? .ok_or_else(|| anyhow!("system params not found in meta store"))?; - self.snapshot.metadata = ClusterMetadata { default_cf, hummock_version, diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index ee9efc1b02feb..1a039bfae3865 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -266,6 +266,7 @@ mod tests { data_directory: Some("data_directory".to_string()), backup_storage_url: Some("backup_storage_url".to_string()), backup_storage_directory: Some("backup_storage_directory".to_string()), + telemetry_enabled: Some(false), } } diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index f33ab0d32cac0..9da871e794057 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -46,6 +46,7 @@ mod model; mod rpc; pub mod storage; mod stream; +pub(crate) mod telemetry; use std::time::Duration; @@ -242,6 +243,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { periodic_space_reclaim_compaction_interval_sec: config .meta .periodic_space_reclaim_compaction_interval_sec, + telemetry_enabled: config.server.telemetry_enabled, periodic_ttl_reclaim_compaction_interval_sec: config .meta .periodic_ttl_reclaim_compaction_interval_sec, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 4cbbe2bdf855c..868aaeef1a53b 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -100,6 +100,8 @@ pub struct MetaOpts { /// Schedule space_reclaim_compaction for all compaction groups with this interval. pub periodic_space_reclaim_compaction_interval_sec: u64, + /// telemetry enabled in config file or not + pub telemetry_enabled: bool, /// Schedule ttl_reclaim_compaction for all compaction groups with this interval. pub periodic_ttl_reclaim_compaction_interval_sec: u64, @@ -126,6 +128,7 @@ impl MetaOpts { security_group_id: None, connector_rpc_endpoint: None, periodic_space_reclaim_compaction_interval_sec: 60, + telemetry_enabled: false, periodic_ttl_reclaim_compaction_interval_sec: 60, max_compactor_task_multiplier: 2, } diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 1cfd07ddc27f5..2b216b8f4906d 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -18,7 +18,11 @@ use std::time::Duration; use either::Either; use etcd_client::ConnectOptions; +use risingwave_common::config::MetaBackend; use risingwave_common::monitor::process_linux::monitor_process; +use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +use risingwave_common::telemetry::manager::TelemetryManager; +use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; @@ -31,6 +35,7 @@ use risingwave_pb::meta::notification_service_server::NotificationServiceServer; use risingwave_pb::meta::scale_service_server::ScaleServiceServer; use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer; use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer; +use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer; use risingwave_pb::meta::SystemParams; use risingwave_pb::user::user_service_server::UserServiceServer; use risingwave_rpc_client::ComputeClientPool; @@ -61,9 +66,11 @@ use crate::rpc::service::hummock_service::HummockServiceImpl; use crate::rpc::service::meta_member_service::MetaMemberServiceImpl; use crate::rpc::service::stream_service::StreamServiceImpl; use crate::rpc::service::system_params_service::SystemParamsServiceImpl; +use crate::rpc::service::telemetry_service::TelemetryInfoServiceImpl; use crate::rpc::service::user_service::UserServiceImpl; use crate::storage::{EtcdMetaStore, MemStore, MetaStore, WrappedEtcdClient as EtcdClient}; use crate::stream::{GlobalStreamManager, SourceManager}; +use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher}; use crate::{hummock, MetaResult}; #[derive(Debug)] @@ -493,6 +500,7 @@ pub async fn start_service_as_election_leader( ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); + let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone()); let system_params_srv = SystemParamsServiceImpl::new(system_params_manager.clone()); if let Some(prometheus_addr) = address_info.prometheus_addr { @@ -545,6 +553,24 @@ pub async fn start_service_as_election_leader( }); sub_tasks.push((stream_abort_handler, abort_sender)); + let local_system_params_manager = LocalSystemParamsManager::new(system_params_reader.clone()); + + let mgr = TelemetryManager::new( + local_system_params_manager.watch_params(), + Arc::new(MetaTelemetryInfoFetcher::new(meta_store.clone())), + Arc::new(MetaReportCreator::new()), + ); + + // May start telemetry reporting + if let MetaBackend::Etcd = meta_store.meta_store_type() && env.opts.telemetry_enabled && telemetry_env_enabled(){ + if system_params_reader.telemetry_enabled(){ + mgr.start_telemetry_reporting(); + } + sub_tasks.push(mgr.watch_params_change()); + } else { + tracing::info!("Telemetry didn't start due to meta backend or config"); + } + let shutdown_all = async move { for (join_handle, shutdown_sender) in sub_tasks { if let Err(_err) = shutdown_sender.send(()) { @@ -578,6 +604,7 @@ pub async fn start_service_as_election_leader( .add_service(HealthServer::new(health_srv)) .add_service(BackupServiceServer::new(backup_srv)) .add_service(SystemParamsServiceServer::new(system_params_srv)) + .add_service(TelemetryInfoServiceServer::new(telemetry_srv)) .serve_with_shutdown(address_info.listen_addr, async move { tokio::select! { res = svc_shutdown_rx.changed() => { diff --git a/src/meta/src/rpc/service/mod.rs b/src/meta/src/rpc/service/mod.rs index dd775588405d3..33a9f20d9e7f8 100644 --- a/src/meta/src/rpc/service/mod.rs +++ b/src/meta/src/rpc/service/mod.rs @@ -23,6 +23,7 @@ pub mod notification_service; pub mod scale_service; pub mod stream_service; pub mod system_params_service; +pub mod telemetry_service; pub mod user_service; use std::pin::Pin; diff --git a/src/meta/src/rpc/service/telemetry_service.rs b/src/meta/src/rpc/service/telemetry_service.rs new file mode 100644 index 0000000000000..c91ea532952dc --- /dev/null +++ b/src/meta/src/rpc/service/telemetry_service.rs @@ -0,0 +1,63 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use risingwave_common::config::MetaBackend; +use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService; +use risingwave_pb::meta::{GetTelemetryInfoRequest, TelemetryInfoResponse}; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +use crate::storage::MetaStore; +use crate::telemetry::{TELEMETRY_CF, TELEMETRY_KEY}; + +pub struct TelemetryInfoServiceImpl { + meta_store: Arc, +} + +impl TelemetryInfoServiceImpl { + pub fn new(meta_store: Arc) -> Self { + Self { meta_store } + } + + async fn get_tracking_id(&self) -> Option { + match self.meta_store.meta_store_type() { + MetaBackend::Etcd => match self.meta_store.get_cf(TELEMETRY_CF, TELEMETRY_KEY).await { + Ok(id) => Uuid::from_slice_le(&id) + .map_err(|e| anyhow!("failed to parse uuid, {}", e)) + .ok() + .map(|uuid| uuid.to_string()), + Err(_) => None, + }, + MetaBackend::Mem => None, + } + } +} + +#[async_trait::async_trait] +impl TelemetryInfoService for TelemetryInfoServiceImpl { + async fn get_telemetry_info( + &self, + _request: Request, + ) -> Result, Status> { + match self.get_tracking_id().await { + Some(tracking_id) => Ok(Response::new(TelemetryInfoResponse { + tracking_id: Some(tracking_id), + })), + None => Ok(Response::new(TelemetryInfoResponse { tracking_id: None })), + } + } +} diff --git a/src/meta/src/storage/etcd_meta_store.rs b/src/meta/src/storage/etcd_meta_store.rs index 1fd3f11d94bbb..ec0543942c9b4 100644 --- a/src/meta/src/storage/etcd_meta_store.rs +++ b/src/meta/src/storage/etcd_meta_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use etcd_client::{Compare, CompareOp, Error as EtcdError, GetOptions, Txn, TxnOp}; use futures::Future; use itertools::Itertools; +use risingwave_common::config::MetaBackend; use tokio::sync::Mutex; use super::{Key, MetaStore, MetaStoreError, MetaStoreResult, Snapshot, Transaction, Value}; @@ -187,6 +188,10 @@ impl EtcdMetaStore { impl MetaStore for EtcdMetaStore { type Snapshot = EtcdSnapshot; + fn meta_store_type(&self) -> MetaBackend { + MetaBackend::Etcd + } + async fn snapshot(&self) -> Self::Snapshot { EtcdSnapshot { client: self.client.clone(), diff --git a/src/meta/src/storage/mem_meta_store.rs b/src/meta/src/storage/mem_meta_store.rs index ab08d6a9c8df9..7b5df1536e896 100644 --- a/src/meta/src/storage/mem_meta_store.rs +++ b/src/meta/src/storage/mem_meta_store.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use async_trait::async_trait; +use risingwave_common::config::MetaBackend; use tokio::sync::{OwnedRwLockReadGuard, RwLock}; use super::{ @@ -78,6 +79,10 @@ impl MemStore { impl MetaStore for MemStore { type Snapshot = MemSnapshot; + fn meta_store_type(&self) -> MetaBackend { + MetaBackend::Mem + } + async fn snapshot(&self) -> Self::Snapshot { let guard = self.inner.clone().read_owned().await; MemSnapshot(guard) diff --git a/src/meta/src/storage/meta_store.rs b/src/meta/src/storage/meta_store.rs index 0cc5315fd826e..8a9e4d4e88600 100644 --- a/src/meta/src/storage/meta_store.rs +++ b/src/meta/src/storage/meta_store.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use risingwave_common::config::MetaBackend; use thiserror::Error; use crate::storage::transaction::Transaction; @@ -44,6 +45,8 @@ pub trait MetaStore: Clone + Sync + Send + 'static { async fn get_cf(&self, cf: &str, key: &[u8]) -> MetaStoreResult> { self.snapshot().await.get_cf(cf, key).await } + + fn meta_store_type(&self) -> MetaBackend; } // Error of metastore diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs new file mode 100644 index 0000000000000..a2b69ad4cf844 --- /dev/null +++ b/src/meta/src/telemetry.rs @@ -0,0 +1,160 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator}; +use risingwave_common::telemetry::{ + current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::storage::MetaStore; + +/// Column in meta store +pub const TELEMETRY_CF: &str = "cf/telemetry"; +/// `telemetry` in bytes +pub const TELEMETRY_KEY: &[u8] = &[74, 65, 0x6c, 65, 0x6d, 65, 74, 72, 79]; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct MetaTelemetryReport { + #[serde(flatten)] + base: TelemetryReportBase, +} + +impl MetaTelemetryReport { + pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self { + Self { + base: TelemetryReportBase { + tracking_id, + session_id, + system_data: SystemData::new(), + up_time, + time_stamp: current_timestamp(), + node_type: TelemetryNodeType::Meta, + }, + } + } +} + +impl TelemetryReport for MetaTelemetryReport { + fn to_json(&self) -> anyhow::Result { + let json = serde_json::to_string(self)?; + Ok(json) + } +} + +pub(crate) struct MetaTelemetryInfoFetcher { + meta_store: Arc, +} + +impl MetaTelemetryInfoFetcher { + pub(crate) fn new(meta_store: Arc) -> Self { + Self { meta_store } + } +} + +#[async_trait::async_trait] +impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher { + async fn fetch_telemetry_info(&self) -> anyhow::Result { + let tracking_id = get_or_create_tracking_id(self.meta_store.clone()).await?; + + Ok(tracking_id) + } +} + +/// fetch or create a `tracking_id` from etcd +async fn get_or_create_tracking_id( + meta_store: Arc, +) -> Result { + match meta_store.get_cf(TELEMETRY_CF, TELEMETRY_KEY).await { + Ok(bytes) => String::from_utf8(bytes).map_err(|e| anyhow!("failed to parse uuid, {}", e)), + Err(_) => { + let uuid = Uuid::new_v4().to_string(); + // put new uuid in meta store + match meta_store + .put_cf( + TELEMETRY_CF, + TELEMETRY_KEY.to_vec(), + uuid.clone().into_bytes(), + ) + .await + { + Err(e) => Err(anyhow!("failed to create uuid, {}", e)), + Ok(_) => Ok(uuid), + } + } + } +} + +#[derive(Copy, Clone)] +pub(crate) struct MetaReportCreator {} + +impl MetaReportCreator { + pub(crate) fn new() -> Self { + Self {} + } +} + +impl TelemetryReportCreator for MetaReportCreator { + fn create_report( + &self, + tracking_id: String, + session_id: String, + up_time: u64, + ) -> anyhow::Result { + Ok(MetaTelemetryReport::new(tracking_id, session_id, up_time)) + } + + fn report_type(&self) -> &str { + "meta" + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::storage::MemStore; + + #[tokio::test] + async fn test_get_or_create_tracking_id_existing_id() { + let meta_store = Arc::new(MemStore::new()); + let uuid = Uuid::new_v4().to_string(); + meta_store + .put_cf( + TELEMETRY_CF, + TELEMETRY_KEY.to_vec(), + uuid.clone().into_bytes(), + ) + .await + .unwrap(); + let result = get_or_create_tracking_id(Arc::clone(&meta_store)) + .await + .unwrap(); + assert_eq!(result, uuid); + } + + #[tokio::test] + async fn test_get_or_create_tracking_id_new_id() { + let meta_store = Arc::new(MemStore::new()); + let result = get_or_create_tracking_id(Arc::clone(&meta_store)) + .await + .unwrap(); + assert!(String::from_utf8(result.into_bytes()).is_ok()); + } +} diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 72aa123d47be0..6d2fa541d8054 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,6 +27,7 @@ use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; @@ -58,6 +59,7 @@ use risingwave_pb::meta::reschedule_request::PbReschedule; use risingwave_pb::meta::scale_service_client::ScaleServiceClient; use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient; use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; +use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient; use risingwave_pb::meta::*; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; @@ -799,6 +801,12 @@ impl MetaClient { Ok(resp.manifest.expect("should exist")) } + pub async fn get_telemetry_info(&self) -> Result { + let req = GetTelemetryInfoRequest {}; + let resp = self.inner.get_telemetry_info(req).await?; + Ok(resp) + } + pub async fn get_system_params(&self) -> Result { let req = GetSystemParamsRequest {}; let resp = self.inner.get_system_params(req).await?; @@ -993,6 +1001,17 @@ impl HummockMetaClient for MetaClient { } } +#[async_trait] +impl TelemetryInfoFetcher for MetaClient { + async fn fetch_telemetry_info(&self) -> anyhow::Result { + let resp = self.get_telemetry_info().await?; + let tracking_id = resp + .get_tracking_id() + .map_err(|e| anyhow::format_err!("failed to get tracking_id {:?}", e))?; + Ok(tracking_id.to_string()) + } +} + #[derive(Debug, Clone)] struct GrpcMetaClientCore { cluster_client: ClusterServiceClient, @@ -1005,6 +1024,7 @@ struct GrpcMetaClientCore { user_client: UserServiceClient, scale_client: ScaleServiceClient, backup_client: BackupServiceClient, + telemetry_client: TelemetryInfoServiceClient, system_params_client: SystemParamsServiceClient, } @@ -1020,7 +1040,9 @@ impl GrpcMetaClientCore { let user_client = UserServiceClient::new(channel.clone()); let scale_client = ScaleServiceClient::new(channel.clone()); let backup_client = BackupServiceClient::new(channel.clone()); + let telemetry_client = TelemetryInfoServiceClient::new(channel.clone()); let system_params_client = SystemParamsServiceClient::new(channel); + GrpcMetaClientCore { cluster_client, meta_member_client, @@ -1032,6 +1054,7 @@ impl GrpcMetaClientCore { user_client, scale_client, backup_client, + telemetry_client, system_params_client, } } @@ -1325,7 +1348,6 @@ impl GrpcMetaClient { ))) }) .await?; - Ok(channel) } @@ -1424,6 +1446,7 @@ macro_rules! for_all_meta_rpc { ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse } ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse} ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse} + ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse} ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse } ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse } } diff --git a/src/storage/compactor/Cargo.toml b/src/storage/compactor/Cargo.toml index e52bdb1c275c6..d66648e8f316a 100644 --- a/src/storage/compactor/Cargo.toml +++ b/src/storage/compactor/Cargo.toml @@ -15,6 +15,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +anyhow = "1" async-trait = "0.1" clap = { version = "4", features = ["derive"] } prometheus = { version = "0.13" } @@ -26,6 +27,8 @@ risingwave_object_store = { path = "../../object_store" } risingwave_pb = { path = "../../prost" } risingwave_rpc_client = { path = "../../rpc_client" } risingwave_storage = { path = "../../storage" } +serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", "rt", diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index df5984254799f..8d812b8845594 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -15,6 +15,7 @@ mod compactor_observer; mod rpc; mod server; +mod telemetry; use clap::Parser; use risingwave_common_proc_macro::OverrideConfig; diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index a1296a55999d0..f86bdfe99c43d 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -19,6 +19,8 @@ use std::time::Duration; use risingwave_common::config::load_config; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +use risingwave_common::telemetry::manager::TelemetryManager; +use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_service::metrics_manager::MetricsManager; @@ -44,6 +46,7 @@ use tracing::info; use super::compactor_observer::observer_manager::CompactorObserverNode; use crate::rpc::CompactorServiceImpl; +use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; /// Fetches and runs compaction tasks. @@ -108,10 +111,14 @@ pub async fn compactor_serve( storage_opts.meta_cache_capacity_mb * (1 << 20), )); + let telemetry_enabled = system_params_reader.telemetry_enabled(); + let filter_key_extractor_manager = Arc::new(FilterKeyExtractorManager::default()); let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader)); - let compactor_observer_node = - CompactorObserverNode::new(filter_key_extractor_manager.clone(), system_params_manager); + let compactor_observer_node = CompactorObserverNode::new( + filter_key_extractor_manager.clone(), + system_params_manager.clone(), + ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await; @@ -149,7 +156,7 @@ pub async fn compactor_serve( max_concurrent_task_number, })), }); - let sub_tasks = vec![ + let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( meta_client.clone(), Duration::from_millis(config.server.heartbeat_interval_ms as u64), @@ -162,6 +169,22 @@ pub async fn compactor_serve( ), ]; + let telemetry_manager = TelemetryManager::new( + system_params_manager.watch_params(), + Arc::new(meta_client.clone()), + Arc::new(CompactorTelemetryCreator::new()), + ); + // if the toml config file or env variable disables telemetry, do not watch system params change + // because if any of configs disable telemetry, we should never start it + if config.server.telemetry_enabled && telemetry_env_enabled() { + if telemetry_enabled { + telemetry_manager.start_telemetry_reporting(); + } + sub_tasks.push(telemetry_manager.watch_params_change()); + } else { + tracing::info!("Telemetry didn't start due to config"); + } + let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/storage/compactor/src/telemetry.rs b/src/storage/compactor/src/telemetry.rs new file mode 100644 index 0000000000000..a2014959c61de --- /dev/null +++ b/src/storage/compactor/src/telemetry.rs @@ -0,0 +1,75 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::telemetry::report::TelemetryReportCreator; +use risingwave_common::telemetry::{ + current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy)] +pub(crate) struct CompactorTelemetryCreator {} + +impl CompactorTelemetryCreator { + pub(crate) fn new() -> Self { + Self {} + } +} + +impl TelemetryReportCreator for CompactorTelemetryCreator { + fn create_report( + &self, + tracking_id: String, + session_id: String, + up_time: u64, + ) -> anyhow::Result { + Ok(CompactorTelemetryReport::new( + tracking_id, + session_id, + up_time, + )) + } + + fn report_type(&self) -> &str { + "compactor" + } +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct CompactorTelemetryReport { + #[serde(flatten)] + base: TelemetryReportBase, +} + +impl TelemetryReport for CompactorTelemetryReport { + fn to_json(&self) -> anyhow::Result { + let json = serde_json::to_string(self)?; + Ok(json) + } +} + +impl CompactorTelemetryReport { + pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self { + Self { + base: TelemetryReportBase { + tracking_id, + session_id, + system_data: SystemData::new(), + up_time, + time_stamp: current_timestamp(), + node_type: TelemetryNodeType::Compactor, + }, + } + } +} diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 5c82906fbe736..1425899a42869 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -98,7 +98,6 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "load-shed", "retry", "timeout", "util"] } -tower-http = { version = "0.3", features = ["add-extension", "cors", "map-response-body", "util"] } tracing = { version = "0.1", features = ["log", "release_max_level_trace"] } tracing-core = { version = "0.1" } tracing-futures = { version = "0.2" } @@ -195,7 +194,6 @@ tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tonic-build = { version = "0.8" } tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "load-shed", "retry", "timeout", "util"] } -tower-http = { version = "0.3", features = ["add-extension", "cors", "map-response-body", "util"] } tracing = { version = "0.1", features = ["log", "release_max_level_trace"] } tracing-core = { version = "0.1" } tracing-futures = { version = "0.2" }