Flink Native Kubernetes支持Volume Mount

在之前的文章 Flink快速了解(4)——NativeKubernetes&HA
中讲到 Native Kubernetes在Flink 1.12版本中已经成为一个正式特性,使用起来也的确非常的简单、方便,但文末提到我碰到的一个问题:无法挂载volume。其实目前Flink Native Kubernetes这种方式提供的容器自定义能力还非常有限。从代码看,是通过一个个配置去支持的(见 KubernetesConfigOptions.java
),但k8s的Pod定义选项太多了,通过这种方式去支持,会一直疲于奔命,而且还要不断的和k8s版本关联。所以,目前社区有一个JIRA FLINK-15656: Support user-specified pod templates
,计划直接支持用户自定义pod template,但目前好像还没有明确的版本计划。另外,考虑到pod挂载volume是一个更加普遍化的高需求,所以还有一个单独的JIRA FLINK-15649: Support mounting volumes
,不过目前也没有明确的版本计划。

我看了一下这个JIRA,其实已经有人提了PR( #14283
)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的( 点此下载
,密码: hi52
。怎么现在分享还必须设置密码了…)。

使用说明

这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:

// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
key("kubernetes.jobmanager.volumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
key("kubernetes.taskmanager.vlumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");

也可以从单元测试文件看使用方法:

// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
+ VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}

emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:

-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>
-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>

下面利用这个PR实现基于NFS的Flink Kubernetes HA。

  1. 先用修改过的 flink-dist_2.11-1.12.0.jar
    替换官方包里面 lib
    目录下的 flink-dist_2.11-1.12.0.jar
    (懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。
  2. 准备好一个pvc,这里我使用的是nfs storage-class提供的一个pvc:
$ kubectl get pvc
NAME           STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
flink-ha-pvc   Bound    pvc-46537a5b-2adc-442e-ae59-52af4c681f2c   500Mi      RWX            nfs-storage    16h
  1. 以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):

    $ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=file:///opt/flink/flink-ha \
    -Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

检查一下:

