技术干货:Apache Pulsar 在移动云上的应用

微信扫一扫,分享到朋友圈

技术干货:Apache Pulsar 在移动云上的应用

业务背景

随着 移动云的快速发展 ,越来越多的客户对云原生消息中间件提出了 更多需求 ,从而可以将主要的精力聚焦在应用程序上,大致有以下方面:

  • 快速弹性伸缩,计算和存储资源能够按需扩展,以满足不同流量峰值和存储规格的要求,并且在线扩展时不需要均衡数据
  • 提供较高的安全防护,拥有身份认证和授权机制,确保数据的安全性
  • 能准确实时地发现问题,支持实例健康、吞吐量、消息堆积等维度的监控
  • 同时支持 IPv4/IPv6 双栈环境,满足不同网络环境下的诉求
  • 在实例级别做到租户资源隔离,提供更细粒度的安全防护
  • 支持跨区域复制服务,保证数据在集群间同步的稳定性和实时性

针对以上诉求,同时为了 统一公有云和私有云架构,移动云选择 Apache Pulsar 和 Kubernetes 来构建性能卓越、安全稳定、弹性伸缩、运维简便的云原生消息系统

整体架构

基于 Apache Pulsar 计算存储分离的云原生架构,我们将用于计算的 Kubernetes 集群和用于存储的 BookKeeper 集群物理分离,如下:

