diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 271491e1cd..722c6d5b5e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -18,8 +18,10 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; @@ -49,6 +51,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** Controller that runs the main reconcile loop for Flink deployments. */ @@ -68,6 +71,9 @@ public class FlinkDeploymentController private final MetricManager metricManager; private FlinkControllerConfig controllerConfig; + private final ConcurrentHashMap, FlinkDeploymentStatus> statusCache = + new ConcurrentHashMap<>(); + public FlinkDeploymentController( FlinkConfigManager configManager, KubernetesClient kubernetesClient, @@ -86,26 +92,29 @@ public FlinkDeploymentController( @Override public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { LOG.info("Deleting FlinkDeployment"); + OperatorUtils.updateStatusFromCache(flinkApp, statusCache); try { observerFactory.getOrCreate(flinkApp).observe(flinkApp, context); } catch (DeploymentFailedException dfe) { // ignore during cleanup } metricManager.onRemove(flinkApp); + statusCache.remove( + Tuple2.of(flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName())); return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, context); } @Override public UpdateControl reconcile(FlinkDeployment flinkApp, Context context) { LOG.info("Starting reconciliation"); - FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp); - + OperatorUtils.updateStatusFromCache(flinkApp, statusCache); try { observerFactory.getOrCreate(flinkApp).observe(flinkApp, context); if (!validateDeployment(flinkApp)) { metricManager.onUpdate(flinkApp); + OperatorUtils.patchAndCacheStatus(kubernetesClient, flinkApp, statusCache); return ReconciliationUtils.toUpdateControl( - configManager.getOperatorConfiguration(), originalCopy, flinkApp, false); + configManager.getOperatorConfiguration(), flinkApp, false); } reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, context); } catch (DeploymentFailedException dfe) { @@ -116,8 +125,9 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex LOG.info("End of reconciliation"); metricManager.onUpdate(flinkApp); + OperatorUtils.patchAndCacheStatus(kubernetesClient, flinkApp, statusCache); return ReconciliationUtils.toUpdateControl( - configManager.getOperatorConfiguration(), originalCopy, flinkApp, true); + configManager.getOperatorConfiguration(), flinkApp, true); } private void handleDeploymentFailed(FlinkDeployment flinkApp, DeploymentFailedException dfe) { @@ -151,16 +161,8 @@ public List prepareEventSources(EventSourceContext @Override public Optional updateErrorStatus( FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) { - LOG.warn( - "Attempt count: {}, last attempt: {}", - retryInfo.getAttemptCount(), - retryInfo.isLastAttempt()); - - ReconciliationUtils.updateForReconciliationError( - flinkApp, - (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString()); - metricManager.onUpdate(flinkApp); - return Optional.of(flinkApp); + return ReconciliationUtils.updateErrorStatus( + kubernetesClient, flinkApp, retryInfo, e, metricManager, statusCache); } public void setControllerConfig(FlinkControllerConfig config) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index c7288ef74b..1ff81a8930 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -17,9 +17,11 @@ package org.apache.flink.kubernetes.operator.controller; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.observer.Observer; @@ -54,6 +56,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -78,6 +81,9 @@ public class FlinkSessionJobController private Map> informers; private FlinkControllerConfig controllerConfig; + private final ConcurrentHashMap, FlinkSessionJobStatus> statusCache = + new ConcurrentHashMap<>(); + public FlinkSessionJobController( FlinkConfigManager configManager, KubernetesClient kubernetesClient, @@ -102,11 +108,13 @@ public void init(FlinkControllerConfig config) { public UpdateControl reconcile( FlinkSessionJob flinkSessionJob, Context context) { LOG.info("Starting reconciliation"); - FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob); + OperatorUtils.updateStatusFromCache(flinkSessionJob, statusCache); observer.observe(flinkSessionJob, context); if (!validateSessionJob(flinkSessionJob, context)) { metricManager.onUpdate(flinkSessionJob); - return ReconciliationUtils.toUpdateControl(flinkSessionJob, flinkSessionJob); + OperatorUtils.patchAndCacheStatus(kubernetesClient, flinkSessionJob, statusCache); + return ReconciliationUtils.toUpdateControl( + configManager.getOperatorConfiguration(), flinkSessionJob, false); } try { @@ -116,31 +124,27 @@ public UpdateControl reconcile( throw new ReconciliationException(e); } metricManager.onUpdate(flinkSessionJob); - return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob) - .rescheduleAfter( - configManager.getOperatorConfiguration().getReconcileInterval().toMillis()); + OperatorUtils.patchAndCacheStatus(kubernetesClient, flinkSessionJob, statusCache); + return ReconciliationUtils.toUpdateControl( + configManager.getOperatorConfiguration(), flinkSessionJob, true); } @Override public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) { LOG.info("Deleting FlinkSessionJob"); metricManager.onRemove(sessionJob); + statusCache.remove( + Tuple2.of( + sessionJob.getMetadata().getNamespace(), + sessionJob.getMetadata().getName())); return reconciler.cleanup(sessionJob, context); } @Override public Optional updateErrorStatus( FlinkSessionJob flinkSessionJob, RetryInfo retryInfo, RuntimeException e) { - LOG.warn( - "Attempt count: {}, last attempt: {}", - retryInfo.getAttemptCount(), - retryInfo.isLastAttempt()); - - ReconciliationUtils.updateForReconciliationError( - flinkSessionJob, - (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString()); - metricManager.onUpdate(flinkSessionJob); - return Optional.of(flinkSessionJob); + return ReconciliationUtils.updateErrorStatus( + kubernetesClient, flinkSessionJob, retryInfo, e, metricManager, statusCache); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index c0cc6ad383..6440776eb7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.reconciler; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; @@ -34,7 +35,10 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; +import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.OperatorUtils; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.core.JsonProcessingException; @@ -42,6 +46,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +56,8 @@ import java.time.Duration; import java.time.Instant; -import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; /** Reconciliation utilities. */ public class ReconciliationUtils { @@ -123,37 +130,39 @@ public static T clone(T object) { } } - public static UpdateControl toUpdateControl( - CR originalCopy, CR current) { - - current.setSpec(originalCopy.getSpec()); + public static < + SPEC extends AbstractFlinkSpec, + STATUS extends CommonStatus, + R extends CustomResource> + UpdateControl toUpdateControl( + FlinkOperatorConfiguration operatorConfiguration, + R current, + boolean reschedule) { - var statusChanged = !Objects.equals(originalCopy.getStatus(), current.getStatus()); + STATUS status = current.getStatus(); - return statusChanged ? UpdateControl.updateStatus(current) : UpdateControl.noUpdate(); - } - - public static UpdateControl toUpdateControl( - FlinkOperatorConfiguration operatorConfiguration, - FlinkDeployment originalCopy, - FlinkDeployment current, - boolean reschedule) { - UpdateControl updateControl = toUpdateControl(originalCopy, current); + // Status update is handled manually independently, we only use UpdateControl to reschedule + // reconciliation + UpdateControl updateControl = UpdateControl.noUpdate(); if (!reschedule) { return updateControl; } - if (isJobUpgradeInProgress(current)) { + if (isJobUpgradeInProgress(current.getSpec(), status)) { return updateControl.rescheduleAfter(0); } - Duration rescheduleAfter = - current.getStatus() - .getJobManagerDeploymentStatus() - .rescheduleAfter(current, operatorConfiguration); - - return updateControl.rescheduleAfter(rescheduleAfter.toMillis()); + if (status instanceof FlinkDeploymentStatus) { + return updateControl.rescheduleAfter( + ((FlinkDeploymentStatus) status) + .getJobManagerDeploymentStatus() + .rescheduleAfter((FlinkDeployment) current, operatorConfiguration) + .toMillis()); + } else { + return updateControl.rescheduleAfter( + operatorConfiguration.getReconcileInterval().toMillis()); + } } public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously( @@ -185,9 +194,8 @@ public static FlinkDeploymentSpec getDeployedSpec(FlinkDeployment deployment) { } } - private static boolean isJobUpgradeInProgress(FlinkDeployment current) { - ReconciliationStatus reconciliationStatus = - current.getStatus().getReconciliationStatus(); + private static boolean isJobUpgradeInProgress(AbstractFlinkSpec spec, CommonStatus status) { + var reconciliationStatus = status.getReconciliationStatus(); if (reconciliationStatus == null) { return false; @@ -197,13 +205,12 @@ private static boolean isJobUpgradeInProgress(FlinkDeployment current) { return true; } - if (current.getSpec().getJob() == null - || reconciliationStatus.getLastReconciledSpec() == null) { + if (spec.getJob() == null || reconciliationStatus.getLastReconciledSpec() == null) { return false; } - return current.getSpec().getJob().getState() == JobState.RUNNING - && current.getStatus().getError() == null + return spec.getJob().getState() == JobState.RUNNING + && status.getError() == null && reconciliationStatus.deserializeLastReconciledSpec().getJob().getState() == JobState.SUSPENDED; } @@ -349,4 +356,42 @@ boolean applyValidationErrorAndResetSpec(CR deployment, String validationError) return true; } } + + /** + * Update the resource error status and metrics when the operator encountered an exception + * during reconciliation. + * + * @param client Kubernetes Client used for status updates + * @param resource Flink Resource to be updated + * @param retryInfo Current RetryInformation + * @param e Exception that caused the retry + * @param metricManager Metric manager to be updated + * @param statusCache Cache containing the latest status updates for this resource type + * @return This always returns Empty optional currently, due to the status update logic + */ + public static < + SPEC extends AbstractFlinkSpec, + STATUS extends CommonStatus, + R extends CustomResource> + Optional updateErrorStatus( + KubernetesClient client, + R resource, + RetryInfo retryInfo, + RuntimeException e, + MetricManager metricManager, + ConcurrentHashMap, STATUS> statusCache) { + LOG.warn( + "Attempt count: {}, last attempt: {}", + retryInfo.getAttemptCount(), + retryInfo.isLastAttempt()); + OperatorUtils.updateStatusFromCache(resource, statusCache); + ReconciliationUtils.updateForReconciliationError( + resource, + (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString()); + metricManager.onUpdate(resource); + OperatorUtils.patchAndCacheStatus(client, resource, statusCache); + + // Status was updated already, no need to return anything + return Optional.empty(); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java index 4b7cb60560..8cffeda4f5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java @@ -18,31 +18,40 @@ package org.apache.flink.kubernetes.operator.utils; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.utils.Constants; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentList; +import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** Operator SDK related utility functions. */ public class OperatorUtils { + private static final Logger LOG = LoggerFactory.getLogger(OperatorUtils.class); + private static final String NAMESPACES_SPLITTER_KEY = ","; public static InformerEventSource createJmDepInformerEventSource( @@ -95,4 +104,71 @@ public static Optional getSecondaryResource( : null; return context.getSecondaryResource(FlinkDeployment.class, identifier); } + + /** + * Update the status of the provided kubernetes resource on the k8s cluster. We use patch + * together with null resourceVersion to try to guarantee that the status update succeeds even + * if the underlying resource spec was update in the meantime. This is necessary for the correct + * operator behavior. + * + * @param client Kubernetes Client used for the status patch + * @param resource Resource for which status update should be performed + * @param statusCache Cache containing the latest status updates for this resource type + */ + @SneakyThrows + public static > void patchAndCacheStatus( + KubernetesClient client, + T resource, + ConcurrentHashMap, S> statusCache) { + + Class resourceClass = (Class) resource.getClass(); + String namespace = resource.getMetadata().getNamespace(); + String name = resource.getMetadata().getName(); + + // This is necessary so the client wouldn't fail of the underlying resource spec was updated + // in the meantime + resource.getMetadata().setResourceVersion(null); + + Exception err = null; + for (int i = 0; i < 3; i++) { + // In any case we retry the status update 3 times to avoid some intermittent + // connectivity errors if any + try { + client.resources(resourceClass) + .inNamespace(namespace) + .withName(name) + .patchStatus(resource); + statusCache.put( + Tuple2.of(namespace, name), + ReconciliationUtils.clone(resource.getStatus())); + return; + } catch (Exception e) { + LOG.error("Error while patching status, retrying {}/3...", (i + 1), e); + Thread.sleep(1000); + err = e; + } + } + throw err; + } + + /** + * Update the custom resource status based on the in-memory cached to ensure that any status + * updates that we made previously are always visible in the reconciliation loop. This is + * required due to our custom status patching logic. + * + *

If the cache doesn't have a status stored, we do no update. This happens when the operator + * reconciles a resource for the first time after a restart. + * + * @param resource Resource for which the status should be updated from the cache + * @param statusCache Cache containing the latest status updates for this resource type + */ + public static > void updateStatusFromCache( + T resource, ConcurrentHashMap, S> statusCache) { + String namespace = resource.getMetadata().getNamespace(); + String name = resource.getMetadata().getName(); + var cachedStatus = statusCache.get(Tuple2.of(namespace, name)); + if (cachedStatus != null) { + resource.setStatus(ReconciliationUtils.clone(cachedStatus)); + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index b53f068431..d1829a5e71 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -59,15 +59,22 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.mockwebserver.utils.ResponseProvider; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; +import okhttp3.Headers; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.Assertions; import java.lang.reflect.Field; +import java.net.HttpURLConnection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** Testing utilities. */ public class TestUtils { @@ -368,4 +375,42 @@ public static FlinkDeploymentController createTestController( new FlinkControllerConfig(controller, Collections.emptySet())); return controller; } + + /** Testing ResponseProvider. */ + public static class ValidatingResponseProvider implements ResponseProvider { + + private final AtomicBoolean validated = new AtomicBoolean(false); + + private final Consumer validator; + private final T returnValue; + + public ValidatingResponseProvider(T returnValue, Consumer validator) { + this.validator = validator; + this.returnValue = returnValue; + } + + public void assertValidated() { + Assertions.assertTrue(validated.get()); + } + + @Override + public int getStatusCode(RecordedRequest recordedRequest) { + return HttpURLConnection.HTTP_CREATED; + } + + @Override + public Headers getHeaders() { + return new Headers.Builder().build(); + } + + @Override + public void setHeaders(Headers headers) {} + + @Override + public Object getBody(RecordedRequest recordedRequest) { + validator.accept(recordedRequest); + validated.set(true); + return returnValue; + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java index 2f59d5c1d7..a972da020c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; -import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; @@ -62,6 +61,7 @@ public void setup() { context = flinkService.getContext(); testController = TestUtils.createTestController(configManager, kubernetesClient, flinkService); + kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } @ParameterizedTest @@ -79,13 +79,14 @@ public void verifyApplicationJmRecovery( .key(), "true"); } - FlinkDeploymentStatus status = appCluster.getStatus(); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); - assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus()); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); // Remove deployment flinkService.setPortReady(false); @@ -96,27 +97,35 @@ public void verifyApplicationJmRecovery( appCluster, TestUtils.createContextWithFailedJobManagerDeployment()); testController.reconcile( appCluster, TestUtils.createContextWithFailedJobManagerDeployment()); - assertEquals(JobManagerDeploymentStatus.ERROR, status.getJobManagerDeploymentStatus()); + assertEquals( + JobManagerDeploymentStatus.ERROR, + appCluster.getStatus().getJobManagerDeploymentStatus()); testController.reconcile(appCluster, context); if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) || enabled) { assertEquals( - JobManagerDeploymentStatus.DEPLOYING, status.getJobManagerDeploymentStatus()); + JobManagerDeploymentStatus.DEPLOYING, + appCluster.getStatus().getJobManagerDeploymentStatus()); } else { assertEquals( - JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus()); + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); } flinkService.setPortReady(true); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) || enabled) { - assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus()); - assertEquals(JobStatus.RUNNING.name(), status.getJobStatus().getState()); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); } else { assertEquals( - JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus()); - assertEquals(JobStatus.FAILED.name(), status.getJobStatus().getState()); + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals(JobStatus.FAILED.name(), appCluster.getStatus().getJobStatus().getState()); } // Remove deployment @@ -129,17 +138,24 @@ public void verifyApplicationJmRecovery( if (upgradeMode == UpgradeMode.SAVEPOINT) { // If deployment goes missing during an upgrade we should throw an error as savepoint // information cannot be recovered with complete certainty - assertEquals(JobManagerDeploymentStatus.ERROR, status.getJobManagerDeploymentStatus()); + assertEquals( + JobManagerDeploymentStatus.ERROR, + appCluster.getStatus().getJobManagerDeploymentStatus()); } else { flinkService.setPortReady(true); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); - assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus()); - assertEquals("RUNNING", status.getJobStatus().getState()); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState()); assertEquals( appCluster.getSpec(), - status.getReconciliationStatus().deserializeLastReconciledSpec()); + appCluster + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec()); } } @@ -156,13 +172,13 @@ public void verifySessionJmRecovery(FlinkVersion flinkVersion, boolean enabled) .key(), "true"); } - FlinkDeploymentStatus status = appCluster.getStatus(); - testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); - assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus()); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); // Remove deployment flinkService.setPortReady(false); @@ -172,16 +188,21 @@ public void verifySessionJmRecovery(FlinkVersion flinkVersion, boolean enabled) if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) || enabled) { assertEquals( - JobManagerDeploymentStatus.DEPLOYING, status.getJobManagerDeploymentStatus()); + JobManagerDeploymentStatus.DEPLOYING, + appCluster.getStatus().getJobManagerDeploymentStatus()); testController.reconcile(appCluster, context); testController.reconcile(appCluster, context); - assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus()); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); } else { assertEquals( - JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus()); + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); testController.reconcile(appCluster, context); assertEquals( - JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus()); + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index b1c073995c..5095afd5d3 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -43,13 +43,11 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -84,6 +82,7 @@ public void setup() { context = flinkService.getContext(); testController = TestUtils.createTestController(configManager, kubernetesClient, flinkService); + kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } @ParameterizedTest @@ -94,7 +93,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) { UpdateControl updateControl; updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( configManager @@ -111,14 +110,14 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) { assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getRestApiReadyDelay().toMillis()), updateControl.getScheduleDelay()); updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -137,7 +136,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) { // Send in invalid update appCluster.getSpec().setJob(null); updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); assertEquals( @@ -159,13 +158,17 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) { @Test public void verifyFailedDeployment() throws Exception { + var validatingResponseProvider = + new TestUtils.ValidatingResponseProvider<>( + new EventBuilder().withNewMetadata().endMetadata().build(), + r -> { + assertTrue(r.getBody().readUtf8().contains(TestUtils.DEPLOYMENT_ERROR)); + }); mockServer .expect() .post() .withPath("/api/v1/namespaces/flink-operator-test/events") - .andReturn( - HttpURLConnection.HTTP_CREATED, - new EventBuilder().withNewMetadata().endMetadata().build()) + .andReply(validatingResponseProvider) .once(); FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); @@ -175,19 +178,18 @@ public void verifyFailedDeployment() throws Exception { updateControl = testController.reconcile( appCluster, TestUtils.createContextWithFailedJobManagerDeployment()); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), updateControl.getScheduleDelay()); - RecordedRequest recordedRequest = mockServer.getLastRequest(); - assertEquals("POST", recordedRequest.getMethod()); - assertTrue(recordedRequest.getBody().readUtf8().contains(TestUtils.DEPLOYMENT_ERROR)); assertEquals( JobManagerDeploymentStatus.ERROR, appCluster.getStatus().getJobManagerDeploymentStatus()); + validatingResponseProvider.assertValidated(); + // Validate status assertNotNull(appCluster.getStatus().getError()); @@ -208,16 +210,25 @@ public void verifyFailedDeployment() throws Exception { @Test public void verifyInProgressDeploymentWithCrashLoopBackoff() throws Exception { + String crashLoopMessage = "container fails"; + + var validatingResponseProvider = + new TestUtils.ValidatingResponseProvider<>( + new EventBuilder().withNewMetadata().endMetadata().build(), + r -> { + String recordedRequestBody = r.getBody().readUtf8(); + assertTrue( + recordedRequestBody.contains( + DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)); + assertTrue(recordedRequestBody.contains(crashLoopMessage)); + }); mockServer .expect() .post() .withPath("/api/v1/namespaces/flink-operator-test/events") - .andReturn( - HttpURLConnection.HTTP_CREATED, - new EventBuilder().withNewMetadata().endMetadata().build()) + .andReply(validatingResponseProvider) .once(); - String crashLoopMessage = "container fails"; flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage)); FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); @@ -227,18 +238,12 @@ public void verifyInProgressDeploymentWithCrashLoopBackoff() throws Exception { updateControl = testController.reconcile( appCluster, TestUtils.createContextWithInProgressDeployment()); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), updateControl.getScheduleDelay()); - RecordedRequest recordedRequest = mockServer.getLastRequest(); - assertEquals("POST", recordedRequest.getMethod()); - String recordedRequestBody = recordedRequest.getBody().readUtf8(); - assertTrue( - recordedRequestBody.contains(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)); - assertTrue(recordedRequestBody.contains(crashLoopMessage)); assertEquals( JobManagerDeploymentStatus.ERROR, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -262,6 +267,7 @@ public void verifyInProgressDeploymentWithCrashLoopBackoff() throws Exception { .rescheduleAfter(appCluster, configManager.getOperatorConfiguration()) .toMillis(), updateControl.getScheduleDelay().get()); + validatingResponseProvider.assertValidated(); } @ParameterizedTest @@ -420,18 +426,30 @@ public void verifyStatelessUpgrade(FlinkVersion flinkVersion) { @ParameterizedTest @EnumSource(FlinkVersion.class) - public void testUpgradeNotReadyCluster(FlinkVersion flinkVersion) { + public void testUpgradeNotReadyClusterSession(FlinkVersion flinkVersion) { testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(flinkVersion), true); + } - FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion); + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testUpgradeNotReadyClusterStateless(FlinkVersion flinkVersion) { + var appCluster = TestUtils.buildApplicationCluster(flinkVersion); appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); testUpgradeNotReadyCluster(appCluster, true); + } - appCluster = TestUtils.buildApplicationCluster(flinkVersion); + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testUpgradeNotReadyClusterLastState(FlinkVersion flinkVersion) { + var appCluster = TestUtils.buildApplicationCluster(flinkVersion); appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); testUpgradeNotReadyCluster(appCluster, true); + } - appCluster = TestUtils.buildApplicationCluster(flinkVersion); + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testUpgradeNotReadyClusterSavepoint(FlinkVersion flinkVersion) { + var appCluster = TestUtils.buildApplicationCluster(flinkVersion); appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); appCluster .getSpec() @@ -451,7 +469,7 @@ public void verifyReconcileWithBadConfig() { // reconcile() finishes. appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088"); updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -463,7 +481,7 @@ public void verifyReconcileWithBadConfig() { assertEquals( "JobManager replicas should not be configured less than one.", appCluster.getStatus().getError()); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -489,14 +507,14 @@ public void verifyReconcileWithAChangedOperatorMode() { UpdateControl updateControl; updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -508,7 +526,7 @@ public void verifyReconcileWithAChangedOperatorMode() { appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java index 359f050f0a..51315263b1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java @@ -69,6 +69,7 @@ public void setup() { new FlinkConfigManager(new Configuration()), kubernetesClient, flinkService); + kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } @ParameterizedTest @@ -78,8 +79,6 @@ public void testRollbackWithSavepoint(FlinkVersion flinkVersion) throws Exceptio dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); var flinkConfiguration = dep.getSpec().getFlinkConfiguration(); flinkConfiguration.put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "sd"); - var jobStatus = dep.getStatus().getJobStatus(); - var reconStatus = dep.getStatus().getReconciliationStatus(); List savepoints = new ArrayList<>(); testRollback( @@ -87,10 +86,19 @@ public void testRollbackWithSavepoint(FlinkVersion flinkVersion) throws Exceptio () -> { dep.getSpec().getJob().setParallelism(9999); testController.reconcile(dep, context); - savepoints.add(jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); + savepoints.add( + dep.getStatus() + .getJobStatus() + .getSavepointInfo() + .getLastSavepoint() + .getLocation()); assertEquals( JobState.SUSPENDED, - reconStatus.deserializeLastReconciledSpec().getJob().getState()); + dep.getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getState()); testController.reconcile(dep, context); // Trigger rollback by delaying the recovery @@ -113,7 +121,6 @@ public void testRollbackWithSavepoint(FlinkVersion flinkVersion) throws Exceptio public void testRollbackWithLastState(FlinkVersion flinkVersion) throws Exception { var dep = TestUtils.buildApplicationCluster(flinkVersion); dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); - var reconStatus = dep.getStatus().getReconciliationStatus(); testRollback( dep, @@ -122,7 +129,11 @@ public void testRollbackWithLastState(FlinkVersion flinkVersion) throws Exceptio testController.reconcile(dep, context); assertEquals( JobState.SUSPENDED, - reconStatus.deserializeLastReconciledSpec().getJob().getState()); + dep.getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getState()); testController.reconcile(dep, context); // Trigger rollback by delaying the recovery @@ -143,7 +154,6 @@ public void testRollbackWithLastState(FlinkVersion flinkVersion) throws Exceptio public void testRollbackFailureWithLastState(FlinkVersion flinkVersion) throws Exception { var dep = TestUtils.buildApplicationCluster(flinkVersion); dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); - var reconStatus = dep.getStatus().getReconciliationStatus(); testRollback( dep, @@ -152,7 +162,11 @@ public void testRollbackFailureWithLastState(FlinkVersion flinkVersion) throws E testController.reconcile(dep, context); assertEquals( JobState.SUSPENDED, - reconStatus.deserializeLastReconciledSpec().getJob().getState()); + dep.getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getState()); testController.reconcile(dep, context); // Trigger rollback by delaying the recovery @@ -179,7 +193,6 @@ public void testRollbackFailureWithLastState(FlinkVersion flinkVersion) throws E public void testRollbackStateless(FlinkVersion flinkVersion) throws Exception { var dep = TestUtils.buildApplicationCluster(); dep.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); - var reconStatus = dep.getStatus().getReconciliationStatus(); testRollback( dep, @@ -188,7 +201,11 @@ public void testRollbackStateless(FlinkVersion flinkVersion) throws Exception { testController.reconcile(dep, context); assertEquals( JobState.SUSPENDED, - reconStatus.deserializeLastReconciledSpec().getJob().getState()); + dep.getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getState()); testController.reconcile(dep, context); // Trigger rollback by delaying the recovery @@ -248,18 +265,19 @@ public void testRollback( testController.reconcile(deployment, context); // Validate reconciliation status - var reconciliationStatus = deployment.getStatus().getReconciliationStatus(); testController.reconcile(deployment, context); testController.reconcile(deployment, context); // Validate stable job - assertTrue(reconciliationStatus.isLastReconciledSpecStable()); + assertTrue(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); triggerRollback.run(); - assertFalse(reconciliationStatus.isLastReconciledSpecStable()); - assertEquals(ReconciliationState.ROLLING_BACK, reconciliationStatus.getState()); + assertFalse(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.ROLLING_BACK, + deployment.getStatus().getReconciliationStatus().getState()); assertEquals( "Deployment is not ready within the configured timeout, rolling back.", deployment.getStatus().getError()); @@ -269,23 +287,31 @@ public void testRollback( } testController.reconcile(deployment, context); - assertEquals(ReconciliationState.ROLLED_BACK, reconciliationStatus.getState()); + assertEquals( + ReconciliationState.ROLLED_BACK, + deployment.getStatus().getReconciliationStatus().getState()); deployment.getSpec().setLogConfiguration(null); testController.reconcile(deployment, context); testController.reconcile(deployment, context); - assertEquals(ReconciliationState.ROLLED_BACK, reconciliationStatus.getState()); - assertFalse(reconciliationStatus.isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.ROLLED_BACK, + deployment.getStatus().getReconciliationStatus().getState()); + assertFalse(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); validateAndRecover.run(); // Test update testController.reconcile(deployment, context); - assertEquals(deployment.getSpec(), reconciliationStatus.deserializeLastReconciledSpec()); + assertEquals( + deployment.getSpec(), + deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec()); testController.reconcile(deployment, context); testController.reconcile(deployment, context); - assertTrue(reconciliationStatus.isLastReconciledSpecStable()); - assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState()); + assertTrue(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); assertNull(deployment.getStatus().getError()); if (deployment.getSpec().getJob() != null) { @@ -293,8 +319,11 @@ public void testRollback( deployment.getSpec().getJob().setParallelism(1); testController.reconcile(deployment, context); testController.reconcile(deployment, context); - assertTrue(reconciliationStatus.isLastReconciledSpecStable()); - assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState()); + assertTrue( + deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); assertNull(deployment.getStatus().getError()); deployment.getSpec().getJob().setState(JobState.RUNNING); @@ -303,21 +332,29 @@ public void testRollback( Thread.sleep(200); testController.reconcile(deployment, context); testController.reconcile(deployment, context); - assertTrue(reconciliationStatus.isLastReconciledSpecStable()); - assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState()); + assertTrue( + deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); assertNull(deployment.getStatus().getError()); // Verify suspending a rolled back job triggerRollback.run(); testController.reconcile(deployment, context); - assertEquals(ReconciliationState.ROLLED_BACK, reconciliationStatus.getState()); + assertEquals( + ReconciliationState.ROLLED_BACK, + deployment.getStatus().getReconciliationStatus().getState()); testController.reconcile(deployment, context); testController.reconcile(deployment, context); deployment.getSpec().getJob().setState(JobState.SUSPENDED); testController.reconcile(deployment, context); - assertTrue(reconciliationStatus.isLastReconciledSpecStable()); - assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState()); + assertTrue( + deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); assertNull(deployment.getStatus().getError()); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java index bfc86d12b9..fce1fa9379 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java @@ -24,7 +24,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; -import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import com.fasterxml.jackson.core.JsonProcessingException; @@ -37,7 +36,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for {@link org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */ public class ReconciliationUtilsTest { @@ -46,30 +44,6 @@ public class ReconciliationUtilsTest { FlinkOperatorConfiguration.fromConfiguration( new Configuration(), Collections.emptySet()); - @Test - public void testStatusChanged() { - FlinkDeployment previous = TestUtils.buildApplicationCluster(); - FlinkDeployment current = ReconciliationUtils.clone(previous); - - UpdateControl updateControl = - ReconciliationUtils.toUpdateControl( - operatorConfiguration, previous, current, false); - - assertFalse(updateControl.isUpdateResource()); - assertFalse(updateControl.isUpdateStatus()); - assertTrue(updateControl.getScheduleDelay().isEmpty()); - - // status changed - current.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - updateControl = - ReconciliationUtils.toUpdateControl(operatorConfiguration, previous, current, true); - assertFalse(updateControl.isUpdateResource()); - assertTrue(updateControl.isUpdateStatus()); - assertEquals( - operatorConfiguration.getProgressCheckInterval().toMillis(), - updateControl.getScheduleDelay().get()); - } - @Test public void testRescheduleUpgradeImmediately() { FlinkDeployment app = TestUtils.buildApplicationCluster(); @@ -81,10 +55,10 @@ public void testRescheduleUpgradeImmediately() { ReconciliationUtils.updateForSpecReconciliationSuccess(current, JobState.SUSPENDED); UpdateControl updateControl = - ReconciliationUtils.toUpdateControl(operatorConfiguration, app, current, true); + ReconciliationUtils.toUpdateControl(operatorConfiguration, current, true); assertFalse(updateControl.isUpdateResource()); - assertTrue(updateControl.isUpdateStatus()); + assertFalse(updateControl.isUpdateStatus()); assertEquals(0, updateControl.getScheduleDelay().get()); } diff --git a/pom.xml b/pom.xml index bd1019d4b5..e2a6cb7ffc 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ under the License. 3.2.0 5.0.0 - 2.1.2 + 2.1.4 5.12.1 1.18.22