生产经验
生产者提高吞吐量
linger.ms:表示一个batch还未满时,时隔多久开始发送。默认是0ms,即缓冲区中一有数据就开始向kafka集群开始发送,此时批次大小的数据没有意义。一般修改linger.ms=5~100ms,给缓冲队列一些时间让他能够向其中存放数据,能够按照batch进行数据发送。也不要设的太大,避免数据量少的时候长时间不发送。batch.size:批次大小,一个batch中存放多少数据。compression.type:压缩方式snappy,压缩数据能够减小数据大小,提高数据吞吐效率。可以选择的压缩方式:- snappy
 - gzip
 - lz4
 - zstd
 
recordaccumulator:缓冲区大小,默认32mb,可以修改为64mb
数据有序
生产者处按照有序的顺序产生数据,写入不同的分区中,消费者在读取数据的时候,因为需要从不同的分区中读取数据,在同一分区内的数据是保持有序的,但是在不同分区之间的数据是无序的。
 在单分区中的有序也是有条件的。
 如果需要多分区中的数据依旧保持有序,那么需要在消费者端创建多个窗口,等待数据全部到达消费者端后,在消费者端进行统一排序,反而降低了效率。
数据乱序
- 在生产者端,默认每个broker最多缓存5个请求,就是最多可以同时等待五个请求收到回应
 - 如果12的请求发送成功了,3的消息发送请求被阻塞了,4的消息发送成功了。此时会导致broker分区中数据的乱序(应该1234,但实际上1243了)。为解决这个问题 
  
- 1.x版本之前保证单分区有序: 
    
max.in.flight.requests.per.connection=1表示缓存队列中最多只能缓存一个请求,那么分区内一定是有序的,因为失败的请求就会被阻塞,直到成功为止。此时甚至不需要开启幂等性
 - 1.x版本之后: 
    
- 如果没有开启幂等性:
max.in.flight.requests.per.connection=1 - 如果开启了幂等性:
max.in.flight.requests.per.connection需要设置小于等于5 - 原因:开启幂等性之后,kafka的服务端会缓存producer发来最近的5个请求的元数据。无论请求完成的时间顺序,服务端是根据这5个元数据内容对数据进行排序,必定能够保证这5个request的数据有序。
 - 元数据中包含有
sequence number,这个值是单调递增的,因此可以根据该值对数据进行排序。如果sequence number较大的数据先到达了,那么就会等待所有数据都到齐了之后,在服务器端对所有数据进行排序之后再落盘。 
 - 如果没有开启幂等性:
 
 - 1.x版本之前保证单分区有序: 
    
 
节点的服役
创建了一个新节点,修改完相关内容后,该节点并没有直接加入到kafka集群中
负载均衡操作
- 创建一个需要均衡的主题,通过创建一个json文件
topics_to_mv.json 
{
    "topics":[
        {"topic": "first"},
        {"topic": "second"}
    ],
    "version":1
}
 
- 生成一个负载均衡计划
bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --topics-to-move-json-file topics_to_move.json --broker-list "1,2,3,4" --generate--topics-to-move-json-file指定按照哪个文件,对哪个主题进行负载均衡操作--broker-list指定需要对那几台broker进行负载均衡--generate生成负载均衡计划
 - 创建副本存储计划
increase_replication_factor.json

 
就是把之前生成的负载均衡计划给拿过来
 4. 执行这个副本存储计划
 bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --reassignment-json-file increase_replication_factor.json --execute
--reassignment-json-file副本存储计划地址--execute执行
- 验证副本存储计划,用于判断这个计划是否已经执行完毕了
bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --reassignment-json-file increase_replication_factor.json --verify--verify将之前的execute改成verify,验证对应的计划
 
节点的退役
和服役一个新节点基本一致,只需要在生成负载均衡计划的时候,将对应需要退役的节点的brokerid进行去除即可。
 这样操作就能够将这个节点上原先的数据转移到其他节点上,让其他节点重新负载均衡其上面的数据.
手动调整分区副本存储
实际生产的环境中,可能存在一些机器性能高,而一些机器性能低的情况。kafka默认的分区情况是不考虑机器资源的负载均衡的,其只考虑机器数量上的负载均衡问题。因此需要根据实际手动调整分区数量。
现在假设希望将所有的数据存储在node1和node2上
- 创建副本存储计划
increase-replication-factor.json 
{
   "version":1,
   "partitions":[
      {
         "topic":"test",
         "partition":0,
         "replicas":[0,1]
      },
      {
         "topic":"test",
         "partition":1,
         "replicas":[0,1]
      },
      {
         "topic":"test",
         "partition":2,
         "replicas":[1,0]
      },
   ]
}
 