简单起见,这里我们以共享 Zookeeper 为例(可根据实例数量及实例资源大小,在 Kubernetes 中独享 Zookeeper 集群)以及直接使用 NodePort 的服务暴露方式将 Proxy 服务提供给客户端(也可根据需求选用合适的 LB 云服务或者使用开源的 LB,例如:Metallb: https://metallb.universe.tf/ )。

落地实践

:wrench: 怎么实现共享Bookie资源

我们期望 Kubernetes 中的多个 Pulsar 实例能够同时共享底层的 Bookie 存储资源,这样可以更快捷地实现计算存储分离。在 2.6.0 版本之前,Pulsar 实例在初始化元数据时,不支持设置 chroot 路径,并且只支持使用固定的 ledger 路径,不能使用已存在的 BookKeeper 集群。为此,我们通过优化 initialize-cluster-metadata 命令来支持设置 chroot 路径,以及在 broker 配置中添加 bookkeeperMetadataServiceUri 参数来指定 BookKeeper 集群的连接信息。

(详见:

  • PR-4502:

https://github.com/apache/pul…

  • PR-5935:

https://github.com/apache/pul…

  • PR-6998:

https://github.com/apache/pul…

这样就可以做到多个 Pulsar 实例共享已存在的 BookKeeper 集群,元数据结构大致如下:

[zk: localhost:2181(CONNECTED) 1] ls /pulsar
[pulsar1, pulsar2]
[zk: localhost:2181(CONNECTED) 2] ls /pulsar/pulsar1
[counters, stream, bookies, managed-ledgers, schemas, namespace, admin, loadbalance]
[zk: localhost:2181(CONNECTED) 3] ls /bookkeeper
[ledgers]
[zk: localhost:2181(CONNECTED) 4] ls /bookkeeper/ledgers
[00, idgen, LAYOUT, available, underreplication, INSTANCEID, cookies]

:wrench: 服务怎样暴露

Pulsar 通过引入可选的 Proxy 组件来解决客户端不能直连 Broker 以及直连可能带来的管理开销问题,例如在云环境中或者在 Kubernetes 集群中运行时,

(参考 PIP-1:

https://github.com/apache/pul… :-Pulsar-Proxy)

此外,Pulsar 官网提供了各个组件的 yaml 模板,

(参考 pulsar-helm-chart:

https://github.com/apache/pul…

这样可以很快捷地在 Kubernetes 集群上构建一个 Pulsar 集群,我们一开始采用了如下的架构:

期间有遇到一些小的问题,例如,Proxy 无法正常启动(在 Proxy 的 StatefulSet 中 initContainers 的条件是至少有一个 Broker 在运行),如下:

14:33:06.894 [main-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers
14:33:36.900 [main-EventThread] WARN  org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader - Error updating broker info after broker list changed.
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) ~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) ~[?:1.8.0_191]
at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) ~[org.apache.pulsar-pulsar-zookeeper-utils-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) ~[org.apache.pulsar-pulsar-proxy-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) ~[org.apache.pulsar-pulsar-proxy-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:85) ~[org.apache.pulsar-pulsar-zookeeper-utils-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_191]
at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:434) ~[org.apache.pulsar-pulsar-zookeeper-utils-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:618) [org.apache.pulsar-pulsar-zookeeper-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510) [org.apache.pulsar-pulsar-zookeeper-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]

可以通过将 podManagementPolicy 策略由 Parallel 改为 OrderedReady(或者将 Proxy 的 initContainers 条件修改为指定 Broker 的副本数都在运行)来临时解决这个问题。这其实是 Proxy 的一个死锁问题导致的,由 Masahiro Sakamoto 发现并修复,

(详见 PR-7690:

https://github.com/apache/pul…

将在 2.6.2 和 2.7.0 版本中发布。此外,在不断的压测过程中,Proxy 会偶现内存持续增长,直至 Pod 重启(Error in writing to inbound channel. Closing java.nio.channels.ClosedChannelException: null),在做了一些验证和评估之后,我们决定通过其他方式来实现 Pod 内的 Broker 服务暴露,大致原因如下(供参考):

  • Proxy 节点会消耗额外的 CPU 和内存资源
  • 业务流量经过 Proxy 转发到 Broker 会增加额外的网络开销
  • 生产速率过快会导致 Proxy 出现 OOM,集群稳定性降低
  • Proxy 本身不具备负载均衡的能力,对实例的弹性伸缩不友好

其中 Client 配置 multi-hosts 的实现方式是配置几个 Proxy Url,实际就只用这几个 Proxy,可简单通过以下步骤验证:

1.2个 broker: broker3:6650,broker4:6650

2.2个 proxy: proxy1:6650,proxy2:6650

  1. 创建多个分区的topic用来生产消费(确保其 namespace 的 bundle 数量是 broker 的整数倍)
  2. client url: 配置为 proxy1:6650 时,proxy1 上有相应的负载(TCP连接),proxy2没有负载
  3. client url: 配置为 proxy1:6650, proxy2:6650 时,两个 proxy 上面均有负载

:wrench: 能否做到直连 Broker

由于 Broker 注册在 Zookeeper 中的服务地址是 podIP或者 pod 域名,Kubernetes 集群外的客户端是不能直接访问的,因此需要一种对外部客户端可见的服务地址。Pulsar 在 2.6.0 版本中引入 PIP-61: https://github.com/apache/pul… :-Advertised-multiple-addresses 来支持广播多个 URL 监听地址,例如,可设置如下内/外监听:

advertisedListeners=internal:pulsar://broker-0.broker-headless.pulsardev.svc.cluster.local.:6650,external:pulsar://10.192.6.23:38068

这里我们使用 pod 域名(broker-0.broker-headless.pulsardev.svc.cluster.local.)作为 Broker 间的通讯地址,使用 Pod 所在的实际 Worker 节点 IP 和 预先分配的 NodePort 端口作为外部的通讯地址。

StatefulSet 中每个 Pod 的 DNS 格式为:statefulSetName-{0..N1}.serviceName.namespace.svc.cluster.local.

  • statefulSetName 为 StatefulSet 的名字
  • 0..N-1 为 Pod 所在的序号,从0开始到 N-1
  • serviceName 为 Headless Service 的名字
  • namespace 为服务所在的 namespace,Headless Service 和 StatefulSet 必须要在相同的 namespace
  • .svc.cluster.local. 为 Cluster Domain

为了使集群外部的客户端能够直连 Broker 所在的 Pod,我们在 ConfigMap 中维护了 Worker 节点名字和 IP 的映射关系以及预先分配好的 NodePort 端口,这样在 StatefulSet 的 containers 启动脚本中,我们就可以通过命令 bin/apply-config-from-env.py conf/broker.conf; 将需要暴露的实际 Worker 节点 IP 和 预先分配的 NodePort 端口写到 Broker 配置 advertisedListeners 中,这样 Broker 启动后注册在 Zookeeper 中的外部通讯地址( external:pulsar://10.192.6.23:38068)对集群外部的客户端就是可见的了。其中,比较关键的一步是通过环境变量将 Pod 信息呈现给容器,

(参考 Pod-Info:

https://kubernetes.io/docs/ta…

例如,在 Broker 的 StatefulSet 的 yaml 文件中添加如下配置:

env:
- name: PULSAR_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: PULSAR_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name

这样在 bin/apply-config-from-env.py 中,我们就可以通过上述信息根据 ConfigMap 中的节点和端口信息得到要暴露的实际 URL,写到 Broker 配置中,进而注册到 Zookeeper 中来发现实际的服务地址。

此外,PIP-61:

https://github.com/apache/pul… :-Advertised-multiple-addresses

目前只支持 TCP 协议的服务发现,当需要在集群外部访问 HTTP 协议时,advertisedListeners 暂无法提供帮助。为了解决这个问题,我们在 Broker 中定制化了一个 webServiceExternalUrl 配置,然后通过上述类似的方法将需要暴露的实际 Worker 节点 IP 和 预先分配的 NodePort 端口(HTTP 协议)注册到 Zookeeper 中,这样对集群外部的 Admin Client 就是可见的了。在2.6.0 版本中,客户端在使用 advertisedListenerName 时,Broker 返回的地址是错误的,我们对此进行了修复。

(详见 PR-7737:

https://github.com/apache/pul… )。

另外,我们在使用该特性的过程中修复了获取 bundle 时的空指针问题,以及支持客户端 Shell 指定监听名字,

(详见:

  • PR-7620:

https://github.com/apache/pul…

  • PR-7621:

https://github.com/apache/pul…

并在 2.6.1 版本中发布。

注:Service 在转发请求时,需要打上对应 Pod 节点名字的 selector 标签,例如:

>  selector:
>    app: pulsar
>    component: broker
>    statefulset.kubernetes.io/pod-name: broker-0

以上是我们对于直连 Broker 的探索,相比 Proxy 的方案有如下优势:

  • 减少了计算资源的额外开销
  • 提高了 Pulsar 集群的吞吐量和稳定性
  • 能够较好地支持 Pulsar 实例的弹性伸缩
  • 可以使用 Broker 的机制做一些集群优化(例如,通过 Broker 限流来避免 OOM 的发生)

:wrench: 怎样解决 IPv4/IPv6 双栈

对于移动云的场景,越来越多的用户期待云产品能够支持 IPv4/IPv6 双栈,以满足多种场景下的应用需求。

在 2.6.0 版本之前,Pulsar 只支持 IPv4 环境的部署,为此我们增加了对 IPv6 的支持。

(详见PR-5713:

https://github.com/apache/pul… )。

此外,Kubernetes 从 1.16+ 版本增加了对 Pods 和 Services 的双栈支持。

(参考 Dual-Stack:

https://kubernetes.io/docs/co…

基于以上特性,我们只需要为 Kubernetes 中的 Pulsar 实例增加 IPv6 的 Service 即可(spec.ipFamily 设置为 IPv6)。然后,通过前面类似的服务暴露方案,将对集群外客户端可见的 IPv6 服务地址注册到 Zookeeper 中即可,如下:

advertisedListeners=internal:pulsar://broker-0.broker-headless.pulsardev.svc.cluster.local.:6650,external:pulsar://10.192.6.23:38068,external-ipv6:pulsar://[fc66:5210:a152:12:0:101:bbbb:f027]:39021

值得注意的是,系统属性 java.net.preferIPv4Stack 默认是 false,在支持 IPv6 的双栈系统上,使用 Java 的 Socket 会默认通过底层 native 方法创建一个 IPv6 Socket(可以同时支持 IPv4 和 IPv6 主机通信),当 TCP 客户端的 java.net.preferIPv4Stack 属性设置为 true 时,如果要创建一个 host 为 IPv6 的 Socket,会抛出异常 java.net.SocketException: Protocol family unavailable。目前,Pulsar 客户端连接时优先使用 IPv4,当前的环境变量和脚本中,该属性都设置为了 true。

(详见PR-209:

https://github.com/apache/pul…

因此,在支持 IPv6 的双栈时,需要将这些脚本中(即 bin 目录下的 pulsar,pulsar-admin,pulsar-client,pulsar-perf)的属性 java.net.preferIPv4Stack 设置为 false。其中,Broker 启动时会使用到 bin/pulsar 脚本,需要确保 Broker 启动后是同时监听 IPv4/IPv6 的端口,大致如下:

[root@k8s1 ~]# kubectl exec -it broker-0 -n pulsardev -- /bin/bash
[pulsar@broker-0 pulsar]$ netstat -anp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp6       0      0 :::8080                 :::*                    LISTEN      1/java
tcp6       0      0 :::6650                 :::*                    LISTEN

上述 Pod 里执行的结果中 ::: 代表同时监听了 IPv4/IPv6,如果是0.0.0.0: ,则只支持 IPv4。期间在使用过程中,我们还修复和优化了一些问题,例如,客户端不支持 Mult-Hosts 的 IPv6 地址等。

(详见 PR-8120:

https://github.com/apache/pul…

:wrench: 如何简捷地管理实例

为了满足移动云用户对于管理简洁可控的需求,我们还定制化了一些管理上的功能,部分列举如下:

  • PR-6456: 支持 Broker 可配置禁用自动创建订阅

https://github.com/apache/pul…

  • PR-6637: 支持在 Namespace 级别设置自动创建订阅

https://github.com/apache/pul…

  • PR-6383: 支持强制删除订阅

https://github.com/apache/pul…

  • PR-7993、PR-8103: 支持强制删除 Namespace/Tenant

未来规划

移动云消息队列 Pulsar 目前已进入公测阶段,后续规划部分如下:

  • 增加移动云周边 Connector 生态的支持
  • 增加跨域复制的支持
  • 优化 HTTP 协议的暴露监听
  • 优化 Broker 级别的限流机制
  • 增加对传统消息队列功能的支持和优化
  • 多个 Pulsar 实例共享 Bookie 存储隔离优化
  • 发布更多的技术博客

作者信息

孙方彬

Apache Pulsar Committer,中国移动云能力中心消息中间件团队负责人。

小福利: 点击 即可申请免费体验移动云 Pulsar

微信扫一扫,分享到朋友圈

技术干货:Apache Pulsar 在移动云上的应用

HDFS

上一篇

你用5G了吗?预计2022年全球5G手机出货量将超7.5亿

下一篇

你也可能喜欢

技术干货:Apache Pulsar 在移动云上的应用

长按储存图像,分享给朋友