前言
netty 是一个基于 java nio(非阻塞 i/o)的网络应用框架,它简化了开发高性能、高可靠性网络服务器和客户端的过程,使得开发者能够专注于业务逻辑的实现,而无需过多关注底层网络通信的复杂细节。适用于 web 服务器、即时通讯、游戏服务器等开发,广泛应用于诸多领域。
环境
jdk:64位 jdk1.8
springboot:2.1.7.release
netty:4.1.39.final
实现功能
使用netty监听多个端口接受设备发送的数据,并发送数据给客户端。
服务端
相关配置
pom.xml
<dependency>
<groupid>io.netty</groupid>
<artifactid>netty-all</artifactid>
<version>4.1.39.final</version>
</dependency>
application.yml
spring:
profiles:
# 环境 dev|test|prod
active: dev
netty-socket:
ports: 8801,8802
buffersize: 2048
配置类
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
/**
* @author qf
* @since 2024/12/08 21:08
*/
@component
@configurationproperties(prefix = "netty-socket")
@data
public class socketconfig {
private list<integer> ports;
private integer buffersize;
}
核心代码
启动类
commandlinerunner:当应用程序启动时,commandlinerunner 接口的实现类中的 run 方法会被调用
import cn.hutool.core.util.objectutil;
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.nioserversocketchannel;
import io.netty.handler.logging.loglevel;
import io.netty.handler.logging.logginghandler;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
import javax.annotation.predestroy;
import java.util.arraylist;
import java.util.list;
@slf4j
@component
public class nettyserver {
@value("${spring.profiles.active}")
private string active;
@autowired
private socketconfig socketconfig;
//用于关闭
private list<channelfuture> futures;
private eventloopgroup bossgroup = null;
private eventloopgroup workergroup = null;
public list<channelfuture> getfutures() {
if (objectutil.isempty(futures)) {
futures = new arraylist<>();
}
return futures;
}
public void start() throws exception {
bossgroup = new nioeventloopgroup(1);
workergroup = new nioeventloopgroup();
try {
integer buffersize = socketconfig.getbuffersize();
serverbootstrap bootstrap = new serverbootstrap();
bootstrap.group(bossgroup, workergroup)
.channel(nioserversocketchannel.class)
.childoption(channeloption.rcvbuf_allocator, new adaptiverecvbytebufallocator(buffersize, buffersize, buffersize))
.childhandler(new listenchannelinithandler());
if("dev".equals(active)){
bootstrap.handler(new logginghandler(loglevel.info));
}
for (integer port : socketconfig.getports()) {
channelfuture channelfuture = bootstrap.bind(port).sync();
getfutures().add(channelfuture);
channelfuture.addlistener(future -> {
if (future.issuccess()) {
log.info("端口:{} 监听成功!", port);
} else {
log.info("端口:{} 监听失败!", port);
}
});
channelfuture.channel().closefuture()
.addlistener((channelfuturelistener) listener -> channelfuture.channel().close());
}
} catch (exception e) {
log.info("netty监听数据时发生异常:", e);
} finally {
//异步使用stop()关闭
// bossgroup.shutdowngracefully();
// workergroup.shutdowngracefully();
}
}
@predestroy
public void stop() {
if (objectutil.isnotempty(futures)) {
try {
for (channelfuture future : futures) {
future.channel().close().addlistener(channelfuturelistener.close);
future.awaituninterruptibly();
}
if (objectutil.isnotempty(bossgroup) && objectutil.isnotempty(workergroup)) {
bossgroup.shutdowngracefully();
workergroup.shutdowngracefully();
}
futures = null;
log.info(" netty服务关闭成功! ");
} catch (exception e) {
log.error("关闭netty服务时发生异常: ", e);
}
}
}
}
端口枚举
public enum portenum {
test1(8801,"测试1"),
test2(8802,"测试2");
private int port;
private string name;
portenum(int port, string name) {
this.port = port;
this.name = name;
}
public int getport() {
return port;
}
public void setport(int port) {
this.port = port;
}
public string getname() {
return name;
}
public void setname(string name) {
this.name = name;
}
}
端口处理器
import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.channelinboundhandleradapter;
import io.netty.channel.channelinitializer;
import io.netty.channel.socket.socketchannel;
import io.netty.handler.codec.delimiterbasedframedecoder;
import io.netty.handler.codec.string.stringdecoder;
import io.netty.handler.codec.string.stringencoder;
import lombok.extern.slf4j.slf4j;
import java.net.inetsocketaddress;
import static com.qf.netty.portenum.*;
/**
* 多端口处理器
*
* @author qf
* @since 2024/12/05 19:05
*/
@slf4j
public class listenchannelinithandler extends channelinitializer<socketchannel> {
@override
protected void initchannel(socketchannel socketchannel) throws exception {
int port = socketchannel.localaddress().getport();
if (test1.getport() == port) {
//使用自定义分割符
customizesplithandler(socketchannel, true, "$");
socketchannel.pipeline().addlast(new stringdecoder());
socketchannel.pipeline().addlast(new stringencoder());
// 添加自定义的业务处理器
socketchannel.pipeline().addlast(new deviceaservicehandler());
} else if (test2.getport() == port) {
customizesplithandler(socketchannel, false, "#");
socketchannel.pipeline().addlast(new stringdecoder());
socketchannel.pipeline().addlast(new stringencoder());
socketchannel.pipeline().addlast(new devicebservicehandler());
} else {
log.error("监听的端口号暂无业务处理:{}", port);
}
// 添加异常处理器
socketchannel.pipeline().addlast(new channelinboundhandleradapter() {
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
// 获取远程地址
inetsocketaddress remoteaddress = (inetsocketaddress) ctx.channel().remoteaddress();
string clientip = remoteaddress.getaddress().gethostaddress();
// 记录异常信息和客户端ip地址
log.error("监听设备时出现异常,客户端ip: {}", clientip, cause);
ctx.close();
}
});
}
/**
* 自定义分隔符处理器
*
* @param socketchannel socketchannel
* @param stripdelimiter 是否去除分隔符
* @param split 分隔符
*/
private static void customizesplithandler(socketchannel socketchannel, boolean stripdelimiter, string split) {
bytebuf buffer = unpooled.copiedbuffer(split.getbytes());
try {
socketchannel.pipeline().addlast(new delimiterbasedframedecoder(2048, stripdelimiter, buffer));
} catch (exception e) {
buffer.release();
log.error("监听工地微站设备时出现异常:", e);
}
}
}
业务处理器
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import lombok.extern.slf4j.slf4j;
/**
* @author qf
* @since 2024/12/09 19:36
*/
@slf4j
public class deviceaservicehandler extends simplechannelinboundhandler<string> {
@override
protected void channelread0(channelhandlercontext ctx, string msg) throws exception {
// 处理接收到的消息
// 你可以在这里添加更复杂的业务逻辑,比如解析消息、访问数据库等。
log.info("发送的数据为------->:" + msg);
if (true) {
//发送数据给客户端
ctx.writeandflush("helloa~");
}
}
}
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import lombok.extern.slf4j.slf4j;
/**
* @author qf
* @since 2024/12/09 19:36
*/
@slf4j
public class devicebservicehandler extends simplechannelinboundhandler<string> {
@override
protected void channelread0(channelhandlercontext ctx, string msg) throws exception {
// 处理接收到的消息
// 你可以在这里添加更复杂的业务逻辑,比如解析消息、访问数据库等。
log.info("发送的数据为------->:" + msg);
if (true) {
//发送数据给客户端
ctx.writeandflush("hellob~");
}
}
}
客户端
import io.netty.bootstrap.bootstrap;
import io.netty.channel.channelinitializer;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niosocketchannel;
import io.netty.handler.codec.string.stringencoder;
import java.net.inetsocketaddress;
/**
* @author qf
* @since 2024/10/11 18:01
*/
public class hello1client {
public static void main(string[] args) throws interruptedexception {
// 1. 启动类,启动客户端
new bootstrap()
// 2. 添加 eventloop
.group(new nioeventloopgroup())//如果服务器端发来数据,客户端的eventloop就可以从selector触发读事件进行处理
// 3. 选择客户端 channel 实现,底层封装了socketchannel
.channel(niosocketchannel.class)
// 4. 添加处理器
.handler(new channelinitializer<niosocketchannel>() {
@override // 在连接建立后被调用
protected void initchannel(niosocketchannel ch) throws exception {
ch.pipeline().addlast(new stringencoder());//编码器,将字符串编码成bytebuf进行发送
ch.pipeline().addlast(new echoclienthandler());
}
})
// 5. 连接到服务器
.connect(new inetsocketaddress("localhost", 8801))
.sync()//sync()是一个阻塞方法,只有连接建立后才会继续执行
.channel()//.channel()表示拿到服务器和客户端之间的socketchannel(连接对象)
// 6. 向服务器发送数据
.writeandflush("hello, world$");
}
}
import io.netty.bootstrap.bootstrap;
import io.netty.channel.channelinitializer;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niosocketchannel;
import io.netty.handler.codec.string.stringencoder;
import java.net.inetsocketaddress;
public class hello2client {
public static void main(string[] args) throws interruptedexception {
// 1. 启动类,启动客户端
new bootstrap()
// 2. 添加 eventloop
.group(new nioeventloopgroup())//如果服务器端发来数据,客户端的eventloop就可以从selector触发读事件进行处理
// 3. 选择客户端 channel 实现,底层封装了socketchannel
.channel(niosocketchannel.class)
// 4. 添加处理器
.handler(new channelinitializer<niosocketchannel>() {
@override // 在连接建立后被调用
protected void initchannel(niosocketchannel ch) throws exception {
ch.pipeline().addlast(new stringencoder());//编码器,将字符串编码成bytebuf进行发送
ch.pipeline().addlast(new echoclienthandler());
}
})
// 5. 连接到服务器
.connect(new inetsocketaddress("localhost", 8802))
.sync()//sync()是一个阻塞方法,只有连接建立后才会继续执行
.channel()//.channel()表示拿到服务器和客户端之间的socketchannel(连接对象)
// 6. 向服务器发送数据
.writeandflush("hello, world#");
}
}
import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.channelinboundhandleradapter;
import io.netty.util.charsetutil;
/**
* @author qf
* @since 2024/10/11 18:05
*/
public class echoclienthandler extends channelinboundhandleradapter {
private final bytebuf message;
public echoclienthandler() {
message = unpooled.buffer(256);
message.writebytes("hello netty - ".getbytes(charsetutil.utf_8));
}
@override
public void channelactive(channelhandlercontext ctx) {
ctx.writeandflush(message);
}
@override
public void channelread(channelhandlercontext ctx, object msg) {
string data = ((bytebuf) msg).tostring(charsetutil.utf_8);
system.out.println(data);
// ctx.write(msg);
try {
thread.sleep(1000);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
@override
public void channelreadcomplete(channelhandlercontext ctx) {
ctx.flush();
}
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
// close the connection when an exception is raised.
cause.printstacktrace();
ctx.close();
}
}
到此这篇关于java使用netty实现同时多端口监听的文章就介绍到这了,更多相关java netty 多端口监听内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论