映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据,
<groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> <version>3.3.4</version>
yml配置
# tcp设备对接
iot:
device:
port1: 1883
port2: 1885
package com.cqcloud.platform.handler;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
import com.cqcloud.platform.service.iotnbiotmqttservice;
import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.service.impl.iotnbiotserviceimpl;
import com.cqcloud.platform.service.impl.iotpushserviceimpl;
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.socketchannel;
import io.netty.channel.socket.nio.nioserversocketchannel;
import jakarta.annotation.postconstruct;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年3月8日🐬🐇 💓💕
*/
@component
public class nettytcpserver {
/**
* 用于自设备1协议端口
*/
private static int port1;
/**
* 来自设备2协议端口
*/
private static int port2;
@value("${iot.device.port1}")
public int port1value;
@value("${iot.device.port2}")
public int port2value;
@postconstruct
public void init() {
port1 = port1value;
port2 = port2value;
}
public void start() throws exception {
final nioeventloopgroup bossgroup = new nioeventloopgroup();
final nioeventloopgroup workergroup = new nioeventloopgroup();
try {
serverbootstrap bootstrap = new serverbootstrap();
// 创建 mqttservice 和 mqttpushservice 实例
iotnbiotmqttservice iotnbiotmqttservice = new iotnbiotserviceimpl();
iotpushservice iotpushservice = new iotpushserviceimpl();
bootstrap.group(bossgroup, workergroup)
.channel(nioserversocketchannel.class)
.childhandler(new channelinitializer<socketchannel>() {
@override
protected void initchannel(socketchannel ch) {
channelpipeline pipeline = ch.pipeline();
// 直接使用 bytebuf,无需编码器和解码器
// 根据端口注入不同的服务
if (ch.localaddress().getport() == port1) {
pipeline.addlast(new tcpiotnbserverhandler(iotnbiotmqttservice)); // 业务逻辑处理器
} else if (ch.localaddress().getport() == port2) {
pipeline.addlast(new tcpiotserverhandler(iotpushservice)); // 新处理器
}
}
});
// 绑定第一个端口并启动
channelfuture future1 = bootstrap.bind(port1).sync();
// 绑定第二个端口并启动
channelfuture future2 = bootstrap.bind(port2).sync();
// 等待服务器关闭
future1.channel().closefuture().sync();
future2.channel().closefuture().sync();
} finally {
// 优雅地关闭线程池
workergroup.shutdowngracefully();
bossgroup.shutdowngracefully();
}
}
}启动类需要
public static void main(string[] args) throws ioexception {
configurableenvironment env = new springapplication(dynamicyearningapplication.class).run(args).getenvironment();
string envport = env.getproperty("server.port");
string port = objects.isnull(envport) ? "8000" : envport;
string envcontext = env.getproperty("server.servlet.context-path");
string contextpath = objects.isnull(envcontext) ? "" : envcontext;
string path = port + contextpath + "/doc.html";
string externalapi = inetaddress.getlocalhost().gethostaddress();
console.log("access urls:\n\t-------------------------------------------------------------------------\n\tlocal-swagger: \t\thttp://127.0.0.1:{}\n\texternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalapi, path);
// 加上以下代码
nettytcpserver server = new nettytcpserver();
try {
server.start();
} catch (exception e) {
e.printstacktrace();
}
}
创建tcpiotserverhandler
package com.cqcloud.platform.handler;
import com.baomidou.mybatisplus.core.conditions.query.lambdaquerywrapper;
import com.baomidou.mybatisplus.core.toolkit.stringpool;
import com.cqcloud.platform.entity.iotcommandrecords;
import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.utils.deviceactionparser;
import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import lombok.extern.slf4j.slf4j;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.optional;
/**
* 设备协议
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年3月8日 🐬🐇 💓💕
*/
@slf4j
public class tcpiotserverhandler extends simplechannelinboundhandler<bytebuf> {
// 接口注入
private final iotpushservice iotpushservice;
public tcpiotserverhandler(iotpushservice iotpushservice) {
this.iotpushservice = iotpushservice;
}
@override
protected void channelread0(channelhandlercontext ctx, bytebuf in) throws exception {
byte[] bytearray;
if (in.readablebytes() <= 0) {
in.release();
return;
}
bytearray = new byte[in.readablebytes()];
in.readbytes(bytearray);
if (bytearray.length <= 0) {
in.release();
return;
}
// 将消息传递给 iotpushservice
iotpushservice.pushmessagearrived(bytearray);
}
// 发送响应的统一辅助方法
private void sendresponse(channelhandlercontext ctx, string hexresponse) {
byte[] responsebytes = hexstringtobytearray(hexresponse);
bytebuf responsebuffer = unpooled.copiedbuffer(responsebytes);
ctx.writeandflush(responsebuffer);
}
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
// 打印异常堆栈跟踪,便于调试和错误排查
cause.printstacktrace();
// 关闭当前的通道,释放相关资源
ctx.close();
}
}创建 tcpiotnbserverhandler
package com.cqcloud.platform.handler;
import com.cqcloud.platform.service.iotnbiotmqttservice;
import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
/**
* nb-iot cat1数据格协议
*
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public class tcpiotnbserverhandler extends simplechannelinboundhandler<bytebuf> {
private final iotnbiotmqttservice iotnbiotmqttservice;
// 构造函数注入 mqttservice
public tcpiotnbserverhandler(iotnbiotmqttservice iotnbiotmqttservice) {
this.iotnbiotmqttservice = iotnbiotmqttservice;
}
@override
public void channelread0(channelhandlercontext ctx,bytebuf in) {
byte[] bytearray;
if (in.readablebytes() <= 0) {
in.release();
return;
}
bytearray = new byte[in.readablebytes()];
in.readbytes(bytearray);
if (bytearray.length <= 0) {
in.release();
return;
}
// 将 byte[] 数据传递给 iotnbiotmqttservice
iotnbiotmqttservice.messagearrived(bytearray);
//发送固定事件默认回复
sendresponse(ctx);
}
// 发送响应的统一辅助方法
private void sendresponse(channelhandlercontext ctx) {
// 回复客户端--向设备回复aaaa8001(设备将保持20秒不休眠),平台尽量在10秒
byte[] responsebytes = new byte[] { (byte) 0xaa, (byte) 0xaa, (byte) 0x80, (byte) 0x01 };
bytebuf responsebuffer = unpooled.copiedbuffer(responsebytes);
ctx.writeandflush(responsebuffer);
}
//将响应消息转换为字节数组
public static byte[] hexstringtobytearray(string s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((character.digit(s.charat(i), 16) << 4)
+ character.digit(s.charat(i + 1), 16));
}
return data;
}
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
cause.printstacktrace();
ctx.close();
}
}
创建接口类iotpushservice
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public interface iotpushservice {
public void pushmessagearrived(byte[] message);
}
创建iotnbiotmqttservice 类
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public interface iotnbiotmqttservice {
public void messagearrived(byte[] message);
}
创建实现类iotnbiotserviceimpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.service;
import com.cqcloud.platform.service.iotnbiotmqttservice;
import com.cqcloud.platform.utils.dataparser;
import lombok.allargsconstructor;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
@service
@allargsconstructor
public class iotnbiotserviceimpl implements iotnbiotmqttservice {
@override
public void messagearrived(byte[] message) {
// 将 byte 数组转换为十六进制字符串
string convertdata = printbytearray(message);
// 打印字节数组内容
system.out.println("来自于xxx数据格式协议的1883端口的数据字节数组内容:"+ convertdata);
//调用解析方法
dispatchmessage(convertdata);
}
// 将 byte[] 转换为十六进制字符串的辅助方法
public static string bytestohex(byte[] bytes) {
stringbuilder hex = new stringbuilder();
for (byte b : bytes) {
// 将每个字节转换为两位的十六进制表示
hex.append(string.format("%02x", b));
}
return hex.tostring();
}
public static string printbytearray(byte[] bytearray) {
stringbuilder hexstring = new stringbuilder();
for (byte b : bytearray) {
// 将字节转换为无符号的十六进制字符串,去掉空格
hexstring.append(string.format("%02x", b & 0xff));
}
system.out.println("byte array (hex): " + hexstring.tostring());
return hexstring.tostring();
}
public void dispatchmessage(string bytearray) {
string prefix = bytearray.substring(0, 2);
// 根据 messageid 进行判断
system.out.println("来自于数据格式协议来自于1883端口的数据处理消息:" +bytearray);
}
}
创建 iotpushserviceimpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.service;
import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.utils.deviceactionparser;
import lombok.allargsconstructor;
/**
* 发送指令实现类
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
@service
@allargsconstructor
public class iotpushserviceimpl implements iotpushservice {
@override
public void pushmessagearrived(byte[] message) {
// 解析字节数组
system.out.println("来自物联网平台的设备协议于1885端口的数据设备返回的的内容处理");
//打印数据
printbytearray(message);
//调用解析方法
dispatchmessage(message);
}
//设备回复的接受内容
public static void dispatchmessage(byte[] bytearray) {
}
public static void printbytearray(byte[] bytearray) {
stringbuilder hexstring = new stringbuilder();
for (byte b : bytearray) {
// 将字节转换为无符号的十六进制字符串,去掉空格
hexstring.append(string.format("%02x", b & 0xff));
}
system.out.println("byte array (hex): " + hexstring.tostring());
}
// 将十六进制字符串转换为字节数组的实用方法
public static byte[] stringtobytes(string s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((character.digit(s.charat(i), 16) << 4)
+ character.digit(s.charat(i+1), 16));
}
return data;
}
// 提取设备类型的十六进制字符串
private static string extractdevicetypehex(byte[] bytearray) {
// 转换为十六进制字符串
string hexstring = bytestohex(bytearray);
// 提取设备类型
return hexstring.substring(10, 12); // 设备类型的位数
}
// 辅助方法:将字节数组转换为十六进制字符串
private static string bytestohex(byte[] bytes) {
stringbuilder hexstring = new stringbuilder();
for (byte b : bytes) {
string hex = integer.tohexstring(0xff & b);
if (hex.length() == 1) {
hexstring.append('0'); // 确保每个字节都为两位
}
hexstring.append(hex);
}
return hexstring.tostring().touppercase(); // 返回大写格式
}
// 将十六进制字符串转换为 byte
private static byte hexstringtobyte(string hex) {
return (byte) integer.parseint(hex, 16);
}
}
然后使用网络根据助手请求。

以上就是使用java项目搭建一个netty服务的详细内容,更多关于java搭建netty服务的资料请关注代码网其它相关文章!
发表评论