就是让原先在2上的数据重新负载均衡到0,1上。
 2. 执行改副本存储计划bin/kafka-reassign-partitions.sh --bootstrap-servers node1:9092 --reassignment-json-file increase-replication-factor.json --execute
 和之前的节点退役重负载均衡问题类似
leader partition的负载均衡
- 一般情况下,kafka会自动把leader partition均匀分布到各个机器上,确保每台机器的读写吞吐量均匀。
 - 如果某些broker宕机了,会导致leader partition过度集中在少数几台broker上,导致这几台机器的压力过高。
 - 宕机的broker重启后会自动成为follower,读写请求较低,造成集群的负载不均衡。
 - 就是一开始大家都会有一些不同分区的leader,但是有些broker宕机了,这些leader就会被发放到其他的broker上,有些broker此时可能就会有多个leader副本,导致压力的增大
 - 为了解决这个问题,kafka有一个参数
auto.leader.rebalance.enable,开启后能够自动进行leader partition的均衡。该参数默认开启。 - 在生产环境中,该值一般不开启。因为很多时候虽然计算了不平衡率是不平衡的,但是实际上其分配的比例还是满足我们需要的,而自平衡需要浪费资源,为避免资源浪费可以关闭该功能。
 leader.imbalance.per.broker.percentage,每个broker允许的不平衡leader比例。如果每个broker上不平衡的leader都超过了这个值,就会由controller来进行leader的再平衡。该值默认是10%。如果开启了自动平衡,推荐将不平衡比例调大,确保不会轻易的进行自平衡。leader.imbalance.check.interval.seconds检查leader负载是否均衡的间隔时间,默认300s
不平衡比例计算
-  
按照正常情况,replicas中第一个副本所在的分区应该和leader是保持一致的,如果不同,说明发生过节点的宕机,它上面的leader被分配到其他节点上了。
 -  
计算某个节点的不平衡比例

 -  
0号分区本来应该是partition2的leader,因为他在partition2的replicas中排第一个,但实际上不是,因此有一个不平衡数
 -  
对于0号节点,ar总副本数为4,有一个不平衡数,因此1/4=25%>10%,0号节点不平衡
 
增加副本因子
某些主题可能在生产过程中重要等级发生了提升,需要对该主题增加副本数。同样是在json中制定号对应的重负载计划,然后按照计划进行执行。
消费者生产经验
分区分配
到底哪个consumer来消费哪个partition中的数据
 kafka提供了4种主流的分区分配策略
- range
 - roundrobin
 - sticky
 - cooperativesticky
 
通过修改配置参数partition.assignment.strategy 来修改对应的分区分配策略,默认策略为range+cooperativesticky
 kakfa可以同时使用多个分区分配策略
range- 针对同一个topic中的分区
 - 对同一个topic中的分区按照序号进行排序,并对消费者按照字母顺序进行排序
 - 通过
partition / consumer来决定每个消费者需要消费几个分区。如果这个结果除不尽,那么前面的消费者需要多消费几个分区 - 如果topic的数量非常多,假设有
n个,每个都除不尽,那么第0个分区就要比其他分区多处理n个分区,会造成数据倾斜 
roundrobin- 针对所有的topic中的所有分区而言
 - 通过轮询的分区策略,把所有的consumer和partition都列出来,然后按照hashcode进行排列,通过轮询的方法来分配partition到各个消费者。
 
sticky- 针对所有的topic中的所有分区
 - 首先会尽量均衡的放置分区到cg上,如果同一cg中的其他消费者出现问题,会尽量保持原有分配的分区不发生变换
 - 执行新的一次分配之前,考虑上一次分配的结果,尽量减少调整分配,节省开销
 
数据积压
这个问题的关键就是在于如何提高consumer对应的吞吐量
- 如果是kafka的消费能力不足,那么可以考虑增加topic的分区数量,同时提高消费者组中的消费者数量,保证
消费者数=分区数 - 如果是下游的数据处理的不及时,可以调高每批次拉取的数量。批次拉取的数据量过少的时候,会使得处理的数据小于生产的数据,造成数据的积压,确保
拉取数据/处理速度>生产速度 
kafka调优
场景: 100w日活,每人每天100条日志,每天日志总数1亿条
- 1150条/秒钟
 - 每条日志 0.5k-2k,取1k
 - 1mb/s
 - 高峰期大约有20倍,23000/s
 - 20mb/s
 - 平均没什么太大意义,主要要看峰值的状态
 
