基础
java kafka消费者主要通过以下核心类实现:
- kafkaconsumer:消费者的核心类,用于创建消费者对象进行数据消费1
- consumerconfig:获取各种配置参数,如果不配置就使用默认值1
- consumerrecord:每条数据都要封装成一个consumerrecord对象才可以进行消费1
偏移量(offset)的含义
- offset 是 kafka 分区内部的消息序号,唯一标识一条消息在分区内的位置。
- 对于 consumer,offset 代表“下一个要消费的消息”。
- 提交 offset 是容错的关键,当 consumer 崩溃/重启/再均衡后,能从正确位置恢复消费。
- 如果 offset 提交得过早,可能会丢消息;如果太晚,可能会重复消费。
kafka提供两种主要的消费方式:
(1)手动提交offset方式
properties props = new properties(); props.put(consumerconfig.bootstrap_servers_config, "kafka-1:9092"); props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class.getname()); props.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class.getname()); props.put(consumerconfig.enable_auto_commit_config, "false"); // 关闭自动提交 props.put(consumerconfig.group_id_config, "csdn"); kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props); consumer.subscribe(arrays.aslist("testkafka")); while (true) { consumerrecords<string, string> records = consumer.poll(100); for (consumerrecord<string, string> record : records) { system.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value()); } consumer.commitasync(); // 异步提交 // 或者使用 consumer.commitsync(); // 同步提交 }
(2)自动提交offset方式
properties props = new properties(); props.put(consumerconfig.enable_auto_commit_config, "true"); // 开启自动提交 props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔 while (true) { consumerrecords<string, string> records = consumer.poll(100); for (consumerrecord<string, string> record : records) { system.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value()); } // 不需要手动提交offset }
手动提交offset有两种具体实现:
- commitsync():同步提交,会失败重试,一直到提交成功1
- commitasync():异步提交,没有失败重试机制,但延迟较低1
消费者组(consumer group)
// 通过group.id配置指定消费者组 props.put(consumerconfig.group_id_config, "experiment");
消费者组的特性:
- 同一个组中的consumer订阅同样的topic,每个consumer接收topic一些分区中的消息4
- 同一个分区不能被一个组中的多个consumer消费4
broker连接
// 指定kafka集群的broker地址 props.put(consumerconfig.bootstrap_servers_config, "localhost:9092");
偏移量(offset)管理
// 记录分区的offset信息 map<topicpartition, offsetandmetadata> currentoffsets = new hashmap<>(); // 在处理消息时更新offset currentoffsets.put( new topicpartition(record.topic(), record.partition()), new offsetandmetadata(record.offset() + 1, "no metadata") ); // 提交特定的偏移量 consumer.commitasync(currentoffsets, null);
消费者通过pull(拉)模式从broker中读取数据:
- 轮询机制:通过
consumer.poll(timeout)
方法轮询获取消息 - 批量处理:poll方法返回的是一批数据,不是单条
- 心跳维护:消费者通过向groupcoordinator发送心跳来维持和群组以及分区的关系
在生产环境中,一般使用手动提交offset方式,因为:
- 手动提交offset取到的数据是可控的
- 可以通过控制提交offset和消费数据的顺序来保证数据的可靠性
- 虽然commitasync没有失败重试机制,但实际工作中用它比较多,因为延迟较低
kafkaconsumer类分析
kafkaconsumer
是 kafka 的客户端核心类之一,用于消费 kafka 集群中的消息。它支持高可靠的消息消费、自动容错、分区分配与再均衡、消费组(consumer group)等机制。
主要成员
- delegate:
kafkaconsumer
实际的大部分操作都委托给内部的consumerdelegate
对象。 - creator:用于创建
consumerdelegate
的工厂。
主要构造方法
- 支持多种方式初始化 consumer(通过 properties、map,及自定义反序列化器)。
- 构造过程会初始化配置、反序列化器、底层网络客户端等。
主要方法
- subscribe/assign/unsubscribe:主题和分区的订阅与管理。
- poll:拉取消息的核心方法。
- commitsync/commitasync:手动或自动提交消费位移(offset)。
- seek/seektobeginning/seektoend:手动控制消费位移。
- position/committed/beginningoffsets/endoffsets:查询当前偏移量、提交的偏移量、起始/末尾偏移量等。
- close/wakeup:关闭消费者、唤醒阻塞的 poll 等。
关键代码与核心算法
2.1 订阅与分区分配
- subscribe(collection<string> topics, consumerrebalancelistener listener)
- 通过订阅主题,kafkaconsumer 会自动和 broker 进行“组协调”,由服务器端分配分区。
- 可指定回调监听分区分配及撤销(consumerrebalancelistener)。
- assign(collection<topicpartition> partitions)
- 手动指定消费哪些分区,此时不参与组协调(不属于消费组)。
2.2 拉取消息
- poll(duration timeout)
- 这是消息获取的核心方法。内部流程大致是:
- 检查当前分区分配、偏移量状态,自动心跳维护。
- 向 broker 发送 fetch 请求,获取分配分区的消息数据。
- 更新本地偏移量、缓存消息,返回给用户。
- 处理组再均衡及回调。
poll
同时承担了心跳(维持消费组成员关系)、数据拉取、再均衡等多项职责。
- 这是消息获取的核心方法。内部流程大致是:
2.3 偏移量管理
- commitsync/commitasync
- 将消费到的 offset 提交到 kafka 的 __consumer_offsets 主题。
- commitsync 为同步,commitasync 为异步,有回调。
- offsetsfortimes、seek、position、committed
- 提供了丰富的偏移量控制与查询能力。比如按时间查 offset、手动指定 offset、获取当前 offset、已提交 offset 等。
2.4 消费组与再均衡
- kafkaconsumer 通过 group.id 配置参与消费组。
- 在订阅/拉取数据时自动和 broker 协作,进行分区分配(groupcoordinator/partitionassignor)。
- 当消费组成员变动(增减、订阅主题变更、分区数变更等)时,自动触发 group rebalance。
核心数据结构
3.1 subscriptionstate
- 跟踪当前订阅主题、分区、偏移量等。
3.2 consumerconfig
- 消费者的配置参数。
3.3 consumerrecords、consumerrecord
- 消费到的数据结构(批量和单条消息)。
3.4 topicpartition
- 主题 + 分区的封装对象。
3.5 offsetandmetadata
- 偏移量及其元数据,用于提交/查询 offset。
与 broker 的交互流程
- 启动/订阅
- consumer 通过 group.id 加入消费组,向 broker 发送 joingroup 请求。
- broker 分配分区,返回 assignment。
- consumer 通过 fetch 请求拉取分配到的分区数据。
- 心跳与再均衡
- consumer 定期发送心跳(heartbeat)给 broker,维持消费组成员关系。
- 发生成员变更时,broker 发起再均衡,consumer 收到分区分配变化的通知。
- 拉取消息
- consumer 向分区主副本 broker 发送 fetch 请求,拉取新消息。
- 提交偏移量
- consumer 通过 offsetcommit 请求,将消费进度(offset)提交到 __consumer_offsets 主题。
- 下次重启或再均衡后,会以 committed offset 作为消费起点。
消费组(consumer group)机制与关联
- 消费组:同一个 group.id 的多个消费者共同组成消费组。
- 一个分区只能被一个消费组内的 consumer 消费。
- 消费组间互不影响,可以实现广播(多个 group 消费同一 topic)。
- 分区分配算法:kafka 内置多种分配策略(如 rangeassignor、roundrobinassignor),也支持自定义。
- 组协调器(groupcoordinator):消费组内所有成员和 broker 的协调节点,负责分配分区和管理组成员状态。
- 组再均衡:消费组成员变动(上线、下线、订阅变更)时,broker 会触发 rebalance,重新分配分区。
总结
kafkaconsumer
封装了和 kafka broker 的全部交互,包括组管理、分区分配、消息拉取、偏移量提交等。- 关键流程:订阅分区 → 拉取消息 → 提交偏移量 → 处理再均衡。
- 数据结构如 topicpartition、offsetandmetadata 贯穿分区与 offset 的管理。
- 消费组机制保证了分布式消费的高可用、横向扩展性和容错。
网络连接分析
1. 消费组与 broker 的连接与交互
1.1 消费组与 broker 的网络连接
- kafkaconsumer 实际的网络连接是通过底层的
kafkaclient
(如networkclient
)实现的。 - 每一个 kafkaconsumer 对象会维护一组 tcp 连接,这些连接包括:
- 与每个被消费分区的 leader broker 的连接:用于拉取数据(fetch)。
- 与 groupcoordinator(消费组协调者)的连接:用于管理 group membership、分区再均衡、offset 提交等。
kafkaconsumer
→consumerdelegate
→kafkaconsumerdelegate
→kafkaclient
(通常是networkclient
)- 构造器中初始化 client
连接建立流程
- 启动时,通过
bootstrap.servers
配置连接部分 broker,自动发现集群。 - 加入消费组时,向 groupcoordinator 发送 joingroup、syncgroup、heartbeat 等请求,维持“组”状态。
2. 拉取数据的批量处理机制
拉取数据不是“一条一条”
- kafkaconsumer 一次 poll 拉取的是一批 record,而非单个。
- 批量拉取由参数控制:
fetch.min.bytes
:每次最少拉取的字节数fetch.max.bytes
:单次最大拉取的字节数max.poll.records
:每次 poll 返回的最大消息数
这样做的原因:
- 批量拉取能极大提升吞吐量,减少网络和序列化开销。
- 只有在消息极少/网络慢时,才可能一次只拉一个 record,实际场景几乎不会。
关键源码位置
- 拉取主逻辑在
kafkaconsumer.poll()
,内部委托到consumernetworkclient
→networkclient
。 - 数据结构:
consumerrecords<k, v>
(批量记录集合) - 相关参数解析见 kafkaconsumerconfig
数据拉取的底层流程
- poll() 方法被调用
- 根据分区分配情况,构造 fetchrequest
- 通过
networkclient
向每个 leader broker 发送 fetchrequest - broker 返回批量数据
- 反序列化为
consumerrecords<k, v>
,返回给用户
关键代码入口
kafkaconsumer.poll()
consumerdelegate.poll()
fetcher.fetchrecords()
- 底层 socket 通信:
networkclient.send()
3. 多线程实现与线程安全
kafkaconsumer 并不是多线程的
- 它本身不是线程安全的,所有方法必须在同一个线程中调用,除了
wakeup()
。 - 官方建议:一个线程一个 consumer 实例,或者用单线程 poll,其他线程异步处理数据。
源码注释说明
- kafkaconsumer.java 注释:“the consumer is not thread-safe...”
多线程架构推荐
- 如果需要多线程消费,加一层 queue,把消息分发到多个工作线程,由 poll 线程专门负责与 broker 通信。
4. 复杂和有意思的实现分析
4.1 消费组协调与再均衡
- 消费组成员通过心跳(heartbeat)、joingroup、syncgroup 维护 membership。
- 分区分配算法在
consumerpartitionassignor
中实现(range, roundrobin, sticky 等)。 - 再均衡期间,consumer 会暂停拉取、等待新分配。
代码位置
org.apache.kafka.clients.consumer.internals.consumercoordinator
org.apache.kafka.clients.consumer.internals.consumernetworkclient
org.apache.kafka.clients.consumer.internals.fetcher
4.2 批量拉取的背后——高效网络 io
- kafka 的 fetch api 支持“多分区合并拉取”,同一台 broker 上的多个分区会合并在一次网络请求里。
networkclient
负责 socket 通信,异步 io,支持高并发。selector
(nio)负责事件分发,提高并发和效率。
代码位置
org.apache.kafka.clients.networkclient
org.apache.kafka.common.network.selector
4.3 offset 管理的强一致性
- offset 提交实际上是把偏移量写入特殊的 topic(__consumer_offsets),由 groupcoordinator 管理。
- 消费组内 offset 的一致性和再均衡机制确保了“至少一次”语义。
5. 直接源码定位
功能 | 关键类或方法 |
---|---|
网络连接的建立 | kafkaconsumer → consumerdelegate → kafkaclient (networkclient) |
消费组协调 | consumercoordinator、groupcoordinator、joingroup/syncgroup |
消息批量拉取 | kafkaconsumer.poll()、fetcher.fetchrecords() |
多线程相关说明 | kafkaconsumer 注释、wakeup() |
offset 管理 | commitsync/commitasync、offsetcommitrequest、__consumer_offsets |
负载均衡与再均衡 | consumerpartitionassignor、consumercoordinator |
小结
- 每个 consumer 进程会与需要的 broker 建立 tcp 连接(1个消费组协调,n个分区 leader)。
- 每次 poll 拉取的是“批量数据”,不是一条!由参数决定批量大小。
- kafkaconsumer 本身不是多线程的,多线程要用 queue 解耦。
- 复杂逻辑:消费组协调、分区分配、批量拉取、offset 强一致性,源码分布见上表。
协作
详细说明各个组件及其关系:
classickafkaconsumer<k, v>
(客户端)- 角色: 这是用户应用程序直接与之交互的 kafka 消费者高级 api 的一个实现(遵循经典组协议)。它封装了消费消息的整个生命周期,包括连接到 kafka 集群、加入消费组、分配分区、拉取消息、提交位移等。
- 主要职责:
- 管理消费者的配置。
- 协调内部组件如
fetcher
和consumercoordinator
的工作。 - 向用户应用程序提供
poll()
方法来获取消息。 - 处理用户发起的位移提交请求。
- 管理消费者的生命周期(如
subscribe()
,assign()
,close()
)。 - 与
fetcher
的关系:classickafkaconsumer
拥有并管理一个fetcher
实例。当poll()
方法被调用时,如果需要拉取数据,classickafkaconsumer
会委托fetcher
来执行实际的数据拉取操作。 - 与
consumercoordinator
的关系:classickafkaconsumer
拥有并管理一个consumercoordinator
实例(前提是配置了group.id
)。consumercoordinator
负责所有与消费组协调相关的任务。 fetcher<k, v>
(客户端)- 角色: 负责从 kafka broker 拉取实际的消息数据。
- 主要职责:
- 根据当前分配给消费者的分区,构建
fetchrequest
。 - 执行请求合并: 将发往同一个 broker 的多个分区的拉取请求合并成一个网络请求。
- 通过
consumernetworkclient
(网络客户端的底层实现) 将fetchrequest
发送给相应的 broker。 - 接收
fetchresponse
,解析消息数据。 - 对消息进行反序列化。
- 管理拉取会话 (fetch session) 以优化拉取效率 (kip-227)。
- 将拉取到的数据缓冲起来,供
classickafkaconsumer
的poll()
方法消费。 - 与
classickafkaconsumer
的关系:fetcher
是classickafkaconsumer
的一个内部组件,由classickafkaconsumer
创建和控制。 - 与 broker 的关系:
fetcher
直接与存储 topic/partition 数据的 broker 进行网络通信,发送拉取请求并接收数据。 consumercoordinator
(客户端)- 角色: 负责代表消费者与 kafka 集群中的
groupcoordinator
(运行在某个 broker 上) 进行交互,以管理消费者在消费组中的成员资格和分区分配。 - 主要职责:
- 发现 groupcoordinator: 找到负责当前消费组的
groupcoordinator
broker。 - 加入消费组 (joingroup): 发送
joingrouprequest
给groupcoordinator
,表明消费者希望加入该组。 - 同步消费组 (syncgroup): 在
joingroup
成功后(通常由 leader 消费者执行),发送syncgrouprequest
以获取分配给该消费者的分区列表。非 leader 消费者也发送syncgrouprequest
来获取分配结果。 - 心跳 (heartbeat): 定期向
groupcoordinator
发送心跳,以表明消费者仍然存活,并维持其在消费组中的成员资格以及对所分配分区的占有。 - 位移提交 (offsetcommit): 将消费者处理过的消息的位移提交给
groupcoordinator
进行存储。 - 位移拉取 (offsetfetch): 从
groupcoordinator
获取已提交的位移。 - 处理 rebalance 相关的回调(通过
consumerrebalancelistener
)。 - 与
classickafkaconsumer
的关系:consumercoordinator
是classickafkaconsumer
的一个内部组件,由classickafkaconsumer
创建和控制(如果配置了group.id
)。 - 与
groupcoordinator
(broker 端) 的关系:consumercoordinator
通过网络与运行在某个 broker 上的groupcoordinator
服务进行通信。 groupcoordinator
(服务端,运行在 broker 上)- 角色: kafka broker 上的一个服务,负责管理一个或多个消费组。每个消费组都有一个对应的
groupcoordinator
。 - 主要职责:
- 处理来自
consumercoordinator
(客户端) 的请求,如joingroup
,syncgroup
,heartbeat
,offsetcommit
,offsetfetch
等。 - 维护消费组的元数据,包括组成员列表、每个成员的订阅信息、当前的分区分配方案、已提交的位移等。这些信息通常存储在内部的
__consumer_offsets
topic 中。 - 选举 leader consumer: 在消费组内选举一个 leader consumer,由 leader consumer 负责计算分区分配方案。
- 触发和协调 rebalance: 当消费组成员发生变化(加入/离开)、订阅的 topic 分区发生变化时,
groupcoordinator
会启动 rebalance 过程。 - 存储和管理消费位移。
- 与
consumercoordinator
(客户端) 的关系:groupcoordinator
是consumercoordinator
(客户端) 的服务端对应组件,接收并处理来自客户端的协调请求。 - 与 broker 的关系:
groupcoordinator
是 broker 进程内的一个模块/服务。 - broker (服务端)
- 角色: kafka 集群中的一个服务器节点。
- 主要职责:
- 存储数据: 存储 topic 的 partition 数据(消息日志)。
- 处理生产请求: 接收来自生产者的消息并写入相应的 partition。
- 处理拉取请求: 响应来自
fetcher
(消费者客户端) 的数据拉取请求,从磁盘读取消息并返回。 - 运行
groupcoordinator
服务: 部分 broker 会运行groupcoordinator
服务来管理消费组。 - 副本管理: 参与 partition 的副本同步和 leader 选举。
- 元数据管理: 维护集群元数据的一部分。
- 与
fetcher
的关系:fetcher
从 broker 拉取消息数据。 - 与
groupcoordinator
的关系:groupcoordinator
服务运行在 broker 内部。
fetcher
sendfetches 实现了基本发送请求
/** * set up a fetch request for any node that we have assigned partitions for which doesn't already have * an in-flight fetch or pending fetch data. * @return number of fetches sent */ public synchronized int sendfetches() { final map<node, fetchsessionhandler.fetchrequestdata> fetchrequests = preparefetchrequests(); sendfetchesinternal( fetchrequests, (fetchtarget, data, clientresponse) -> { synchronized (fetcher.this) { handlefetchsuccess(fetchtarget, data, clientresponse); } }, (fetchtarget, data, error) -> { synchronized (fetcher.this) { handlefetchfailure(fetchtarget, data, error); } }); return fetchrequests.size(); }
总结一下合并的流程:
- 准备阶段 (
preparefetchrequests()
- 位于abstractfetch
类中):fetcher
(通过继承的preparefetchrequests
方法) 遍历所有当前消费者订阅的、并且已经分配给此消费者的、并且已经知道 leader broker 的分区。- 对于每个这样的分区,它会确定其 leader broker (即
node
)。 - 它将所有需要从同一个
node
拉取数据的分区信息收集起来,形成一个fetchsessionhandler.fetchrequestdata
对象。 - 最终,
preparefetchrequests()
返回一个map<node, fetchsessionhandler.fetchrequestdata>
。这个 map 的结构本身就体现了合并:每个node
(broker) 对应一个fetchrequestdata
,这个fetchrequestdata
聚合了发往该 broker 的所有分区的拉取需求。
- 发送阶段 (
sendfetchesinternal()
- 位于fetcher
类中):sendfetchesinternal
方法接收这个map
。- 它遍历 map 中的每一个
node
(broker)。 - 对于每一个
node
,它使用对应的fetchsessionhandler.fetchrequestdata
来创建一个fetchrequest.builder
实例。这个builder
会被配置为包含该fetchrequestdata
中指定的所有 topicpartition 的拉取信息。 - 然后,为每个
node
发送一个(且仅一个)fetchrequest
。
合并的本质是将发往同一个 broker 的多个分区的拉取操作打包到一个网络请求中。 这样做的好处是:
- 减少网络开销: 显著减少了客户端和 broker 之间的网络往返次数。
- 提高吞吐量: broker 可以一次性处理来自一个消费者的多个分区的请求,提高了处理效率。
- 降低延迟: 减少了等待多个独立请求响应的时间。
到此这篇关于java kafka消费者实现过程的文章就介绍到这了,更多相关java kafka消费者内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论