分享 kafka(四) 可靠性探讨

early · 2021年09月25日 · 345 次阅读

本系列计划分四大部分:

本系列前三篇介绍了消息队列的要素,以及 kafka 的生产和消费过程,实现从抽象到实际的一次知识缝合。在串联知识点的过程中,跳过了一些关键点,缺了它们会有隔靴搔痒之感。

本文主要探讨可靠性方面的一些点,主要内容围绕:

  1. 在故障发生时,数据一致性靠谱吗?
  2. kafka 可以保证消息不丢失吗?

不变的阳谋

分布式系统可靠性的建设 (机器),和公司团队可靠性建设 (人),其底层逻辑是一致的。大道至简,大道相通。

一般情况是 20-%的人创造了 80+%的核心价值,但人一旦吃饱了就容易骄纵懈怠,战斗力就缩水了。想要搭建一个战斗力爆棚同时产出稳健的团队可以怎么做?在定制好良性竞争机制之后,核心点就是堆人 (才),在切割单点不可替代性后,还要建立纵向梯度。你虽然是首席,但二把手可以随时上,三把手也在等着。

分布式系统呢,如何稳健?实际上是一样的:

  • 堆足够量的机器,>=50% 作为副本存在
  • 将集群压力切割分散在众多机器上
  • 设立 leader 节点 (Master),数据同步给副本(一致性)
  • 副本可以随时替代 leader(可用性)

可用性的来源

当集群的压力分散隔离到多个机器上时,个别 broker 出现故障,并不会影响其他 Topic 的生产和消费 (broker 量较多)。因为数据同步给了副本,可以替代 leader 迅速恢复服务,以此便实现高可用。

聚焦到本文的主题,我们需要理清楚两个问题:

  1. 数据同步相关的细节
  2. 故障恢复时一致性问题

在此基础上,尝试探讨 kafka 是否能保证消息不丢失。

副本数据同步

回顾kafka(二) 消息的生产中数据同步的部分,每个 Partition 有一个 leader,有 x 个副本,生产者直接向 leader 创建消息,副本通过接口向 leader 轮询获取最新消息,实现数据同步。 涞源 1

当 ack=all 时,leader 会等所有 ISR 集合的节点拉到消息后才会返回成功,确保数据同步到了副本上。每次副本轮询时会带上想要的 offset(fetch_offset),这是副本上最新的消息偏移量,由此 leader 便可以知道每个副本现在同步到哪儿来了,意味着小于 fetch_offset 的消息都已经同步成功了。(类似 TCP 的 ack)

通过数据同步时延 (replica.lag.time.max.ms) 可以对 ISR 集合进行伸缩,将慢节点踢掉,如果速度恢复了会再加回来。其原理是,每当副本的 fetch_offset 等于 leader 的 LEO(下一条消息的偏移量),leader 会更新对应节点的 lastCaughtUpTimeMs 值 (完全追上后才会更新),kafka 通过定时任务扫每个节点的该值,如果 time.now-lastCaughtUpTimeMs > replica.lag.time.max.ms,则该副本落后了,会记录起来,通过另一定时任务将其踢掉。

ISR 本质上是个优质副本集合,也可能只有 leader 副本一个节点 (其他因为慢被踢掉了),这算是“少数服从多数的”一种实现 (PacificA)。类似 raft 的超过半数提交,实际上是通过最快的部分节点实现“少数服从多数”,从对比上看同等规模下 kafka 可以实现更少的副本节点 (副本越多生产越耗时),使用者也可以通过 min.insync.replicas 实现数量控制,这将灵活性交给了使用者。

消息可见性

kafka 消息的生产和消费都是围绕 leader 进行的,副本全程只是默默复制数据。那当消息写入 leader 时,哪些消息能被消费者看到?如果写入 leader 就被消费者看到,假设此时消息还未同步到副本,leader 节点又挂了,副本重新成为 leader 后,便会出现“幻读”,有些消费者消费到了该消息,有些则消费不到,因为新 leader 上没有。 涞源 1

