From 9e4449ba571855c548f1599c12b0560d22bfbf7b Mon Sep 17 00:00:00 2001 From: "m.nabokikh" Date: Mon, 10 Apr 2023 20:21:11 +0200 Subject: [PATCH] feat(kubernetes_logs): use kube-apiserver cache for list requests Signed-off-by: m.nabokikh --- Cargo.lock | 94 ++++++++++--------- Cargo.toml | 4 +- src/sources/kubernetes_logs/mod.rs | 27 +++++- .../sources/base/kubernetes_logs.cue | 10 ++ 4 files changed, 84 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7c1c451700d1..1750ee01c75c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1333,6 +1333,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" + [[package]] name = "base64" version = "0.21.0" @@ -4319,12 +4325,13 @@ checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" [[package]] name = "json-patch" -version = "0.2.6" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f995a3c8f2bc3dd52a18a583e90f9ec109c047fa1603a853e46bcda14d2e279d" +checksum = "1f54898088ccb91df1b492cc80029a6fdf1c48ca0db7c6822a8babad69c94658" dependencies = [ "serde", "serde_json", + "thiserror", "treediff", ] @@ -4360,7 +4367,7 @@ dependencies = [ "env_logger 0.10.0", "futures 0.3.28", "indoc", - "k8s-openapi", + "k8s-openapi 0.16.0", "k8s-test-framework", "rand 0.8.5", "regex", @@ -4379,6 +4386,20 @@ dependencies = [ "base64 0.13.1", "bytes 1.4.0", "chrono", + "serde", + "serde-value", + "serde_json", +] + +[[package]] +name = "k8s-openapi" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd990069640f9db34b3b0f7a1afc62a05ffaa3be9b66aa3c313f58346df7f788" +dependencies = [ + "base64 0.21.0", + "bytes 1.4.0", + "chrono", "http", "percent-encoding", "serde", @@ -4391,7 +4412,7 @@ dependencies = [ name = "k8s-test-framework" version = "0.1.0" dependencies = [ - "k8s-openapi", + "k8s-openapi 0.16.0", "log", "serde_json", "tempfile", @@ -4438,11 +4459,11 @@ dependencies = [ [[package]] name = "kube" -version = "0.75.0" +version = "0.82.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb19108692aeafebb108fd0a1c381c06ac4c03859652599420975165e939b8a" +checksum = "ca82ee1786dc8770d1ad4e319003e3d68cd86bc1204ed9e40f591ffef8e6492c" dependencies = [ - "k8s-openapi", + "k8s-openapi 0.18.0", "kube-client", "kube-core", "kube-runtime", @@ -4450,11 +4471,11 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.75.0" +version = "0.82.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97e1a80ecd1b1438a2fc004549e155d47250b9e01fbfcf4cfbe9c8b56a085593" +checksum = "90b1d8deb705ef2463b2ce142b0ff98c815f8f0ac393d13c8f4c2b26491daf66" dependencies = [ - "base64 0.13.1", + "base64 0.20.0", "bytes 1.4.0", "chrono", "dirs-next", @@ -4463,10 +4484,10 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-openssl", "hyper-timeout", - "hyper-tls", "jsonpath_lib", - "k8s-openapi", + "k8s-openapi 0.18.0", "kube-core", "openssl", "pem", @@ -4474,27 +4495,26 @@ dependencies = [ "secrecy", "serde", "serde_json", - "serde_yaml 0.8.26", + "serde_yaml 0.9.19", "thiserror", "tokio", - "tokio-native-tls", "tokio-util", "tower", - "tower-http 0.3.5", + "tower-http", "tracing 0.1.37", ] [[package]] name = "kube-core" -version = "0.75.0" +version = "0.82.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4d780f2bb048eeef64a4c6b2582d26a0fe19e30b4d3cc9e081616e1779c5d47" +checksum = "b16c1653fd0bda69a6bdb363167edbb72d28817db340d2fe8cb89dc07d354e05" dependencies = [ "chrono", "form_urlencoded", "http", "json-patch", - "k8s-openapi", + "k8s-openapi 0.18.0", "once_cell", "serde", "serde_json", @@ -4503,16 +4523,17 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.75.0" +version = "0.82.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7769af142ee2e46bfa44bd393cf7f40b9d8b80d2e11f6317399551ed17760beb" +checksum = "ed8442b2f1d6c1d630677ade9e5d5ebe793dec099a75fb582d56d77b8eb8cee8" dependencies = [ "ahash 0.8.2", + "async-trait", "backoff", "derivative", "futures 0.3.28", "json-patch", - "k8s-openapi", + "k8s-openapi 0.18.0", "kube-client", "parking_lot", "pin-project", @@ -8744,26 +8765,6 @@ dependencies = [ "tracing 0.1.37", ] -[[package]] -name = "tower-http" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" -dependencies = [ - "base64 0.13.1", - "bitflags", - "bytes 1.4.0", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing 0.1.37", -] - [[package]] name = "tower-http" version = "0.4.0" @@ -8771,6 +8772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" dependencies = [ "async-compression", + "base64 0.20.0", "bitflags", "bytes 1.4.0", "futures-core", @@ -8778,11 +8780,13 @@ dependencies = [ "http", "http-body", "http-range-header", + "mime", "pin-project-lite", "tokio", "tokio-util", "tower-layer", "tower-service", + "tracing 0.1.37", ] [[package]] @@ -8961,9 +8965,9 @@ dependencies = [ [[package]] name = "treediff" -version = "3.0.2" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "761e8d5ad7ce14bb82b7e61ccc0ca961005a275a060b9644a2431aa11553c2ff" +checksum = "52984d277bdf2a751072b5df30ec0377febdb02f7696d64c2d7d54630bac4303" dependencies = [ "serde_json", ] @@ -9455,7 +9459,7 @@ dependencies = [ "infer 0.13.0", "inventory", "itertools", - "k8s-openapi", + "k8s-openapi 0.18.0", "kube", "lapin", "libc", @@ -9536,7 +9540,7 @@ dependencies = [ "tonic 0.9.1", "tonic-build", "tower", - "tower-http 0.4.0", + "tower-http", "tower-test", "tracing 0.1.37", "tracing-core 0.1.30", diff --git a/Cargo.toml b/Cargo.toml index 68188e5c49928..39b3610cbfef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -271,8 +271,8 @@ indexmap = { version = "~1.9.3", default-features = false, features = ["serde"] infer = { version = "0.13.0", default-features = false, optional = true} indoc = { version = "2.0.1", default-features = false } inventory = { version = "0.3.5", default-features = false } -k8s-openapi = { version = "0.16.0", default-features = false, features = ["api", "v1_19"], optional = true } -kube = { version = "0.75.0", default-features = false, features = ["client", "native-tls", "runtime"], optional = true } +k8s-openapi = { version = "0.18.0", default-features = false, features = ["api", "v1_26"], optional = true } +kube = { version = "0.82.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true } listenfd = { version = "1.0.1", default-features = false, optional = true } logfmt = { version = "0.0.2", default-features = false, optional = true } lru = { version = "0.10.0", default-features = false, optional = true } diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 69f833c99d177..db8b8c31b8fb5 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -19,7 +19,7 @@ use futures_util::Stream; use k8s_openapi::api::core::v1::{Namespace, Node, Pod}; use k8s_paths_provider::K8sPathsProvider; use kube::{ - api::{Api, ListParams}, + api::Api, config::{self, KubeConfigOptions}, runtime::{ reflector::{self}, @@ -219,6 +219,12 @@ pub struct Config { #[configurable(metadata(docs::examples = "/path/to/.kube/config"))] kube_config_file: Option, + /// Replaces `MostRecent` list requests semantic with [resource version][resource_version] `Any`, + /// which makes vector using cached resources from kube-apiserver instead of an etcd quorum read. + /// + /// [resource_version]: https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + use_apiserver_cache: bool, + /// How long to delay removing metadata entries from the cache when a pod deletion event /// event is received from the watch stream. /// @@ -271,6 +277,7 @@ impl Default for Config { ingestion_timestamp_field: None, timezone: None, kube_config_file: None, + use_apiserver_cache: false, delay_deletion_ms: default_delay_deletion_ms(), log_namespace: None, } @@ -519,6 +526,7 @@ struct Source { max_line_bytes: usize, fingerprint_lines: usize, glob_minimum_cooldown: Duration, + use_apiserver_cache: bool, ingestion_timestamp_field: Option, delay_deletion: Duration, } @@ -595,6 +603,7 @@ impl Source { max_line_bytes: config.max_line_bytes, fingerprint_lines: config.fingerprint_lines, glob_minimum_cooldown, + use_apiserver_cache: config.use_apiserver_cache, ingestion_timestamp_field, delay_deletion, }) @@ -625,6 +634,7 @@ impl Source { max_line_bytes, fingerprint_lines, glob_minimum_cooldown, + use_apiserver_cache, ingestion_timestamp_field, delay_deletion, } = self; @@ -633,11 +643,18 @@ impl Source { let pods = Api::::all(client.clone()); + let list_semantic = if use_apiserver_cache { + watcher::ListSemantic::Any + } else { + watcher::ListSemantic::MostRecent + }; + let pod_watcher = watcher( pods, - ListParams { + watcher::Config { field_selector: Some(field_selector), label_selector: Some(label_selector), + list_semantic: list_semantic.clone(), ..Default::default() }, ) @@ -658,8 +675,9 @@ impl Source { let namespaces = Api::::all(client.clone()); let ns_watcher = watcher( namespaces, - ListParams { + watcher::Config { label_selector: Some(namespace_label_selector), + list_semantic: list_semantic.clone(), ..Default::default() }, ) @@ -680,8 +698,9 @@ impl Source { let nodes = Api::::all(client); let node_watcher = watcher( nodes, - ListParams { + watcher::Config { field_selector: Some(node_selector), + list_semantic: list_semantic, ..Default::default() }, ) diff --git a/website/cue/reference/components/sources/base/kubernetes_logs.cue b/website/cue/reference/components/sources/base/kubernetes_logs.cue index 567ae2ec96ef9..ea56bc05ad4b0 100644 --- a/website/cue/reference/components/sources/base/kubernetes_logs.cue +++ b/website/cue/reference/components/sources/base/kubernetes_logs.cue @@ -399,4 +399,14 @@ base: components: sources: kubernetes_logs: configuration: { required: false type: string: examples: ["local", "America/New_York", "EST5EDT"] } + use_apiserver_cache: { + description: """ + Replaces `MostRecent` list requests semantic with [resource version][resource_version] `Any`, + which makes vector using cached resources from kube-apiserver instead of an etcd quorum read. + + [resource_version]: https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + """ + required: false + type: bool: default: false + } }