原文详见:
用 java 编写 websocket 服务器 - web api | mdn (mozilla.org)
首先,我们需要明白,websocket的本质是基于tcp的端到端双工通信协议,它是区别于http的一种协议。所以接到客户端发来的websocket连接请求时,需要进行验证,保证双方都支持websocket协议。
当然,在之前,我们需要监听端口来接受请求:
//创建一个监听 8080 端口的套接字连接
serversocket server = new serversocket(8080);
//监听并接受与该套接字的连接。该方法会阻塞,直到建立连接
socket accept = server.accept();
system.out.println("客户端已连接");
socket 方法
-
java.net.socket.getinputstream()
返回该 socket 的输入流。
-
java.net.socket.getoutputstream()
返回该 socket 的输出流。
//该连接的输入流
inputstream inputstream = accept.getinputstream();
//该连接的输出流
outputstream outputstream = accept.getoutputstream();
scanner scanner = new scanner(inputstream, "utf-8");
握手
当客户端连接到服务器时,它会发送 get 请求以从简单的 http 请求升级到 websocket 的连接。这被称为握手。
string data = scanner.usedelimiter("\\r\\n\\r\\n").next();
//创建正则表达式模式,匹配data中以get开始的字符串
pattern compile = pattern.compile("^get");
matcher get = compile.matcher(data);
//拿到请求头中的信息,与客户端”握手“
if(get.find()){
//获取 sec-websocket-key 请求标头的值,去除头部和尾部的所有空格
matcher match = pattern.compile("sec-websocket-key: (.*)").matcher(data);
if(match.find()){ //健壮性判断
//追加字符串“258eafa5-e914-47da-95ca-c5ab0dc85b11” ---》固定的,为了校验双方是否都支持websocket
string s = "258eafa5-e914-47da-95ca-c5ab0dc85b11";
//计算 sha-1 值及其 base64 编码
//将其作为 http 响应的一部分写回 sec-websocket-accept 响应标头的值
byte[] response = ("http/1.1 101 switching protocols\r\n"
+ "connection: upgrade\r\n"
+ "upgrade: websocket\r\n"
+ "sec-websocket-accept: "
+ base64.getencoder().encodetostring(messagedigest.getinstance("sha-1")
.digest((match.group(1) + s).getbytes("utf-8")))
+ "\r\n\r\n").getbytes("utf-8");
outputstream.write(response, 0, response.length);
system.out.println("握手成功");
解码消息
解码字节 = 编码字节 xor 密钥的第(编码字节位置 and 0x3)个字节
//假如传入了数据:
byte[] decoded = new byte[6];
byte[] encoded = new byte[] { (byte) 198, (byte) 131, (byte) 130, (byte) 182, (byte) 194, (byte) 135 };
byte[] key = new byte[] { (byte) 167, (byte) 225, (byte) 225, (byte) 210 };
for (int i = 0; i < encoded.length; i++) {
ecoded[i] = (byte) (encoded[i] ^ key[i & 0x3]);
}
system.out.println(arrays.tostring(decoded));
完整示例代码
package com.yuziqikk.websocketdemo.websocket_test;
import java.io.ioexception;
import java.io.inputstream;
import java.io.outputstream;
import java.net.serversocket;
import java.net.socket;
import java.security.messagedigest;
import java.security.nosuchalgorithmexception;
import java.util.arrays;
import java.util.base64;
import java.util.scanner;
import java.util.regex.matcher;
import java.util.regex.pattern;
/**
* @package: com.yuziqikk.websocketdemo.websocket_test
* @author: yuziqikk
* @created: 22/5/2024 上午10:32
**/
public class test {
public void test() throws ioexception, nosuchalgorithmexception {
//创建一个监听 8080 端口的套接字连接
serversocket server = new serversocket(8080);
//监听并接受与该套接字的连接。该方法会阻塞,直到建立连接
socket accept = server.accept();
system.out.println("客户端已连接");
//该连接的输入流
inputstream inputstream = accept.getinputstream();
//该连接的输出流
outputstream outputstream = accept.getoutputstream();
scanner scanner = new scanner(inputstream, "utf-8");
string data = scanner.usedelimiter("\\r\\n\\r\\n").next();
//创建正则表达式模式,匹配data中以get开始的字符串
pattern compile = pattern.compile("^get");
matcher get = compile.matcher(data);
//拿到请求头中的信息,与客户端”握手“
if(get.find()){
//获取 sec-websocket-key 请求标头的值,去除头部和尾部的所有空格
matcher match = pattern.compile("sec-websocket-key: (.*)").matcher(data);
if(match.find()){ //健壮性判断
//追加字符串“258eafa5-e914-47da-95ca-c5ab0dc85b11” ---》固定的,为了校验双方是否都支持websocket
string s = "258eafa5-e914-47da-95ca-c5ab0dc85b11";
//计算 sha-1 值及其 base64 编码
//将其作为 http 响应的一部分写回 sec-websocket-accept 响应标头的值
byte[] response = ("http/1.1 101 switching protocols\r\n"
+ "connection: upgrade\r\n"
+ "upgrade: websocket\r\n"
+ "sec-websocket-accept: "
+ base64.getencoder().encodetostring(messagedigest.getinstance("sha-1")
.digest((match.group(1) + s).getbytes("utf-8")))
+ "\r\n\r\n").getbytes("utf-8");
outputstream.write(response, 0, response.length);
system.out.println("握手成功");
//假如传入了数据:
byte[] decoded = new byte[6];
byte[] encoded = new byte[] { (byte) 198, (byte) 131, (byte) 130, (byte) 182, (byte) 194, (byte) 135 };
byte[] key = new byte[] { (byte) 167, (byte) 225, (byte) 225, (byte) 210 };
for (int i = 0; i < encoded.length; i++) {
decoded[i] = (byte) (encoded[i] ^ key[i & 0x3]);
}
//传输的消息为:
system.out.println(arrays.tostring(decoded));
}
}
}
}
springboot整合websocket
引入依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-websocket</artifactid>
</dependency>
配置websocketconfig
@configuration
public class websocketconfig {
// 这个bean会自动注册使用@serverendpoint注解声明的websocket endpoint
@bean
public serverendpointexporter serverendpointexporter(){
return new serverendpointexporter();
}
}
编写核心代码
类注解
添加@component注解将类纳入springboot的管理中
添加@serverendpoint声明这是一个websocket服务器,路径参数则是处理该路径的请求(注意,这里使用/{user}是为了获得当前发起请求的用户名,后面也许这个信息应该放在请求体中?)
@component
@serverendpoint("/web/{user}")
成员变量
/**
* 与某个客户端的连接对话,需要通过它来给客户端发送消息
*/
private session session;
/**
* 标识当前连接客户端的用户名
*/
private string user;
/**
* 线程安全的hashmap,存每个客户端的连接,key 即是 name, value这里是类名,
当然,应该把session和user抽离出去作为一个单独的类,然后作为value
*/
private static concurrenthashmap<string, websocket> websocketset = new concurrenthashmap<>();
@onopen
/**
* 新的连接
* @param session
* @param user (从路径参数中取的值)
*/
@onopen
public void onopen(session session, @pathparam(value = "user") string user){
//把用户信息(用户名)保存到session
session.getuserproperties().put("user", user);
this.session = session;
this.user = user;
// name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过user来区分
websocketset.put(user, this);
system.out.println("[websocket] 连接成功,当前连接人数为:={}" + websocketset.size());
}
@onmessage
/**
* 接收消息
* @param msg
*/
@onmessage
public void onmessage(string msg){
system.out.println("[websocket] 收到消息:{}" + msg);
/*
说明a要向b发送信息
注意,websocket发送的信息本质上一串字符串,所以校验规则可以自定义
这里是在发送的消息前带上了 user:{接受消息的用户名}
*/
if(msg.indexof("user:") == 0){
//截取用户名
string user = msg.substring("user:{".length(), msg.indexof("}"));
msg = msg.substring(msg.indexof("}") + 1);
appointsending(user, msg);
}else{
groupsending(msg);
}
}
私聊
/**
* 指定发送
* @param name
* @param message
*/
public void appointsending(string name,string message){
try {
websocketset.get(name).session.getbasicremote().sendtext(message);
}catch (exception e){
e.printstacktrace();
}
}
群发
/**
* 群发
* @param message
*/
public void groupsending(string message){
for (string name : websocketset.keyset()){
//过滤掉自己的用户名,这样就不会给自己也发送消息
if(name == session.getuserproperties().get("user")){
continue;
}
try {
websocketset.get(name).session.getbasicremote().sendtext(message);
}catch (exception e){
e.printstacktrace();
}
}
}
@onclose
/**
* 关闭连接
*/
@onclose
public void onclose(){
websocketset.remove(this.user);
system.out.println("[websocket] 退出成功,当前连接人数为:={}" + websocketset.size());
}
完整示例
package com.yuziqikk.websocketdemo;
import org.springframework.stereotype.component;
import javax.websocket.onclose;
import javax.websocket.onmessage;
import javax.websocket.onopen;
import javax.websocket.session;
import javax.websocket.server.pathparam;
import javax.websocket.server.serverendpoint;
import java.util.concurrent.concurrenthashmap;
/**
* @package: com.yuziqikk.websocketdemo
* @author: yuziqikk
* @created: 22/5/2024 上午9:20
**/
@component
@serverendpoint("/web/{user}")
public class websocket {
/**
* 与某个客户端的连接对话,需要通过它来给客户端发送消息
*/
private session session;
/**
* 标识当前连接客户端的用户名
*/
private string user;
/**
* 线程安全的hashmap,存每个客户端的连接,key 即是 name
*/
private static concurrenthashmap<string, websocket> websocketset = new concurrenthashmap<>();
/**
* 新的连接
* @param session
* @param user
*/
@onopen
public void onopen(session session, @pathparam(value = "user") string user){
//把用户信息(用户名)保存到session
session.getuserproperties().put("user", user);
this.session = session;
this.user = user;
// name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过user来区分
websocketset.put(user, this);
system.out.println("[websocket] 连接成功,当前连接人数为:={}" + websocketset.size());
}
/**
* 关闭连接
*/
@onclose
public void onclose(){
websocketset.remove(this.user);
system.out.println("[websocket] 退出成功,当前连接人数为:={}" + websocketset.size());
}
/**
* 接受消息
* @param msg
*/
@onmessage
public void onmessage(string msg){
system.out.println("[websocket] 收到消息:{}" + msg);
//说明a要向b发送信息
if(msg.indexof("user:") == 0){
//截取用户名
string user = msg.substring("user:{".length(), msg.indexof("}"));
msg = msg.substring(msg.indexof("}") + 1);
appointsending(user, msg);
}else{
groupsending(msg);
}
}
/**
* 群发
* @param message
*/
public void groupsending(string message){
for (string name : websocketset.keyset()){
if(name == session.getuserproperties().get("user")){
continue;
}
try {
websocketset.get(name).session.getbasicremote().sendtext(message);
}catch (exception e){
e.printstacktrace();
}
}
}
/**
* 指定发送
* @param name
* @param message
*/
public void appointsending(string name,string message){
try {
websocketset.get(name).session.getbasicremote().sendtext(message);
}catch (exception e){
e.printstacktrace();
}
}
}
发表评论