方案对比
推荐:每连接虚拟线程 + 队列方案。
理由是"自定义executor"把复杂度转移给了用户,但绝大多数用户并不需要定制,只需要一个"能安全阻塞"的环境。虚拟线程方案开销确实轻量,且彻底消除了"不能在回调里阻塞"这个隐患,用户心智负担最低。
唯一前提是要求 Java 21+,但既然 HTTP 处理已经用了虚拟线程,这个前提已经成立。
所有修改汇总
1. WsConnection — 核心改动
引入虚拟线程消费循环,dispatchMessage 改为入队,IO线程不再执行回调。
class WsConnection internal constructor(...) {
// 新增:消息派发队列,容量可配置防止积压
private val messageQueue = LinkedBlockingQueue<Any>() // Any 用于区分 message/error
// 新增:每连接专属虚拟线程
private val dispatchThread: Thread = Thread.ofVirtual().start {
while (true) {
val event = try {
messageQueue.poll(5, TimeUnit.SECONDS) ?: if (isClosed) break else continue
} catch (e: InterruptedException) {
break
}
when (event) {
is WsMessage -> {
val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
callbacks.forEach { cb ->
runCatching { cb.accept(event) }.onFailure { doDispatchError(it) }
}
}
is Throwable -> doDispatchError(event)
}
}
}
// 修改:IO线程只入队,立即返回
internal fun dispatchMessage(message: WsMessage) {
if (isClosed) return
messageQueue.offer(message)
}
// 修改:error 也走队列,保证和 message 的相对顺序
internal fun dispatchError(error: Throwable) {
if (isClosed) return
messageQueue.offer(error)
}
// 实际执行 error 回调,只在虚拟线程里调用
private fun doDispatchError(error: Throwable) {
val callbacks = synchronized(errorCallbacks) { ArrayList(errorCallbacks) }
callbacks.forEach { cb -> runCatching { cb.accept(error) } }
}
// 修改:close 时中断虚拟线程
fun close(reason: WsCloseReason) {
if (!closed.compareAndSet(false, true)) return
dispatchThread.interrupt() // 新增:唤醒虚拟线程使其退出
// ... 其余逻辑不变
}
}原因:
dispatchMessage原来在 IO 线程直接执行回调,用户一旦在onMessage里做阻塞操作(DB查询、HTTP调用)就会阻塞 Undertow IO 线程,影响整个服务器error 也走同一队列,保证 error 和 message 的相对顺序,避免"消息还没处理,error已经先到"的混乱
2. UndertowServer.handleWebSocketUpgrade — Ping/Pong 回复
补充 onPingMessage 回调,自动回复 Pong(WebSocket 协议要求)。
// 在 AbstractReceiveListener 里新增:
override fun onPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage) {
val data = message.data
try {
WebSockets.sendPongBlocking(
data.resource.firstOrNull() ?: ByteBuffer.allocate(0),
channel
)
} finally {
data.free()
}
}原因:RFC 6455 要求收到 Ping 必须回复 Pong,否则对端会认为连接异常。Undertow 不自动处理,必须手动实现。不加这个,使用了心跳机制的客户端(浏览器通常不发 Ping,但服务端代理、负载均衡器会)会误判连接断开。
3. UndertowServer.handleWebSocketUpgrade — 1009 消息过大处理
在 onError 里识别消息过大错误,补发 1009 关闭帧。
override fun onError(channel: WebSocketChannel, error: Throwable) {
if (isMessageTooLarge(error)) {
runCatching {
WebSockets.sendCloseBlocking(1009, "Message Too Big", channel)
}
connection.close(WsCloseReason.Protocol(1009, "Message Too Big"))
} else if (error is IOException) {
connection.close(WsCloseReason.ClientDisconnected)
} else {
connection.dispatchError(error)
connection.close(WsCloseReason.Error(error))
}
}
private fun isMessageTooLarge(error: Throwable): Boolean {
// 需实测确认 Undertow 具体异常,以实际抓到的为准
return error is IOException && error.message?.contains("UT001015") == true
}原因:超出 maxMessageSizeBytes 时 Undertow 直接断开 TCP,客户端收不到任何说明。发 1009 是协议规范要求,客户端可以据此做出正确响应(如缩小消息、提示用户),而不是困惑地面对一个莫名断开的连接。
4. WsConfig — 补充 Ping 主动发送配置(可选)
框架提供配置项,默认关闭,用户按需开启:
data class WsConfig(
val idleTimeoutMs: Long = 60_000,
val maxMessageSizeBytes: Long = 65536,
// 新增:主动 Ping 心跳,0 表示禁用
val pingIntervalMs: Long = 0,
val pingTimeoutMs: Long = 10_000,
)配合 UndertowServer 里一个共享的 ScheduledExecutorService(单线程即可,所有连接共用)在连接建立时注册心跳任务,连接关闭时取消。具体实现参考之前讨论的结构,此处不赘述。
原因:纯靠 idleTimeout 在某些网络环境下不可靠(NAT、透明代理会静默丢弃长时间无数据的连接),主动 Ping 能更早发现僵尸连接。做成可选配置而非强制,是因为并非所有场景都需要,且客户端有时也会主动发 Ping。