分享 kafka(二) 消息的生产

early · 2021年09月25日 · 最后由 Rei 回复于 2021年09月25日 · 518 次阅读
本帖已被管理员设置为精华贴

本系列计划分四大部分:

上一篇文章简要探讨了消息队列的抽象模型,可简要分为两部分:

  • 生产。生产 RPC + 存储
  • 消费。读取 + 消费 RPC

本文将聚焦第一部分,消息的生产。以一个工具使用者的视角展开,侧重于理解其运行机制。如上面所述,这将涉及两个方面:

  • 其一为生产 RPC,我们将从生产 client 和 kafka 两端透视整个 RPC 的过程。
  • 其二为储存,我们需要从分布式系统的角度,来梳理 kafka 在高可用、可扩展的、高吞吐等方面的思路。

回归本质

client 发起一次 RPC,将数据发送到 kafka,kafka 将数据保存起来后返回,这便是生产消息的全过程。不过这个过程非常的眼熟,因为这和往数据库插入一条数据没有任何本质差别。

这个过程中,有几个疑问不可绕过:

  1. kafka 作为集群存在,生产 Client 怎么知道该将数据发送到哪个实例?
  2. kafka 收到生产请求后,是怎么处理数据的?怎么存储的?
  3. 如何尽可能实现高吞吐、高可扩展···?

本文尝试逐一回答上面的问题,这需要先从分布式集群的角度来切入。

数据分布

作为一个分布式集群,kafka 需要具备高吞吐、高可用、灵活扩展 (扩容) 的能力,要达到这些目的,需要:

  • 将海量的写入请求均匀地分散到集群中的机器上
  • 当写入压力进一步增大时,能通过简单扩容的方式解决 (增加 partition 数量)
  • 当某台机器故障 (宕机),集群要能快速恢复服务(备选)
  • 当某台机器磁盘故障,要能避免数据丢失 (备份)

我们知道这需要一种数据分片策略,这种分片在 Elasticsearch 叫 Shard,在 HBase 中叫 Region,kafka 中则叫 Partition(分区):

  • 每个 Topic 的数据分散在多个 Partition 上,实现数据分片 (每个 Topic 有自己的 partition)
  • Partition 可以分散到不同的机器上 (Broker),实现负载均衡
  • Partition(leader) 可设置副本 (follower),并分布在其他机器上,避免数据丢失
  • 当某个 Partition 故障后,副本可迅速取代之,实现快速恢复

图片来自网络

Producer

为了方便我们将生产 Client 称为 producer。当生产一条消息时 (某个 Topic),它只会属于某一个分片,这个归属是怎么确定的呢? procducer 怎么知道该发送到哪个机器上?

这类问题的解决方案一般有两种:

  • 有 proxy 专门负责这类判定、转发,client 对细节无感知 (类似 MySQL 中间件代理)
  • Producer(client) 端掌握 server 端的详细信息,实现重型 Client(类似 redis cluster)

kafka 选择的是第二种方式,Producer 可以通过 bootstrap.servers 中任意一个 kafka 实例,拉取到所有元信息,和生产有关的比如: 某个 Topic 有多少个 Partition,每个 Partition 的 leader 的地址,这些元信息 Producer 会定时轮询更新。

每个 kafka 节点都有完整的元信息,Producer 可以通过任意节点拉取,源头维护于 Zookeeper 之中,当集群中的 Partition 等元信息发生变更,Controller 节点会逐一推送给其他 Broker 最新信息 [4]。zookeeper 的作用其实主要是两个,一是作为存储,二是基于其 Watch 能力做事件驱动 (例如元信息更新推送)。

当要生产一条消息时:

  1. Producer 会根据策略先决定好这条消息归属于某个 Partition,策略一般有 轮训、随机、基于某个 key( Key-ordering) 这三种。
  2. 将该消息直接发往 kafka 目标 Partition 的 leader 所在节点。

也就是说,负载均衡策略实际上是 Producer 在决定。

kafka 的 Producer 实现相对复杂,需要关注 kafka 集群的细节,也要处理不少边界情况,例如 Partition 发生重选举后 leader 节点变化等,不同的语言要重复写一遍;好处是 kafka 本身不关心这些细节,实现上清爽很多,也有利于灵活性和性能提升。

