Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STRMCMP-1658] Enable Deploys to Fallback without State #287

Merged
merged 26 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
42eb35f
add fields
sethsaperstein-lyft Mar 17, 2023
8932cf3
merge master
sethsaperstein-lyft Apr 17, 2023
f94219f
integ test failing due to vertices timeout of 0
sethsaperstein-lyft Apr 18, 2023
caadae6
about to make changes to rid of new state in graph
sethsaperstein-lyft Apr 25, 2023
68ad78f
revamped to not modify state. test passing
sethsaperstein-lyft Apr 25, 2023
5e1bc6b
fix image name
sethsaperstein-lyft Apr 25, 2023
a397152
update logging
sethsaperstein-lyft Apr 25, 2023
841924c
unit test
sethsaperstein-lyft Apr 25, 2023
f05c41d
fix import for lint
sethsaperstein-lyft Apr 25, 2023
d004ec2
skip tests
sethsaperstein-lyft May 3, 2023
cdc027b
change order of status check and fail
sethsaperstein-lyft May 3, 2023
d2cfc6d
Revert "skip tests"
sethsaperstein-lyft May 3, 2023
30d4644
Revert "Revert "skip tests""
sethsaperstein-lyft May 3, 2023
c92aae4
skip test
sethsaperstein-lyft May 3, 2023
d9350f4
fix test with task failure now that there's the vertex monitoring
sethsaperstein-lyft May 4, 2023
924725b
remove skips
sethsaperstein-lyft May 4, 2023
9aacdbf
lint and refactor
sethsaperstein-lyft May 4, 2023
2845963
Revert "remove skips"
sethsaperstein-lyft May 4, 2023
25b4859
limit checkpoint lookback
sethsaperstein-lyft May 4, 2023
8781624
Revert "Revert "remove skips""
sethsaperstein-lyft May 4, 2023
062390f
add checkpoint timeouts
sethsaperstein-lyft May 5, 2023
d97fbb5
Revert "Revert "Revert "remove skips"""
sethsaperstein-lyft May 5, 2023
bcb53bc
don't allow checkpoints to trigger
sethsaperstein-lyft May 5, 2023
fb202d4
Revert "Revert "Revert "Revert "remove skips""""
sethsaperstein-lyft May 5, 2023
f3a3775
remove redundant line
sethsaperstein-lyft May 5, 2023
3b2e806
lint
sethsaperstein-lyft May 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
maxCheckpointRestoreAgeSeconds:
type: integer
minimum: 1
fallbackWithoutState:
type: boolean
jobManagerConfig:
type: object
properties:
Expand Down
4 changes: 4 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ Below is the list of fields in the custom resource and their description:
* **tearDownVersionHash** `type:string`
Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase.
Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase.

* **fallbackWithoutState** `type:bool`
Can be set to true to attempt to continue to submit a job without a savepoint in the case where
a savepoint cannot be taken and there are no external checkpoints to recover from.
10 changes: 1 addition & 9 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ By default the tests create, use, and clean up the namespace
`flinkoperatortest`.

These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
tests currently use two images built from here:

* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1`
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2`

Those images are available on our private Dockerhub registry, and you
will either need to pull them locally or give Kubernetes access to the
registry.
tests currently use two images built before the integration test is run.

### Setup

Expand Down Expand Up @@ -123,4 +116,3 @@ Helpers:
`kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge`
- Set default namespace
`kubectl config set-context --current --namespace=flinkoperatortest`

1 change: 0 additions & 1 deletion integ/blue_green_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {

// update with new image.
app, err := s.Util.Update(name, updateFn)
c.Assert(err, IsNil)
Expand Down
117 changes: 104 additions & 13 deletions integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
coreV1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -19,12 +20,12 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {

config.ObjectMeta.Labels["integTest"] = testName

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

// Cause it to fail
causeFailure()

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// wait a bit for it to start failing
Expand Down Expand Up @@ -63,25 +64,115 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
log.Info("All pods torn down")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")
}

func appUpdate(app *v1beta1.FlinkApplication) *v1beta1.FlinkApplication {
app.Spec.Image = NewImage
skipFailureEnvVar := coreV1.EnvVar{Name: "SKIP_INDUCED_FAILURE", Value: "true"}
app.Spec.JobManagerConfig.EnvConfig.Env = append(app.Spec.JobManagerConfig.EnvConfig.Env, skipFailureEnvVar)
app.Spec.TaskManagerConfig.EnvConfig.Env = append(app.Spec.TaskManagerConfig.EnvConfig.Env, skipFailureEnvVar)
var maxCheckpointRestoreAgeSeconds int32 = 1
app.Spec.MaxCheckpointRestoreAgeSeconds = &maxCheckpointRestoreAgeSeconds
return app
}

func failingTaskTest(s *IntegSuite, c *C, testName string, fallbackWithoutState bool, deployShouldFail bool, causeFailure func()) {
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))
config.Name = testName + "job"
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
config.Spec.FallbackWithoutState = fallbackWithoutState
config.ObjectMeta.Labels["integTest"] = testName

// Avoid external checkpoints to be used in recovery stage during update
err = s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// Cause it to fail
causeFailure()

// wait a bit for it to start failing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment incorrect because on line 115 you assert error is nil?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! You can not verifying that the app is failing is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines have moved around since but if you are referring to s.Util.GetFlinkApplication this gets the k8s FlinkApplication CR and the err corresponds to if there were errors retrieving the CR object as opposed to anything related to the status of the actual job.

Ln 135 s.Util.FlinkAPIGet(newApp, endpoint) gets the actual flink job and corresponding status to show that the job itself is healthy

time.Sleep(5 * time.Second)

// get app details
app, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)

if deployShouldFail {
// Try to update it
app, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
app = appUpdate(app)
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

// because the checkpoint will fail, the app should move to deploy failed
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed), IsNil)

// And the job should not have been updated
newApp, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
c.Assert(newApp.Status.JobStatus.JobID, Equals, app.Status.JobStatus.JobID)
} else {
// Try to update it with app that does not fail on checkpoint
newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
appUpdate(app)
}, v1beta1.FlinkApplicationDeployFailed)

// Check job updated and started without savepointPath
c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID)
c.Assert(newApp.Spec.SavepointPath, Equals, "")

// Check new app has no failures
endpoint := fmt.Sprintf("jobs/%s", newApp.Status.JobStatus.JobID)
_, err = s.Util.FlinkAPIGet(newApp, endpoint)
c.Assert(err, IsNil)
}

// delete the application and ensure everything is cleaned up successfully
c.Assert(s.Util.FlinkApps().Delete(app.Name, &v1.DeleteOptions{}), IsNil)

for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
}
log.Info("All pods torn down")
}

// Tests that we correctly handle updating a job with task failures
func (s *IntegSuite) TestJobWithTaskFailures(c *C) {
log.Info("Starting test TestJobWithTaskFailures")

failingJobTest(s, c, "taskfailure", func() {
failingTaskTest(s, c, "taskfailure", false, true, func() {
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
log.Info("Completed test TestJobWithTaskFailures")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
func (s *IntegSuite) TestSavepointCheckpointFailureFallback(c *C) {
log.Info("Starting test TestSavepointCheckpointFailureFallback")
failingTaskTest(s, c, "recoveryfallback", true, false, func() {
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")
log.Info("Completed test TestSavepointCheckpointFailureFallback")
}
13 changes: 7 additions & 6 deletions integ/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ func (s *IntegSuite) SetUpSuite(c *C) {

if runDirect {
config := controllerConfig.Config{
LimitNamespace: namespace,
UseProxy: true,
ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second},
MetricsPrefix: "flinkk8soperator",
ProxyPort: flyteConfig.Port{Port: 8001},
LimitNamespace: namespace,
UseProxy: true,
ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 6000 * time.Second},
FlinkJobVertexTimeout: flyteConfig.Duration{Duration: 3 * time.Minute},
MetricsPrefix: "flinkk8soperator",
ProxyPort: flyteConfig.Port{Port: 8001},
}

log.Info("Running operator directly")
Expand Down
2 changes: 1 addition & 1 deletion integ/operator-test-app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code

# Configure Flink version
ENV FLINK_VERSION=1.11.6 \
ENV FLINK_VERSION=1.8.1 \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning this back to 1.8.1 which corresponds to the image in dockerhub. Attempted to update this in the integration test PR but am reverting for now as not to deal with further configuration options that are non-trivial when dealing with 8GB memory total for github actions

HADOOP_SCALA_VARIANT=scala_2.12

# Install dependencies
Expand Down
17 changes: 15 additions & 2 deletions integ/operator-test-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.6</version>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.6</version>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static class MaybeFail implements MapFunction<Long, Long> {

@Override
public Long map(Long x) throws Exception {
if (new File("/checkpoints/fail").exists()) {
if (new File("/checkpoints/fail").exists() && !Settings.skipInducedFailure()) {
throw new RuntimeException("FAILED!!!");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.lyft;

public class Settings {
private static final String SKIP_INDUCED_FAILURE = "SKIP_INDUCED_FAILURE";

public static boolean skipInducedFailure() {
return System.getenv(SKIP_INDUCED_FAILURE) != null && System.getenv(SKIP_INDUCED_FAILURE).equals("true");
}
}
24 changes: 9 additions & 15 deletions integ/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@
# Test App Setup

# TODO: upgrade flink test app from 1.8
#cd integ/operator-test-app
#export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
#docker build -t $TEST_APP_IMAGE .
#docker tag $TEST_APP_IMAGE flink-test-app:local.1
#docker tag $TEST_APP_IMAGE flink-test-app:local.2
#minikube image load flink-test-app:local.1
#minikube image load flink-test-app:local.2
#
#cd ../../

docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2
minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2

cd integ/operator-test-app
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building the test app image as part of the integration test as opposed to relying on the remote image whose contents may not match the integ/operator-test-app contents

export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
docker build -t $TEST_APP_IMAGE .
docker tag $TEST_APP_IMAGE operator-test-app:local.1
docker tag $TEST_APP_IMAGE operator-test-app:local.2
minikube image load operator-test-app:local.1
minikube image load operator-test-app:local.2

cd ../../

# Operator Setup

Expand Down
2 changes: 1 addition & 1 deletion integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"
const NewImage = "operator-test-app:local.2"

func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {
app, err := s.Util.Update(name, updateFn)
Expand Down
2 changes: 1 addition & 1 deletion integ/test_app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
environment: development
spec:
image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
image: operator-test-app:local.1
imagePullPolicy: IfNotPresent
imagePullSecrets:
- name: dockerhub
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type FlinkApplicationSpec struct {
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
TearDownVersionHash string `json:"tearDownVersionHash,omitempty"`
FallbackWithoutState bool `json:"fallbackWithoutState"`
}

type FlinkConfig map[string]interface{}
Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
updateApplication, appErr = s.handleApplicationDeleting(ctx, application)
case v1beta1.FlinkApplicationDualRunning:
updateApplication, appErr = s.handleDualRunning(ctx, application)

}

if !v1beta1.IsRunningPhase(appPhase) {
Expand Down Expand Up @@ -587,17 +586,30 @@ func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app
// (but if the JM is unavailable, our options there might be limited)

// try to find an externalized checkpoint
failDeploy := false
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, app, app.Status.DeployHash)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"Failed to get externalized checkpoint config, could not recover. "+
"Manual intervention is needed.")
return s.deployFailed(app)
"Failed to get externalized checkpoint config.")
failDeploy = true
} else if path == "" {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"No externalized checkpoint found, could not recover. Make sure that "+
"externalized checkpoints are enabled in your job's checkpoint configuration. Manual intervention "+
"is needed to recover.")
"No externalized checkpoint found. Make sure that "+
"externalized checkpoints are enabled in your job's checkpoint configuration.")
failDeploy = true
}
// try to continue without state if configured else fail
if failDeploy && app.Spec.FallbackWithoutState {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RestoringWithoutExternalizedCheckpoint",
"FallbackWithoutState enabled. Proceeding without a checkpoint or savepoint.")
s.flinkController.UpdateLatestJobID(ctx, app, "")
s.updateApplicationPhase(app, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}

if failDeploy {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"Could not recover. Manual intervention is needed to recover.")
return s.deployFailed(app)
}

Expand Down
Loading