Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink on k8s: Checkpoint 的持久化存储 #57

Open
nodejh opened this issue Aug 2, 2021 · 0 comments
Open

Flink on k8s: Checkpoint 的持久化存储 #57

nodejh opened this issue Aug 2, 2021 · 0 comments

Comments

@nodejh
Copy link
Owner

nodejh commented Aug 2, 2021

将 Flink 应用部署到 k8s 上时,为了实现任务高可用,我们就需要将 checkpoint、savepoint 以及 Job Manager meta data 等数据持久化存储下来,而不是简单地存储在单个 Node 上,这时就需要配置 Pod 的持久化存储。

在传统的 Yarn 部署模式中,我们通常会将 checkpoint 等数据存储在 HDFS 中,HDFS 是 Hadoop 分布式的文件系统。这样只要 HDFS 不出问题,就能保证 Flink 任务出现异常后,checkpoint 数据还在,Flink 任务依旧可以通过 HDFS 中的 checkpoint 进行恢复。

在 k8s 中,我们也可以参考使用 HDFS 来持久化存储数据,但相比在 k8s 上部署一个 HDFS,我们还有更好、更简单的方案:直接使用云上的弹性分布式文件系统,比如阿里云的 NAS、AWS 的 EFS。这些文件系统可以直接通过 NFS 协议协议挂载到 Pod 中,使用起来像读写本地文件一样方便。

image.png

flink-conf.yaml 配置

首先让我们来看一下,要实现 flink 任务的高可用,需要在 flink-conf.yaml 中指定的一些配置:

state.backend: rocksdb
state.checkpoint-storage: filesystem
state.checkpoints.dir: file:///nas/flink/checkpoints
state.savepoints.dir: file:///nas/flink/savepoints
state.backend.incremental: true
kubernetes.cluster-id: flink-cluster-001
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///nas/flink/recovery

状态后端使用 rocksdb (RocksDBStateBackend)。RocksDBStateBackend 会将正在运行的数据保存在 RocksDB 数据库中。在进行 checkpoint 时,整个 RocksDB 数据库就会被保存到 state.checkpoints.dir 目录中。同时 Job Manager 也会将高可用数据存储到 state.checkpoints.dir 中。

kubernetes.cluster-idhigh-availabilityhigh-availability.storageDir 则是 k8s 的高可用配置。

  • kubernetes.cluster-id 是自定义的 Flink 任务集群名称,用来标记当前的 flink 任务;
  • high-availability是高可用的类型,由于是部署在 k8s 上,所以需要配置为 KubernetesHaServicesFactory
  • high-availability.storageDir JobManager 元数据的存储位置,用于存储恢复 JobManager 所需的全部元数据。

在 k8s 中,要实现任务出现异常后能自动恢复,state.checkpoints.dirstate.savepoints.dirhigh-availability.storageDir 这几个目录就非常重要了,就必须持久化存储。

所以我们需要在 Pod 中将这些目录挂载到 NAS 上。

前面我们也提到了,NAS 可以通过 NFS 协议进行挂载,所以在 flink-conf.yaml 可以直接通过 file:// 前缀来指定 NAS 目录位置,跟使用本地文件一样。如果你使用过 HDFS 来存储 checkpoint 数据,则你应该还有印象,HDFS 的目录前缀是 hdfs://

创建 PV / PVC

在 Pod 中挂载 NAS 的前提就是先创建 PV、PVC。

以下面的 yaml 为例,创建一个名为 pv-flink 的通用型 NAS 持久化存储卷,和名为 pvc-flink 的存储声明。

apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-flink
  labels:
    alicloud-pvname: pv-flink
spec:
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteMany
  csi:
    driver: nasplugin.csi.alibabacloud.com
    volumeHandle: pv-dflink
    volumeAttributes:
      server: "xxx.cn-hangzhou.nas.aliyuncs.com"
      path: "/nas/flink"
  mountOptions:
    - nolock,tcp,noresvport
    - vers=3

---

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: pvc-flink
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  selector:
    matchLabels:
      alicloud-pvname: pv-flink

pvc-flink 中,对应的 NAS 挂载点是 xxx.cn-hangzhou.nas.aliyuncs.com,挂载的子目录是 /nas/flink

这里需要注意的是,NAS 有通用型和极速型两个型号,极速型的 NAS volumeAttributes.path需要以 /share 为父目录。

其他字段详细的含义可以参考 NAS 文档:使用NAS静态存储卷 ,这里就不细讲了。

在 Pod 中挂载 NAS