硬件选择
- 服务器台数 = 
2 * (生产者峰值速率 * 副本数 /100) + 1= 2 * (20 m/s * 2 / 100) + 1 = 3 台 - 磁盘 
  
- kafka 按照顺序读写,在顺序读写的方面,固态和机械差不多,可以选机械硬盘
 - 1 亿 * 1k * 2(副本) * 3(天数) / 0.7 = 1 t
 - 三台服务器总的磁盘大小需要>1t
 
 - 内存 
  
- kafka内存 = 堆内存 (kafka内部配置) + 页缓存 (服务器内存)
 - 堆内存: 10-15g 就是修改
kafka-server-start.sh中的配置 - 页缓存 使用segment来存储(1g),一般放25%。 
页缓存=分区数 * 1g * 25% / 服务器台数 - 一台机器应该 10-15g + 1-2g
 
 - cpu 选择 
  
num.io.threads=8负责写磁盘的线程数,占总核数的50%num.replica.fetchers=1副本拉取线程数 占总核数1/6num.network.threads=3数据传输线程 占总核数1/3- 还会有很多线程独立在运行中
 - 一般建议32个cpu核
 
 - 网络选择
网络带宽 =峰值吞吐量一般选择千兆网卡 
自动创建主题
auto.create.topics.enable 如果没有主题的时候就向其中发送消息,系统会自动创建一个主题,该主题是非预期的,增加了主题管理和维护上的难度,因此该值一般置为false
kafka 总体调优
吞吐量
- 增加生产者吞吐量 
  
buffer.memorybatch.sizelinger.mscompression.type
 - 增加分区数量
 - 消费者端提高吞吐量 
  
fetch.max.bytesmax.poll.records
 - 增加下游消费者的处理能力
 
数据精准一次
- 生产者 
  
- ack=-1
 - 幂等性+事务
 
 - broker端 
  
- 分区副本数大于等于2
 - isr应答的最小副本数大于等于2
 
 - 消费者 
  
- 事务+手动offset
 - 消费者的目的地必须支持事务
 
 
合理设置分区数
- 创建只有一个分区的topic
 - 测试该topic的producer和consumer的吞吐量
 - 分区数 = 
总吞吐量 / min(producer吞吐量,consumer吞吐量) - 虽然可以计算,但是分区数也不是越多越好,一般设置在3-10个,可以先进行压测,再灵活调整分区个数
 
单条日志大于1m
kafka单条日志的默认上限是1m,如果消息大小超过1m了,那么会导致kafka可能卡住
message.max.bytes:默认1m,broker端接受每批次消息的最大值max.request.size默认1m,生产者向broker发送请求每个消息的最大值replica.fetch.max.bytes默认1m,副本同步时每个批次消息的最大值fetch.max.bytes消费者端一次拉取的最大数据量,但是这个一个软上限 默认50m
服务器宕机
- 尝试重启
 - 重启不行,增加内存、cpu、磁盘、网络等方法
 - 如果将整个节点误删除,如果有多的副本,可以重新服役一个节点后再均衡
 
集群压测
kafka自带了压测的脚本
- 生产者 
kafka-producer-perf-test.sh - 消费者 
kafka-consumer-perf-test.sh 
生产者压测
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 100000 --throughput 10000 --producer-props bootstrap.servers=node1:9092 batch.size=16384 linger.ms=0
| 参数 | 描述 | 
|---|---|
| topic | 主题 | 
| record-size | 单条信息的大小,单位为字节 | 
| num-records | 总共发送的信息条数 | 
| throughput | 每秒多少条消息,设为-1表示不设限,尽可能快的生产数据,可测出生产者最大吞吐量 | 
| producer-props | 配置生产者的一些参数 | 
消费者压测
kafka-consumer-perf-test.sh --bootstrap-servers:node1:9092 -topic test --message 1000000 --consumer.config config/consumer.properties
| 参数 | 描述 | 
|---|---|
| topic | 主题 | 
| message | 发送的消息条数 | 
| consumer.config | 对应的消费者配置,与生产者压测不同的是,这个需要写在一个properties文件中,通过读取文件的方式加载配置 | 
            
                                            
                                            
发表评论