From 140f8f8688487e9554131ef0d374b9262d9c2c59 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 3 May 2024 19:01:05 +0100 Subject: [PATCH] Canary pods need to be created via Jobs (#7) For the pia canary to work well, the pod creation needs to be on the same apiserver instance that the controller manager uses. I really hope that controller manager requests are not load balanced across all 3. We can use a Job object so that the controller manager makes the pods. --- .../restate-operator-helm/templates/rbac.yaml | 3 +- src/controller.rs | 15 +- src/reconcilers/compute.rs | 155 ++++++++++++++---- 3 files changed, 133 insertions(+), 40 deletions(-) diff --git a/charts/restate-operator-helm/templates/rbac.yaml b/charts/restate-operator-helm/templates/rbac.yaml index 837c3ab..d174e87 100644 --- a/charts/restate-operator-helm/templates/rbac.yaml +++ b/charts/restate-operator-helm/templates/rbac.yaml @@ -47,6 +47,7 @@ rules: - statefulsets - persistentvolumeclaims - pods + - jobs - securitygrouppolicies - secretproviderclasses verbs: @@ -64,7 +65,7 @@ rules: - resources: - statefulsets - networkpolicies - - pods + - jobs - securitygrouppolicies - secretproviderclasses verbs: diff --git a/src/controller.rs b/src/controller.rs index 128ea8e..b34e8f3 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -8,13 +8,15 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use futures::StreamExt; use k8s_openapi::api::apps::v1::StatefulSet; +use k8s_openapi::api::batch::v1::Job; use k8s_openapi::api::core::v1::{ - EnvVar, Namespace, PersistentVolumeClaim, Pod, PodDNSConfig, ResourceRequirements, Service, + EnvVar, Namespace, PersistentVolumeClaim, PodDNSConfig, ResourceRequirements, Service, ServiceAccount, }; use k8s_openapi::api::networking::v1; use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, ObjectMeta}; + use kube::core::object::HasStatus; use kube::core::PartialObjectMeta; use kube::runtime::reflector::{ObjectRef, Store}; @@ -417,7 +419,7 @@ async fn reconcile(rc: Arc, ctx: Arc) -> Result } } -fn error_policy(_rc: Arc, _error: &Error, _ctx: Arc) -> Action { +fn error_policy(_rc: Arc, _error: &Error, _ctx: C) -> Action { Action::requeue(Duration::from_secs(30)) } @@ -724,7 +726,7 @@ pub async fn run(state: State) { let svcacc_api = Api::::all(client.clone()); let np_api = Api::::all(client.clone()); let pia_api = Api::::all(client.clone()); - let pod_api = Api::::all(client.clone()); + let job_api = Api::::all(client.clone()); let sgp_api = Api::::all(client.clone()); let spc_api = Api::::all(client.clone()); @@ -785,9 +787,10 @@ pub async fn run(state: State) { // avoid apply loops that seem to happen with crds .predicate_filter(changed_predicate.combine(status_predicate)); - controller - .owns_stream(pia_watcher) - .owns(pod_api, cfg.clone()) + controller.owns_stream(pia_watcher).owns( + job_api, + Config::default().labels("app.kubernetes.io/name=restate-pia-canary"), + ) } else { controller }; diff --git a/src/reconcilers/compute.rs b/src/reconcilers/compute.rs index 0547105..b154c51 100644 --- a/src/reconcilers/compute.rs +++ b/src/reconcilers/compute.rs @@ -1,9 +1,9 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::Into; use std::path::PathBuf; -use std::time::Duration; use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetStatus}; +use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{ Container, ContainerPort, EnvVar, HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, @@ -13,14 +13,14 @@ use k8s_openapi::api::core::v1::{ use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; -use kube::api::{DeleteParams, Preconditions, PropagationPolicy}; +use kube::api::{DeleteParams, ListParams, Preconditions, PropagationPolicy}; use kube::core::PartialObjectMeta; use kube::runtime::reflector::{ObjectRef, Store}; use kube::{ api::{Patch, PatchParams}, Api, ResourceExt, }; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; use crate::podidentityassociations::{PodIdentityAssociation, PodIdentityAssociationSpec}; use crate::reconcilers::{label_selector, mandatory_labels, object_meta}; @@ -342,6 +342,7 @@ pub async fn reconcile_compute( let svc_api: Api = Api::namespaced(ctx.client.clone(), namespace); let svcacc_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pia_api: Api = Api::namespaced(ctx.client.clone(), namespace); + let job_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pod_api: Api = Api::namespaced(ctx.client.clone(), namespace); let sgp_api: Api = Api::namespaced(ctx.client.clone(), namespace); @@ -385,9 +386,7 @@ pub async fn reconcile_compute( return Err(Error::NotReady { reason: "PodIdentityAssociationNotSynced".into(), message: "Waiting for the AWS ACK controller to provision the Pod Identity Association with IAM".into(), requeue_after: None }); } - if !check_pia(namespace, base_metadata, &pod_api).await? { - return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), requeue_after: Some(Duration::from_secs(2)) }); - } + check_pia(namespace, base_metadata, &job_api, &pod_api).await?; // Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta let pod_annotations = pod_annotations.get_or_insert_with(Default::default); @@ -402,6 +401,7 @@ pub async fn reconcile_compute( } (Some(_), None) => { delete_pod_identity_association(namespace, &pia_api, "restate").await?; + delete_job(namespace, &job_api, "restate-pia-canary").await?; } (None, Some(aws_pod_identity_association_role_arn)) => { warn!("Ignoring AWS pod identity association role ARN {aws_pod_identity_association_role_arn} as the operator is not configured with --aws-pod-identity-association-cluster"); @@ -523,8 +523,9 @@ async fn apply_pod_identity_association( async fn check_pia( namespace: &str, base_metadata: &ObjectMeta, + job_api: &Api, pod_api: &Api, -) -> Result { +) -> Result<(), Error> { let name = "restate-pia-canary"; let params: PatchParams = PatchParams::apply("restate-operator").force(); @@ -537,24 +538,38 @@ async fn check_pia( } debug!( - "Applying PodIdentityAssociation canary Pod in namespace {}", + "Applying PodIdentityAssociation canary Job in namespace {}", namespace ); - let created = pod_api + let created = job_api .patch( name, ¶ms, - &Patch::Apply(&Pod { - metadata: object_meta(base_metadata, name), - spec: Some(PodSpec { - service_account_name: Some("restate".into()), - containers: vec![Container { - name: "canary".into(), - image: Some("hello-world:linux".into()), - ..Default::default() - }], - restart_policy: Some("Never".into()), + &Patch::Apply(&Job { + metadata, + spec: Some(JobSpec { + // single-use job that we delete on failuire; don't want to wait 10 seconds for retries + backoff_limit: Some(1), + template: PodTemplateSpec { + metadata: None, + spec: Some(PodSpec { + service_account_name: Some("restate".into()), + containers: vec![Container { + name: "canary".into(), + image: Some("busybox:uclibc".into()), + command: Some(vec![ + "grep".into(), + "-q".into(), + "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(), + "/proc/self/environ".into(), + ]), + ..Default::default() + }], + restart_policy: Some("Never".into()), + ..Default::default() + }), + }, ..Default::default() }), status: None, @@ -562,28 +577,90 @@ async fn check_pia( ) .await?; - if let Some(spec) = created.spec { - if let Some(volumes) = spec.volumes { - if volumes.iter().any(|v| v.name == "eks-pod-identity-token") { - debug!( - "PodIdentityAssociation canary check succeeded in namespace {}", - namespace - ); - // leave pod in place as a signal that we passed the check - return Ok(true); + if let Some(conditions) = created.status.and_then(|s| s.conditions) { + for condition in conditions { + if condition.status != "True" { + continue; } + match condition.type_.as_str() { + "Complete" => { + debug!( + "PodIdentityAssociation canary check succeeded in namespace {}", + namespace + ); + return Ok(()); + } + "Failed" => { + error!( + "PodIdentityAssociation canary check failed in namespace {}, deleting Job", + namespace + ); + + delete_job(namespace, job_api, name).await?; + + return Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryFailed".into(), + message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }); + } + _ => {} + } + } + } + + // if we are here then the job hasn't succeeded or failed yet; lets try and figure things out a bit quicker + // because it takes times for pods to schedule etc + + let pods = pod_api + .list(&ListParams::default().labels(&format!( + "batch.kubernetes.io/job-name={name},batch.kubernetes.io/controller-uid={}", + created.metadata.uid.unwrap() + ))) + .await?; + + if let Some(pod) = pods.items.first() { + if pod + .spec + .as_ref() + .and_then(|s| s.volumes.as_ref()) + .map(|vs| vs.iter().any(|v| v.name == "eks-pod-identity-token")) + .unwrap_or(false) + { + debug!( + "PodIdentityAssociation canary check succeeded via pod lookup in namespace {}", + namespace + ); + return Ok(()); } + + debug!( + "PodIdentityAssociation canary check failed via pod lookup in namespace {}, deleting Job", + namespace + ); + delete_job(namespace, job_api, name).await?; + + return Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryFailed".into(), + message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }); } + // no pods; we generally expect this immediately after creating the job debug!( - "PodIdentityAssociation canary check failed in namespace {}, deleting canary Pod", + "PodIdentityAssociation canary Job not yet succeeded in namespace {}", namespace ); - // delete pod to try again next time - pod_api.delete(name, &Default::default()).await?; - - Ok(false) + Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryPending".into(), + message: "Canary Job has not yet succeeded; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }) } fn is_pod_identity_association_synced(pia: PodIdentityAssociation) -> bool { @@ -618,6 +695,18 @@ async fn delete_pod_identity_association( } } +async fn delete_job(namespace: &str, job_api: &Api, name: &str) -> Result<(), Error> { + debug!( + "Ensuring Job {} in namespace {} does not exist", + name, namespace + ); + match job_api.delete(name, &DeleteParams::default()).await { + Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()), + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } +} + async fn apply_security_group_policy( namespace: &str, pia_api: &Api,