+-
你管这破玩意叫 Pulsar

这两年 Pulsar 发展比较快,有好多大公司引入了 Pulsar,相关的资料和课程也多了,今天一起来了解一下 Pulsar 这款中间件。


下图是几款消息中间件的历史:



2012年 Pulsar 在 Yahoo 内部开发,2016 年开源并捐献给 Apache,2018 成为 Apache 顶级项目。


1. 架构


Pulsar 的架构图如下:



总结一下,Pulsar 有下面的几个特性。


1.1 计算存储分离


Pulsar 采用计算和存储相分离的架构,Broker 集群负责把 producer 发出的消息发送给 consumer,同时承担负载均衡的作用。


Pulsar 用 Apache BookKeeper 作为持久化存储,Broker 持有 BookKeeper client,把未确认的消息发送到 BookKeeper 进行保存。


BookKeeper 是一个分布式的 WAL(Write Ahead Log)系统,Pulsar 使用 BookKeeper 有下面几个便利:


  • 可以为 topic 创建多个 ledgers:ledger 是一个只追加的数据结构,并且只有一个 writer,这个 writer 负责多个 BookKeeper 存储节点(就是 Bookies)的写入。Ledger 的条目会被复制到多个 bookies;

  • Broker 可以创建、关闭和删除 Ledger,也可以追加内容到 Ledger;
  • Ledger 被关闭后,只能以只读状态打开,除非要明确地写数据或者是因为 writer挂掉导致的关闭;
  • Ledger 只能有 writer 这一个进程写入,这样写入不会有冲突,所以写入效率很高。如果 writer 挂了,Ledger 会启动恢复进程来确定 Ledger 最终状态和最后提交的日志,保证之后所有 Ledger 进程读取到相同的内容;
  • 除了保存消息数据外,还会保存 cursors,也就是消费端订阅消费的位置。这样所有 cursors 消费完一个 Ledger 的消息后这个 Ledger 就可以被删除,这样可以实现 ledgers 的定期翻滚从头写。

  • 1.2 节点对等


    从架构图可以看出,broker 节点不保存数据,所有 broker 节点都是对等的。如果一个 broker 宕机了,不会丢失任何数据,只需要把它服务的 topic 迁移到一个新的 broker 上就行。


    broker 的 topic 拥有多个逻辑分区,同时每个分区又有多个 segment。


    writer 写数据时,首先会选择 Bookies,比如图中的 segment1。选择了 Bookie1、Bookie2、Bookie4,然后并发地写下去。这样这 3 个节点并没有主从关系,协调完全依赖于 writer,因此它们也是对等的。


    1.3 扩展和扩容


    在遇到双十一等大流量的场景时,必须增加 consumer。


    这时因为 broker 不存储任何数据,可以方便的增加 broker。broker 集群会有一个或多个 broker 做消息负载均衡。当新的broker 加入后,流量会自动从压力大的 broker 上迁移过来。


    对于 BookKeeper,如果对存储要求变高,比如之前存储 2 个副本现在需要存储 4 个副本,这时可以单独扩展 bookies 而不用考虑 broker。因为节点对等,之前节点的 segment 又堆放整齐,加入新节点并不用搬移数据。writer 会感知新的节点并优先选择使用。


    1.4 容错机制


    对于 broker,因为不保存任何数据,如果节点宕机了就相当于客户端断开,重新连接其他的 broker 就可以了。


    对于 BookKeeper,保存了多份副本并且这些副本都是对等的。因为没有主从关系,所以当一个节点宕机后,不用立即恢复。后台有一个线程会检查宕机节点的数据备份进行恢复。


    2. BookKeeper 简介


    从上一节的讲解看出,Apache Bookkeeper 是一个易扩展、高可用、运维简单的分布式存储系统。这节再看一下 Bookkeeper 的其他三个特性。


    2.1 客户端数量


    我们知道,在 Kafka 中,客户端只能从 leader 节点读取数据。但在 BookKeeper 中,客户端可以从任何一个 bookie 副本读取数据,这有三个好处:


  • 增加了读高可用;
  • 把客户端流量平均分配到了不同的 bookie;
  • 可以通过增加客户端数量来提高读取效率。

  • 客户端和服务器通信采用 Netty 实现异步 I/O。网络 I/O 使用单个 TCP 连接进行多路复用,这就以很少的资源消耗实现了非常高的吞吐量。


    2.3 I/O 隔离


    为什么要做 I/O 隔离?


    在大多数消息系统中,如果 consumer 处理慢,可能会导致消息积压。这迫使存储系统从持久存储介质中读取数据。


    当存储系统 I/O 组件共享写入、追尾读、追赶读的单一路径时,就会出现 I/O 抖动及页面缓存的换入换出。


    写入和追尾读对可预测的低延迟有较高要求,而追赶读则对吞吐量的要求比较高,分离这三个路径很重要。


    在 BookKeeper 中,bookie 使用 3 条独立的 I/O 路径,分别用于写入、追尾读、追赶读。如下图:



    3. 多租户


    Pulsar 可以使用多租户来管理大集群。Pulsar 的租户可以跨集群分布,每个租户都可以有单独的认证和授权机制。租户也是存储配额、消息 TTL 和隔离策略的管理单元。


    Pulsar 的多租户性质主要体现在 topic 的 URL 中,其结构如下:


  • persistent://tenant/namespace/topic


    可以看到,租户是 topic 的最基本单位。


    假如一个公司有三个部门,tenant1、tenant2、tenant3,可以分配三个租户,这三个租户互不干扰。如下图所示:



    如果消息平台不支持租户,那部门之间想要隔离,就要给每个部门部署一套集群,运维成本非常高。


    4. 消息模型


    4.1 消息结构


    首先看一下 Pulsar 的消息结构,如下图:



    消息流由多个独立的 segment 组成(这里的 segment 就是上面讲的 ledger)。


    segment 又包含独立的 entry。entry 又由独立的 message 组成(这里的message就是consumer发来的消息)。


    可以看到,一个 message 的 id 组成包括 ledger-id、entry-id、batch-index、partition-index。


    需要注意两点:


  • segment 和 entry 都是 BookKeeper 里面的概念;
  • Pulsar 作为消息平台时,一个 message 就是一个 entry。当 Pulsar 作为流平台时,为了提高吞吐量会开启 batch,这样多个 message 组成一个 entry。

  • 4.2 创建过程


    消息的创建过程如下图:



    消息创建后主要经历下面几步:


  • 选择一个 partition;

  • 发送到管理这个 partition 的 broker;

  • broker 将消息并发的发送给 N 个 bookie,这个 N 是可以配置的。broker 持有BookKeeper 的客户端,也就是 writer,writer 收到写请求后,会并发地写入 N 个 bookie。上图中 N=3;

  • bookie 写完消息后会给 broker 一个回复,broker 收到指定数量的确认消息后就会认为写 BookKeeper 成功。这个数量是这个配置的,比如M。M 越大,写 BookKeeper 延迟越大,数据一致性越高。因此这个配置要对一致性和延迟到进行。


  • 5 消费模型
    5.1 概要


    Pulsar 的消费模型如下图:



    producer 将消息发送给 topic,topic 下有多个 partition,partition 下面又有多个broker。


    broker 负责接收消息并把消息分配给给 consumer,并把消息写到 BookKeeper。


    broker还具有限流功能,可以根据限流阈值对producer的消息进行限流。


    consumer 并不能直接从 broker 中获取消息,consumer 和 broker 之间有一个  Subscription。Consumer 通过 Subscription 获取消息。


    5.2 subscription


    subscription 有 4 种类型:


    1. 独占模式(Exclusive)

    同一 个topic 只能有一个消费者,如果多个消费者,就会出错。

    2. 灾备模式(Failover)

    同一个 topic 可以有多个消费者,但是只能有一个消费者消费。

    其他消费者作为故障转移备用。如果当前消费者出了故障,就从备用消费者中选择一个进行消费。

    如下图:


    3. 共享模式(Shared)

    同一个 topic 可以由多个消费者订阅和消费。 消 息通过  r ound robin  轮询机制分发给不同的消费者,并且每个 消息仅会被分发给一个消费者。 当消费者断开,发送给它的没有被消费的消息还会被重新分发给其它存活的消费者。 如下图:


    4. Key_Shared

    消息和消费者都会绑定一个 key,消息只会发送给绑定同一个 key 的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。如下图:


    跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。


    5.3 Cursor


    当多个 consumer 订阅同一个 topic 时,subscription 为每一个 consumer 分配一个 Cursor,这样多个 Consumer 之间就不会相互影响了。如下图:



    subscription 会维护一个消息的 ACK 状态,consumer 处理完消息后会给 broker 返回 ACK,表示消息已经处理完成。如果 broker 一直没有收到 ACK,就会把消息发送到其他 consumer。


    如果客户端想要重新消费 Cursor 以前的消息,Cursor 是支持 reset 的。reset 之后, Cursor 就回退回去了,这时 consumer 可以从新的 Cursor 位置进行消费。


    Cursor 的位置是会实时写入 BookKeeper 的,这必定会有一定的性能损耗。因此,Pulsar 提供了一种非持久化的 Subscription(Non-durable Exclusive)。Pulsar 的Reader 接口内嵌了 Non—durable Exclusive Cursor,它读取消息不会返回 ACK。


    6. broker 代理


    通过前面的讲解可以看到,consumer 和 producer 只需要跟 broker 进行交互,而不用跟底层的 BookKeeper 交互。


    事实上,broker 还有一层代理,consumer 和 producer 直接跟代理进行交互。如下图:



    7. Zookeeper


    Pulsar 提供了 System topic 用来保存策略之类的元数据,尽量减少对 ZooKeeper 的依赖。


    ZooKeeper 也保存一些策略相关的元数据,还保存了 broker 和 BookKeeper 集群相关的配置元数据,比如服务发现相关的元数据。


    8. 总结


    Pulsar 是一款非常优秀的中间件,实现了计算和存储相分离,支持多租户、扩展和扩容、容错都是非常容易的。


    - EOF -

    推荐阅读   点击标题可跳转

    1、90% 的 Java 程序员,都扛不住这波消息中间件的面试四连炮!

    2、哥们,你们的系统架构中为什么要引入消息中间件?

    3、消息中间件 MQ 如何处理消费失败的消息?


    看完本文有收获?请转发分享给更多人

    关注「ImportNew」,提升Java技能

    点赞和在看就是最大的支持❤️