在 spring boot 中使用 kotlin 配置 websocket 的完整流程如下(包含基础配置、安全增强和性能优化):
一、添加依赖 (build.gradle.kts 或 pom.xml)
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin") // json 支持
}
二、基础 websocket 配置
1. 启用 websocket 支持
@configuration
@enablewebsocket
class websocketconfig : websocketconfigurer {
@autowired
lateinit var mywebsockethandler: mywebsockethandler
@autowired
lateinit var handshakeinterceptor: authhandshakeinterceptor
override fun registerwebsockethandlers(registry: websockethandlerregistry) {
registry.addhandler(mywebsockethandler, "/ws")
.addinterceptors(handshakeinterceptor)
.setallowedorigins("*") // 生产环境应限制域名
}
}
三、核心组件实现
1. websocket 消息处理器
@component
class mywebsockethandler : textwebsockethandler() {
private val sessions = concurrenthashmap<string, websocketsession>()
private val logger = loggerfactory.getlogger(this::class.java)
// 连接建立
override fun afterconnectionestablished(session: websocketsession) {
val userid = session.attributes["userid"] as? string
userid?.let {
sessions[it] = session
logger.info("user $it connected, total: ${sessions.size}")
} ?: run {
session.close(closestatus.bad_data)
}
}
// 处理文本消息
override fun handletextmessage(session: websocketsession, message: textmessage) {
val payload = message.payload
logger.debug("received message: $payload")
// 示例:广播消息
sessions.values.foreach {
it.sendmessage(textmessage("broadcast: $payload"))
}
}
// 连接关闭
override fun afterconnectionclosed(session: websocketsession, status: closestatus) {
sessions.values.remove(session)
logger.info("connection closed: ${status.reason}")
}
// 主动推送方法
fun sendtouser(userid: string, message: string) {
sessions[userid]?.sendmessage(textmessage(message))
}
}
2. 握手拦截器(安全校验)
@component
class authhandshakeinterceptor : handshakeinterceptor {
override fun beforehandshake(
request: serverhttprequest,
response: serverhttpresponse,
wshandler: websockethandler,
attributes: mutablemap<string, any>
): boolean {
// 1. 校验 token
val token = request.uri.query?.split("token=")?.get(1)
if (!validatetoken(token)) {
response.statuscode = httpstatus.unauthorized
return false
}
// 2. 获取用户信息
val userid = parseuseridfromtoken(token)
attributes["userid"] = userid
// 3. 记录握手日志
loghandshake(request.remoteaddress?.hostname)
return true
}
private fun validatetoken(token: string?): boolean {
// 实现 jwt 校验逻辑
return token?.startswith("valid_") == true
}
}
四、客户端连接示例(javascript + kotlin)
1. web 前端连接
const socket = new websocket(`ws://${location.host}/ws?token=valid_123`);
socket.onmessage = (event) => {
console.log('received:', event.data);
};
// 发送心跳包保持连接
setinterval(() => {
socket.send(json.stringify({ type: "heartbeat" }));
}, 30000);
2. android 客户端(kotlin)
class websocketclient(url: string) : websocketlistener() {
private val client = okhttpclient()
private var ws: websocket? = null
init {
val request = request.builder().url(url).build()
ws = client.newwebsocket(request, this)
}
override fun onmessage(websocket: websocket, text: string) {
println("received: $text")
}
fun sendmessage(msg: string) {
ws?.send(msg)
}
}
五、高级配置
1. 消息压缩配置
@bean
fun websocketcontainer(): servletservercontainerfactorybean {
return servletservercontainerfactorybean().apply {
setmaxtextmessagebuffersize(8192)
setmaxbinarymessagebuffersize(8192)
setasyncsendtimeout(30000l) // 30秒异步发送超时
}
}
2. 集群支持(redis 广播)
@configuration
@enableredisrepositories
class redispubsubconfig {
@bean
fun redistemplate(connectionfactory: redisconnectionfactory): redistemplate<string, string> {
return redistemplate<string, string>().apply {
setconnectionfactory(connectionfactory)
}
}
@bean
fun topiclisteneradapter(handler: messagelistener): channeltopic {
return channeltopic("websocket-messages")
}
}
六、监控与调试
1. 端点监控
@restcontroller
class websocketmetricscontroller(
private val handler: mywebsockethandler
) {
@getmapping("/metrics/websocket")
fun getmetrics(): map<string, any> {
return mapof(
"activeconnections" to handler.getsessioncount(),
"lastmessagetime" to handler.getlastactivity()
)
}
}
2. 日志配置 (logback-spring.xml)
<logger name="org.springframework.web.socket" level="debug"/> <logger name="com.example.websocket" level="trace"/>
七、常见问题解决方案
| 问题现象 | 解决方案 |
|---|---|
| 连接频繁断开 | 添加心跳机制,调整 setasyncsendtimeout |
| 跨域失败 | 精确配置 .setallowedorigins("https://your-domain.com") |
| 消息顺序错乱 | 使用 @sendtouser(destination = "/queue", broadcast = false) 点对点发送 |
| 内存泄漏 | 定期检查 sessions map,添加连接超时清理逻辑 |
| 高并发时性能下降 | 启用异步消息处理,使用 redis pub/sub 分流消息 |
八、安全增强建议
启用 wss 协议:
# nginx 配置示例 location /ws { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header upgrade $http_upgrade; proxy_set_header connection "upgrade"; proxy_set_header x-real-ip $remote_addr; proxy_ssl_protocols tlsv1.2 tlsv1.3; }限流防护:
@bean fun websocketratelimiter(): websockethandlerdecoratorfactory { return object : websockethandlerdecoratorfactory { override fun decorate(handler: websockethandler): websockethandler { val ratelimiter = ratelimiter.create(100) // 100次/秒 return object : websockethandlerdecorator(handler) { override fun handlemessage(session: websocketsession, message: websocketmessage<*>) { if (!ratelimiter.tryacquire()) { session.close(closestatus.policy_violation) return } super.handlemessage(session, message) } } } } }
九、性能测试建议
使用 jmeter 压测:
<!-- websocket 压测计划示例 --> <websocketsampler> <connecttime>5000</connecttime> <responsetimeout>10000</responsetimeout> <payload>{ "type": "stress", "data": "test" }</payload> </websocketsampler>监控指标:
• 单节点最大连接数
• 消息往返延迟 (rtt)
• 内存占用增长率
通过以上配置,可以实现一个高性能、安全可靠的企业级 websocket 服务,支持从开发到生产的全生命周期管理。
总结
到此这篇关于spring配置websocket的文章就介绍到这了,更多相关spring配置websocket内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论