To use Google Cloud Storage as remote storage for checkpoints, savepoints or job jar, you can create a custom Docker image based on the official Flink image and add GCS connector in it.
Edit the Flink, Scala, GCS connections versions in properties file as you need. Then run the following command to build and push the image:
make build push IMAGE_TAG=gcr.io/[MY_PROJECT]/flink:[FLINK_VERSION]-scala_[SCALA_VERSION]-gcs
Before you create a Flink cluster with the custom image, you need to prepare several things:
-
Create a service account with required GCS IAM roles on the bucket or use an existing service account, download the key file (JSON) to your local machine.
-
Create Kubernetes Secret object with your service account key file.
kubectl create secret generic gcp-service-account-secret --from-file gcp_service_account_key.json=[/PATH/TO/KEY]
GCS connector uses Hadoop core-site.xml as its config file, we need to create a ConfigMap and make the file available to the Flink TaskManager containers:
- Create a configMap with the
core-site.xml
.
kubectl create configmap hadoop-configmap --from-file [/PATH/TO/CORE-SITE.XML]
Now you can create a Flink cluster with the Secret and ConfigMap created in the previous steps.
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: my-flinkjobcluster
spec:
image:
name: gcr.io/[MY_PROJECT]/flink:[FLINK_VERSION]-scala_[SCALA_VERSION]-gcs
jobManager:
...
taskManager:
...
job:
jarFile: gs://my-bucket/my-job.jar
savepoint: gs://my-bucket/path-to-savepoints/savepoint-1234
savepointsDir: gs://my-bucket/path-to-savepoints
autoSavepointSeconds: 300
hadoopConfig:
configMapName: hadoop-configmap
mountPath: /etc/hadoop/conf
gcpConfig:
serviceAccount:
secretName: gcp-service-account-secret
keyFile: gcp_service_account_key.json
mountPath: /etc/gcp/keys
Then run:
kubectl apply -f my_flinkjobcluster.yaml
The operator will automatically mount the service account key file to the specified path and set the environment
variable GOOGLE_APPLICATION_CREDENTIALS
to the key file in the JobManager, TaskManager and Job containers.