工作场景:使用netty长连接实时获取第三方接口的车辆定位数据
开发环境:jdk8
netty基本介绍
一、什么是netty
netty是由jboss提供的一个java开源框架,现为github上的独立项目。它是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络io程序。netty主要针对在tcp协议下,面向clients端的高并发应用,或者peer-to-peer场景下的大量数据持续传输的应用。
netty提供了一套完整的api,用于处理网络io操作,如tcp和udp套接字。它封装了底层的网络编程细节,使得开发者可以更加专注于业务逻辑的实现。netty使用了一种高效的线程模型,可以处理大量的并发连接,并且具有很好的伸缩性。
netty在多个领域都有广泛的应用,如rpc框架、游戏行业、大数据领域等。它支持多种传输类型和协议,如阻塞和非阻塞、基于bio和nio的udp传输、本地传输(in-vm传输)、http通道等。同时,netty还提供了丰富的编解码器,用于处理各种协议的编解码操作。
netty的整体结构包括核心层和协议支持层。核心层提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用的通信api、支持零拷贝的bytebuf等。协议支持层则覆盖了主流协议的编解码实现,如http、ssl、protobuf等。
总的来说,netty是一个功能强大、易于使用的网络应用框架,它可以帮助开发者快速构建高性能、高可靠性的网络应用程序。
二、netty核心组件
netty的核心组件主要包括以下几个部分:
- channels:channel是netty网络通信的抽象,用于进行i/o操作。它可以被看作是java nio的一个基本抽象,代表了与硬件设备、文件、网络socket等实体的开放连接,或者是一个能够完成读、写等i/o操作的程序。channel可以被打开或关闭,连接或断开。
- callbacks(回调):callback是一个方法,它是提供给另一个方法的引用,使得另一个方法可以在适当的时候回过头来调用这个callback方法。callback在很多编程情形中被广泛使用,是用于通知相关方某个操作已经完成最常用的方法之一。
- futures:在netty中,futures用于异步i/o操作的结果。当一个异步操作开始时,会立即返回一个future,这个future会在操作完成时得到结果或者异常。
- handlers:handlers是netty中处理i/o事件或拦截i/o操作的组件。netty提供了许多内置的handler,如channelinboundhandler、channeloutboundhandler等,这些handler可以处理各种i/o事件,如连接建立、数据接收、异常处理等。
- bootstrap与serverbootstrap:bootstrap和serverbootstrap是netty程序的引导类,主要用于配置各种参数并启动整个netty服务。它们都继承自abstractbootstrap抽象类,不同的是,bootstrap用于客户端引导,而serverbootstrap用于服务端引导。
- eventloopgroup:eventloopgroup可以理解为一个线程池,用于处理i/o操作。在服务端程序中,一般会绑定两个eventloopgroup,一个用于处理accept事件(即新的连接请求),另一个用于处理读写事件。
以上这些组件共同构成了netty的核心框架,使得开发者可以更加专注于业务逻辑的实现,而无需过多关心底层的网络通信细节。
三、springboot与netty整合
1. 添加依赖
在springboot项目的pom.xml文件中,我们需要添加netty的依赖。netty的官方maven仓库地址为:https://mvnrepository.com/artifact/io.netty/netty-all
<dependencies>
<!-- mybatis-plus -->
<dependency>
<groupid>com.baomidou</groupid>
<artifactid>mybatis-plus-boot-starter</artifactid>
</dependency>
<!-- mysql -->
<dependency>
<groupid>mysql</groupid>
<artifactid>mysql-connector-java</artifactid>
</dependency>
<!-- 数据源 -->
<dependency>
<groupid>com.alibaba</groupid>
<artifactid>druid-spring-boot-starter</artifactid>
</dependency>
<!-- netty -->
<dependency>
<groupid>io.netty</groupid>
<artifactid>netty-all</artifactid>
</dependency>
<dependency>
<groupid>org.apache.commons</groupid>
<artifactid>commons-pool2</artifactid>
</dependency>
<dependency>
<groupid>com.baomidou</groupid>
<artifactid>mybatis-plus-boot-starter</artifactid>
</dependency>
<dependency>
<groupid>com.baomidou</groupid>
<artifactid>dynamic-datasource-spring-boot-starter</artifactid>
<version>${mybatis-plus.version}</version>
</dependency>
</dependencies>2.创建netty服务端
@component
public class nettyserver {
//负责处理接受进来的链接
private eventloopgroup bossgroup;
//负责处理已经被接收的连接上的i/o操作
private eventloopgroup workergroup;
//在这个场景中,它表示服务器的绑定操作的结果
private channelfuture future;
@postconstruct
public void startserver() throws exception {
bossgroup = new nioeventloopgroup();
workergroup = new nioeventloopgroup();
try {
//创建serverbootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数
serverbootstrap bootstrap = new serverbootstrap();
bootstrap.group(bossgroup, workergroup)
.channel(nioserversocketchannel.class)
.childhandler(new nettyserverinitializer());
// 绑定端口并开始接受进来的连接
future = bootstrap.bind(7000).sync();
// 等待服务器套接字关闭
future.channel().closefuture().sync();
} finally {
workergroup.shutdowngracefully();
bossgroup.shutdowngracefully();
}
}
@predestroy
public void stopserver() {
if (future != null && !future.isdone()) {
future.cancel(true);
}
workergroup.shutdowngracefully();
bossgroup.shutdowngracefully();
}
}
代码解析
类注解
@component: 这是spring框架的注解,表示这个类是一个组件,spring会扫描到这个类并将其作为bean注册到spring容器中。因此,这个类可以被其他spring管理的bean自动装配(如果需要的话)。
类成员变量
bossgroup和workergroup: 这两个是eventloopgroup的实例,用于处理网络事件。bossgroup主要负责接收进来的连接,而workergroup负责处理已经被接收的连接上的i/o操作。future: 这是一个channelfuture的实例,代表了一个异步的i/o操作的结果。在这个场景中,它表示服务器的绑定操作的结果。
startserver 方法
- 该方法使用
@postconstruct注解,这意味着当spring容器实例化这个bean并完成依赖注入后,会自动调用这个方法。 - 在这个方法中,首先创建了两个
nioeventloopgroup实例,一个用于boss,一个用于worker。 - 然后,使用
serverbootstrap类来配置和启动服务器。这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数。 - 通过
group方法设置boss和worker的eventloopgroup。 - 通过
channel方法指定使用nioserversocketchannel作为服务器的通道实现。 - 通过
childhandler方法设置一个新的连接被接受后如何处理。这里使用了nettyserverinitializer(这个类没有在提供的代码段中定义,但我们可以假设它是一个channelinitializer的实现,用于配置新的channel)。 - 使用
bind方法绑定服务器到指定的端口(这里是7000),并使用sync方法阻塞直到绑定完成。 - 最后,使用
closefuture().sync()方法阻塞当前线程,直到服务器套接字关闭。 - 在
finally块中,无论是否发生异常,都会优雅地关闭eventloopgroup。
stopserver 方法
- 该方法使用
@predestroy注解,意味着当spring容器销毁这个bean之前,会自动调用这个方法。 - 在这个方法中,首先检查
future是否已经完成(即服务器是否已经关闭)。如果没有,就调用cancel(true)方法来尝试取消这个操作。但是,需要注意的是,这里的cancel可能并不总是能立即停止服务器,它更多的是尝试停止服务器,而不是强制停止。
3.创建字符解析器,用于解析收到的消息
public class nettyserverinitializer extends channelinitializer<socketchannel> {
@override
protected void initchannel(socketchannel ch){
channelpipeline pipeline = ch.pipeline();
// 添加一个字符串解码器,用于将接收到的bytebuf转换成字符串
// 这里假设使用的是utf-8字符集
pipeline.addlast("decoder", new stringdecoder(charsetutil.utf_8));
// 添加一个字符串编码器,用于将发送的字符串转换成bytebuf
// 这样服务器发送字符串时,客户端可以直接接收到字符串
pipeline.addlast("encoder", new stringencoder(charsetutil.utf_8));
// 添加自定义的channelinboundhandleradapter来处理业务逻辑
pipeline.addlast("handler", new mychannelhandler());
}
}这段代码定义了一个nettyserverinitializer类,它继承自channelinitializer<socketchannel>,并覆盖了initchannel方法。在netty中,channelinitializer是一个特殊的处理器,它的主要目的是帮助用户配置一个新的channel的channelpipeline。当一个新的连接被接受时,netty会自动调用channelinitializer的initchannel方法来设置这个新连接的channelpipeline。
具体来说,initchannel方法会在以下情况下被调用:
- 当
serverbootstrap的bind方法被调用并成功绑定到某个端口后,开始监听传入的连接。 - 一旦有客户端连接到服务器,
serverbootstrap会接受这个连接,并创建一个新的socketchannel来表示这个连接。 - 对于这个新的
socketchannel,netty会调用之前设置的channelinitializer(在这个例子中是nettyserverinitializer)的initchannel方法。 initchannel方法内部会配置这个新socketchannel的channelpipeline,添加解码器、编码器、业务处理器等。- 一旦
initchannel方法执行完毕,这个channelinitializer的使命就完成了,并且会从channelpipeline中移除自身,因为它只负责初始化工作,不参与后续的数据处理。
所以,总结来说,nettyserverinitializer的initchannel方法会在一个新的客户端连接被服务器接受时运行,用于初始化这个新连接的channelpipeline。
4.创建handler处理接受到的消息
public class mychannelhandler extends channelinboundhandleradapter {
@override
public void channelread(channelhandlercontext ctx, object msg) throws exception {
// 在这里处理接收到的数据
system.out.println("msg = " + msg);
}
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
// 在这里处理异常
}
}四、开发中遇到的问题(暂未解决)
1.字符解析器定义之后,接收到的消息仍然乱码
2.项目启动后,可以访问netty的端口号,但是访问不了项目的端口号(已解决)
// future.channel().closefuture().sync();把这段代码屏蔽就可以
到此这篇关于springboot整合netty服务端的方法示例的文章就介绍到这了,更多相关springboot整合netty服务端内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论