对于此,kafka 提供了分区高水位的概念,一方面定义消息的可见性,另一方面也辅助了消息同步到副本。其值计算逻辑很简单,HW = max(currentHW, min(副本 LEO))。也就是说分区高水位取决于最慢节点的最新一次 fetch_offset 值,这直接衡量了副本的同步情况。

比高水位大的消息对消费者不可见,也被称为未提交消息。副本在拉消息接口中,leader 也会将高水位值返回给副本,副本接着更新自己的高水位 (min(leader-HW, 自己 LEO))。但分区高水位是指 leader 高水位,副本高水位用于故障恢复时日志截断。

总结下两个概念:

  • 分区高水位 HW,也是 leader 节点高水位值,控制消费者可见性,避免“幻读”,也直接记录副本同步情况。由节点列表中 (在 ISR 中且未同步延迟) 最慢的节点决定。leader 更新公式:max(currentHW, min(副本 LEO)),副本公式:min(leader-HW, 自己 LEO)
  • LEO(Log End Offset),末端位移,下一条将要插入消息的 offset。

故障恢复

每个 broker 和 zookeeper 有心跳,当某个 leader 没响应了,controller 可以通过 watch 机制得知,并开启故障转移。(见kafka(二) 消息的生产)

当发生 leader 切换时,数据一致性便会经受考验。在 kafka0.11 之前,当 leader 发生切换时,副本会将自己本地的日志按照高水位 (副本自己的) 截断删除,然后将 fetch_offset 设置为高水位值向新 leader 发起同步请求,这里面可能会出现数据一致性问题。

高水位的缺陷

根源在于 leader 的 HW 更新和副本 HW 更新存在时间差。下面是几个回合 HW 的更新,右边是 leader,左边是某个副本: 涞源 2

图上共有三排,第一排时序:

  1. leader B 中有两条消息 m1 m2
  2. 副本 A 带上 fetch_offset=1 拉 m2 这条消息
  3. 此时 B、A 的 LEO 均为 2,HW 为 1
  4. 副本 A 带上 fetch_offset=2 继续拉消息
  5. B 将 HW 修改为 2,A 需要等接口返回后才能将 HW 修改为 2,此时会出现时间差。如果 B 中没有新消息则会延迟返回,时间差还会进一步拉大。

假设在这个时间差之间,A 崩溃了,时序如下:

  1. A 奔溃,此时其 HW 为 1(上图第二排)
  2. A 恢复后,则会将 m2 删除
  3. 以 fetch_offset=1 向 leader 拉消息。假设此时 B 也奔溃了
  4. A 会成为新 leader
  5. B 恢复后成为副本,其 HW 为 2,比 leader 高,会将 HW 改为 1 并截断 m2,fetch_offset=1 向 A 拉消息
  6. 最终结果是:m2 这条消息丢了

假设 leader 等副本更新完 HW 后,才更新自己的 HW,则可以避免上面的问题,但这需要多增加一轮数据同步请求。而且换一个时序,也会出现数据不一致问题。 涞源 2

  1. A 为 leader,LEO=HW=1。B 为副本,LEO=HW=1
  2. 假设 A、B 同时挂掉
  3. B 先恢复,成为新 leader
  4. 生产者向 B 创建了一条新消息 m3,此时 HW=LEO=2
  5. A 恢复称为副本,HW 和 A 一样,不需要截断,fetch_offset=2 也拉不到新消息
  6. 最终结果是:A 和 B 之间数据不一致。

上面的问题会出现在 min.insync.replicas=1 的情况下,也就是某时刻 ISR 中只有 leader 节点,由于 min.insync.replicas=1,此时也可以生产消息,数据只提交到了 leader 节点,便返回成功给生产者。

