本系列计划分四大部分:
上一篇文章简要探讨了消息队列的抽象模型,可简要分为两部分:
本文将聚焦第一部分,消息的生产。以一个工具使用者的视角展开,侧重于理解其运行机制。如上面所述,这将涉及两个方面:
client 发起一次 RPC,将数据发送到 kafka,kafka 将数据保存起来后返回,这便是生产消息的全过程。不过这个过程非常的眼熟,因为这和往数据库插入一条数据没有任何本质差别。
这个过程中,有几个疑问不可绕过:
本文尝试逐一回答上面的问题,这需要先从分布式集群的角度来切入。
作为一个分布式集群,kafka 需要具备高吞吐、高可用、灵活扩展 (扩容) 的能力,要达到这些目的,需要:
我们知道这需要一种数据分片策略,这种分片在 Elasticsearch 叫 Shard,在 HBase 中叫 Region,kafka 中则叫 Partition(分区):
为了方便我们将生产 Client 称为 producer。当生产一条消息时 (某个 Topic),它只会属于某一个分片,这个归属是怎么确定的呢?procducer 怎么知道该发送到哪个机器上?
这类问题的解决方案一般有两种:
kafka 选择的是第二种方式,Producer 可以通过 bootstrap.servers 中任意一个 kafka 实例,拉取到所有元信息,和生产有关的比如:某个 Topic 有多少个 Partition,每个 Partition 的 leader 的地址,这些元信息 Producer 会定时轮询更新。
每个 kafka 节点都有完整的元信息,Producer 可以通过任意节点拉取,源头维护于 Zookeeper 之中,当集群中的 Partition 等元信息发生变更,Controller 节点会逐一推送给其他 Broker 最新信息 [4]。zookeeper 的作用其实主要是两个,一是作为存储,二是基于其 Watch 能力做事件驱动 (例如元信息更新推送)。
当要生产一条消息时:
也就是说,负载均衡策略实际上是 Producer 在决定。
kafka 的 Producer 实现相对复杂,需要关注 kafka 集群的细节,也要处理不少边界情况,例如 Partition 发生重选举后 leader 节点变化等,不同的语言要重复写一遍;好处是 kafka 本身不关心这些细节,实现上清爽很多,也有利于灵活性和性能提升。
其实现主要有几部分:
好多中大型公司在暴露给业务部门 kafka 时,都会额外做一层 proxy,只给业务暴露极简单的生产 API,将细节屏蔽,通过专门的代理层实现上面所述的功能。一方面让业务部门简单接入,一方面从运维层面提升 Topic 生产消费的管控能力。
解决了 client 端的疑问,接下来我们从 server 端的角度,看 kafka 收到请求后都做了些什么,以及背后的存储。
作为一个提供 RPC 接口的 server,kafka 和其他服务器一样,有着高吞吐、低延迟处理请求的需求,这和其他 Web 服务器没有任何差别。实际上 kafka 也和其他 web 服务器一样,在服务端实现了 Reactor 模型提供高效率的并发模型:
大概理清楚了宏观的处理流程,关键点还有日志的存储,有两个点:
对于日志的存放形式,我们在下一篇文章中再详细讨论,无非是组织形式,当然还会重点考虑读取便捷度和性能。
日志最后会被持久化到磁盘中,这里有个常见的权衡:
MySQL 为了在事务中的解决方案是:
MySQL 在权衡中选择了可靠性,这也导致其单机更新能力极限一般在 万/s,瓶颈非常明显。而 kafka 为了更牛逼的吞吐能力,选择直接写入 PageCache 就返回成功,定时或条件触发时批量刷盘。 [6]
也有参数可以控制刷盘机制,否则由 OS 决定刷盘时机:
对于数据可靠性的补偿,kafka 提供 request.required.acks 的配置,可以设定当消息被复制到多个节点后才返回成功,这样数据可靠性就能明显提升,因为多个节点在某个特殊时机下同时故障导致数据丢失的概率会大大降低。
我们知道 kafka 会通过数据复制的方式,将数据同步到副本 partition 上,一方面当 leader 故障时,副本能够顶上提供服务,另一方面当 leader 磁盘故障时,数据有备份避免丢失。
这里再次出现一个权衡点,也就是到底如何将数据同步给副本?这也有两种常规的手法:
kafka 本身的目标是超高量级吞吐,自然不会选择同步复制,但纯异步复制也显得不靠谱,特别是当对数据可靠性有一定要求时。kafka 从两种模式中取长补短,设计了一种新的异步模式,为不同的数据可靠性提供选择空间。
大概总结思路是:
同步写副本数据太慢,那就让 follower 节点通过接口异步找 leader 节点拉数据。
只有数据写入副本后,才能提供数据可靠性保障,但等所有副本同步完成后再返回成功则太慢了,特别是会受慢节点拖累。
kafka 提出了一个 ISR 副本集合的概念,其本质 leader 是维护一个“优质“的副本集合,是否优质的标准是落后 leader 的时间 (replica.lag.time.max.ms),当节点满足“优质"条件则加入 ISR,如果同步变慢了则剔除出 ISR。 (如果担心 ISR 数量太少不可靠,可以通过 min.insync.replicas 兜底,如果小于此数量,kafka 会拒绝生产)
当对数据可靠性要求高时,可以设置消息同步到所有 ISR 节点后才算成功 (request.required.acks=all),否则给 Producer 返回超时或错误。当 leader 节点故障后,新的 leader 节点可以从 ISR 中诞生。
当对数据可靠性要求不高时,可以设置 request.required.acks=1,此时写入 leader 成功就算成功提交。当然有些场景可以更激进地设置成 0。all,1,0 这三个值对应的吞吐能力依次大幅提升。
梳理完整个消息生产流程,以及副本同步机制,离高可用以及故障恢复还有些距离。还得问以下几个问题:
这里所说的故障,一般是指网络分区,比如某个节点连接不上了,要么网络出现问题,要么可能宕机了。这种情况的监测,kafka 是依靠 zookeeper 来实现的:
那谁负责订阅这些变更消息呢?一般分布式系统中,都会有一个角色来统一负责,这种节点在 kafka 这里被叫做控制器 (Controller),除了故障检测,还会负责 Topic/分区等注册更新,以及上文提到的集群元信息通知。
当 Controller 发现节点故障后,会启动对应节点上的 partition leader 选举,让副本站出来当选 leader 对外提供服务。
具体策略就是遍历对应 partition 的副本列表,如果在 ISR 队列中则直接发消息通知其成为新 leader,并分别通知其他 Broker。如果 ISR 为空,当 unclean.leader.election.enable=true 则选择副本列表 (AR) 第一个为新 leader,否则要等挂了的节点重启后才能完成选举。Producer 可以通过任意 Broker 获得对应 partition 新的 leader 地址 (轮询或生产消息时)。[7]
当 Controller 挂了怎么办?每个节点在启动时都会去 zookeeper 检测/controler 节点是否注册情况,如果没有则会尝试自己注册,第一个注册成功的则为 Controller。如果没能成为 Controller,则会订阅/controller 的变更消息,当 Controller 挂掉后,zk 会删除节点,此时其他所有节点都会收到消息,并竞争成为新的 Controller。