Skip to content

Commit

Permalink
feat(kubernetes_logs): use kube-apiserver cache for list requests
Browse files Browse the repository at this point in the history
Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>
  • Loading branch information
nabokihms committed Apr 10, 2023
1 parent 887d6d7 commit 9e4449b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 51 deletions.
94 changes: 49 additions & 45 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ indexmap = { version = "~1.9.3", default-features = false, features = ["serde"]
infer = { version = "0.13.0", default-features = false, optional = true}
indoc = { version = "2.0.1", default-features = false }
inventory = { version = "0.3.5", default-features = false }
k8s-openapi = { version = "0.16.0", default-features = false, features = ["api", "v1_19"], optional = true }
kube = { version = "0.75.0", default-features = false, features = ["client", "native-tls", "runtime"], optional = true }
k8s-openapi = { version = "0.18.0", default-features = false, features = ["api", "v1_26"], optional = true }
kube = { version = "0.82.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.1", default-features = false, optional = true }
logfmt = { version = "0.0.2", default-features = false, optional = true }
lru = { version = "0.10.0", default-features = false, optional = true }
Expand Down
27 changes: 23 additions & 4 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures_util::Stream;
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
use k8s_paths_provider::K8sPathsProvider;
use kube::{
api::{Api, ListParams},
api::Api,
config::{self, KubeConfigOptions},
runtime::{
reflector::{self},
Expand Down Expand Up @@ -219,6 +219,12 @@ pub struct Config {
#[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
kube_config_file: Option<PathBuf>,

/// Replaces `MostRecent` list requests semantic with [resource version][resource_version] `Any`,
/// which makes vector using cached resources from kube-apiserver instead of an etcd quorum read.
///
/// [resource_version]: https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
use_apiserver_cache: bool,

/// How long to delay removing metadata entries from the cache when a pod deletion event
/// event is received from the watch stream.
///
Expand Down Expand Up @@ -271,6 +277,7 @@ impl Default for Config {
ingestion_timestamp_field: None,
timezone: None,
kube_config_file: None,
use_apiserver_cache: false,
delay_deletion_ms: default_delay_deletion_ms(),
log_namespace: None,
}
Expand Down Expand Up @@ -519,6 +526,7 @@ struct Source {
max_line_bytes: usize,
fingerprint_lines: usize,
glob_minimum_cooldown: Duration,
use_apiserver_cache: bool,
ingestion_timestamp_field: Option<OwnedTargetPath>,
delay_deletion: Duration,
}
Expand Down Expand Up @@ -595,6 +603,7 @@ impl Source {
max_line_bytes: config.max_line_bytes,
fingerprint_lines: config.fingerprint_lines,
glob_minimum_cooldown,
use_apiserver_cache: config.use_apiserver_cache,
ingestion_timestamp_field,
delay_deletion,
})
Expand Down Expand Up @@ -625,6 +634,7 @@ impl Source {
max_line_bytes,
fingerprint_lines,
glob_minimum_cooldown,
use_apiserver_cache,
ingestion_timestamp_field,
delay_deletion,
} = self;
Expand All @@ -633,11 +643,18 @@ impl Source {

let pods = Api::<Pod>::all(client.clone());

let list_semantic = if use_apiserver_cache {
watcher::ListSemantic::Any
} else {
watcher::ListSemantic::MostRecent
};

let pod_watcher = watcher(
pods,
ListParams {
watcher::Config {
field_selector: Some(field_selector),
label_selector: Some(label_selector),
list_semantic: list_semantic.clone(),
..Default::default()
},
)
Expand All @@ -658,8 +675,9 @@ impl Source {
let namespaces = Api::<Namespace>::all(client.clone());
let ns_watcher = watcher(
namespaces,
ListParams {
watcher::Config {
label_selector: Some(namespace_label_selector),
list_semantic: list_semantic.clone(),
..Default::default()
},
)
Expand All @@ -680,8 +698,9 @@ impl Source {
let nodes = Api::<Node>::all(client);
let node_watcher = watcher(
nodes,
ListParams {
watcher::Config {
field_selector: Some(node_selector),
list_semantic: list_semantic,
..Default::default()
},
)
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/sources/base/kubernetes_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,14 @@ base: components: sources: kubernetes_logs: configuration: {
required: false
type: string: examples: ["local", "America/New_York", "EST5EDT"]
}
use_apiserver_cache: {
description: """
Replaces `MostRecent` list requests semantic with [resource version][resource_version] `Any`,
which makes vector using cached resources from kube-apiserver instead of an etcd quorum read.
[resource_version]: https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
"""
required: false
type: bool: default: false
}
}

0 comments on commit 9e4449b

Please sign in to comment.