Skip to content

Commit

Permalink
Merge branch 'main' into feature/error-boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
nightkr authored Aug 7, 2024
2 parents 5a7cefa + f83043d commit d2142ae
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 81 deletions.
7 changes: 5 additions & 2 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ async fn main() -> anyhow::Result<()> {
let event_stream = watcher(events, conf).default_backoff().applied_objects();
let mut event_stream = pin!(event_stream);

println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE");
#[allow(clippy::print_literal)] // for consistency
{
println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE");
}
while let Some(ev) = event_stream.try_next().await? {
let age = ev.creation_timestamp().map(format_creation).unwrap_or_default();
let reason = ev.reason.unwrap_or_default();
let obj = ev.regarding.map(format_objref).flatten().unwrap_or_default();
let obj = ev.regarding.and_then(format_objref).unwrap_or_default();
let note = ev.note.unwrap_or_default();
println!("{0:<6} {1:<15} {2:<55} {3}", age, reason, obj, note);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/pod_shell_crossterm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn main() -> anyhow::Result<()> {
message = stdin.next() => {
match message {
Some(Ok(message)) => {
input.write(&message).await?;
input.write_all(&message).await?;
}
_ => {
break;
Expand All @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> {
message = output.next() => {
match message {
Some(Ok(message)) => {
stdout.write(&message).await?;
stdout.write_all(&message).await?;
stdout.flush().await?;
},
_ => {
Expand Down
6 changes: 3 additions & 3 deletions examples/request_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn print_table(summaries: Vec<NodeSummary>) {
.iter()
.map(|summary| summary.name.len())
.max()
.unwrap_or_else(|| 0)
.unwrap_or(0)
.max(NAME.len());
max_name_width + 4
};
Expand All @@ -120,7 +120,7 @@ fn print_table(summaries: Vec<NodeSummary>) {
.get("memory")
.map(|mem| {
let mem = mem.0.trim_end_matches("Ki");
mem.parse::<usize>().ok().unwrap_or_else(|| 1)
mem.parse::<usize>().ok().unwrap_or(1)
})
.unwrap_or_else(|| 1);

Expand All @@ -129,7 +129,7 @@ fn print_table(summaries: Vec<NodeSummary>) {
let cpu_total = summary
.allocatable
.get("cpu")
.map(|mem| mem.0.parse::<usize>().ok().unwrap_or_else(|| 1))
.map(|mem| mem.0.parse::<usize>().ok().unwrap_or(1))
.unwrap_or_else(|| 1);

let name = summary.name;
Expand Down
5 changes: 2 additions & 3 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,10 @@ mod test {
{
let owner = pod
.owner_references()
.to_vec()
.into_iter()
.iter()
.find(|r| r.kind == Node::kind(&()))
.ok_or("Not found")?;
let _: Node = client.fetch(&owner).await?;
let _: Node = client.fetch(owner).await?;
}

Ok(())
Expand Down
11 changes: 10 additions & 1 deletion kube-client/src/client/config_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,21 @@ impl ConfigExt for Config {
&self,
connector: H,
) -> Result<hyper_rustls::HttpsConnector<H>> {
use hyper_rustls::FixedServerNameResolver;

use crate::client::tls::rustls_tls;

let rustls_config = self.rustls_client_config()?;
let mut builder = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(rustls_config)
.https_or_http();
if let Some(tsn) = self.tls_server_name.as_ref() {
builder = builder.with_server_name(tsn.clone());
builder = builder.with_server_name_resolver(FixedServerNameResolver::new(
tsn.clone()
.try_into()
.map_err(rustls_tls::Error::InvalidServerName)
.map_err(Error::RustlsTls)?,
));
}
Ok(builder.enable_http1().wrap_connector(connector))
}
Expand Down
8 changes: 6 additions & 2 deletions kube-client/src/client/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod rustls_tls {
use rustls::{
self,
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
pki_types::{CertificateDer, PrivateKeyDer, ServerName},
pki_types::{CertificateDer, InvalidDnsNameError, PrivateKeyDer, ServerName},
ClientConfig, DigitallySignedStruct,
};
use thiserror::Error;
Expand Down Expand Up @@ -38,8 +38,12 @@ pub mod rustls_tls {
AddRootCertificate(#[source] Box<dyn std::error::Error + Send + Sync>),

/// No valid native root CA certificates found
#[error("No valid native root CA certificates found")]
#[error("no valid native root CA certificates found")]
NoValidNativeRootCA(#[source] std::io::Error),

/// Invalid server name
#[error("invalid server name: {0}")]
InvalidServerName(#[source] InvalidDnsNameError),
}

/// Create `rustls::ClientConfig`.
Expand Down
10 changes: 6 additions & 4 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@ mod test {
use tower::ServiceBuilder;

// hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
#[cfg(feature = "when_rustls_works_with_k3d")]
#[tokio::test]
#[allow(dead_code)]
// #[tokio::test]
#[ignore = "needs cluster (lists pods)"]
#[cfg(feature = "rustls-tls")]
async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
use hyper_util::rt::TokioExecutor;

let config = Config::infer().await?;
let https = config.rustls_https_connector()?;
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https));
.service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
let client = Client::new(service, config.default_namespace);
let pods: Api<Pod> = Api::default_namespaced(client);
pods.list(&Default::default()).await?;
Expand All @@ -180,7 +182,7 @@ mod test {

#[tokio::test]
#[ignore = "needs cluster (lists api resources)"]
#[cfg(feature = "discovery")]
#[cfg(feature = "client")]
async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
use crate::{core::DynamicObject, discovery};
let client = Client::try_default().await?;
Expand Down
82 changes: 41 additions & 41 deletions kube-core/src/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,47 @@ impl Request {
}
}

// ----------------------------------------------------------------------------
// Portforward subresource
// ----------------------------------------------------------------------------
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Request {
/// Request to forward ports of a pod
pub fn portforward(&self, name: &str, ports: &[u16]) -> Result<http::Request<Vec<u8>>, Error> {
if ports.is_empty() {
return Err(Error::Validation("ports cannot be empty".into()));
}
if ports.len() > 128 {
return Err(Error::Validation(
"the number of ports cannot be more than 128".into(),
));
}

if ports.len() > 1 {
let mut seen = std::collections::HashSet::with_capacity(ports.len());
for port in ports.iter() {
if seen.contains(port) {
return Err(Error::Validation(format!(
"ports must be unique, found multiple {port}"
)));
}
seen.insert(port);
}
}

let base_url = format!("{}/{}/portforward?", self.url_path, name);
let mut qp = form_urlencoded::Serializer::new(base_url);
qp.append_pair(
"ports",
&ports.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(","),
);

let req = http::Request::get(qp.finish());
req.body(vec![]).map_err(Error::BuildRequest)
}
}

// ----------------------------------------------------------------------------
// tests
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -412,44 +453,3 @@ mod test {
);
}
}

// ----------------------------------------------------------------------------
// Portforward subresource
// ----------------------------------------------------------------------------
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Request {
/// Request to forward ports of a pod
pub fn portforward(&self, name: &str, ports: &[u16]) -> Result<http::Request<Vec<u8>>, Error> {
if ports.is_empty() {
return Err(Error::Validation("ports cannot be empty".into()));
}
if ports.len() > 128 {
return Err(Error::Validation(
"the number of ports cannot be more than 128".into(),
));
}

if ports.len() > 1 {
let mut seen = std::collections::HashSet::with_capacity(ports.len());
for port in ports.iter() {
if seen.contains(port) {
return Err(Error::Validation(format!(
"ports must be unique, found multiple {port}"
)));
}
seen.insert(port);
}
}

let base_url = format!("{}/{}/portforward?", self.url_path, name);
let mut qp = form_urlencoded::Serializer::new(base_url);
qp.append_pair(
"ports",
&ports.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(","),
);

let req = http::Request::get(qp.finish());
req.body(vec![]).map_err(Error::BuildRequest)
}
}
3 changes: 3 additions & 0 deletions kube-derive/src/custom_resource.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Generated by darling macros, out of our control
#![allow(clippy::manual_unwrap_or_default)]

use darling::{FromDeriveInput, FromMeta};
use proc_macro2::{Ident, Literal, Span, TokenStream};
use quote::ToTokens;
Expand Down
4 changes: 2 additions & 2 deletions kube-derive/tests/crd_schema_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![recursion_limit = "256"]

use assert_json_diff::assert_json_eq;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use chrono::{DateTime, Utc};
use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,7 +135,7 @@ fn test_serialized_matches_expected() {
nullable: None,
nullable_skipped_with_default: None,
nullable_with_default: None,
timestamp: TimeZone::from_utc_datetime(&Utc, &NaiveDateTime::from_timestamp_opt(0, 0).unwrap()),
timestamp: DateTime::from_timestamp(0, 0).unwrap(),
complex_enum: ComplexEnum::VariantOne { int: 23 },
untagged_enum_person: UntaggedEnumPerson::GenderAndAge(GenderAndAge {
age: 42,
Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ mod tests {
let mut runner = Box::pin(
// The debounce period needs to zero because a debounce period > 0
// will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx), 0, |_| {
Runner::new(scheduler(sched_rx), 0, |()| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Expand Down Expand Up @@ -234,7 +234,7 @@ mod tests {
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), 0, |msg: &u8| futures::future::ready(*msg));
let mut runner = Runner::new(scheduler(sched_rx), 0, |msg: &u8| std::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
0,
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
std::future::ready(*msg)
},
)
.delay_tasks_until(ready.get()),
Expand Down Expand Up @@ -314,7 +314,7 @@ mod tests {
0,
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
std::future::ready(*msg)
},
)
.delay_tasks_until(ready.get()),
Expand Down Expand Up @@ -352,7 +352,7 @@ mod tests {
panic!("run_msg should never be invoked if readiness gate fails");
// It's "useless", but it helps to direct rustc to the correct types
#[allow(unreachable_code)]
future::ready(())
std::future::ready(())
},
)
.delay_tasks_until(ready.get()),
Expand Down
25 changes: 9 additions & 16 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ pub(crate) mod test {
watcher::{Error, Event},
WatchStreamExt,
};
use std::{sync::Arc, task::Poll};
use std::{pin::pin, sync::Arc, task::Poll};

use crate::reflector;
use futures::{pin_mut, poll, stream, StreamExt};
use futures::{poll, stream, StreamExt};
use k8s_openapi::api::core::v1::Pod;

fn testpod(name: &str) -> Pod {
Expand All @@ -174,8 +174,7 @@ pub(crate) mod test {
]);

let (reader, writer) = reflector::store_shared(10);
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
let mut reflect = pin!(st.reflect_shared(writer));

// Prior to any polls, we should have an empty store.
assert_eq!(reader.len(), 0);
Expand Down Expand Up @@ -234,10 +233,8 @@ pub(crate) mod test {
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);
let mut subscriber = pin!(writer.subscribe().unwrap());
let mut reflect = pin!(st.reflect_shared(writer));

// Deleted events should be skipped by subscriber.
assert!(matches!(
Expand Down Expand Up @@ -307,9 +304,8 @@ pub(crate) mod test {
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let mut subscriber = pin!(writer.subscribe().unwrap());
let mut reflect = Box::pin(st.reflect_shared(writer));
pin_mut!(subscriber);

assert!(matches!(
poll!(reflect.next()),
Expand Down Expand Up @@ -373,12 +369,9 @@ pub(crate) mod test {
let bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(1);
let subscriber = writer.subscribe().unwrap();
let subscriber_slow = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);
pin_mut!(subscriber_slow);
let mut subscriber = pin!(writer.subscribe().unwrap());
let mut subscriber_slow = pin!(writer.subscribe().unwrap());
let mut reflect = pin!(st.reflect_shared(writer));

assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
Expand Down

0 comments on commit d2142ae

Please sign in to comment.