前言
大家好,由于工作上业务的需要,在java项目中引入了socket通信,特此记录一下,用以备份,本文章中的socket通信实现了,服务端与客户端的双向通讯,以及二者之间的心跳通信,服务端重启之后,客户端的自动重连功能。
原理
socket通信是计算机网络中常用的一种通信机制,它是基于tcp/ip协议实现的,提供了两个应用程序之间通过网络进行数据交换的能力。socket本质上是一种抽象概念,为网络服务提供了一组api接口。
- socket通信模型
socket通信模型通常包括客户端和服务器端两部分。
服务器端:负责在特定的端口监听来自客户端的连接请求,当一个请求到达时,服务器会与客户端建立连接,并为客户端提供相应的服务。
客户端:主动向服务器的特定ip地址和端口发起连接请求,连接成功后,客户端可以通过建立的连接向服务器发送请求并接收响应。
- socket通信过程
socket通信过程一般包括以下几个步骤:
- 服务器监听:
服务器通过socket()函数创建一个socket,并通过bind()函数将其绑定到一个ip地址和端口上。然后,服务器调用listen()函数开始监听该端口上的连接请求。
- 客户端请求连接:
客户端也通过socket()函数创建一个socket,然后调用connect()函数尝试与服务器的指定ip地址和端口建立连接。
- 服务器接受连接:
服务器在接收到客户端的连接请求后,通过accept()函数接受这个连接。如果成功,accept()函数会返回一个新的socket(通常称为“子socket”),用于与该客户端进行通信。
数据传输:连接建立成功后,客户端和服务器就可以通过新建立的socket进行数据传输了。数据传输可以是单向的也可以是双向的。应用程序可以使用send(), write(), recv(), read()等函数进行数据发送和接收操作。
- 断开连接:
当通信结束后,客户端和服务器都可以调用close()函数来关闭自己持有的socket,从而断开两者之间的连接。
tcp vs udp
在实际使用中,基于socket的通信方式主要有两种:基于tcp和基于udp。
tcp socket:提供可靠、面向连接、基于字节流的通信方式。适用对数据完整性和顺序有要求的应用场景。
udp socket:提供无连接、不保证可靠性、基于消息(数据报)的通信方式。适用于对实时性要求高、容忍部分数据丢失或乱序的应用场景。
代码实现
服务端
服务端主体逻辑:和每个接入的客户端都会使用独立线程建立起长连接,二者之间使用心跳保持联系,使用clientsockets 存储了每个客户端的信息便于和客户端建立起联系。
package com.example.demo2.server.socket;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.disposablebean;
import org.springframework.stereotype.component;
import javax.annotation.postconstruct;
import java.io.*;
import java.net.serversocket;
import java.net.socket;
import java.util.map;
import java.util.concurrent.*;
/**
* @author kim
*/
@component
public class tcpserver implements disposablebean {
private static final logger logger = loggerfactory.getlogger(tcpserver.class);
private final socketserverconfig config;
private serversocket serversocket;
private executorservice executorservice;
private volatile boolean running = true;
// 存储客户端连接
private final map<string, socket> clientsockets = new concurrenthashmap<>();
public tcpserver(socketserverconfig config) {
this.config = config;
}
@postconstruct
public void start() throws ioexception {
executorservice = executors.newfixedthreadpool(config.getmaxthreads());
serversocket = new serversocket(config.getport());
logger.info("平台socket服务已启动, 监听端口为 {}", config.getport());
new thread(this::acceptconnections).start();
}
private void acceptconnections() {
while (running) {
try {
socket clientsocket = serversocket.accept();
string clientaddress = clientsocket.getinetaddress().gethostaddress();
clientsockets.put(clientaddress, clientsocket);
executorservice.execute(new clienthandler(clientsocket, clientaddress));
} catch (ioexception e) {
if (running) {
logger.error("connection accept error", e);
}
}
}
}
// 用于发送消息到特定客户端
public void sendmessagetoclient(string clientaddress, string message) throws ioexception {
socket socket = clientsockets.get(clientaddress);
if (socket != null && !socket.isclosed()) {
printwriter out = new printwriter(socket.getoutputstream(), true);
out.println(message);
logger.info("sent message to {}: {}", clientaddress, message);
} else {
logger.warn("client {} is not connected or socket is closed", clientaddress);
}
}
@override
public void destroy() throws exception {
running = false;
executorservice.shutdown();
for (socket socket : clientsockets.values()) {
if (!socket.isclosed()) {
socket.close();
}
}
if (serversocket != null && !serversocket.isclosed()) {
serversocket.close();
}
logger.info("tcp server stopped");
}
private class clienthandler implements runnable {
private final socket clientsocket;
private final string clientaddress;
clienthandler(socket socket, string address) {
this.clientsocket = socket;
this.clientaddress = address;
}
@override
public void run() {
try (bufferedreader in = new bufferedreader(
new inputstreamreader(clientsocket.getinputstream()));
printwriter out = new printwriter(
clientsocket.getoutputstream(), true)) {
logger.info("client connected: {}", clientaddress);
string input;
while ((input = in.readline()) != null) {
logger.debug("received: {}", input);
out.println(input);
logger.info("client connected: {}", clientaddress);
}
} catch (ioexception e) {
logger.warn("client connection closed: {}", e.getmessage());
} finally {
try {
clientsockets.remove(clientaddress);
clientsocket.close();
} catch (ioexception e) {
logger.error("error closing socket", e);
}
}
}
}
}
配置类
package com.example.demo2.server.socket;
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;
@configuration
@configurationproperties(prefix = "socket.server")
@data
public class socketserverconfig {
private int port;
private int maxthreads;
// getters and setters
}
配置文件
server:
port: 8080
socket:
server:
port: 8088
maxthreads: 50
向客户端发送测试信息
@getmapping("/send")
public string sendmessage(string clientaddress) throws ioexception {
tcpserver.sendmessagetoclient("192.168.3.8","77777777777");
return "success";
}
服务端发送日志

客户端接收日志

客户端
客户端主体逻辑,使用自己设计的心跳机制,监听服务端状态,如果服务端断开连接,则客户端会尝试重新建立联系。
package com.example.demo1.socketclient;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.disposablebean;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.applicationarguments;
import org.springframework.boot.applicationrunner;
import org.springframework.stereotype.service;
import java.io.bufferedreader;
import java.io.ioexception;
import java.io.inputstreamreader;
import java.io.printwriter;
import java.net.inetsocketaddress;
import java.net.socket;
import java.net.sockettimeoutexception;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.atomic.atomicboolean;
@service
public class tcpclientservice implements applicationrunner, disposablebean {
private static final logger logger = loggerfactory.getlogger(tcpclientservice.class);
private final socketclientconfig config;
private socket socket;
private printwriter out;
private bufferedreader in;
private final atomicboolean running = new atomicboolean(true);
private final executorservice executor = executors.newsinglethreadexecutor();
private final messagelistener messagelistener;
@autowired
public tcpclientservice(socketclientconfig config, messagelistener messagelistener) {
this.config = config;
this.messagelistener = messagelistener;
}
@override
public void run(applicationarguments args) throws exception {
initializeconnection();
}
@override
public void destroy() throws exception {
running.set(false);
closeresources();
executor.shutdown();
}
private synchronized void initializeconnection() {
new thread(() -> {
while (running.get()) {
try {
socket = new socket();
socket.setkeepalive(true);
socket.setsotimeout(config.getheartbeattimeout());
socket.connect(new inetsocketaddress(config.gethost(), config.getport()), config.gettimeout());
out = new printwriter(socket.getoutputstream(), true);
in = new bufferedreader(new inputstreamreader(socket.getinputstream()));
logger.info("connected to server {}:{}", config.gethost(), config.getport());
executor.execute(this::listentoserver);
startheartbeat();
while (!socket.isclosed() && socket.isconnected()) {
thread.sleep(1000);
}
} catch (exception e) {
logger.warn("connection error: {}", e.getmessage());
} finally {
closeresources();
if (running.get()) {
logger.info("attempting to reconnect in 5 seconds...");
sleepsafely(5000);
}
}
}
}, "tcp-client-connector").start();
}
private void listentoserver() {
try {
string response;
while (running.get() && !socket.isclosed()) {
try {
response = in.readline();
if (response == null) {
logger.warn("server closed connection");
break; // 终止循环,表示连接已关闭
}
logger.debug("received server message: {}", response);
messagelistener.onmessage(response);
} catch (sockettimeoutexception e) {
logger.debug("socket read timeout");
} catch (ioexception e) {
if (!socket.isclosed()) {
logger.warn("connection lost: {}", e.getmessage());
break; // 终止循环,表示连接已中断
}
}
}
} finally {
closeresources(); // 确保资源关闭
}
}
private void startheartbeat() {
new thread(() -> {
while (running.get() && !socket.isclosed()) {
try {
sendmessageinternal("heartbeat");
sleepsafely(config.getheartbeatinterval());
} catch (exception e) {
logger.warn("heartbeat failed: {}", e.getmessage());
break;
}
}
}, "heartbeat-thread").start();
}
public synchronized void sendmessage(string message) throws ioexception {
if (socket == null || !socket.isconnected()) {
throw new ioexception("not connected to server");
}
out.println(message);
logger.debug("sent message: {}", message);
}
private synchronized void sendmessageinternal(string message) {
try {
if (socket != null && socket.isconnected()) {
out.println(message);
}
} catch (exception e) {
logger.warn("failed to send heartbeat");
}
}
private synchronized void closeresources() {
try {
if (out != null) out.close();
if (in != null) in.close();
if (socket != null) socket.close();
} catch (ioexception e) {
logger.warn("error closing resources: {}", e.getmessage());
}
}
private void sleepsafely(long millis) {
try {
thread.sleep(millis);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}
public interface messagelistener {
void onmessage(string message);
}
}
消息监听:监听服务发送的消息
package com.example.demo1.socketclient;
import org.apache.commons.lang3.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.stereotype.component;
@component
public class servermessagehandler implements tcpclientservice.messagelistener {
private static final logger logger = loggerfactory.getlogger(servermessagehandler.class);
@override
public void onmessage(string message) {
if(stringutils.isnotempty(message)){
if (!message.contains("heartbeat")){
//处理其他逻辑
system.out.println("接收服务端消息成功:"+message);
}else{
//心跳消息
system.out.println(message);
}
}
}
}
配置类
package com.example.demo1.socketclient;
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;
@configuration
@configurationproperties(prefix = "socket.client")
@data
public class socketclientconfig {
private string host;
private int port;
private int timeout;
private int heartbeatinterval;
private int heartbeattimeout;
// getters and setters
}
发送测试方法
@getmapping("/send")
public responseentity<string> sendmessage() {
try {
tcpclient.sendmessage("客户端发送信息");
return responseentity.ok("message sent");
} catch (ioexception e) {
return responseentity.status(503).body("server unavailable");
}
}
配置文件
socket:
client:
host: 192.168.3.8 #服务端ip地址
port: 8088 #监听端口
timeout: 5000
heartbeat-interval: 3000 # 心跳间隔30秒
heartbeat-timeout: 6000 # 心跳超时60秒
server:
port: 8082
客户端发送信息后,服务端会接收到信息。

总结
以上就是java接入socket通信服务端与客户端的全部代码,二者实现了互相通信,具体的业务场景则需要小伙伴们在此基础上额外的设计逻辑了,有其他疑问或者想要测试demo的可以后台私我,看到会回复。
到此这篇关于java整合socket通信的文章就介绍到这了,更多相关java整合socket通信内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论