当前位置: 代码网 > it编程>编程语言>Java > SpringBoot3中Spring WebFlux SSE服务器发送事件的实现步骤

SpringBoot3中Spring WebFlux SSE服务器发送事件的实现步骤

2024年11月22日 Java 我要评论
chatgpt 刚出的时候,让大伙很好奇的是它是如何实现的逐字输出的?答案就是 sse (服务器发送事件)。随着实时数据和响应式编程的需求不断增加,服务器发送事件(server-sent events

chatgpt 刚出的时候,让大伙很好奇的是它是如何实现的逐字输出的?答案就是 sse (服务器发送事件)。随着实时数据和响应式编程的需求不断增加,服务器发送事件(server-sent events,简称 sse)在现代 web 应用程序中越来越受欢迎。sse 提供了一种轻量级的服务器推送数据给客户端的方式,适合用于监控、实时通知、股票价格更新等场景。

在 spring boot 3 中,结合响应式编程的理念,sse 的实现变得更加简洁和高效。本文将详细介绍如何使用 spring boot 3 来实现 sse 服务端推送,并讨论响应式编程在此过程中的重要性和优势。

1. 什么是 sse?

服务器发送事件(sse) 是一种从服务器向客户端推送数据的技术,属于 html5 的一部分。与传统的 http 请求-响应模型不同,sse 是单向的,服务器可以持续不断地向客户端发送数据,而客户端通过一次长连接持续接收这些更新。

相比 websocket,sse 有以下特点:

  • 单向通信:sse 仅允许服务器向客户端推送数据,客户端无法向服务器发送数据。
  • 基于 http 协议:sse 是建立在 http 协议之上的,浏览器原生支持,不需要额外的协议处理。
  • 自动重连:sse 支持自动重连,当连接意外断开时,客户端会自动尝试重新连接服务器。

2. spring boot 3 响应式编程与 sse

spring boot 3 提供了对响应式编程的全面支持,基于 project reactor 实现异步、非阻塞的流式数据处理。而响应式编程非常适合实现 sse,因为它允许我们以非阻塞的方式持续推送数据,而不会阻塞服务器的资源。

spring webflux 是 spring boot 3 中用于构建响应式应用的核心框架,它可以无缝集成 sse,为我们提供简单高效的服务器推送功能。

为什么选择响应式编程实现 sse?

传统的阻塞式编程在处理长连接(如 sse)时可能会占用大量服务器资源。响应式编程通过非阻塞 i/o 操作,不仅可以高效处理长时间的连接,还能在有新数据时立即推送给客户端。响应式流(如 flux)天然适合于这种流式数据推送场景。

3. 实现 sse 的基本步骤

我们将通过以下步骤实现一个简单的 sse 服务端推送应用:

  • 创建 spring boot 项目并引入 webflux 依赖。
  • 实现服务端推送 sse 事件流。
  • 编写客户端接收 sse 数据。
  • 测试与优化。

3.1 创建 spring boot 项目

首先,创建一个新的 spring boot 3 项目,并确保引入了 spring-boot-starter-webflux 依赖。可以通过 maven 或 gradle 配置:

maven 依赖

<dependencies>
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-webflux</artifactid>
    </dependency>
</dependencies>

3.2 实现服务端推送sse 事件流

在 spring webflux 中,sse 通过返回 flux<serversentevent<t>> 这种响应流来实现。下面我们实现一个简单的 sse 控制器,它会每隔一段时间向客户端推送当前的时间信息。

示例控制器

package com.coderjia.boot3webflux.controller;
import org.springframework.http.codec.serversentevent;
import org.springframework.stereotype.controller;
import org.springframework.web.bind.annotation.getmapping;
import reactor.core.publisher.flux;
import java.time.duration;
import java.time.localtime;
/**
 * @author coderjia
 * @create 2024/10/27 下午 07:03
 * @description
 **/
@controller
public class ssecontroller {
    @getmapping("/sse/stream")
    public flux<serversentevent<string>> streamsse() {
        return flux.interval(duration.ofseconds(1))
                .map(sequence -> serversentevent.<string>builder()
                        .id(string.valueof(sequence))
                        .event("periodic-event")
                        .data("current time: " + localtime.now())
                        .build());
    }
}

解释

flux.interval(duration.ofseconds(1)):创建一个每秒发出事件的响应式流。

serversentevent.builder():构建 serversentevent 对象,它可以包含 ideventdata 等信息,符合 sse 规范。

map():将流中的每个事件映射为 serversentevent,并附带当前的时间信息。

3.3 客户端接收 sse 数据

客户端可以使用 javascript 原生的 eventsource api 来接收服务器发送的 sse 数据流。

示例 html + javascript 客户端

resources/static/index.html

<!doctype html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>sse example</title>
</head>
<body>
<h1>server-sent events (sse) example</h1>
<div id="messages"></div>
<script src="https://unpkg.com/axios/dist/axios.min.js"></script>
<script>
    const http = axios.create({
        baseurl: 'http://localhost:8080/',
        timeout: 100000,
        responsetype: 'stream',
        ondownloadprogress: function (progressevent) {
            // 获取 messages 元素
            const messageselement = document.getelementbyid("messages");
            // 清除现有内容
            messageselement.innerhtml = "";
            // 添加新内容
            const newelement = document.createelement("div");
            newelement.innerhtml = progressevent.event.currenttarget.responsetext + "<br/>";
            messageselement.appendchild(newelement);
        },
    });
    http.get('/sse/stream')
        .then(function (response) {
            // 处理成功情况
            console.log(response);
        })
        .catch(function (error) {
            // 处理错误情况
            console.log(error);
        })
        .finally(function () {
            // 总是会执行
        });
</script>
</body>
</html>

解释

  • eventsource("/sse/stream"):eventsource 是浏览器提供的一个用于和服务器建立连接,接收服务器发送事件的接口。在客户端发起与服务器的 sse 长连接。服务器通过 /sse/stream 推送事件。
  • onmessage:处理服务器发送的消息,并将消息显示在页面上。
  • onerror:当连接发生错误时关闭连接,避免持续消耗资源。

4. 测试 sse

运行 spring boot 应用,并访问 /sse/stream,可以看到服务器每秒钟向客户端推送一次当前时间信息。

接口请求

header 里的 content-type 为 text/event-stream

content-type

可以通过浏览器打开 http://localhost:8080/,在页面中将会每秒钟显示一次服务器推送的数据流。这就验证了 sse 在 spring boot 3 中的实现。

浏览器展示

5. 优化与扩展

5.1 增加随机数据推送

为了模拟更真实的场景,可以增加一些随机数据或实时数据更新。假设我们希望推送随机的股票价格,我们可以这样修改:

@getmapping("/sse/stocks")
public flux<serversentevent<string>> streamstockprices() {
    return flux.interval(duration.ofseconds(1))
            .map(sequence -> serversentevent.<string>builder()
                    .id(string.valueof(sequence))
                    .event("stock-update")
                    .data("stock price: $" + threadlocalrandom.current().nextint(100, 200))
                    .build());
}

在这个例子中,每秒推送一次随机的股票价格更新。

5.2 增加心跳检测(ping)

sse 连接如果长时间没有数据传输,可能会被中断。为此,sse 规范推荐发送 “ping” 消息来保持连接活跃。可以通过 serversenteventcomment() 来发送心跳信息:

@getmapping("/sse/stream-with-ping")
public flux<serversentevent<string>> streamwithping() {
    return flux.interval(duration.ofseconds(1))
            .map(sequence -> {
                if (sequence % 5 == 0) {  // 每5秒发送一次心跳
                    return serversentevent.<string>builder()
                            .comment("ping")
                            .build();
                } else {
                    return serversentevent.<string>builder()
                            .data("current time: " + localtime.now())
                            .build();
                }
            });
}

5.3 使用 mediatype.text_event_stream 响应

虽然 serversentevent 是处理 sse 的标准类,但你也可以直接返回 flux<t>,spring 会自动将其转换为事件流。如果你想简化代码,可以这样写:

@getmapping(value = "/sse/simple", produces = mediatype.text_event_stream_value)
public flux<string> simplesse() {
    return flux.interval(duration.ofseconds(1))
            .map(sequence -> "current time: " + localtime.now());
}

这里直接返回 flux<string>,spring webflux 会自动推送数据。

6. sse 与 websocket 的对比

sse 和 websocket 都是实时通信的重要技术,但它们有不同的适用场景:

  • sse:单向通信,服务器推送数据到客户端,适合轻量级的通知、监控、消息更新等场景。使用简单,基于 http。
  • websocket:双向通信,适合复杂的交互场景,如实时聊天、在线游戏等。websocket 是基于 tcp 的全双工连接,相对更复杂。

对于简单的实时更新场景,如股票价格更新、推送通知等,sse 更加轻量且易于实现。

7. 总结

spring boot 3 提供了简单、强大的 sse 实现,结合响应式编程的特性,使得我们可以轻松构建高效的服务器推送应用。在实际项目中,sse 非常适合用于推送实时数据或监控信息,尤其在需要轻量且可靠的单向通信时。通过 spring webflux 和 project reactor,sse 的实现可以以非阻塞的方式运行,极大提升了应用的并发处理能力。

希望这篇博客对你理解 spring boot 3 中的 sse 服务端推送有所帮助,如果有任何问题或想法,欢迎讨论!

到此这篇关于springboot3中spring webflux sse服务器发送事件的实现步骤的文章就介绍到这了,更多相关springboot sse服务器发送事件内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com