$ kubectl get pod
NAME                                        READY   STATUS    RESTARTS   AGE
flink-application-cluster-9589dbf58-hm7xj   1/1     Running   0          76s
flink-application-cluster-taskmanager-1-1   1/1     Running   0          33s
$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name:         flink-application-cluster-9589dbf58-hm7xj
Namespace:    default
Priority:     0
Node:         10.9.1.18/10.9.1.18
Start Time:   Thu, 24 Dec 2020 10:26:07 +0800
Labels:       app=flink-application-cluster
component=jobmanager
pod-template-hash=9589dbf58
type=flink-native-kubernetes
Annotations:  <none>
Status:       Running
IP:           172.20.0.165
IPs:
IP:           172.20.0.165
Controlled By:  ReplicaSet/flink-application-cluster-9589dbf58
Containers:
flink-job-manager:
Container ID:  docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
Image:         top-speed-windowing:1.12.0
Image ID:      docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
Ports:         8081/TCP, 6123/TCP, 6124/TCP
Host Ports:    0/TCP, 0/TCP, 0/TCP
Command:
/docker-entrypoint.sh
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
State:          Running
Started:      Thu, 24 Dec 2020 10:26:11 +0800
Ready:          True
Restart Count:  0
Limits:
cpu:     1
memory:  1600Mi
Requests:
cpu:     1
memory:  1600Mi
Environment:
_POD_IP_ADDRESS:   (v1:status.podIP)
Mounts:
/opt/flink/conf from flink-config-volume (rw)
/opt/flink/flink-ha from jobmanager-ha (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
Type              Status
Initialized       True
Ready             True
ContainersReady   True
PodScheduled      True
Volumes:
flink-config-volume:
Type:      ConfigMap (a volume populated by a ConfigMap)
Name:      flink-config-flink-application-cluster
Optional:  false
jobmanager-ha:
Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName:  flink-ha-pvc
ReadOnly:   false
default-token-pzw5h:
Type:        Secret (a volume populated by a Secret)
SecretName:  default-token-pzw5h
Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type     Reason       Age                From                Message
----     ------       ----               ----                -------
Normal   Scheduled    <unknown>          default-scheduler   Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
Warning  FailedMount  80s (x2 over 81s)  kubelet, 10.9.1.18  MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
Normal   Pulled       78s                kubelet, 10.9.1.18  Container image "top-speed-windowing:1.12.0" already present on machine
Normal   Created      78s                kubelet, 10.9.1.18  Created container flink-job-manager
Normal   Started      77s                kubelet, 10.9.1.18  Started container flink-job-manager

可以看到已经正确挂载了。

这个PR能不能用于生产?

能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。

另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。

在之前的文章 Flink快速了解(4)——NativeKubernetes&HA
中讲到 Native Kubernetes在Flink 1.12版本中已经成为一个正式特性,使用起来也的确非常的简单、方便,但文末提到我碰到的一个问题:无法挂载volume。其实目前Flink Native Kubernetes这种方式提供的容器自定义能力还非常有限。从代码看,是通过一个个配置去支持的(见 KubernetesConfigOptions.java
),但k8s的Pod定义选项太多了,通过这种方式去支持,会一直疲于奔命,而且还要不断的和k8s版本关联。所以,目前社区有一个JIRA FLINK-15656: Support user-specified pod templates
,计划直接支持用户自定义pod template,但目前好像还没有明确的版本计划。另外,考虑到pod挂载volume是一个更加普遍化的高需求,所以还有一个单独的JIRA FLINK-15649: Support mounting volumes
,不过目前也没有明确的版本计划。

我看了一下这个JIRA,其实已经有人提了PR( #14283
)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的( 点此下载
,密码: hi52
。怎么现在分享还必须设置密码了…)。

使用说明

这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:

// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
key("kubernetes.jobmanager.volumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
key("kubernetes.taskmanager.vlumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");

也可以从单元测试文件看使用方法:

// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
+ VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}

emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:

-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>
-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>

下面利用这个PR实现基于NFS的Flink Kubernetes HA。

  1. 先用修改过的 flink-dist_2.11-1.12.0.jar
    替换官方包里面 lib
    目录下的 flink-dist_2.11-1.12.0.jar
    (懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。
  2. 准备好一个pvc,这里我使用的是nfs storage-class提供的一个pvc:
$ kubectl get pvc
NAME           STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
flink-ha-pvc   Bound    pvc-46537a5b-2adc-442e-ae59-52af4c681f2c   500Mi      RWX            nfs-storage    16h
  1. 以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):

    $ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=file:///opt/flink/flink-ha \
    -Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

检查一下:

$ kubectl get pod
NAME                                        READY   STATUS    RESTARTS   AGE
flink-application-cluster-9589dbf58-hm7xj   1/1     Running   0          76s
flink-application-cluster-taskmanager-1-1   1/1     Running   0          33s
$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name:         flink-application-cluster-9589dbf58-hm7xj
Namespace:    default
Priority:     0
Node:         10.9.1.18/10.9.1.18
Start Time:   Thu, 24 Dec 2020 10:26:07 +0800
Labels:       app=flink-application-cluster
component=jobmanager
pod-template-hash=9589dbf58
type=flink-native-kubernetes
Annotations:  <none>
Status:       Running
IP:           172.20.0.165
IPs:
IP:           172.20.0.165
Controlled By:  ReplicaSet/flink-application-cluster-9589dbf58
Containers:
flink-job-manager:
Container ID:  docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
Image:         top-speed-windowing:1.12.0
Image ID:      docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
Ports:         8081/TCP, 6123/TCP, 6124/TCP
Host Ports:    0/TCP, 0/TCP, 0/TCP
Command:
/docker-entrypoint.sh
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
State:          Running
Started:      Thu, 24 Dec 2020 10:26:11 +0800
Ready:          True
Restart Count:  0
Limits:
cpu:     1
memory:  1600Mi
Requests:
cpu:     1
memory:  1600Mi
Environment:
_POD_IP_ADDRESS:   (v1:status.podIP)
Mounts:
/opt/flink/conf from flink-config-volume (rw)
/opt/flink/flink-ha from jobmanager-ha (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
Type              Status
Initialized       True
Ready             True
ContainersReady   True
PodScheduled      True
Volumes:
flink-config-volume:
Type:      ConfigMap (a volume populated by a ConfigMap)
Name:      flink-config-flink-application-cluster
Optional:  false
jobmanager-ha:
Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName:  flink-ha-pvc
ReadOnly:   false
default-token-pzw5h:
Type:        Secret (a volume populated by a Secret)
SecretName:  default-token-pzw5h
Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type     Reason       Age                From                Message
----     ------       ----               ----                -------
Normal   Scheduled    <unknown>          default-scheduler   Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
Warning  FailedMount  80s (x2 over 81s)  kubelet, 10.9.1.18  MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
Normal   Pulled       78s                kubelet, 10.9.1.18  Container image "top-speed-windowing:1.12.0" already present on machine
Normal   Created      78s                kubelet, 10.9.1.18  Created container flink-job-manager
Normal   Started      77s                kubelet, 10.9.1.18  Started container flink-job-manager

可以看到已经正确挂载了。

这个PR能不能用于生产?

能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。

另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。

不过,如果你只想完全用官方的东西,那完全可以像之前版本一样,使用非Native的方式在Kubernetes上面部署Flink,不过我还是喜欢Native的东西,更加简单。

倪彦春的博客
我还没有学会写个人说明!
上一篇

摸透原理|一文带你了解 Redis 列表底层的实现方式

你也可能喜欢

评论已经被关闭。

插入图片