先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里p7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加v获取:vip204888 (备注大数据)
正文
名称 | 解释 |
broker | 消息中间件处理节点,一个kafka节点就是一个broker,一个或者多个broker可以组成一个kafka集群 |
topic | kafka根据topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic |
producer | 消息生产者,向broker发送消息的客户端 |
consumer | 消息消费者,从broker读取消息的客户端 |
consumergroup | 每个consumer属于一个特定的consumer group,一条消息可以被多个不同的consumer group消费,但是一个consumer group中只能有一个consumer能够消费该消息 |
partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
offset | partition中每条消息的唯一编号 |
①、producer(生产者)
消息生产者,向broker发送消息,也称为发布者
②、comsumer(消费者)
读取消息的客户端
③、consumer group(消费者组)
一个consumer group由多个consumer组成,消费者组可以消费某个分区中的所有消息,消费的消息不会立马被删除。也称为订阅者
④、topic(主题)
逻辑上的区分,通过topic将消息进行分类,不同topic会被订阅该topic的消费者消费
特点:topic的一个分区只能被consumer group的一个consumer消费;同一条消息可以被多个消费者组消费,但同一个分区只能被某个消费者组中的一个消费者消费。
问题:topic消息非常多,消息会被保存在log日志文件中,文件过大
解决:分区
⑥、partition(分区)
将一个topic中的消息分区来存储,有序序列,真正存放消息的消息队列
⑦、offset(偏移量)
分区中的每条消息都有唯一的编号,用来唯一标识这一条消息(message)
⑧、leader、follower(副本)
每个分区都可以设置自己对应的副本(replication-factor参数),有一个主副本(leader)、多个从副本(follower)
每个副本的职责是什么?
- leader:处理读写请求,负责当前分区的数据读写
- follower:同步数据,保持数据一致性
为什么要设置多副本?
单一职责。leader负责和生产消费者交互,follower负责副本拷贝,副本是为了保证消息存储安全性,当其中一个leader挂掉,则会从follower中选举出新的leader,提高了容灾能力,但是副本也会占用存储空间
⑨、isr(副本集)
动态集合,保存正在同步的副本集,是与leader同步的副本。如果某个副本不能正常同步数据或落后的数据比较多,会从副本集中把节点中剔除,当追赶上来了在重新加入。kafka默认的follower副本能够落后leader副本的最长时间间隔是10s
kafka工作流程?
生产者生产好消息之后调用send()方法发送到broker端,broker将收到的消息存储的对应topic中的patition中,而broker中的消息实际上是存储在了commit-log文件中,消费者监听定时循环拉取消息
一、生产者发送消息流程
参考代码:
package com.example;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import java.util.properties;
import java.util.concurrent.executionexception;
import java.util.concurrent.future;
public class myproductor {
public static void main(string[] args) throws executionexception, interruptedexception {
//kafka的配置
properties properties = new properties();
//kafka服务器地址和端口
properties.put("bootstrap.servers", "localhost:9092");
//producer的压缩算法使用的是gzip
//为什么要压缩?
properties.put("compression.type","gzip");
//指定发送消息的key和value的序列化类型
properties.put("key.serializer", "org.apache.kafka.common,serialization.stringserializer");
properties.put("value.serializer", "org.apache.kafka.common,serialization.stringserializer");
//补充:为什么要序列化/反序列化?
//实例化一个生产者对象,指定发送的主题、key、value、分区号等
kafkaproducer<object, object> producer = new kafkaproducer<>(properties);
//发送100条消息
for (int i = 0; i < 100; i++) {
//调用send方法,向kafka发送数据,并返回一个future对象,通过该对象来获取结果
future<recordmetadata> result = producer.send(new producerrecord<>("my-topic", integer.tostring(i),
integer.tostring(i)));
recordmetadata recordmetadata = result.get();
}
//关闭生产者对象
producer.close();
}
}
第一步、生产者配置参数
指定生产消息要达到的kafka服务器地址,压缩方式、序列化方式
①、为什么要进行压缩?
producer生产的每个消息都经过gzip压缩,在传输的过程中能够节省网络传输带宽和broker磁盘占用
②、为什么要进行序列化/反序列化?
数据在网络传输过程中都是以字节流的形式传输的,在生产者发送消息的时候需要将消息先进行序列化
第二步、拦截器
生产者在发送消息前会对请求的消息进行拦截,起到过滤和处理的作用。
我们可以自定义拦截器,拦截器中定义自己需要的逻辑,满足个性化配置。比方说对消息进行加密解密、消息格式转换、消息路由等等
第三步、序列化器
数据在网络传输过程中都是以字节流的形式传输的,在生产者发送消息的时候需要将消息先进行序列化
第四步、分区器
- 如果producerrecord对象提供了分区号,使用提供的分区号
- 如果没有提供分区号,提供了key,则使用key序列化后的值的hash值对分区数量取模
- 如果没有提供分区号、key,采用轮询方式分配分区号(默认)
第五步、send()发送消息
通过上面的操作生产者已经知道该往哪个主题、哪个分区发送这条消息了。
第六步、获取发送消息响应
①、如果消息发送成功:broker收到消息之后会返回一个future类型recordmetadata对象,可以通过该对象来获取发送的结果,对象中记录了此条消息发送到的topic、partition、offset。
②、消息发送失败:错误消息。在收到错误消息之后会有尝试机制,尝试重新发送消息
但直接使用send(msg)会出现问题,调用之后会立即返回,如果因为网络等外界因素影响导致消息没有发送到broker,出现生产者程序丢失数据问题,只能通过处理返回的future对象处理才能感知到。
对应的解决方案是我们可以使用send(msg,callbakc)的方式发哦是那个消息并设置回调函数
在发送消息后,会立即调用回调函数来处理发送结果,回调函数中定义了处理逻辑
二、broker收发消息流程
1. 分区机制(主题-分区-消息)
前文中提到生产者发送到broker的消息都是基于topic进行分类的(逻辑上),而topic中的消息是以partition为单位存储的(物理上),每条消息都有自己的offset
①、 分区中的数据存储在哪儿?
每个partition都有一个commit log文件
②、 为什么要分区(好处)存储?
如果commitlog文件很大的话可能导致一台服务器无法承担所有的数据量,机器无法存储,分区之后可以把不同的分区放在不同的机器上,相当于是分布式存储
- 每个消费者并行消费
- 提高可用性,增加若干副本
2. 消息存储
每一个partition都对应了一个commit log文件,日志文件中存储了消息等信息,新到达的消息以追加的方式写入分区的末尾,然后以先入先出的顺序读取。
①、 分区中的消息会一直存储吗?
如果不停的一致向日志文件中写入消息,日志文件大小也是有上限的,所以kafka会定期的清理磁盘,有两种方式:
- 时间:kafka默认保留最近一周的消息(根据配置文中的日志保留时间设置的:log.retention.hours)
- 大小:kakfa在配置文件中配置单个消息的大小为1mb,如果生产者发送的消息超过1mb,不会接收消息
②、follower副本数据什么时候同步更新的?
- 数据传输阶段:leader副本将消息发送给follower副本。这个过程中,leader副本会将消息按照一定的批次大小发送给follower副本,follower副本会接收并写入本地日志。一旦follower副本成功写入消息到本地日志,就会向leader副本发送确认消息。
- 确认阶段:leader副本在收到来自所有follower副本的确认消息后,就会认为消息已经成功复制到所有的副本中。然后向生产者发送成功响应,表示消息已被成功接收和复制。
注意的是,follower副本的数据同步是异步进行的,即follower副本不需要等待数据同步完成才返回成功响应。这样可以提高消息的处理速度和吞吐量。但也意味着,在数据同步过程中,follower副本可能会滞后于leader副本一段时间,这个时间间隔称为追赶(lag)。kafka提供了配置参数来控制同步和追赶的速度,以平衡数据的一致性和性能的需求。
三、消费者消费消息流程
- 配置消费者客户端参数
- 创建消费者实例并指定订阅的主题
- 拉取消息并消费
- 提交消费offset
参考代码:
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
vert/af5ad230764909a79135b316b63d936e.png)
- 配置消费者客户端参数
- 创建消费者实例并指定订阅的主题
- 拉取消息并消费
- 提交消费offset
参考代码:
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)
[外链图片转存中…(img-xunwayb2-1713141767104)]
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
发表评论