下面是一个挂载 NAS 的 Job Manager 的 YAML 示例:

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  parallelism:
  template:
    metadata:
      labels:
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          imagePullPolicy: Always
          image: my-flink-job:latest
          resources:
            limits:
              cpu: 1
              memory: 4Gi
            requests:
              cpu: 1
              memory: 4Gi
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.podIP
          args: [ "standalone-job"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: volume-pvc-nas-flink
              mountPath: /nas/flink
          securityContext:
            runAsUser: 9999
      serviceAccountName: flink
      volumes:
        - name: volume-pvc-nas-flink
          persistentVolumeClaim:
            claimName: pvc-flink

我们使用 volumeMounts/nas/flink 目录挂载到 volume-pvc-nas-flink 卷,该卷对应的是 NAS 持久化存储卷。

在该 Pod 中,镜像是基于 Flink 官方 Docker 镜像my-flink-job:latest

Flink 官方的 Docker 镜像中创建了 flink 用户组以及组内的 flink 用户,用户组和用户的 ID 均为 9999。 并且在官方镜像中,最终也是使用 flink 用户来启动 Job Manager 和 Task Manager。

下面是 Flink 官方镜像 Dockerfile 的相关脚本:

......

ENV FLINK_HOME=/opt/flink
ENV PATH=$FLINK_HOME/bin:$PATH
RUN groupadd --system --gid=9999 flink && \
    useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
WORKDIR $FLINK_HOME

......

所以在 securityContext 中,我们也指定 runAsUser 为 9999 ,使 Pod 中的所有进程都是用 9999 (也就是 flink 用户)来运行。

挂载了 NAS 之后,还有一件非常重要的事情,就是初始化 NAS 目录的权限。

因为 NAS 目录默认是使用 root 用户挂载的,但 flink 任务是以 ID 为 9999 的用户执行的,所以这时 flink 任务没有权限写 NAS 目录。

解决这个问题的方案有两种,一是使用 root 权限运行 flink 任务;二是修改所挂载的 NAS 目录的权限。通常为了安全起见,我们不会使用 root 权限运行 Pod,所以选择方案二。

初始化 NAS 目录权限

要初始化 NAS 目录的权限, 我们可以使用 k8s 的 Init 容器。Init 容器是一个特殊的容器,可以在 Pod 内的应用容器启动前运行。所以我们可以基于 Init 容器做一些初始化工作,比如执行初始化脚本、设置目录权限等。

要初始化 NAS 目录权限,就需要在 Init 容器中也挂载对应的 NAS 目录,以便 Init 容器能够访问该目录。然后通过 command 来执行 shell 命令,修改目录权限。

/nas/flink 目录的所有者设置为 flink 用户的命令为:

$ chown -R 9999:9999 /nas/flink

在 Init 容器中, 我们使用了 alpine 镜像, 这是一个基于 Alpine Linux 的只有 5MB 的 Docker 镜像。

完成的 YAML 文件如下所示:

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  parallelism:
  template:
    metadata:
      labels:
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      initContainers:
        - name: init-jobmanager
          image: alpine:3
          command:
            - chown
            - -R
            - 9999:9999
            - /nas/flink
          volumeMounts:
            - name: volume-pvc-nas-flink
              mountPath: /nas/flink
      containers:
        - name: jobmanager
          imagePullPolicy: Always
          image: my-flink-job:latest
          resources:
            limits:
              cpu: 1
              memory: 4Gi
            requests:
              cpu: 1
              memory: 4Gi
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.podIP
          args: [ "standalone-job"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: volume-pvc-nas-flink
              mountPath: /nas/flink
          securityContext:
            runAsUser: 9999
      serviceAccountName: flink
      volumes:
        - name: volume-pvc-nas-flink
          persistentVolumeClaim:
            claimName: pvc-flink

这样 k8s 在运行 jobmanager 容器前,会先运行 init-jobmanager 容器,init-jobmanager 会将 /nas/flink 目录的所有者修改为 ID 为 9999 的用户,即 flink。init-jobmanager 运行完毕后,再运行 jobmanager ,这时 jobmanager 中的 flink 任务就有权限读写 /nas/flink 目录了。

这里需要注意的是,Job Manager 和 Task Manager 的 NAS 挂载路径 mountPath 必须保持一致。

Checkpoint 文件的清理

配置了持久化存储后,Flink 会将 state 数据持久存储在 state.checkpoints.dir目录。

state.checkpoints.dir 目录结构如下:

/user-defined-checkpoint-dir
    /{job-id}
        |
        + --shared/
        + --taskowned/
        + --chk-1/
        + --chk-2/
        + --chk-3/
        ...       

其中 shared目录中保持的是可能被多个 checkpoint 引用的文件,taskowned 目录保存的是不会被 Job Manager 删除的文件,chk-{n} 则是每个 checkpoint 的文件,n 为 checkpoint 的 ID。

checkpoint 文件的数量可以在 flink-config.yaml 中通过 state.checkpoints.num-retained 参数指定。默认为 1,即只保留一个 checkpoint 文件,Flink 会清理多余的 checkpoint 文件。

前面提到在 k8s 中 Job Manager 和 Task Manager 的持久化存储路径 mountPath 必须保持一致。如果不一致,则 Job Manager 和 Task Manager 的 checkpoint 文件就会存储到不同的目录中。这时即使设置了 state.checkpoints.num-retained, Flink 也无法自动清理多余的 checkpoint 文件,最终会导致 checkpoint 文件堆积、占满磁盘空间。

常见问题

目录权限错误

在进行持久化存储配置的时候,最常见的问题就是没有初始化 NAS 目录的权限,导致应用无法启动。

通常 Job Manager 的报错可能如下所示:

2021-08-02 17:51:23.718 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: Application failed unexpectedly.
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync$0(ApplicationDispatcherBootstrap.java:170) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_292]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_292]
	... 13 more
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
	... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'DeliveryOssStream'.
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	... 10 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'DeliveryOssStream'.
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at com.aliyun.actiontrail.delivery.stream.OssStream.start(OssStream.java:59) ~[?:?]
	at com.aliyun.actiontrail.delivery.Main.main(Main.java:143) ~[?:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_292]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_292]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_292]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	... 10 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:380) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.lang.RuntimeException: java.lang.Exception: Could not open output stream for state backend
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	... 4 more
Caused by: java.lang.Exception: Could not open output stream for state backend
	at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:391) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	... 4 more
