为什么要使用 kafka 消息队列?
解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中,下游可以按照自己的速度逐步处理;
可扩展:通过横向扩展生产者、消费者和 broker, kafka 可以轻松处理巨大的消息流;
高吞吐、低延迟:在一台普通的服务器上既可以达到 10w/s 的吞吐速率;
容灾性:kafka 通过副本 replication 的设置和 leader/follower 的容灾机制保障了消息的安全性。
kafka 的高吞吐、低延迟是如何实现的?
1. 顺序读写
kafka 使用磁盘顺序读写来提升性能
顺序读写和随机读写性能对比:
顺序读 | 随机读 | 顺序写 | 随机写 | |
---|---|---|---|---|
机械硬盘 | 84.0mb/s | 0.033mb/s (512 字节) | 79.0mb/s | 0.083mb/s (512 字节) |
固态硬盘 | 220.7mb/s | 5.296mb/s (512 字节) | 77.2mb/s | 10.203mb/s (512 字节) |
从数据可以看出磁盘的顺序读写速度远高于随机读写的速度,这是因为传统的磁头探针结构,随机读写时需要频繁寻道,也就需要磁头和探针频繁的转动,而机械结构的磁头和探针的位置调整是十分费时的,这就严重影响到硬盘的寻址速度,进而影响到随机写入速度。
kafka 的 message 是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得 kafka 写入吞吐量得到了显著提升 。每一个 partition 其实都是一个文件 ,收到消息后 kafka 会把数据插入到文件末尾。
2. 页缓存(pagecache)
pagecache 是系统级别的缓存,它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高 io 效率;
pagecache 同时可以避免在 jvm 内部缓存数据,避免不必要的 gc、以及内存空间占用。对于 in-process cache,如果 kafka 重启,它会失效,而操作系统管理的 pagecache 依然可以继续使用。
- producer 把消息发到 broker 后,数据并不是直接落入磁盘的,而是先进入 pagecache。pagecache 中的数据会被内核中的处理线程采用同步或异步的方式定期刷盘至磁盘。
- consumer 消费消息时,会先从 pagecache 获取消息,获取不到才回去磁盘读取,并且会预读出一些相邻的块放入 pagecache,以方便下一次读取。
- 如果 kafka producer 的生产速率与 consumer 的消费速率相差不大,那么几乎只靠对 broker pagecache 的读写就能完成整个生产和消费过程,磁盘访问非常少
3. 零拷贝
正常过程:
- 操作系统将数据从磁盘上读入到内核空间的读缓冲区中
- 应用程序(也就是 kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中
- 应用程序将数据从用户空间的缓冲区再写回到内核空间的 socket 缓冲区中
- 操作系统将 socket 缓冲区中的数据拷贝到 nic 缓冲区中,然后通过网络发送给客户端
在这个过程中,可以发现, 数据从磁盘到最终发出去,要经历 4 次拷贝,而在这四次拷贝过程中, 有两次拷贝是浪费的。
1. 从内核空间拷贝到用户空间;
2. 从用户空间再次拷贝到内核空间;
除此之外,由于用户空间和内核空间的切换,会带来 cpu 上下文切换,对于 cpu 的性能也会造成影响;
零拷贝省略了数据在内核空间和用户空间之间的重复穿梭;用户态和内核态切换时产生中断,耗时;
4. 分区分段索引
kafka 的 message 是按 topic 分类存储的,topic 中的数据又是按照一个一个的 partition 即分区存储到不同 broker 节点。每个 partition 对应了操作系统上的一个文件夹,partition 实际上又是按照 segment 分段存储的。符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,kafka 的 message 消息实际上是分布式存储在一个一个小的 segment 中的,每次文件操作也是直接操作的 segment。为了进一步的查询优化,kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index 文件。这种分区分段 + 索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
5. 批量处理
kafka 发送消息不是一条一条发送的,而是批量发送,很大的提高了发送消息的吞吐量。
假设发送一条消息的时间是 1ms,而此时的吞吐量就是 1000tps。但是假如我们将消息批量发送,1000 条消息需要 10ms,而此时的吞吐量就达到了 1000*100tps。而且这样也很大程度的减少了请求 broker 的次数,提升了总体的效率。
kafka 架构
基本概念
名词 | 概念 |
---|---|
producer | 生产者(发送消息) |
consumer | 消费者(接收消息) |
consumergroup | 消费者组,可以并行消费同一 topic 中的消息 |
broker | 一个独立的 kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出相应,返回已经提交到磁盘上的消息。可起到负载均衡、容错的作用。 |
topic | 主题,一个队列,可理解为按照消息的逻辑分类将消息划分为不同的 topic |
partition | topic 的物理分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序队列。可起到提高可扩展性,应对高并发场景的作用。 |
replica | 副本,为保证集群的高可用性,kafka 提供副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower |
leader | 每个分区多个副本的主节点,生产者发送数据的对象,以及消费者消费数据的对象都是 leader |
offset | 对于 kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。 |
架构图
q1:topic 的分区及副本在 broker 上是如何分配的呢?
这里涉及到两个参数:
startindex:第一个分区的第一个副本放置位置(p0-r0)
nextreplicashift:其他分区的副本的放置是依次后移的,间隔距离就是 nextreplicashift 值。
q2:kafka 的架构是基于什么设计思想呢?
分治思想:
1. topic 分治:对于 kafka 的 topic,我们在创建之初可以设置多个 partition 来存放数据,对于同一个 topic 的数据,每条数据的 key 通过哈希取模被路由到不同的 partition 中(如果没有设置 key,则根据消息本身取模),以此达到分治的目的。
2. partition 分治:为了方便数据的消费,kafka 将原始的数据转化为” 索引 + 数据 “的形式进行分治,将一个 partition 对应一个文件转变为一个 partition 对应多个人不同类型的文件,分别为:
- .index 文件:索引文件,用来记录 log 文件中真实消息的相对偏移量和物理位置,为了减少索引文件的大小,使用了稀疏索引
- .log 文件:用来记录 producer 写入的真实消息,即消息体本身;
- .timeindex 文件:时间索引文件,类比.index 文件,用来记录 log 文件中真实消息写入的时间情况,跟 offset 索引文件功能一样,只不过这个以时间作为索引,用来快速定位目标时间点的数据;
3. 底层文件分治:不能将 partition 全部文件都放入一套 ”.index+.log+.timeindex“ 文件中,因此需要对文件进行拆分。kafka 对单个.index 文件、.timeindex 文件、.log 文件的大小都有限定(通过不同参数配置),且这 3 个文件互为一组。当.log 文件的大小达到阈值则会自动拆分形成一组新的文件,这种将数据拆分成多个的小文件叫做 segment,一个 log 文件代表一个 segment。
kafka 工作流程
生产流程:
- 先从 zk 获取对应分区的 leader 在哪个 broker
- broker 进程上的 leader 将消息写入到本地 log 中
- follower 从 leader 上拉取消息,写入到本地 log,并向 leader 发送 ack
- leader 接收到所有的 isr 中的 replica 的 ack 后,并向生产者返回 ack
消费流程:
- 每个 consumer 都可以根据分配策略,获得要消费的分区
- 获取到 consumer 对应的 leader 处于哪个 broker 以及 offset
- 拉取数据
- 消费者提交 offset
分区策略
相信上面的内容已经让大家大致了解了消息生产及消费的过程:一个 topic 内的消息会被发送到不同的分区以供不同的消费者拉取消息。
那么在这个过程中就涉及到了两个问题:
- 生产者按照什么策略将数据分配到分区中呢?
- 消费者按照什么策略去不同的分区拉取消息呢?
生产者分区策略
生产者写入消息到 topic,kafka 将依据不同的策略将数据分配到不同的分区中:
1. 轮询分区策略
即按消息顺序进行分区顺序分配,是默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区;
key 为 null,则使用轮询算法均衡地分配分区;
2. 按 key 分区分配策略
key 不为 null,key.hash () % n
但是按照 key 决定分区有可能会造成数据倾斜
3. 随机分区策略
随机分区,不建议使用
4. 自定义分区策略
根据业务需要制定以分区策略
消费者分区策略
同一时刻,一条消息只能被组中的一个消费者实例消费:
- 消费者数 = 分区数:一个分区对应一个消费者
- 消费者数 < 分区数:一个消费者对应多个分区
- 消费者数 > 分区数:多出来的消费者将不会消费任何消息
分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少
1. range 分配策略(范围分配策略):kafka 默认的分配策略
计算公式:
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前 m 个消费者消费 n+1 个,剩余消费者消费 n 个
以上图为例:n=8/3=2m=8%3=2 因此前 2 个消费者消费 2+1=3 个分区,剩下 1 个消费者消费 2 个分区
2. roundrobin 分配策略(轮询分配策略)
消费者挨个分配消费的分区:
如下图,3 个消费者共同消费 8 个分区
第一轮:consumer0-->a-partition0;consumer1-->a-partition1;consumer2-->a-partition2
第二轮:consumer0-->a-partition3;consumer1-->b-partition0;consumer2-->b-partition1
第三轮:consumer0-->b-partition2;consumer1-->b-partition3
3. striky 粘性分配策略
在没有发生 rebalance 跟轮询分配策略是一致的
发生了 rebalance(例如 consumer2 故障宕机),轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可,这样就减少了上下文的切换
副本的 ack 机制
producer 是不断地往 kafka 中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个 acks 的配置。
- acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的
- acks = 1:生产者会等到 leader 分区写入成功后,返回成功,接着发送下一条
- acks = -1/all:确保消息写入到 leader 分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的
根据业务情况来选择 ack 机制,是要求性能最高,一部分数据丢失影响不大,可以选择 0/1。如果要求数据一定不能丢失,就得配置为 - 1/all。
分区中是有 leader 和 follower 的概念,为了确保消费者消费的数据是一致的,只能从分区 leader 去读写消息,follower 做的事情就是同步数据。
q&a:
1. offset 存在哪里?
0.9 版本前默认存在 zk,但是由于频繁访问 zk,zk 需要一个一个节点更新 offset,不能批量或分组更新,导致 offset 更新成了瓶颈。
在新版 kafka 以及之后的版本,kafka 消费的 offset 都会默认存放在 kafka 集群中的一个叫 __consumer_offsets 的 topic 中。offset 以消息形式发送到该 topic 并保存在 broker 中。这样 consumer 提交 offset 时,只需连接到 broker,不用访问 zk,避免了 zk 节点更新瓶颈。
2.leader 选举策略?
2 种 leader:①broker 的 leader 即 controller leader ② partition 的 leader
- controller leader:当 broker 启动的时候,都会创建 kafkacontroller 对象,但是集群中只能有一个 leader 对外提供服务,这些每个节点上的 kafkacontroller 会在指定的 zookeeper 路径下创建临时节点,只有第一个成功创建的节点的 kafkacontroller 才可以成为 leader,其余的都是 follower。当 leader 故障后,所有的 follower 会收到通知,再次竞争在该路径下创建节点从而选举新的 leader
- partition leader :由 controller leader 执行
- 从 zookeeper 中读取当前分区的所有 isr (in-sync replicas) 集合
- 调用配置的分区选择算法选择分区的 leader
3. 分区可以提高扩展性以及吞吐量,那分区越多越好吗?
分区并不是越多越好,分区过多存在着以下弊端:
- producer 内存成本增大、consumer 线程开销增大 partition 是 kafka 管理数据的基本逻辑组织单元,越多的 partition 意味着越多的数据存储文件(一个 partition 对应至少 3 个数据文件)
- 分区增多数据连续性下降
- 可用性降低
4. 与数据库相比 kafka 的优势?
- 业务场景不同,底层数据结构不同,kafka 数据存储对于功能要求较少,因此读写更快 kafka 存储数据(消息本身)的文件的数据结构是数组,数组的特点就是:数据间位置连续,如果按照顺序读取,或者追加写入的话,其时间复杂度为 o (1),效率最高。
5. 消费偏移的更新方式 无论是 kafka 默认 api,还是 java 的 api,offset 的更新方式都有两种:自动提交和手动提交
- 自动提交(默认方式)
kafka 中偏移量的自动提交是由参数 enable_auto_commit 和 auto_commit_interval_ms 控制的,当 enable_auto_commit=true 时,kafka 在消费的过程中会以频率为 auto_commit_interval_ms 向 kafka 自带的 topic (__consumer_offsets) 进行偏移量提交,具体提交到哪个 partation:math.abs (groupid.hashcode ()) % numpartitions。
这种方式也被称为 at most once,fetch 到消息后就可以更新 offset,无论是否消费成功。
2. 手动提交
鉴于 kafka 自动提交 offset 的不灵活性和不精确性 (只能是按指定频率的提交),kafka 提供了手动提交 offset 策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交 offset 主要有 3 种方式:
同步提交:提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同步方式下消费者线程在拉取消息会被阻塞,在 broker 对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
异步提交:异步手动提交 offset 时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在 broker 做出响应的时候记录错误信息。 对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
异步 + 同步:针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。 通过 finally 在最后不管是否异常都会触发 consumer.commit () 来同步补救一次,确保偏移量不会丢失。
发表评论