当前位置: 代码网 > it编程>编程语言>Java > 使用java项目搭建一个netty服务

使用java项目搭建一个netty服务

2024年10月30日 Java 我要评论
映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据, <groupid>org.springframework.boot</groupid

映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据,

  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-data-redis</artifactid>
  <version>3.3.4</version>

yml配置

# tcp设备对接
iot:
  device:
    port1: 1883
    port2: 1885
package com.cqcloud.platform.handler;

import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;

import com.cqcloud.platform.service.iotnbiotmqttservice;
import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.service.impl.iotnbiotserviceimpl;
import com.cqcloud.platform.service.impl.iotpushserviceimpl;

import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.socketchannel;
import io.netty.channel.socket.nio.nioserversocketchannel;
import jakarta.annotation.postconstruct;
/**
 * @author weimeilayer@gmail.com ✨
 * @date 💓💕 2022年3月8日🐬🐇 💓💕
 */
@component
public class nettytcpserver {
	/**
	 * 用于自设备1协议端口
	 */
    private static int port1;
    /**
     * 来自设备2协议端口
     */
    private static int port2;

    @value("${iot.device.port1}")
    public int port1value;

    @value("${iot.device.port2}")
    public int port2value;
    
    @postconstruct
    public void init() {
        port1 = port1value;
        port2 = port2value;
    }

	public void start() throws exception {
		final nioeventloopgroup bossgroup = new nioeventloopgroup();
	    final nioeventloopgroup workergroup = new nioeventloopgroup();
	    try {
	        serverbootstrap bootstrap = new serverbootstrap();
	        // 创建 mqttservice 和 mqttpushservice 实例
	        iotnbiotmqttservice iotnbiotmqttservice = new iotnbiotserviceimpl();
	        iotpushservice iotpushservice = new iotpushserviceimpl();

	        bootstrap.group(bossgroup, workergroup)
	            .channel(nioserversocketchannel.class)
	            .childhandler(new channelinitializer<socketchannel>() {
	                @override
	                protected void initchannel(socketchannel ch) {
	                    channelpipeline pipeline = ch.pipeline();
	                    // 直接使用 bytebuf,无需编码器和解码器
	                    // 根据端口注入不同的服务
	                    if (ch.localaddress().getport() == port1) {
	                        pipeline.addlast(new tcpiotnbserverhandler(iotnbiotmqttservice)); // 业务逻辑处理器
	                    } else if (ch.localaddress().getport() == port2) {
	                        pipeline.addlast(new tcpiotserverhandler(iotpushservice)); // 新处理器
	                    }
	                }
	            });

	        // 绑定第一个端口并启动
	        channelfuture future1 = bootstrap.bind(port1).sync();
	        // 绑定第二个端口并启动
	        channelfuture future2 = bootstrap.bind(port2).sync();

	        // 等待服务器关闭
	        future1.channel().closefuture().sync();
	        future2.channel().closefuture().sync();
	    } finally {
	        // 优雅地关闭线程池
	        workergroup.shutdowngracefully();
	        bossgroup.shutdowngracefully();
	    }
	}
}

启动类需要

public static void main(string[] args) throws ioexception {
		configurableenvironment env = new springapplication(dynamicyearningapplication.class).run(args).getenvironment();
		string envport = env.getproperty("server.port");
		string port = objects.isnull(envport) ? "8000" : envport;
		string envcontext = env.getproperty("server.servlet.context-path");
		string contextpath = objects.isnull(envcontext) ? "" : envcontext;
		string path = port + contextpath + "/doc.html";
		string externalapi = inetaddress.getlocalhost().gethostaddress();
		console.log("access urls:\n\t-------------------------------------------------------------------------\n\tlocal-swagger: \t\thttp://127.0.0.1:{}\n\texternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalapi, path);
		// 加上以下代码
		nettytcpserver server = new nettytcpserver();
		try {
			server.start();
		} catch (exception e) {
			e.printstacktrace();
		}
	}

创建tcpiotserverhandler

package com.cqcloud.platform.handler;

import com.baomidou.mybatisplus.core.conditions.query.lambdaquerywrapper;
import com.baomidou.mybatisplus.core.toolkit.stringpool;
import com.cqcloud.platform.entity.iotcommandrecords;
import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.utils.deviceactionparser;
import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import lombok.extern.slf4j.slf4j;

import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.optional;

/**
 * 设备协议
 * @author weimeilayer@gmail.com ✨
 * @date 💓💕 2022年3月8日 🐬🐇 💓💕
 */
@slf4j
public class tcpiotserverhandler extends simplechannelinboundhandler<bytebuf>  {

	// 接口注入
	private final iotpushservice iotpushservice;

	public tcpiotserverhandler(iotpushservice iotpushservice) {
		this.iotpushservice = iotpushservice;
	}
	@override
	protected void channelread0(channelhandlercontext ctx, bytebuf in) throws exception {
		byte[] bytearray;
		if (in.readablebytes() <= 0) {
			in.release();
			return;
		}
		bytearray = new byte[in.readablebytes()];
		in.readbytes(bytearray);
		if (bytearray.length <= 0) {
			in.release();
			return;
		}
		// 将消息传递给 iotpushservice
		iotpushservice.pushmessagearrived(bytearray);
	}
	// 发送响应的统一辅助方法
	private void sendresponse(channelhandlercontext ctx, string hexresponse) {
		byte[] responsebytes = hexstringtobytearray(hexresponse);
		bytebuf responsebuffer = unpooled.copiedbuffer(responsebytes);
		ctx.writeandflush(responsebuffer);
	}
	
	@override
	public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
		// 打印异常堆栈跟踪,便于调试和错误排查
		cause.printstacktrace();
		// 关闭当前的通道,释放相关资源
		ctx.close();
	}
}

创建 tcpiotnbserverhandler

package com.cqcloud.platform.handler;

import com.cqcloud.platform.service.iotnbiotmqttservice;

import io.netty.buffer.bytebuf;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;

/**
 * nb-iot cat1数据格协议
 * 
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public class tcpiotnbserverhandler extends simplechannelinboundhandler<bytebuf> {
	private final iotnbiotmqttservice iotnbiotmqttservice;

	// 构造函数注入 mqttservice
	public tcpiotnbserverhandler(iotnbiotmqttservice iotnbiotmqttservice) {
		this.iotnbiotmqttservice = iotnbiotmqttservice;
	}

	@override
	public void channelread0(channelhandlercontext ctx,bytebuf in) {
		byte[] bytearray;
		if (in.readablebytes() <= 0) {
			in.release();
			return;
		}
		bytearray = new byte[in.readablebytes()];
		in.readbytes(bytearray);
		if (bytearray.length <= 0) {
			in.release();
			return;
		}
	    // 将 byte[] 数据传递给 iotnbiotmqttservice
	    iotnbiotmqttservice.messagearrived(bytearray); 
		//发送固定事件默认回复
        sendresponse(ctx);
	}
	
	// 发送响应的统一辅助方法
    private void sendresponse(channelhandlercontext ctx) {
    	// 回复客户端--向设备回复aaaa8001(设备将保持20秒不休眠),平台尽量在10秒
    	byte[] responsebytes = new byte[] { (byte) 0xaa, (byte) 0xaa, (byte) 0x80, (byte) 0x01 };
        bytebuf responsebuffer = unpooled.copiedbuffer(responsebytes);
        ctx.writeandflush(responsebuffer);
    }
    
    //将响应消息转换为字节数组
    public static byte[] hexstringtobytearray(string s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((character.digit(s.charat(i), 16) << 4)
                    + character.digit(s.charat(i + 1), 16));
        }
        return data;
    }
	@override
	public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
		cause.printstacktrace();
		ctx.close();
	}
}

创建接口类iotpushservice

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public interface iotpushservice {
	public void pushmessagearrived(byte[] message);
}

创建iotnbiotmqttservice 类

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public interface iotnbiotmqttservice {

	public void messagearrived(byte[] message);
}

创建实现类iotnbiotserviceimpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.service;

import com.cqcloud.platform.service.iotnbiotmqttservice;
import com.cqcloud.platform.utils.dataparser;

import lombok.allargsconstructor;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
@service
@allargsconstructor
public class iotnbiotserviceimpl implements iotnbiotmqttservice {

	@override
	public void messagearrived(byte[] message) {
		// 将 byte 数组转换为十六进制字符串  
		string convertdata = printbytearray(message);
		// 打印字节数组内容
		system.out.println("来自于xxx数据格式协议的1883端口的数据字节数组内容:"+ convertdata);
        //调用解析方法
        dispatchmessage(convertdata);
	}
	
	// 将 byte[] 转换为十六进制字符串的辅助方法  
    public static string bytestohex(byte[] bytes) {  
        stringbuilder hex = new stringbuilder();  
        for (byte b : bytes) {  
            // 将每个字节转换为两位的十六进制表示  
            hex.append(string.format("%02x", b));  
        }  
        return hex.tostring();  
    }  

	public static string printbytearray(byte[] bytearray) {
		stringbuilder hexstring = new stringbuilder();
		for (byte b : bytearray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexstring.append(string.format("%02x", b & 0xff));
		}
		system.out.println("byte array (hex): " + hexstring.tostring());
		return hexstring.tostring();
	}
	
	public void dispatchmessage(string bytearray) {
        string prefix = bytearray.substring(0, 2);  
        // 根据 messageid 进行判断
          system.out.println("来自于数据格式协议来自于1883端口的数据处理消息:" +bytearray);
	}
}

创建 iotpushserviceimpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.service;

import com.cqcloud.platform.service.iotpushservice;
import com.cqcloud.platform.utils.deviceactionparser;

import lombok.allargsconstructor;

/**
 * 发送指令实现类
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
@service
@allargsconstructor
public class iotpushserviceimpl implements iotpushservice {

	@override
	public void pushmessagearrived(byte[] message) {
		// 解析字节数组
		system.out.println("来自物联网平台的设备协议于1885端口的数据设备返回的的内容处理");
		//打印数据
		printbytearray(message);
		//调用解析方法
		dispatchmessage(message);
	}

	//设备回复的接受内容
	public static void dispatchmessage(byte[] bytearray) {
       
	}
	
	public static void printbytearray(byte[] bytearray) {
		stringbuilder hexstring = new stringbuilder();
		for (byte b : bytearray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexstring.append(string.format("%02x", b & 0xff));
		}
		system.out.println("byte array (hex): " + hexstring.tostring());
	}

	 // 将十六进制字符串转换为字节数组的实用方法
    public static byte[] stringtobytes(string s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((character.digit(s.charat(i), 16) << 4)
                                  + character.digit(s.charat(i+1), 16));
        }
        return data;
    }
	// 提取设备类型的十六进制字符串
    private static string extractdevicetypehex(byte[] bytearray) {
        // 转换为十六进制字符串
        string hexstring = bytestohex(bytearray);
        // 提取设备类型
        return hexstring.substring(10, 12); // 设备类型的位数
    }

    // 辅助方法:将字节数组转换为十六进制字符串
    private static string bytestohex(byte[] bytes) {
        stringbuilder hexstring = new stringbuilder();
        for (byte b : bytes) {
            string hex = integer.tohexstring(0xff & b);
            if (hex.length() == 1) {
                hexstring.append('0'); // 确保每个字节都为两位
            }
            hexstring.append(hex);
        }
        return hexstring.tostring().touppercase(); // 返回大写格式
    }

    // 将十六进制字符串转换为 byte
    private static byte hexstringtobyte(string hex) {
        return (byte) integer.parseint(hex, 16);
    }
}

然后使用网络根据助手请求。

以上就是使用java项目搭建一个netty服务的详细内容,更多关于java搭建netty服务的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com