Caused by: java.io.IOException: Mkdirs failed to create file:/nas/flink/recovery/default
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:391) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	... 4 more
2021-08-02 17:51:23.721 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..

报错信息很多,但一开始可能很难知道原因。其中造成 Job Manager 无法启动的主要报错是:

Caused by: java.io.IOException: Mkdirs failed to create file:/nas/flink/recovery/default

其根本原因是 flink 任务进程没有权限写 file:/nas/flink/recovery/default 目录。

然后 Job Manager 会不断重启,Task Manager 虽然不会重启,但也无法正常工作。

这时登录到 Job Manager 或 Task Manager 的任意一个 Pod 上,查看任务启动进程,就可以发现 flink 任务进程是通过 flink 用户启动的,但 /nas/flink 目录的所有者是 root ,所以 flink 任务进程没有权限写 /nas/flink 目录。

# 查看当前 flink 任务的启动用户
$ ps -aux
USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
flink          1  7.7  2.5 7449300 1655960 ?     Ssl  17:54   0:05 /usr/local/openjdk-8/bin/java -Xmx4563402682 -Xms4563402682 -XX:MaxDirectMem
root         280  0.0  0.0   5752  3608 pts/0    Ss   17:55   0:00 bash
root         331  0.0  0.0   9392  2972 pts/0    R+   17:56   0:00 ps -aux

# 查看 /nas/flink 目录的权限
$ ls -l /nas
total 77
...
drwxr-xr-x   2 root root 4096 Aug  2 17:48 flink

此外我们还可以通过 cat /etc/passwd 查看 flink 用户对应的 ID,即 9999,这也跟前面提到的 Docker 镜像中 flink 用户的 PID 为 9999 相对应。

$ cat /etc/passwd
root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
sync:x:4:65534:sync:/bin:/bin/sync

......

flink:x:9999:9999::/opt/flink:/bin/sh

当然,解决该问题的方法,前面也讲了,就是通过 Init 容器初始化 NAS 目录的权限。

checkpoint 文件堆积占满磁盘空间

另一个常见问题是,checkpoint 文件堆积,导致磁盘空间占满。

这时有两种情况,一是 state.checkpoints.num-retained 过大,不过通常可能不是这个原因。更可能的原因是 Job Manager 和 Task Manager 的持久化存储卷挂载路径 mountPath 不一致,这时就需要检查对应的 YAML 配置。

总结

在 Flink on k8s 中,我们通常会使用 NAS 等持久化存储来保存 checkpoint、savepoint 以及高可用数据。

但在使用持久化存储时,最常见的一个问题就是 flink 任务没有权限读写持久化存储,所以我们需要通过 Init 容器来初始化持久化存储的目录权限。

@nodejh nodejh changed the title "Flink on k8s Flink on k8s: Checkpoint 的持久化存储 Aug 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant