diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 75cbf276..3349e961 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -131,6 +131,8 @@ spec: maxCheckpointRestoreAgeSeconds: type: integer minimum: 1 + fallbackWithoutState: + type: boolean jobManagerConfig: type: object properties: diff --git a/docs/crd.md b/docs/crd.md index afecaf74..4b32e747 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -172,3 +172,7 @@ Below is the list of fields in the custom resource and their description: * **tearDownVersionHash** `type:string` Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase. Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase. + + * **fallbackWithoutState** `type:bool` + Can be set to true to attempt to continue to submit a job without a savepoint in the case where + a savepoint cannot be taken and there are no external checkpoints to recover from. diff --git a/integ/README.md b/integ/README.md index 09d43303..7f43318e 100644 --- a/integ/README.md +++ b/integ/README.md @@ -21,14 +21,7 @@ By default the tests create, use, and clean up the namespace `flinkoperatortest`. These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The -tests currently use two images built from here: - -* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1` -* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2` - -Those images are available on our private Dockerhub registry, and you -will either need to pull them locally or give Kubernetes access to the -registry. +tests currently use two images built before the integration test is run. ### Setup @@ -123,4 +116,3 @@ Helpers: `kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge` - Set default namespace `kubectl config set-context --current --namespace=flinkoperatortest` - diff --git a/integ/blue_green_deployment_test.go b/integ/blue_green_deployment_test.go index c29855a2..25aaf7c7 100644 --- a/integ/blue_green_deployment_test.go +++ b/integ/blue_green_deployment_test.go @@ -10,7 +10,6 @@ import ( ) func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication { - // update with new image. app, err := s.Util.Update(name, updateFn) c.Assert(err, IsNil) diff --git a/integ/checkpoint_failure_test.go b/integ/checkpoint_failure_test.go index 5004bbf0..f498edbe 100644 --- a/integ/checkpoint_failure_test.go +++ b/integ/checkpoint_failure_test.go @@ -7,6 +7,7 @@ import ( "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" "github.com/prometheus/common/log" . "gopkg.in/check.v1" + coreV1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -19,12 +20,12 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) { config.ObjectMeta.Labels["integTest"] = testName - c.Assert(s.Util.CreateFlinkApplication(config), IsNil, - Commentf("Failed to create flink application")) - // Cause it to fail causeFailure() + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) // wait a bit for it to start failing @@ -63,25 +64,115 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) { log.Info("All pods torn down") } +// Tests that we correctly handle updating a job with a checkpoint timeout +func (s *IntegSuite) TestCheckpointTimeout(c *C) { + log.Info("Starting test TestCheckpointTimeout") + + failingJobTest(s, c, "checkpointtimeout", func() { + // cause checkpoints to take 120 seconds + err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay") + c.Assert(err, IsNil) + }) + log.Info("Completed test TestCheckpointTimeout") +} + +func appUpdate(app *v1beta1.FlinkApplication) *v1beta1.FlinkApplication { + app.Spec.Image = NewImage + skipFailureEnvVar := coreV1.EnvVar{Name: "SKIP_INDUCED_FAILURE", Value: "true"} + app.Spec.JobManagerConfig.EnvConfig.Env = append(app.Spec.JobManagerConfig.EnvConfig.Env, skipFailureEnvVar) + app.Spec.TaskManagerConfig.EnvConfig.Env = append(app.Spec.TaskManagerConfig.EnvConfig.Env, skipFailureEnvVar) + var maxCheckpointRestoreAgeSeconds int32 = 1 + app.Spec.MaxCheckpointRestoreAgeSeconds = &maxCheckpointRestoreAgeSeconds + return app +} + +func failingTaskTest(s *IntegSuite, c *C, testName string, fallbackWithoutState bool, deployShouldFail bool, causeFailure func()) { + config, err := s.Util.ReadFlinkApplication("test_app.yaml") + c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) + config.Name = testName + "job" + config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + config.Spec.FallbackWithoutState = fallbackWithoutState + config.ObjectMeta.Labels["integTest"] = testName + + // Avoid external checkpoints to be used in recovery stage during update + err = s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay") + c.Assert(err, IsNil) + + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + + // Cause it to fail + causeFailure() + + // wait a bit for it to start failing + time.Sleep(5 * time.Second) + + // get app details + app, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + + if deployShouldFail { + // Try to update it + app, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + app = appUpdate(app) + _, err = s.Util.FlinkApps().Update(app) + c.Assert(err, IsNil) + + // because the checkpoint will fail, the app should move to deploy failed + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed), IsNil) + + // And the job should not have been updated + newApp, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + c.Assert(newApp.Status.JobStatus.JobID, Equals, app.Status.JobStatus.JobID) + } else { + // Try to update it with app that does not fail on checkpoint + newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { + appUpdate(app) + }, v1beta1.FlinkApplicationDeployFailed) + + // Check job updated and started without savepointPath + c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID) + c.Assert(newApp.Spec.SavepointPath, Equals, "") + + // Check new app has no failures + endpoint := fmt.Sprintf("jobs/%s", newApp.Status.JobStatus.JobID) + _, err = s.Util.FlinkAPIGet(newApp, endpoint) + c.Assert(err, IsNil) + } + + // delete the application and ensure everything is cleaned up successfully + c.Assert(s.Util.FlinkApps().Delete(app.Name, &v1.DeleteOptions{}), IsNil) + + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + } + log.Info("All pods torn down") +} + // Tests that we correctly handle updating a job with task failures func (s *IntegSuite) TestJobWithTaskFailures(c *C) { log.Info("Starting test TestJobWithTaskFailures") - - failingJobTest(s, c, "taskfailure", func() { + failingTaskTest(s, c, "taskfailure", false, true, func() { err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail") c.Assert(err, IsNil) }) log.Info("Completed test TestJobWithTaskFailures") } -// Tests that we correctly handle updating a job with a checkpoint timeout -func (s *IntegSuite) TestCheckpointTimeout(c *C) { - log.Info("Starting test TestCheckpointTimeout") - - failingJobTest(s, c, "checkpointtimeout", func() { - // cause checkpoints to take 120 seconds - err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay") +func (s *IntegSuite) TestSavepointCheckpointFailureFallback(c *C) { + log.Info("Starting test TestSavepointCheckpointFailureFallback") + failingTaskTest(s, c, "recoveryfallback", true, false, func() { + err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail") c.Assert(err, IsNil) }) - log.Info("Completed test TestCheckpointTimeout") + log.Info("Completed test TestSavepointCheckpointFailureFallback") } diff --git a/integ/main_test.go b/integ/main_test.go index 4a3ce8c9..9e573656 100644 --- a/integ/main_test.go +++ b/integ/main_test.go @@ -77,12 +77,13 @@ func (s *IntegSuite) SetUpSuite(c *C) { if runDirect { config := controllerConfig.Config{ - LimitNamespace: namespace, - UseProxy: true, - ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second}, - MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second}, - MetricsPrefix: "flinkk8soperator", - ProxyPort: flyteConfig.Port{Port: 8001}, + LimitNamespace: namespace, + UseProxy: true, + ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second}, + MaxErrDuration: flyteConfig.Duration{Duration: 6000 * time.Second}, + FlinkJobVertexTimeout: flyteConfig.Duration{Duration: 3 * time.Minute}, + MetricsPrefix: "flinkk8soperator", + ProxyPort: flyteConfig.Port{Port: 8001}, } log.Info("Running operator directly") diff --git a/integ/operator-test-app/Dockerfile b/integ/operator-test-app/Dockerfile index e42031b9..2182461c 100644 --- a/integ/operator-test-app/Dockerfile +++ b/integ/operator-test-app/Dockerfile @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH COPY . /code # Configure Flink version -ENV FLINK_VERSION=1.11.6 \ +ENV FLINK_VERSION=1.8.1 \ HADOOP_SCALA_VARIANT=scala_2.12 # Install dependencies diff --git a/integ/operator-test-app/pom.xml b/integ/operator-test-app/pom.xml index be20aac8..68e0e141 100644 --- a/integ/operator-test-app/pom.xml +++ b/integ/operator-test-app/pom.xml @@ -19,12 +19,25 @@ org.apache.flink flink-java - 1.11.6 + 1.8.1 org.apache.flink flink-streaming-java_2.11 - 1.11.6 + 1.8.1 + + + org.apache.flink + flink-streaming-scala_2.11 + 1.8.1 + provided + + + + org.apache.flink + flink-clients_2.11 + 1.8.1 + provided diff --git a/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java b/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java index c423aff8..e1f3a413 100644 --- a/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java +++ b/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java @@ -89,7 +89,7 @@ public static class MaybeFail implements MapFunction { @Override public Long map(Long x) throws Exception { - if (new File("/checkpoints/fail").exists()) { + if (new File("/checkpoints/fail").exists() && !Settings.skipInducedFailure()) { throw new RuntimeException("FAILED!!!"); } diff --git a/integ/operator-test-app/src/main/java/com/lyft/Settings.java b/integ/operator-test-app/src/main/java/com/lyft/Settings.java new file mode 100644 index 00000000..16a6d9a0 --- /dev/null +++ b/integ/operator-test-app/src/main/java/com/lyft/Settings.java @@ -0,0 +1,9 @@ +package com.lyft; + +public class Settings { + private static final String SKIP_INDUCED_FAILURE = "SKIP_INDUCED_FAILURE"; + + public static boolean skipInducedFailure() { + return System.getenv(SKIP_INDUCED_FAILURE) != null && System.getenv(SKIP_INDUCED_FAILURE).equals("true"); + } +} diff --git a/integ/setup.sh b/integ/setup.sh index 5871435b..b2c06a86 100755 --- a/integ/setup.sh +++ b/integ/setup.sh @@ -3,21 +3,15 @@ # Test App Setup # TODO: upgrade flink test app from 1.8 -#cd integ/operator-test-app -#export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD) -#docker build -t $TEST_APP_IMAGE . -#docker tag $TEST_APP_IMAGE flink-test-app:local.1 -#docker tag $TEST_APP_IMAGE flink-test-app:local.2 -#minikube image load flink-test-app:local.1 -#minikube image load flink-test-app:local.2 -# -#cd ../../ - -docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 -docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2 -minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 -minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2 - +cd integ/operator-test-app +export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD) +docker build -t $TEST_APP_IMAGE . +docker tag $TEST_APP_IMAGE operator-test-app:local.1 +docker tag $TEST_APP_IMAGE operator-test-app:local.2 +minikube image load operator-test-app:local.1 +minikube image load operator-test-app:local.2 + +cd ../../ # Operator Setup diff --git a/integ/simple_test.go b/integ/simple_test.go index 553ea978..bc9346b2 100644 --- a/integ/simple_test.go +++ b/integ/simple_test.go @@ -14,7 +14,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2" +const NewImage = "operator-test-app:local.2" func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication { app, err := s.Util.Update(name, updateFn) diff --git a/integ/test_app.yaml b/integ/test_app.yaml index 03189bf3..c190b205 100644 --- a/integ/test_app.yaml +++ b/integ/test_app.yaml @@ -6,7 +6,7 @@ metadata: labels: environment: development spec: - image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 + image: operator-test-app:local.1 imagePullPolicy: IfNotPresent imagePullSecrets: - name: dockerhub diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 0b3edf66..ec4d4aac 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -60,6 +60,7 @@ type FlinkApplicationSpec struct { ForceRollback bool `json:"forceRollback"` MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"` TearDownVersionHash string `json:"tearDownVersionHash,omitempty"` + FallbackWithoutState bool `json:"fallbackWithoutState"` } type FlinkConfig map[string]interface{} diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 54d3e251..ae288bf7 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -193,7 +193,6 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli updateApplication, appErr = s.handleApplicationDeleting(ctx, application) case v1beta1.FlinkApplicationDualRunning: updateApplication, appErr = s.handleDualRunning(ctx, application) - } if !v1beta1.IsRunningPhase(appPhase) { @@ -587,17 +586,30 @@ func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app // (but if the JM is unavailable, our options there might be limited) // try to find an externalized checkpoint + failDeploy := false path, err := s.flinkController.FindExternalizedCheckpoint(ctx, app, app.Status.DeployHash) if err != nil { s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed", - "Failed to get externalized checkpoint config, could not recover. "+ - "Manual intervention is needed.") - return s.deployFailed(app) + "Failed to get externalized checkpoint config.") + failDeploy = true } else if path == "" { s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed", - "No externalized checkpoint found, could not recover. Make sure that "+ - "externalized checkpoints are enabled in your job's checkpoint configuration. Manual intervention "+ - "is needed to recover.") + "No externalized checkpoint found. Make sure that "+ + "externalized checkpoints are enabled in your job's checkpoint configuration.") + failDeploy = true + } + // try to continue without state if configured else fail + if failDeploy && app.Spec.FallbackWithoutState { + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RestoringWithoutExternalizedCheckpoint", + "FallbackWithoutState enabled. Proceeding without a checkpoint or savepoint.") + s.flinkController.UpdateLatestJobID(ctx, app, "") + s.updateApplicationPhase(app, v1beta1.FlinkApplicationSubmittingJob) + return statusChanged, nil + } + + if failDeploy { + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed", + "Could not recover. Manual intervention is needed to recover.") return s.deployFailed(app) } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 4e962ea7..500609be 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -406,6 +406,40 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) { assert.Nil(t, err) } +func TestRestoreWithFallbackWithoutState(t *testing.T) { + updateInvoked := false + + app := v1beta1.FlinkApplication{ + Spec: v1beta1.FlinkApplicationSpec{ + FallbackWithoutState: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationRecovering, + DeployHash: "blah", + SavepointTriggerID: "trigger", + }, + } + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + + mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + return "", nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, "", application.Status.SavepointPath) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) + updateInvoked = true + return nil + } + err := stateMachineForTest.Handle(context.Background(), &app) + assert.True(t, updateInvoked) + assert.Nil(t, err) +} + func TestSubmittingToRunning(t *testing.T) { jobID := "j1"