其实现主要有几部分:

  • 定时任务,例如刷新集群元数据
  • 生产能力,例如负载均衡、自动重试、批量提交、server 端连接管理等等

好多中大型公司在暴露给业务部门 kafka 时,都会额外做一层 proxy,只给业务暴露极简单的生产 API,将细节屏蔽,通过专门的代理层实现上面所述的功能。一方面让业务部门简单接入,一方面从运维层面提升 Topic 生产消费的管控能力。

Server

解决了 client 端的疑问,接下来我们从 server 端的角度,看 kafka 收到请求后都做了些什么,以及背后的存储。

作为一个提供 RPC 接口的 server,kafka 和其他服务器一样,有着高吞吐、低延迟处理请求的需求,这和其他 Web 服务器没有任何差别。实际上 kafka 也和其他 web 服务器一样,在服务端实现了 Reactor 模型提供高效率的并发模型: 涞源1

  • Acceptor 主要处理新 tcp 连接,核心就是执行 accept 调用,得到一个 socket,轮询分发给网络线程 (读取出完整的请求数据)
  • 网络线程将请求数据读取完毕后,会写入共享请求队列 (也会将返回数据回写 socket)

涞源1

  • IO 线程池从共享请求队列中取消息,执行真正的处理逻辑,例如检查、储存,将消息追加写入文件 (内存)
  • 处理完毕后,将响应写入响应队列,等待网络线程将处理结果返回 (correlationId 区别请求)
  • 当请求不能立即返回时,会写入 Purgatory 中,等待条件满足后才返回 (时间轮)。例如设置 ack=all 时,需要等 ISR 里其他节点拉到这条消息后,才能返回成功。[3]

日志储存

大概理清楚了宏观的处理流程,关键点还有日志的存储,有两个点:

  1. 日志以什么形式被存放的?
  2. 日志被保存到了哪里?

对于日志的存放形式,我们在下一篇文章中再详细讨论,无非是组织形式,当然还会重点考虑读取便捷度和性能。

日志最后会被持久化到磁盘中,这里有个常见的权衡:

  1. 性能。因为写磁盘是一个极高成本的事情,如果每条消息都直接刷盘,则 kafka 的吞吐能力会受到极大限制。
  2. 可靠性。但如果只刷到 PageCache(内存) 中,当机器故障,未刷入磁盘的数据就丢了 (可能已经告诉 Producer 成功了) 。

MySQL 为了在事务中的解决方案是:

  1. 在事务提交时,以顺序写的方式写入事务日志,默认直接刷盘,顺序写性能相对较好。如果这个失败了,那直接告诉请求方失败。[5]
  2. 如果写顺序日志成功了,但由于宕机导致更新失败,则在启动流程中解决,做回滚或恢复。

MySQL 在权衡中选择了可靠性,这也导致其单机更新能力极限一般在 万/s,瓶颈非常明显。而 kafka 为了更牛逼的吞吐能力,选择直接写入 PageCache 就返回成功,定时或条件触发时批量刷盘。 [6]

也有参数可以控制刷盘机制,否则由 OS 决定刷盘时机:

  • log.flush.interval.messages //多少条消息刷盘 1 次
  • log.flush.interval.ms //隔多长时间刷盘 1 次
  • log.flush.scheduler.interval.ms //周期性的刷盘

对于数据可靠性的补偿,kafka 提供 request.required.acks 的配置,可以设定当消息被复制到多个节点后才返回成功,这样数据可靠性就能明显提升,因为多个节点在某个特殊时机下同时故障导致数据丢失的概率会大大降低。

数据复制

我们知道 kafka 会通过数据复制的方式,将数据同步到副本 partition 上,一方面当 leader 故障时,副本能够顶上提供服务,另一方面当 leader 磁盘故障时,数据有备份避免丢失。

这里再次出现一个权衡点,也就是到底如何将数据同步给副本? 这也有两种常规的手法:

  1. 同步复制。类似 Raft 协议这样,当有写入请求时,同步调用副本写入数据,等 x 个副本响应成功写入后,才算成功,此时数据可靠性很高,可见性能会很差,且易受最慢节点拖累。
  2. 异步复制。类似 MySQL 的 binlog 机制,副本自己来消费 binlog 写入本地,leader 节点写入过程中完全不管 follower 的同步情况。 此时写入流程不受影响,但 leader 和 follower 之间数据同步常出现延迟,且 MySQL 副本还会提供服务,这也是其节点级 “幻读” 的原因。