假设 min.insync.replicas>1,上面问题理论上不会出现。(严肃性故障下文讨论)

  1. 如果 ISR 中只有 leader 副本,此时 kafka 会返回错误,告诉生产者现在不能生产,这样不会有数据问题。
  2. 如果 ISR 中有其他副本,leader 会等其他节点 fetch_offset>[消息的 offset] 时,才会返回成功给生产者,此时 HW 均已经更新了。 ## Leader Epoch 上面数据问题的根源在于,故障恢复后 HW 被新 leader 更新,但副本的 HW 还是原来的,一旦同步便会产生问题。如果能将 HW 按照变更前的状态返回给副本,则可以避免问题。

Leader Epoch 便是起这样的作用:

  1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
  3. 组合起来 Leader Epoch 便类似于: 。epoch 为当前版本,leader 变更后会 +1,startOffset 是当前新 leader 第一条消息的 offset。
  4. broker 会在内存和磁盘中存放 leader 的 Epoch 值。

当有副本重启后归来,带上自己保存的 leader epoch 值,先向 leader 发起一轮请求,获取当前 leader 的 epoch 值。

  1. 如果两者 epoch 值一样,则 leader 返回当前自己的 LEO 给副本。此时副本 LEO 理论上<=leader LEO,不需要截断,正常同步数据。
  2. 如果两个 epoch 不一样,说明副本持有的 epoch 是上一轮老 leader 的 (可能落后多轮),此时新 leader 根据老 epoch 返回 [老 epoch+1] 记录的 startOffset 给副本。(假设只差一轮的话,则返回当前 epoch 的 startOffset)
  3. 副本根据 startOffset 进行截断逻辑。如果返回的 startOffset 大于等于自己的 HW,则不会删除消息,原消息得到保留。否则截断从 leader 重新拉取,这避免了不一致。

涞源 2

消息能不丢么

上文大致梳理了数据同步,以及故障恢复的一些细节。现在我们需要问一个更大的问题:

kafka 能保证消息不丢么,或者说要做到消息不丢失,该如何配置 kafka? 前文提到了,当:

  • 生产者得到 kafka 生产成功回复时,才认为成功,否则重试
  • ack=all,消息同步到 ISR 中时,才返回成功。
  • replication.factor >= 3,副本数量多一些
  • min.insync.replicas > 1, ISR 数量超过 1 个才提供服务。
  • replication.factor > min.insync.replicas,如果相等只要一个节点挂了,便会拒绝服务
  • unclean.leader.election.enable=false,必须要 ISR 的成员才能称为新 leader

上面几个条件同时生效时,kafka 的明确告知生产成功的消息是有保障的,至少落到了某个 ISR 节点中,等待重启完毕便可以恢复数据。

但这是有前提的,因为 kafka 的消息是写入内存便认为提交,副本同步也是一样:

  • 副本告诉 leader 消息已经同步成功,实际上可能还在内存中没有刷盘,奔溃后数据丢失
  • 副本告诉 leader 消息同步成功,此时已经刷盘,但这个副本磁盘故障,数据会丢失

当出现了上面的情况时,kafka 依然会丢失消息,只不过副本数量越多,上面参数配置的越苛刻,丢失数据的概率会减小,毕竟没有向 MySQL 那样同步刷 redolog 到磁盘。

这个问题的答案是,当上面的一系列参数都配置正确,kafka 对"已提交"的消息,可以在一定程度上保证数据不丢失:

  • 已提交。上面 ack=all 等参数配置,且 kafka 明确返回成功
  • 一定程度。kafka 的多个副本至少有一个没出问题 (刷盘前 crash 或磁盘故障等)

参考资料

early kafka(一) 消息队列的本质 提及了此话题。 09月25日 18:19
early kafka(二) 消息的生产 提及了此话题。 09月25日 18:20
early kafka(三) 消息的消费 提及了此话题。 09月25日 18:20
需要 登录 后方可回复, 如果你还没有账号请 注册新账号