当前位置: 代码网 > it编程>编程语言>Java > Spring boot 项目中如何进行kafka Stream app 开发

Spring boot 项目中如何进行kafka Stream app 开发

2025年09月26日 Java 我要评论
kafka streamkafka stream是apache kafka从0.10版本引入的一个新feature。它是提供了对存储于kafka内的数据进行流式处理和分析的功能。kafka strea

kafka stream

kafka stream是apache kafka从0.10版本引入的一个新feature。它是提供了对存储于kafka内的数据进行流式处理和分析的功能。

kafka stream的特点

  • kafka stream提供了一个非常简单而轻量的library,它可以非常方便地嵌入任意java应用中,也可以任意方式打包和部署
  • 除了kafka外,无任何外部依赖
  • 充分利用kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语processor(类似于storm的spout和bolt),以及高层抽象的dsl(类似于spark的map/group/reduce)

下面介绍spring boot 项目中进行kafka stream app 开发的详细过程。

1. 导入依赖

      <dependency>
        <groupid>org.apache.kafka</groupid>
        <artifactid>kafka-streams</artifactid>
        <version>3.6.2</version>
      </dependency>

2. 示例代码(伪代码)

这段伪代码只是为了举例设置的场景,业务场景并不一定合适

@slf4j
@component
public class mykafkastreamprocessor {
	@value("${spring.kafka.bootstrap-servers}")
	private string bootstrapservers;
	@postconstruct
	private void init () {
		string appid = "my-kafka-streams-app";
		mykafkastreams(appid);
		log.info("✅ kafka streams:{} 初始化完成,开始监听 topic: {}", appid, "source-topic");
	}
	public void mykafkastreams(string appid) {
		/*=======配置=======*/
		properties config  = new properties();
		config.put(streamsconfig.application_id_config, appid);
		config.put(streamsconfig.bootstrap_servers_config, bootstrapservers);
		streamsbuilder builder = new streamsbuilder();
		/*=======构建拓扑结构=======*/
		// 数据清洗
		kstream<string, order> stream = builder
			.stream("source-topic", consumed.with(serdes.string(), new jsonserde<>(order.class)))
			.mapvalues(value -> {
				// do something
				return value;
			});
		// 过滤出从app创建的订单 并进行处理
		stream.filter((k, v) -> order.getsource.equals("app"))
			.foreach((k, v) -> {
				// do something
			});
		// 发送到第一个topic
		stream.mapvalues(value -> json.tojsonstring(value), named.as("to-the-first-target-topic-processor"))
			.to("the-first-target-topic", produced.with(serdes.string(), serdes.string()));
		// 发送到第二个topic
		stream.filter((k, v) -> {
				// filter something
			})
			.mapvalues(value -> {
				// map to another object
			}, named.as("to-the-second-target-topic-processor"))
			.to("the-second-target-topic", produced.with(serdes.string(), serdes.string()));
		/*=======创建kafkastreams=======*/
		kafkastreams streams = new kafkastreams(builder.build(), config);
		/*=======设置异常处理器=======*/
		streams.setuncaughtexceptionhandler(new customstreamsuncaughtexceptionhandler());
		/*=======启动streams=======*/
		streams.start();
		/*=======添加jvm hook 确保streams安全退出=======*/
		runtime.getruntime().addshutdownhook(new thread(() -> {
			log.info("关闭 kafka streams:{}...", appid);
			streams.close();
			log.info("kafka streams:{}已经关闭!", appid);
		}));
	}
}
@slf4j
public class customstreamsuncaughtexceptionhandler implements streamsuncaughtexceptionhandler {
	/**
	 * inspect the exception received in a stream thread and respond with an action.
	 *
	 */
	@override
	public streamthreadexceptionresponse handle(throwable throwable) {
		log.error("kafka streams 线程发生未捕获异常: {}", exceptionutil.stacktracetostring(throwable));
		// 选择处理策略(以下三选一):
		// 1. 替换线程(继续运行)
		return streamthreadexceptionresponse.replace_thread;
		// 2. 关闭整个 streams 应用
		// return streamthreadexceptionresponse.shutdown_client;
		// 3. 关闭整个 jvm
		// return streamthreadexceptionresponse.shutdown_application;
	}
}

3. 一些注意事项和说明

  • kafka stream 的处理部分集中在构建的拓扑中,其他部分大同小异
  • 在配置部分 streamsconfig.application_id_config 这个参数是必须的,且不能重复,否则会启动失败,streamsconfig.bootstrap_servers_config 是kafka的ip与端口
  • 需要注意的是kafka的 kstream 与 java 中的 stream并不相同,在java中 stream只能被消费一次,但是kstream 可以被消费多次,在上面的demo中可以看到,同一个 kstream 被多次消费,且kstream中的数据是不可变的,也就是无论在上一个处理器(processor)对数据进行了何种处理,下一个处理器从kstream 中获取的数据依旧是原来的数据
  • 在kafka stream app中应该对可能会抛出的异常进行处理,而不是全部交给uncaughtexceptionhandleruncaughtexceptionhandler应该只处理哪些无法预料的异常
  • 如果kafka stream app 捕获未处理异常之后的处理策略也是替换线程,那么kafka stream app 中如果抛出未捕获异常,那么这个消费者组就会进入再平衡状态(preparingrebalance),老的消费者从消费者组中剔除,新的消费者加入消费者组,然后再开始消费,注意这种替换线程的处理策略可能导致消息重放,也就是原本的线程消费的offset没有提交导致新的线程会重复消费之前已经被消费的数据,如果业务会因为消息重放出现异常,建议做幂等

4. kafka stream 的一些方法说明

  • stream()
    • stream()方法是从源topic获取数据的方法,示例中第一个参数是字符串,也就是源topic的名称,
    • 第二个参数是 consumed ,用来定义对于消息的key与value反序列化的规则,
    • 示例中将key序列化为string, value 序列化为order对象,需要注意的是,如果在配置的config中没有设置适用于整个stream app的序列化与反序列化规则,那么后续的 to()中必须要指定序列化规则
  • filter()
    • filter()的用法与java stream 中的filter()一致,这里不做说明
  • mapvalues()
    • mapvalues()的用法,是只对消息的value进行操作,比如将value转换为其他对象。这个方法不会对key进行修改
  • map()
    • 与mapvalues()类似,但是可以修改消息的key
  • foreach()
    • 与java stream 的 foreach()类似,也是一个终结方法
  • to()
    • 终结方法,用于将数据发送到另外的topic,示例中第一个参数是目标topic, 第二个参数是produced,用于定义key和value的序列化规则
    • 除了stream()和to()之外,其他方法基本都可以传一个参数 named,这个参数是为每个处理器节点命名,如果不传则自动生成,但是在同一个kstream中 命名不能重复。这个名称不会影响功能,但是如果有一个名称可以在后续调试和监控中提供一点帮助

到此这篇关于spring boot 项目中如何进行kafka stream app 开发的文章就介绍到这了,更多相关spring boot kafka stream app 开发内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com