本文主要学习https://blog.csdn.net/cao1315020626/article/details/112590786?ops_request_misc=%257b%2522request%255fid%2522%253a%2522171886470616800197016195%2522%252c%2522scm%2522%253a%252220140713.130102334…%2522%257d&request_id=171886470616800197016195&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-112590786-null-null.142v100pc_search_result_base3&utm_term=kafka&spm=1018.2226.3001.4187
基于上文加上上文博主没讲到的地方和自己的一些理解
1 kafka基本概念
kafka是一种消息队列,同样具有消息队列的好处
解耦合
耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能。
异步处理
异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。
流量削峰
高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。
1.1 kafka消费模式
主要有两种一对一和一对多,一般来说,都是用一对多模式(至少我目前没用过一对一)
一对一模式:
生产者将消息发送到生产队列,消费者进行消费,消费完毕后该消息删除,整个生产消费过程是非阻塞的,类似与我们go中有缓冲区的channel
一对多模式:
一个生产者对应多个消费者,生产者将消息生产到生产队列的topic中,消费者订阅某一个或者多个topic进行消费,消费后并不会删除消息,默认保存一段时间,具体消费方式设计到分区等,后面会说。
1.2kafka基础架构
producer:消息生产者,向kafka中发布消息的角色。
consumer:消息消费者,即从kafka中拉取消息消费的客户端。
consumer group:消费者组,消费者组则是一组中存在多个消费者,消费者消费broker中当前topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费
broker:经纪人,一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
topic:主题,可以理解为一个队列,生产者和消费者都是面向一个topic
partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列(分区有序,不能保证全局有序),一个分区只能被同一个消费者组中的一个消费者消费,这确保了每条消息只能被消费者组中的一个消费者处理。不同消费者组中的消费者可以独立消费不同的分区,但一个分区只能被同一消费者组中的一个消费者消费。
replica:副本replication,为保证集群中某个节点发生故障,节点上的partition数据不丢失,kafka可以正常的工作,kafka提供了副本机制,一个topic的每个分区有若干个副本,一个leader和多个follower
leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
follower:每个分区多个副本的从角色,实时的从leader中同步数据,保持和leader数据的同步,leader发生故障的时候,某个follower会成为新的leader。
offset:kafka的分区中的每一个消息的位置都由offset来表示
1.3 kafka工作流程
kafka中,topic可以视为一个逻辑地址,实际上topic是分布到多个partition上的,partition通常有一个log文件,消息一般是直接新加到log文件的末端,消费者每次消费都会更新offset,以便清楚自己下次消费的位置,所以一个topic通常对应多个partition,然后一个partition通常对应一个log(有可能分裂)
1.4 kafka的文件存储(这个用到的比较少)
kafka中一般是一个topic对应多个partition,一个partition对应多个segment,一个segment对应两个文件,一个index,一个log,log就是用来存储我们的消息的,index是用于大量数据情况下快速定位的
.index文件存储的消息的offset+真实的起始偏移量。.log中存放的是真实的数据。
1.5 kafka生产者分区
分区的原因
1.方便在集群中扩展:每个partition通过调整以适应它所在的机器,而一个topic又可以有多个partition组成,因此整个集群可以适应适合的数据
2。可以提高并发:以partition为单位进行读写。类似于多路。
分区的原则
1.指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值
2.没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值
3.值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。
1.6 kafka的ack机制
为了确保数据能落盘,producer发送消息后,需要收到相应leader的ack,收到后再进行下一轮发送
1.61 何时发送ack
leader发送ack一般是有两种方式,一种是半数follower同步完成,还有一种是全部follower同步完成,kafka采用的是第二种,主要是因为网络对kafka影响较小,所以等一半还是等全部差距不大。
关于故障处理一般是通过维护一个isr,如果leader发现某一follower长时间未回应,则将其提出isr,如果leader掉线,则再isr中重新选举
1.62 三种ack
ack 0 : producer不需要等到leader的ack直接进行下一轮发送,速度非常快,但存在数据丢失的风险
ack 1 : producer仅需要等待leader的ack,可能会出现follower未同步消息的情况
ack -1 : producer需要等到leader和follower的ack,可能会出现数据重复的情况
1.63 数据一致性
leo : 每个副本的最后一个offset(即最后一个消息)
hw : 消费者能见到的最小的leo,即isr中最小的leo
leader故障 : 会从isr中重新选取一个leader,follower截掉hw以上的数据,开始和leader同步数据
follower故障 : 暂时踢出isr队列,恢复后戒掉hw以上的数据,开始和leader同步
2 消费者分区
consumer采用pull拉的方式来从broker中读取数据。
push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。
2.1 分配模式
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由那个consumer消费的问题。
kafka的两种分配策略:
round-robin循环
range
round-robin
主要采用轮询的方式分配所有的分区,该策略主要实现的步骤:
假设存在三个topic:t0/t1/t2,分别拥有1/2/3个分区,共有6个分区,分别为t0-0/t1-0/t1-1/t2-0/t2-1/t2-2,这里假设我们有三个consumer,c0、c1、c2,订阅情况为c0:t0,c1:t0、t1,c2:t0/t1/t2。
此时round-robin采取的分配方式,则是按照分区的字典对分区和消费者进行排序,然后对分区进行循环遍历,遇到自己订阅的则消费,否则向下轮询下一个消费者。即按照分区轮询消费者,继而消息被消费。
分区在循环遍历消费者,自己被当前消费者订阅,则消息与消费者共同向下(消息被消费),否则消费者向下消息继续遍历(消息没有被消费)。轮询的方式会导致每个consumer所承载的分区数量不一致,从而导致各个consumer压力不均。上面的c2因为订阅的比较多,导致承受的压力也相对较大。
range
range的重分配策略,首先计算各个consumer将会承载的分区数量,然后将指定数量的分区分配给该consumer。假设存在两个consumer,c0和c1,两个topic,t0和t1,这两个topic分别都有三个分区,那么总共的分区有6个,t0-0,t0-1,t0-2,t1-0,t1-1,t1-2。分配方式如下:
range按照topic一次进行分配,即消费者遍历topic,t0,含有三个分区,同时有两个订阅了该topic的消费者,将这些分区和消费者按照字典序排列。
按照平均分配的方式计算每个consumer会得到多少个分区,如果没有除尽,多出来的分区则按照字典序挨个分配给消费者。按照此方式以此分配每一个topic给订阅的消费者,最后完成topic分区的分配。
按照range的方式进行分配,本质上是以此遍历每个topic,然后将这些topic按照其订阅的consumer数进行平均分配,多出来的则按照consumer的字典序挨个分配,这种方式会导致在前面的consumer得到更多的分区,导致各个consumer的压力不均衡。
2.2 消费者offset存储
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复以后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。
3 kafka高吞吐低延迟如何实现的
kafka使用磁盘顺序读写来提升性能
顺序读写和随机读写性能对比:
顺序读随机读顺序写随机写机械硬盘84.0mb/s0.033mb/s (512字节)79.0mb/s0.083mb/s (512字节)固态硬盘220.7mb/s5.296mb/s(512字节)77.2mb/s10.203mb/s(512字节)
从数据可以看出磁盘的顺序读写速度远高于随机读写的速度,这是因为传统的磁头探针结构,随机读写时需要频繁寻道,也就需要磁头和探针频繁的转动,而机械结构的磁头和探针的位置调整是十分费时的,这就严重影响到硬盘的寻址速度,进而影响到随机写入速度。
kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得kafka写入吞吐量得到了显著提升 。每一个partition其实都是一个文件 ,收到消息后kafka会把数据插入到文件末尾。
2.页缓存(pagecache)
pagecache是系统级别的缓存,它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高io效率;
pagecache同时可以避免在jvm内部缓存数据,避免不必要的gc、以及内存空间占用。对于in-process cache,如果kafka重启,它会失效,而操作系统管理的pagecache依然可以继续使用。
producer把消息发到broker后,数据并不是直接落入磁盘的,而是先进入pagecache。pagecache中的数据会被内核中的处理线程采用同步或异步的方式定期刷盘至磁盘。
consumer消费消息时,会先从pagecache获取消息,获取不到才回去磁盘读取,并且会预读出一些相邻的块放入pagecache,以方便下一次读取。
如果kafka producer的生产速率与consumer的消费速率相差不大,那么几乎只靠对broker pagecache的读写就能完成整个生产和消费过程,磁盘访问非常少
3.零拷贝
正常过程:
操作系统将数据从磁盘上读入到内核空间的读缓冲区中
应用程序(也就是kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中
应用程序将数据从用户空间的缓冲区再写回到内核空间的socket缓冲区中
操作系统将socket缓冲区中的数据拷贝到nic缓冲区中,然后通过网络发送给客户端
在这个过程中,可以发现, 数据从磁盘到最终发出去,要经历4次拷贝,而在这四次拷贝过程中, 有两次拷贝是浪费的。
1.从内核空间拷贝到用户空间;
2.从用户空间再次拷贝到内核空间;
除此之外,由于用户空间和内核空间的切换,会带来cpu上下文切换,对于cpu的性能也会造成影响;
零拷贝省略了数据在内核空间和用户空间之间的重复穿梭;用户态和内核态切换时产生中断,耗时;
4.分区分段索引
kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
5.批量处理
kafka发送消息不是一条一条发送的,而是批量发送,很大的提高了发送消息的吞吐量。
假设发送一条消息的时间是1ms,而此时的吞吐量就是1000tps。但是假如我们将消息批量发送,1000条消息需要10ms,而此时的吞吐量就达到了1000*100tps。而且这样也很大程度的减少了请求broker的次数,提升了总体的效率。
4 事务
4.1 消费者事务
为了按跨分区跨会话的事务,需要引入一个全局唯一的transaction id(tid),消费者的pid会和tid绑定
还会有一个tc,将每一个事务信息写入一个topic中,这样即使消费者断电,也可以继续从上次消费的地方继续消费
4.2 生产者事务
暂时无法保证
5 go操作kafka
下面代码主要学习www.liwenzhou.com
go get github.com/segmentio/kafka-go
首先安装依赖
发送消息
// writebyconn 基于conn发送消息
func writebyconn() {
topic := "my-topic"
partition := 0
// 连接至kafka集群的leader节点
conn, err := kafka.dialleader(context.background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.fatal("failed to dial leader:", err)
}
// 设置发送消息的超时时间
conn.setwritedeadline(time.now().add(10 * time.second))
// 发送消息
_, err = conn.writemessages(
kafka.message{value: []byte("one!")},
kafka.message{value: []byte("two!")},
kafka.message{value: []byte("three!")},
)
if err != nil {
log.fatal("failed to write messages:", err)
}
// 关闭连接
if err := conn.close(); err != nil {
log.fatal("failed to close writer:", err)
}
}
消费消息
// readbyconn 连接至kafka后接收消息
func readbyconn() {
// 指定要连接的topic和partition
topic := "my-topic"
partition := 0
// 连接至kafka的leader节点
conn, err := kafka.dialleader(context.background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.fatal("failed to dial leader:", err)
}
// 设置读取超时时间
conn.setreaddeadline(time.now().add(10 * time.second))
// 读取一批消息,得到的batch是一系列消息的迭代器
batch := conn.readbatch(10e3, 1e6) // fetch 10kb min, 1mb max
// 遍历读取消息
b := make([]byte, 10e3) // 10kb max per message
for {
n, err := batch.read(b)
if err != nil {
break
}
fmt.println(string(b[:n]))
}
// 关闭batch
if err := batch.close(); err != nil {
log.fatal("failed to close batch:", err)
}
// 关闭连接
if err := conn.close(); err != nil {
log.fatal("failed to close connection:", err)
}
}
创建topic
// createtopicbyconn 创建topic
func createtopicbyconn() {
// 指定要创建的topic名称
topic := "my-topic"
// 连接至任意kafka节点
conn, err := kafka.dial("tcp", "localhost:9092")
if err != nil {
panic(err.error())
}
defer conn.close()
// 获取当前控制节点信息
controller, err := conn.controller()
if err != nil {
panic(err.error())
}
var controllerconn *kafka.conn
// 连接至leader节点
controllerconn, err = kafka.dial("tcp", net.joinhostport(controller.host, strconv.itoa(controller.port)))
if err != nil {
panic(err.error())
}
defer controllerconn.close()
topicconfigs := []kafka.topicconfig{
{
topic: topic,
numpartitions: 1,
replicationfactor: 1,
},
}
// 创建topic
err = controllerconn.createtopics(topicconfigs...)
if err != nil {
panic(err.error())
}
}
获取topic列表
conn, err := kafka.dial("tcp", "localhost:9092")
if err != nil {
panic(err.error())
}
defer conn.close()
partitions, err := conn.readpartitions()
if err != nil {
panic(err.error())
}
m := map[string]struct{}{}
// 遍历所有分区取topic
for _, p := range partitions {
m[p.topic] = struct{}{}
}
for k := range m {
fmt.println(k)
}
6 kafka结合redis实现分布式锁
6.1 使用redis实现分布式锁:
在redis中使用set命令来尝试获取锁,在设置过程中可以添加过期时间以防止死锁。
使用set命令的nx(只在键不存在时设置)选项,确保只有一个客户端能够成功设置锁。
释放锁时,使用del命令删除锁。
6.2 使用kafka实现锁的分发:
当一个客户端成功获取锁时,向一个专门的kafka主题发送消息,表示锁已经被获取。
其他客户端订阅该主题以获取锁状态。
6.3 结合redis和kafka:
当客户端成功获取锁后,在redis中设置锁,并通过kafka发送获取锁的消息。
其他客户端在收到获取锁的消息后,尝试获取锁时首先检查redis中的锁状态。
代码示例
package main
import (
"fmt"
"github.com/shopify/sarama"
"github.com/go-redis/redis"
"log"
"time"
)
var (
redisclient *redis.client
kafkabrokers = []string{"localhost:9092"}
)
func init() {
redisclient = redis.newclient(&redis.options{
addr: "localhost:6379",
password: "",
db: 0,
})
}
func acquirelock(lockkey string) bool {
result, err := redisclient.setnx(lockkey, "locked", 10*time.second).result()
if err != nil {
log.fatal(err)
}
return result
}
func releaselock(lockkey string) {
err := redisclient.del(lockkey).err()
if err != nil {
log.fatal(err)
}
}
func main() {
config := sarama.newconfig()
config.producer.requiredacks = sarama.waitforlocal
config.producer.return.successes = true
producer, err := sarama.newsyncproducer(kafkabrokers, config)
if err != nil {
log.fatal(err)
}
defer func() {
if err := producer.close(); err != nil {
log.fatal(err)
}
}()
lockkey := "my_lock_key"
if acquirelock(lockkey) {
defer releaselock(lockkey)
// 发送获取锁消息到kafka
message := &sarama.producermessage{
topic: "lock_topic",
value: sarama.stringencoder("lock acquired for key: " + lockkey),
}
_, _, err := producer.sendmessage(message)
if err != nil {
log.fatal(err)
}
// 处理业务逻辑
fmt.println("lock acquired. performing critical section operations...")
time.sleep(5 * time.second)
fmt.println("critical section operations completed.")
} else {
fmt.println("failed to acquire lock. another process may be holding the lock.")
}
}
发表评论