Skip to content

Commit

Permalink
Canary pods need to be created via Jobs (#7)
Browse files Browse the repository at this point in the history
For the pia canary to work well, the pod creation needs to be on the same apiserver instance that the controller manager uses. I really hope that controller manager requests are not load balanced across all 3.
We can use a Job object so that the controller manager makes the pods.
  • Loading branch information
jackkleeman authored May 3, 2024
1 parent 69c83ae commit 140f8f8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
3 changes: 2 additions & 1 deletion charts/restate-operator-helm/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rules:
- statefulsets
- persistentvolumeclaims
- pods
- jobs
- securitygrouppolicies
- secretproviderclasses
verbs:
Expand All @@ -64,7 +65,7 @@ rules:
- resources:
- statefulsets
- networkpolicies
- pods
- jobs
- securitygrouppolicies
- secretproviderclasses
verbs:
Expand Down
15 changes: 9 additions & 6 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::{
EnvVar, Namespace, PersistentVolumeClaim, Pod, PodDNSConfig, ResourceRequirements, Service,
EnvVar, Namespace, PersistentVolumeClaim, PodDNSConfig, ResourceRequirements, Service,
ServiceAccount,
};
use k8s_openapi::api::networking::v1;
use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, ObjectMeta};

use kube::core::object::HasStatus;
use kube::core::PartialObjectMeta;
use kube::runtime::reflector::{ObjectRef, Store};
Expand Down Expand Up @@ -417,7 +419,7 @@ async fn reconcile(rc: Arc<RestateCluster>, ctx: Arc<Context>) -> Result<Action>
}
}

fn error_policy(_rc: Arc<RestateCluster>, _error: &Error, _ctx: Arc<Context>) -> Action {
fn error_policy<K, C>(_rc: Arc<K>, _error: &Error, _ctx: C) -> Action {
Action::requeue(Duration::from_secs(30))
}

Expand Down Expand Up @@ -724,7 +726,7 @@ pub async fn run(state: State) {
let svcacc_api = Api::<ServiceAccount>::all(client.clone());
let np_api = Api::<NetworkPolicy>::all(client.clone());
let pia_api = Api::<PodIdentityAssociation>::all(client.clone());
let pod_api = Api::<Pod>::all(client.clone());
let job_api = Api::<Job>::all(client.clone());
let sgp_api = Api::<SecurityGroupPolicy>::all(client.clone());
let spc_api = Api::<SecretProviderClass>::all(client.clone());

Expand Down Expand Up @@ -785,9 +787,10 @@ pub async fn run(state: State) {
// avoid apply loops that seem to happen with crds
.predicate_filter(changed_predicate.combine(status_predicate));

controller
.owns_stream(pia_watcher)
.owns(pod_api, cfg.clone())
controller.owns_stream(pia_watcher).owns(
job_api,
Config::default().labels("app.kubernetes.io/name=restate-pia-canary"),
)
} else {
controller
};
Expand Down
155 changes: 122 additions & 33 deletions src/reconcilers/compute.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{BTreeMap, HashSet};
use std::convert::Into;
use std::path::PathBuf;
use std::time::Duration;

use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetStatus};
use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, HTTPGetAction, PersistentVolumeClaim,
PersistentVolumeClaimSpec, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
Expand All @@ -13,14 +13,14 @@ use k8s_openapi::api::core::v1::{
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::api::{DeleteParams, Preconditions, PropagationPolicy};
use kube::api::{DeleteParams, ListParams, Preconditions, PropagationPolicy};
use kube::core::PartialObjectMeta;
use kube::runtime::reflector::{ObjectRef, Store};
use kube::{
api::{Patch, PatchParams},
Api, ResourceExt,
};
use tracing::{debug, warn};
use tracing::{debug, error, warn};

use crate::podidentityassociations::{PodIdentityAssociation, PodIdentityAssociationSpec};
use crate::reconcilers::{label_selector, mandatory_labels, object_meta};
Expand Down Expand Up @@ -342,6 +342,7 @@ pub async fn reconcile_compute(
let svc_api: Api<Service> = Api::namespaced(ctx.client.clone(), namespace);
let svcacc_api: Api<ServiceAccount> = Api::namespaced(ctx.client.clone(), namespace);
let pia_api: Api<PodIdentityAssociation> = Api::namespaced(ctx.client.clone(), namespace);
let job_api: Api<Job> = Api::namespaced(ctx.client.clone(), namespace);
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), namespace);
let sgp_api: Api<SecurityGroupPolicy> = Api::namespaced(ctx.client.clone(), namespace);

Expand Down Expand Up @@ -385,9 +386,7 @@ pub async fn reconcile_compute(
return Err(Error::NotReady { reason: "PodIdentityAssociationNotSynced".into(), message: "Waiting for the AWS ACK controller to provision the Pod Identity Association with IAM".into(), requeue_after: None });
}

if !check_pia(namespace, base_metadata, &pod_api).await? {
return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), requeue_after: Some(Duration::from_secs(2)) });
}
check_pia(namespace, base_metadata, &job_api, &pod_api).await?;

// Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta
let pod_annotations = pod_annotations.get_or_insert_with(Default::default);
Expand All @@ -402,6 +401,7 @@ pub async fn reconcile_compute(
}
(Some(_), None) => {
delete_pod_identity_association(namespace, &pia_api, "restate").await?;
delete_job(namespace, &job_api, "restate-pia-canary").await?;
}
(None, Some(aws_pod_identity_association_role_arn)) => {
warn!("Ignoring AWS pod identity association role ARN {aws_pod_identity_association_role_arn} as the operator is not configured with --aws-pod-identity-association-cluster");
Expand Down Expand Up @@ -523,8 +523,9 @@ async fn apply_pod_identity_association(
async fn check_pia(
namespace: &str,
base_metadata: &ObjectMeta,
job_api: &Api<Job>,
pod_api: &Api<Pod>,
) -> Result<bool, Error> {
) -> Result<(), Error> {
let name = "restate-pia-canary";
let params: PatchParams = PatchParams::apply("restate-operator").force();

Expand All @@ -537,53 +538,129 @@ async fn check_pia(
}

debug!(
"Applying PodIdentityAssociation canary Pod in namespace {}",
"Applying PodIdentityAssociation canary Job in namespace {}",
namespace
);

let created = pod_api
let created = job_api
.patch(
name,
&params,
&Patch::Apply(&Pod {
metadata: object_meta(base_metadata, name),
spec: Some(PodSpec {
service_account_name: Some("restate".into()),
containers: vec![Container {
name: "canary".into(),
image: Some("hello-world:linux".into()),
..Default::default()
}],
restart_policy: Some("Never".into()),
&Patch::Apply(&Job {
metadata,
spec: Some(JobSpec {
// single-use job that we delete on failuire; don't want to wait 10 seconds for retries
backoff_limit: Some(1),
template: PodTemplateSpec {
metadata: None,
spec: Some(PodSpec {
service_account_name: Some("restate".into()),
containers: vec![Container {
name: "canary".into(),
image: Some("busybox:uclibc".into()),
command: Some(vec![
"grep".into(),
"-q".into(),
"AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(),
"/proc/self/environ".into(),
]),
..Default::default()
}],
restart_policy: Some("Never".into()),
..Default::default()
}),
},
..Default::default()
}),
status: None,
}),
)
.await?;

if let Some(spec) = created.spec {
if let Some(volumes) = spec.volumes {
if volumes.iter().any(|v| v.name == "eks-pod-identity-token") {
debug!(
"PodIdentityAssociation canary check succeeded in namespace {}",
namespace
);
// leave pod in place as a signal that we passed the check
return Ok(true);
if let Some(conditions) = created.status.and_then(|s| s.conditions) {
for condition in conditions {
if condition.status != "True" {
continue;
}
match condition.type_.as_str() {
"Complete" => {
debug!(
"PodIdentityAssociation canary check succeeded in namespace {}",
namespace
);
return Ok(());
}
"Failed" => {
error!(
"PodIdentityAssociation canary check failed in namespace {}, deleting Job",
namespace
);

delete_job(namespace, job_api, name).await?;

return Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryFailed".into(),
message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
});
}
_ => {}
}
}
}

// if we are here then the job hasn't succeeded or failed yet; lets try and figure things out a bit quicker
// because it takes times for pods to schedule etc

let pods = pod_api
.list(&ListParams::default().labels(&format!(
"batch.kubernetes.io/job-name={name},batch.kubernetes.io/controller-uid={}",
created.metadata.uid.unwrap()
)))
.await?;

if let Some(pod) = pods.items.first() {
if pod
.spec
.as_ref()
.and_then(|s| s.volumes.as_ref())
.map(|vs| vs.iter().any(|v| v.name == "eks-pod-identity-token"))
.unwrap_or(false)
{
debug!(
"PodIdentityAssociation canary check succeeded via pod lookup in namespace {}",
namespace
);
return Ok(());
}

debug!(
"PodIdentityAssociation canary check failed via pod lookup in namespace {}, deleting Job",
namespace
);
delete_job(namespace, job_api, name).await?;

return Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryFailed".into(),
message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
});
}

// no pods; we generally expect this immediately after creating the job
debug!(
"PodIdentityAssociation canary check failed in namespace {}, deleting canary Pod",
"PodIdentityAssociation canary Job not yet succeeded in namespace {}",
namespace
);

// delete pod to try again next time
pod_api.delete(name, &Default::default()).await?;

Ok(false)
Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryPending".into(),
message: "Canary Job has not yet succeeded; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
})
}

fn is_pod_identity_association_synced(pia: PodIdentityAssociation) -> bool {
Expand Down Expand Up @@ -618,6 +695,18 @@ async fn delete_pod_identity_association(
}
}

async fn delete_job(namespace: &str, job_api: &Api<Job>, name: &str) -> Result<(), Error> {
debug!(
"Ensuring Job {} in namespace {} does not exist",
name, namespace
);
match job_api.delete(name, &DeleteParams::default()).await {
Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
}

async fn apply_security_group_policy(
namespace: &str,
pia_api: &Api<SecurityGroupPolicy>,
Expand Down

0 comments on commit 140f8f8

Please sign in to comment.