This document will walk you through the steps of deploying the Flink Operator to a Kubernetes cluster and running a sample Flink job.
-
Get a running Kubernetes cluster, you can verify the cluster info with
kubectl cluster-info
-
Clone the Flink Operator repo to your local machine:
git clone git@github.com:GoogleCloudPlatform/flink-on-k8s-operator.git
then switch to the repo directory, we need to use the scripts in the repo for deployment. (This step is not needed if you choose to install through Helm Chart).
To execute the install scripts, you'll also need kubectl v1.14+ and go v1.12+ installed on your local machine.
-
(Optional) Choose a Flink Operator image.
By default, the deployment uses the image
gcr.io/flink-operator/flink-operator:latest
, but you can find and choose other released images by running the following command:gcloud container images list-tags gcr.io/flink-operator/flink-operator
You can also follow the Deverloper Guide to build your own image from the source code and push the image to a registry where your Kubernetes cluster can access.
You can deploy the Flink Operator to the Kubernetes cluster through one of the following 2 ways:
-
Option 1: Make deploy
Simply run
make deploy
from the source repo to deploy the operator. There are some flags which you can use to configure the deployment:
make deploy [IMG=<operator-image>] \ [FLINK_OPERATOR_NAMESPACE=<namespace-to-deploy-operator>] \ [RESOURCE_PREFIX=<kuberntes-resource-name-prefix>] \ [WATCH_NAMESPACE=<namespace-to-watch>]
IMG
: The Flink Operator image. The default value isgcr.io/flink-operator/flink-operator:latest
.FLINK_OPERATOR_NAMESPACE
: the namespace of the operator. The default value isflink-operator-system
.RESOURCE_PREFIX
: the prefix to avoid conflict of cluster-scoped resources. The default value isflink-operator-
.WATCH_NAMESPACE
: the namespace of theFlinkCluster
CRs which the operator watches. The default value is empty string which means all namespaces.
Note: It is highly recommended to just use the default values unless you want to deploy multiple instances of the operator in a cluster, see more details in the How-to section of this doc.
-
Option 2: Helm Chart
Follow the Helm Chart Installation Guide to install the operator through Helm Chart.
Note, for kubernetes cluster with private cluster domain, you should add a CLUSTER_DOMAIN environment to the operator deployment.
After deploying the operator, you can verify CRD flinkclusters.flinkoperator.k8s.io
has been created:
kubectl get crds | grep flinkclusters.flinkoperator.k8s.io
View the details of the CRD:
kubectl describe crds/flinkclusters.flinkoperator.k8s.io
Find out the deployment:
kubectl get deployments -n flink-operator-system
Verify the operator Pod is up and running:
kubectl get pods -n flink-operator-system
Check the operator logs:
kubectl logs -n flink-operator-system -l app=flink-operator --all-containers
you should be able see logs like:
INFO setup Starting manager
INFO controller-runtime.certwatcher Starting certificate watcher
INFO controller-runtime.controller Starting workers {"controller": "flinkcluster", "worker count": 1}
After deploying the Flink CRDs and the Flink Operator to a Kubernetes cluster, the operator serves as a control plane for Flink. In other words, previously the cluster only understands the language of Kubernetes, now it understands the language of Flink. You can then create custom resources representing Flink session clusters or job clusters, and the operator will detect the custom resources automatically, then create the actual clusters optionally run jobs, and update status in the custom resources.
Create a sample Flink session cluster custom resource with
kubectl apply -f config/samples/flinkoperator_v1beta1_flinksessioncluster.yaml
Flink will deploy Flink session cluster's Pods, Services, etc.. on default
namespace, and you can find out with
kubectl get pods,svc -n default
or verify the Pod is up and running with
kubectl get pods,svc -n default | grep "flinksessioncluster"
and a sample Flink job cluster
custom resource with
kubectl apply -f config/samples/flinkoperator_v1beta1_flinkjobcluster.yaml
and verify the pod is up and running with
kubectl get pods,svc -n default | grep "flinkjobcluster"
By default, Flink Job Cluster's TaskManager will get terminated once the sample job is completed (in this case it takes around 5 minutes for the Pod to terminate)
There are several ways to submit jobs to a session cluster.
-
Flink web UI
You can submit jobs through the Flink web UI. See instructions in the Monitoring section on how to setup a proxy to the Flink web UI.
-
From within the cluster
You can submit jobs through a client Pod in the same cluster, for example:
cat <<EOF | kubectl apply --filename - apiVersion: batch/v1 kind: Job metadata: name: my-job-submitter spec: template: spec: containers: - name: wordcount image: flink:1.8.1 args: - /opt/flink/bin/flink - run - -m - flinksessioncluster-sample-jobmanager:8081 - /opt/flink/examples/batch/WordCount.jar - --input - /opt/flink/README.txt restartPolicy: Never EOF
-
From outside the cluster
If you have configured the access scope of JobManager as
External
orVPC
, you can submit jobs from a machine which is in the scope, for example:flink run -m <jobmanager-service-ip>:8081 \ examples/batch/WordCount.jar --input /opt/flink/README.txt
Or if the access scope is
Cluster
which is the default, you can use port forwarding to establish a tunnel from a machine which has access to the Kubernetes API service (typically your local machine) to the JobManager service first, for example:kubectl port-forward service/flinksessioncluster-sample-jobmanager 8081:8081
then submit jobs through the tunnel, for example:
flink run -m localhost:8081 \ examples/batch/WordCount.jar --input ./README.txt
You can check the operator logs with
kubectl logs -n flink-operator-system -l app=flink-operator --all-containers -f --tail=1000
After deploying a Flink cluster with the operator, you can find the cluster custom resource with
kubectl get flinkclusters
check the cluster status with
kubectl describe flinkclusters <CLUSTER-NAME>
In a job cluster, the job is automatically submitted by the operator. The operator creates a submitter for a Flink job. The job submitter itself is created as a Kubernetes job.
When the job submitter starts, it first checks the status of Flink job manager. And it submits a Flink job when confirmed that Flink job manager is ready and then terminates.
You can check the Flink job submission status and logs with
kubectl describe jobs <CLUSTER-NAME>-job-submitter
kubectl logs jobs/<CLUSTER-NAME>-job-submitter -f
In a session cluster, depending on how you submit the job, you can check the job status and logs accordingly.
You can also access the Flink web UI, REST API and CLI by first creating a port forward from you local machine to the JobManager service UI port (8081 by default).
kubectl port-forward svc/[FLINK_CLUSTER_NAME]-jobmanager 8081:8081
then access the web UI with your browser through the following URL:
http://localhost:8081
call the Flink REST API, e.g., list jobs:
curl http://localhost:8081/jobs
run the Flink CLI, e.g., list jobs:
flink list -m localhost:8081
You can delete a Flink job or session cluster with the following command regardless of its current status, the operator will try to take savepoint if possible then cancel the job.
kubectl delete flinkclusters <name>
Undeploy the operator and CRDs from the Kubernetes cluster with
make undeploy [FLINK_OPERATOR_NAMESPACE=<namespace>]
The Flink operator basically detects and processes all FlinkCluster resources created in one kubernetes cluster. However, depending on the usage environment, such as a multi-tenant cluster, the namespace to be managed by the operator may need to be limited. In this case, dedicated operators must be deployed for each namespace, and multiple operators may be deployed in one cluster.
Deploy by specifying the namespace to manage and prefix to avoid duplication of cluster-scoped resources:
make deploy
IMG=<operator-image> \
FLINK_OPERATOR_NAMESPACE=<namespace-to-deploy-operator> \
RESOURCE_PREFIX=<kuberntes-resource-name-prefix> \
WATCH_NAMESPACE=<namespace-to-watch>
If you want to cancel a running Flink job, attach control annotation to your FlinkCluster's metadata:
metadata:
annotations:
flinkclusters.flinkoperator.k8s.io/user-control: job-cancel
You can attach the annotation:
kubectl annotate flinkclusters <CLUSTER-NAME> flinkclusters.flinkoperator.k8s.io/user-control=job-cancel
When canceling, all Pods that make up the Flink cluster are basically terminated. If you want to leave the cluster, configure spec.job.cleanupPolicy.afterJobCancelled according to the FlinkCluster Custom Resource Definition.
When job cancellation is finished, the control annotation disappears and the progress can be checked in FlinkCluster status:
kubectl describe flinkcluster <CLUSTER-NAME>
...
Status:
Control:
Details:
Job ID: e689263060695231f62fa8b00f97b383
Name: job-cancel
State: Succeeded
Update Time: 2020-04-03T10:04:50+09:00
Flink cluster can be monitored with Prometheus in various ways. Here, we introduce the method using PodMonitor custom resource of Prometheus operator. First, create a FlinkCluster with the metric exporter activated and its port exposed. Next, create a PodMonitor which will be used to generate service discovery configurations and register it to Prometheus. Exposed Flink metric port must be set as the endpoint of the PodMonitor. See the Prometheus API docs for details.
You can create Prometheus metric exporter activated FlinkCluster and PodMonitor like this.
kubectl apply -f examples/prometheus/flink_metric_cluster.yaml
kubectl apply -f examples/prometheus/pod-monitor.yaml
If the service discovery configuration is generated and registered successfully by the Promethues operator,
you can see the item named "flink-pod-monitor" in the "Service Discovery" section of your Prometheus Web UI.
(http://<Your-Prometheus-Web-UI-base-URL>/service-discovery
)
See this doc on how to manage savepoints with the operator.
To update a running Flink job's program or execution settings, you can create new savepoint, terminate the job, and create a new FlinkCluster with the new program, settings, and savepoint. However, in some cases, you may want to update the Flink job while maintaining the logical continuity of the Flink job with the FlinkCluster custom resource. In this case, you can continuously update the FlinkCluster custom resource, and the Flink operator takes care of the process required to update the Flink job and cluster.
There are several points to note when using the job update feature.
- To use the job update feature,
savepointsDir
must be set and the value of this field cannot be deleted when updating. This is because the Flink operator requires it to create a savepoint for job updates. - You can resume Flink job from your desired savepoint by updating
fromSavepoint
. If you want to resume the updated job from the latest savepoint,fromSavepoint
must be unspecified. cancelRequested
andsavepointGeneration
are not allowed to update at the same time with other fields due to functional characteristics.
There are some behavioral characteristics in update.
- Whenever the FlinkCluster spec is updated, the Flink operator creates a ControllerRevision resource that stores the changed spec. ControllerRevisions can be used to check the editing history.
- If update is triggered while the cluster is running, all components are re-created after terminated. If update is triggered in terminated state, all components are re-created as well.
- When job is to be updated, the Flink operator will restore the job from the latest savepoint available
savepointLocation
orfromSavepoint
in job status. If those are not available, the job is restarted from the beginning.
For example, you can create wordcount job v1.9.2 and update it to wordcount job v1.9.3 like this.
kubectl apply -f examples/update/wordcount-1.9.2.yaml
kubectl apply -f examples/update/wordcount-1.9.3.yaml
In this example, Flink cluster image, job jar and job arguments are updated and the task manager is scaled from 1 to 2. You can check the list of revisions and their contents like this:
kubectl get controllerrevision
kubectl get controllerrevision <REVISION-NAME> -o yaml
The default logging configuration provided by the operator sends logs from JobManager and TaskManager to stdout
. This
has the effect of making it so that logging from Flink workloads running on Kubernetes behaves like every other
Kubernetes pod. Your Flink logs should be stored wherever you generally expect to see your container logs in your
environment.
Sometimes, however, this is not a good fit. An example of when you might want to customize logging behavior is to restore the visibility of logs in the Flink JobManager web interface. Or you might want to ship logs directly to a different sink, or using a different formatter.
You can use the spec.logConfig
field to fully control the log4j and logback configuration. It is a string-to-string map,
whose keys and values become filenames and contents (respectively) in the folder /opt/flink/conf
in each container.
The default Flink docker entrypoint expects this directory to contain two files: log4j-console.properties
and
logback-console.xml
.
An example of using this parameter to make logs visible in both the Flink UI and on stdout can be found here.
You can set various security-related attributes of the JobManager, TaskManager, and Job Pods using a PodSecurityContext object. It is possible to run the entrypoint of the container process as a different user or group, and to modify ownership of mounted volumes.
You can set the SecurityContext in the FlinkCluster spec, within the JobManager, TaskManager, and Job fields, like this:
taskManager:
...
securityContext:
runAsUser: 9999
runAsGroup: 1000
fsGroup: 2000
You can set different SecurityContexts for the TaskManager, JobManager StatefulSets and the Job, but all TaskManager pods will share the same one. Examples and explanations of the available options can be found here.
If your deployment requires larger storage captivity, or a faster access to the state backend you can use volumeClaimTemplates
option in TaskManager config
to create a new claim template and then mount it in volumeMounts
Check the FlinkCluster Custom Resource Definition and StatefulSet's doc for more info