diff --git a/deploy/crd.yaml b/deploy/crd.yaml
index 75cbf276..3349e961 100644
--- a/deploy/crd.yaml
+++ b/deploy/crd.yaml
@@ -131,6 +131,8 @@ spec:
maxCheckpointRestoreAgeSeconds:
type: integer
minimum: 1
+ fallbackWithoutState:
+ type: boolean
jobManagerConfig:
type: object
properties:
diff --git a/docs/crd.md b/docs/crd.md
index afecaf74..4b32e747 100644
--- a/docs/crd.md
+++ b/docs/crd.md
@@ -172,3 +172,7 @@ Below is the list of fields in the custom resource and their description:
* **tearDownVersionHash** `type:string`
Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase.
Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase.
+
+ * **fallbackWithoutState** `type:bool`
+ Can be set to true to attempt to continue to submit a job without a savepoint in the case where
+ a savepoint cannot be taken and there are no external checkpoints to recover from.
diff --git a/integ/README.md b/integ/README.md
index 09d43303..7f43318e 100644
--- a/integ/README.md
+++ b/integ/README.md
@@ -21,14 +21,7 @@ By default the tests create, use, and clean up the namespace
`flinkoperatortest`.
These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
-tests currently use two images built from here:
-
-* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1`
-* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2`
-
-Those images are available on our private Dockerhub registry, and you
-will either need to pull them locally or give Kubernetes access to the
-registry.
+tests currently use two images built before the integration test is run.
### Setup
@@ -123,4 +116,3 @@ Helpers:
`kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge`
- Set default namespace
`kubectl config set-context --current --namespace=flinkoperatortest`
-
diff --git a/integ/blue_green_deployment_test.go b/integ/blue_green_deployment_test.go
index c29855a2..25aaf7c7 100644
--- a/integ/blue_green_deployment_test.go
+++ b/integ/blue_green_deployment_test.go
@@ -10,7 +10,6 @@ import (
)
func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {
-
// update with new image.
app, err := s.Util.Update(name, updateFn)
c.Assert(err, IsNil)
diff --git a/integ/checkpoint_failure_test.go b/integ/checkpoint_failure_test.go
index 5004bbf0..f498edbe 100644
--- a/integ/checkpoint_failure_test.go
+++ b/integ/checkpoint_failure_test.go
@@ -7,6 +7,7 @@ import (
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
+ coreV1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -19,12 +20,12 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
config.ObjectMeta.Labels["integTest"] = testName
- c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
- Commentf("Failed to create flink application"))
-
// Cause it to fail
causeFailure()
+ c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
+ Commentf("Failed to create flink application"))
+
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
// wait a bit for it to start failing
@@ -63,25 +64,115 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
log.Info("All pods torn down")
}
+// Tests that we correctly handle updating a job with a checkpoint timeout
+func (s *IntegSuite) TestCheckpointTimeout(c *C) {
+ log.Info("Starting test TestCheckpointTimeout")
+
+ failingJobTest(s, c, "checkpointtimeout", func() {
+ // cause checkpoints to take 120 seconds
+ err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
+ c.Assert(err, IsNil)
+ })
+ log.Info("Completed test TestCheckpointTimeout")
+}
+
+func appUpdate(app *v1beta1.FlinkApplication) *v1beta1.FlinkApplication {
+ app.Spec.Image = NewImage
+ skipFailureEnvVar := coreV1.EnvVar{Name: "SKIP_INDUCED_FAILURE", Value: "true"}
+ app.Spec.JobManagerConfig.EnvConfig.Env = append(app.Spec.JobManagerConfig.EnvConfig.Env, skipFailureEnvVar)
+ app.Spec.TaskManagerConfig.EnvConfig.Env = append(app.Spec.TaskManagerConfig.EnvConfig.Env, skipFailureEnvVar)
+ var maxCheckpointRestoreAgeSeconds int32 = 1
+ app.Spec.MaxCheckpointRestoreAgeSeconds = &maxCheckpointRestoreAgeSeconds
+ return app
+}
+
+func failingTaskTest(s *IntegSuite, c *C, testName string, fallbackWithoutState bool, deployShouldFail bool, causeFailure func()) {
+ config, err := s.Util.ReadFlinkApplication("test_app.yaml")
+ c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))
+ config.Name = testName + "job"
+ config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
+ config.Spec.FallbackWithoutState = fallbackWithoutState
+ config.ObjectMeta.Labels["integTest"] = testName
+
+ // Avoid external checkpoints to be used in recovery stage during update
+ err = s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
+ c.Assert(err, IsNil)
+
+ c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
+ Commentf("Failed to create flink application"))
+
+ c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
+
+ // Cause it to fail
+ causeFailure()
+
+ // wait a bit for it to start failing
+ time.Sleep(5 * time.Second)
+
+ // get app details
+ app, err := s.Util.GetFlinkApplication(config.Name)
+ c.Assert(err, IsNil)
+
+ if deployShouldFail {
+ // Try to update it
+ app, err := s.Util.GetFlinkApplication(config.Name)
+ c.Assert(err, IsNil)
+ app = appUpdate(app)
+ _, err = s.Util.FlinkApps().Update(app)
+ c.Assert(err, IsNil)
+
+ // because the checkpoint will fail, the app should move to deploy failed
+ c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed), IsNil)
+
+ // And the job should not have been updated
+ newApp, err := s.Util.GetFlinkApplication(config.Name)
+ c.Assert(err, IsNil)
+ c.Assert(newApp.Status.JobStatus.JobID, Equals, app.Status.JobStatus.JobID)
+ } else {
+ // Try to update it with app that does not fail on checkpoint
+ newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
+ appUpdate(app)
+ }, v1beta1.FlinkApplicationDeployFailed)
+
+ // Check job updated and started without savepointPath
+ c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID)
+ c.Assert(newApp.Spec.SavepointPath, Equals, "")
+
+ // Check new app has no failures
+ endpoint := fmt.Sprintf("jobs/%s", newApp.Status.JobStatus.JobID)
+ _, err = s.Util.FlinkAPIGet(newApp, endpoint)
+ c.Assert(err, IsNil)
+ }
+
+ // delete the application and ensure everything is cleaned up successfully
+ c.Assert(s.Util.FlinkApps().Delete(app.Name, &v1.DeleteOptions{}), IsNil)
+
+ for {
+ pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
+ List(v1.ListOptions{LabelSelector: "integTest=" + testName})
+ c.Assert(err, IsNil)
+ if len(pods.Items) == 0 {
+ break
+ }
+ }
+ log.Info("All pods torn down")
+}
+
// Tests that we correctly handle updating a job with task failures
func (s *IntegSuite) TestJobWithTaskFailures(c *C) {
log.Info("Starting test TestJobWithTaskFailures")
-
- failingJobTest(s, c, "taskfailure", func() {
+ failingTaskTest(s, c, "taskfailure", false, true, func() {
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
log.Info("Completed test TestJobWithTaskFailures")
}
-// Tests that we correctly handle updating a job with a checkpoint timeout
-func (s *IntegSuite) TestCheckpointTimeout(c *C) {
- log.Info("Starting test TestCheckpointTimeout")
-
- failingJobTest(s, c, "checkpointtimeout", func() {
- // cause checkpoints to take 120 seconds
- err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
+func (s *IntegSuite) TestSavepointCheckpointFailureFallback(c *C) {
+ log.Info("Starting test TestSavepointCheckpointFailureFallback")
+ failingTaskTest(s, c, "recoveryfallback", true, false, func() {
+ err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
- log.Info("Completed test TestCheckpointTimeout")
+ log.Info("Completed test TestSavepointCheckpointFailureFallback")
}
diff --git a/integ/main_test.go b/integ/main_test.go
index 4a3ce8c9..9e573656 100644
--- a/integ/main_test.go
+++ b/integ/main_test.go
@@ -77,12 +77,13 @@ func (s *IntegSuite) SetUpSuite(c *C) {
if runDirect {
config := controllerConfig.Config{
- LimitNamespace: namespace,
- UseProxy: true,
- ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
- MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second},
- MetricsPrefix: "flinkk8soperator",
- ProxyPort: flyteConfig.Port{Port: 8001},
+ LimitNamespace: namespace,
+ UseProxy: true,
+ ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
+ MaxErrDuration: flyteConfig.Duration{Duration: 6000 * time.Second},
+ FlinkJobVertexTimeout: flyteConfig.Duration{Duration: 3 * time.Minute},
+ MetricsPrefix: "flinkk8soperator",
+ ProxyPort: flyteConfig.Port{Port: 8001},
}
log.Info("Running operator directly")
diff --git a/integ/operator-test-app/Dockerfile b/integ/operator-test-app/Dockerfile
index e42031b9..2182461c 100644
--- a/integ/operator-test-app/Dockerfile
+++ b/integ/operator-test-app/Dockerfile
@@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code
# Configure Flink version
-ENV FLINK_VERSION=1.11.6 \
+ENV FLINK_VERSION=1.8.1 \
HADOOP_SCALA_VARIANT=scala_2.12
# Install dependencies
diff --git a/integ/operator-test-app/pom.xml b/integ/operator-test-app/pom.xml
index be20aac8..68e0e141 100644
--- a/integ/operator-test-app/pom.xml
+++ b/integ/operator-test-app/pom.xml
@@ -19,12 +19,25 @@
org.apache.flink
flink-java
- 1.11.6
+ 1.8.1
org.apache.flink
flink-streaming-java_2.11
- 1.11.6
+ 1.8.1
+
+
+ org.apache.flink
+ flink-streaming-scala_2.11
+ 1.8.1
+ provided
+
+
+
+ org.apache.flink
+ flink-clients_2.11
+ 1.8.1
+ provided
diff --git a/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java b/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java
index c423aff8..e1f3a413 100644
--- a/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java
+++ b/integ/operator-test-app/src/main/java/com/lyft/OperatorTestApp.java
@@ -89,7 +89,7 @@ public static class MaybeFail implements MapFunction {
@Override
public Long map(Long x) throws Exception {
- if (new File("/checkpoints/fail").exists()) {
+ if (new File("/checkpoints/fail").exists() && !Settings.skipInducedFailure()) {
throw new RuntimeException("FAILED!!!");
}
diff --git a/integ/operator-test-app/src/main/java/com/lyft/Settings.java b/integ/operator-test-app/src/main/java/com/lyft/Settings.java
new file mode 100644
index 00000000..16a6d9a0
--- /dev/null
+++ b/integ/operator-test-app/src/main/java/com/lyft/Settings.java
@@ -0,0 +1,9 @@
+package com.lyft;
+
+public class Settings {
+ private static final String SKIP_INDUCED_FAILURE = "SKIP_INDUCED_FAILURE";
+
+ public static boolean skipInducedFailure() {
+ return System.getenv(SKIP_INDUCED_FAILURE) != null && System.getenv(SKIP_INDUCED_FAILURE).equals("true");
+ }
+}
diff --git a/integ/setup.sh b/integ/setup.sh
index 5871435b..b2c06a86 100755
--- a/integ/setup.sh
+++ b/integ/setup.sh
@@ -3,21 +3,15 @@
# Test App Setup
# TODO: upgrade flink test app from 1.8
-#cd integ/operator-test-app
-#export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
-#docker build -t $TEST_APP_IMAGE .
-#docker tag $TEST_APP_IMAGE flink-test-app:local.1
-#docker tag $TEST_APP_IMAGE flink-test-app:local.2
-#minikube image load flink-test-app:local.1
-#minikube image load flink-test-app:local.2
-#
-#cd ../../
-
-docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
-docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2
-minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
-minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2
-
+cd integ/operator-test-app
+export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
+docker build -t $TEST_APP_IMAGE .
+docker tag $TEST_APP_IMAGE operator-test-app:local.1
+docker tag $TEST_APP_IMAGE operator-test-app:local.2
+minikube image load operator-test-app:local.1
+minikube image load operator-test-app:local.2
+
+cd ../../
# Operator Setup
diff --git a/integ/simple_test.go b/integ/simple_test.go
index 553ea978..bc9346b2 100644
--- a/integ/simple_test.go
+++ b/integ/simple_test.go
@@ -14,7 +14,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
-const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"
+const NewImage = "operator-test-app:local.2"
func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {
app, err := s.Util.Update(name, updateFn)
diff --git a/integ/test_app.yaml b/integ/test_app.yaml
index 03189bf3..c190b205 100644
--- a/integ/test_app.yaml
+++ b/integ/test_app.yaml
@@ -6,7 +6,7 @@ metadata:
labels:
environment: development
spec:
- image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
+ image: operator-test-app:local.1
imagePullPolicy: IfNotPresent
imagePullSecrets:
- name: dockerhub
diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go
index 0b3edf66..ec4d4aac 100644
--- a/pkg/apis/app/v1beta1/types.go
+++ b/pkg/apis/app/v1beta1/types.go
@@ -60,6 +60,7 @@ type FlinkApplicationSpec struct {
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
TearDownVersionHash string `json:"tearDownVersionHash,omitempty"`
+ FallbackWithoutState bool `json:"fallbackWithoutState"`
}
type FlinkConfig map[string]interface{}
diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go
index 54d3e251..ae288bf7 100644
--- a/pkg/controller/flinkapplication/flink_state_machine.go
+++ b/pkg/controller/flinkapplication/flink_state_machine.go
@@ -193,7 +193,6 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
updateApplication, appErr = s.handleApplicationDeleting(ctx, application)
case v1beta1.FlinkApplicationDualRunning:
updateApplication, appErr = s.handleDualRunning(ctx, application)
-
}
if !v1beta1.IsRunningPhase(appPhase) {
@@ -587,17 +586,30 @@ func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app
// (but if the JM is unavailable, our options there might be limited)
// try to find an externalized checkpoint
+ failDeploy := false
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, app, app.Status.DeployHash)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
- "Failed to get externalized checkpoint config, could not recover. "+
- "Manual intervention is needed.")
- return s.deployFailed(app)
+ "Failed to get externalized checkpoint config.")
+ failDeploy = true
} else if path == "" {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
- "No externalized checkpoint found, could not recover. Make sure that "+
- "externalized checkpoints are enabled in your job's checkpoint configuration. Manual intervention "+
- "is needed to recover.")
+ "No externalized checkpoint found. Make sure that "+
+ "externalized checkpoints are enabled in your job's checkpoint configuration.")
+ failDeploy = true
+ }
+ // try to continue without state if configured else fail
+ if failDeploy && app.Spec.FallbackWithoutState {
+ s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RestoringWithoutExternalizedCheckpoint",
+ "FallbackWithoutState enabled. Proceeding without a checkpoint or savepoint.")
+ s.flinkController.UpdateLatestJobID(ctx, app, "")
+ s.updateApplicationPhase(app, v1beta1.FlinkApplicationSubmittingJob)
+ return statusChanged, nil
+ }
+
+ if failDeploy {
+ s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
+ "Could not recover. Manual intervention is needed to recover.")
return s.deployFailed(app)
}
diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go
index 4e962ea7..500609be 100644
--- a/pkg/controller/flinkapplication/flink_state_machine_test.go
+++ b/pkg/controller/flinkapplication/flink_state_machine_test.go
@@ -406,6 +406,40 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) {
assert.Nil(t, err)
}
+func TestRestoreWithFallbackWithoutState(t *testing.T) {
+ updateInvoked := false
+
+ app := v1beta1.FlinkApplication{
+ Spec: v1beta1.FlinkApplicationSpec{
+ FallbackWithoutState: true,
+ },
+ Status: v1beta1.FlinkApplicationStatus{
+ Phase: v1beta1.FlinkApplicationRecovering,
+ DeployHash: "blah",
+ SavepointTriggerID: "trigger",
+ },
+ }
+
+ stateMachineForTest := getTestStateMachine()
+ mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
+
+ mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
+ return "", nil
+ }
+
+ mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)
+ mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
+ application := object.(*v1beta1.FlinkApplication)
+ assert.Equal(t, "", application.Status.SavepointPath)
+ assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase)
+ updateInvoked = true
+ return nil
+ }
+ err := stateMachineForTest.Handle(context.Background(), &app)
+ assert.True(t, updateInvoked)
+ assert.Nil(t, err)
+}
+
func TestSubmittingToRunning(t *testing.T) {
jobID := "j1"