kafka 本身的目标是超高量级吞吐,自然不会选择同步复制,但纯异步复制也显得不靠谱,特别是当对数据可靠性有一定要求时。kafka 从两种模式中取长补短,设计了一种新的异步模式,为不同的数据可靠性提供选择空间。

大概总结思路是:

  1. 异步同步数据。

同步写副本数据太慢,那就让 follower 节点通过接口异步找 leader 节点拉数据。

  1. 有限副本集合 ISR。

只有数据写入副本后,才能提供数据可靠性保障,但等所有副本同步完成后再返回成功则太慢了,特别是会受慢节点拖累。

kafka 提出了一个 ISR 副本集合的概念,其本质 leader 是维护一个 “优质 “的副本集合,是否优质的标准是落后 leader 的时间 (replica.lag.time.max.ms),当节点满足 “优质"条件则加入 ISR,如果同步变慢了则剔除出 ISR。 (如果担心 ISR 数量太少不可靠,可以通过 min.insync.replicas 兜底,如果小于此数量,kafka 会拒绝生产)

当对数据可靠性要求高时,可以设置消息同步到所有 ISR 节点后才算成功 (request.required.acks=all),否则给 Producer 返回超时或错误。当 leader 节点故障后,新的 leader 节点可以从 ISR 中诞生。

当对数据可靠性要求不高时,可以设置 request.required.acks=1,此时写入 leader 成功就算成功提交。当然有些场景可以更激进地设置成 0。all,1,0 这三个值对应的吞吐能力依次大幅提升。

故障转移

梳理完整个消息生产流程,以及副本同步机制,离高可用以及故障恢复还有些距离。还得问以下几个问题:

  1. 故障怎么检测?
  2. 谁负责故障恢复?
  3. 怎么恢复?

这里所说的故障,一般是指网络分区,比如某个节点连接不上了,要么网络出现问题,要么可能宕机了。这种情况的监测,kafka 是依靠 zookeeper 来实现的:

  • 每个节点在启动时会到 zookerper 注册创建一个临时节点
  • 当某个节点故障后,也会被 zookeeper 的心跳检测到,此时会将之前注册的临时节点删除
  • zookeeper 提供节点/目录变更消息订阅通知
  • 订阅了相关变更消息的节点,当故障发生时即可检测到

那谁负责订阅这些变更消息呢? 一般分布式系统中,都会有一个角色来统一负责,这种节点在 kafka 这里被叫做控制器 (Controller),除了故障检测,还会负责 Topic/分区等注册更新,以及上文提到的集群元信息通知。

当 Controller 发现节点故障后,会启动对应节点上的 partition leader 选举,让副本站出来当选 leader 对外提供服务。

具体策略就是遍历对应 partition 的副本列表,如果在 ISR 队列中则直接发消息通知其成为新 leader,并分别通知其他 Broker。如果 ISR 为空,当 unclean.leader.election.enable=true 则选择副本列表 (AR) 第一个为新 leader,否则要等挂了的节点重启后才能完成选举。Producer 可以通过任意 Broker 获得对应 partition 新的 leader 地址 (轮询或生产消息时)。[7]

当 Controller 挂了怎么办? 每个节点在启动时都会去 zookeeper 检测/controler 节点是否注册情况,如果没有则会尝试自己注册,第一个注册成功的则为 Controller。如果没能成为 Controller,则会订阅/controller 的变更消息,当 Controller 挂掉后,zk 会删除节点,此时其他所有节点都会收到消息,并竞争成为新的 Controller。

参考资料

early kafka(四) 可靠性探讨 提及了此话题。 09月25日 18:19
early kafka(一) 消息队列的本质 提及了此话题。 09月25日 18:19
early kafka(三) 消息的消费 提及了此话题。 09月25日 18:20

题外话:可以试试 GeekNote 的合集功能整理系列文章 https://geeknote.net/GeekNote/collections/27

xiaoronglv 将本帖设为了精华贴。 09月27日 22:32
需要 登录 后方可回复, 如果你还没有账号请 注册新账号