逐一回答:
1. 虚拟线程能解决阻塞 IO 线程的问题吗?
不能,两者是完全不同的线程。
虚拟线程负责的是 HTTP 请求处理(dispatchToWorker 那条路径),但 WebSocket 消息回调走的是 Undertow 的 XNIO IO 线程,这是 Undertow 内部的 native 线程,不受你的 virtualExecutor 管控。
HTTP 请求: IO线程 → virtualExecutor(虚拟线程) ✅ 阻塞没问题
WS 回调: IO线程 → dispatchMessage() → onMessage(cb) ❌ 还是在 IO 线程虚拟线程对 WebSocket 这条路径没有任何影响。如果真的想让 onMessage 可以安全阻塞,需要框架层面把回调 dispatch 出去:
// 在 dispatchMessage 里改成投递到 executor
internal fun dispatchMessage(message: WsMessage) {
if (isClosed) return
val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
messageExecutor.execute { // 投递到虚拟线程或 worker
callbacks.forEach { cb ->
runCatching { cb.accept(message) }.onFailure { dispatchError(it) }
}
}
}不过这会引入新问题:消息顺序能否保证?如果用 newVirtualThreadPerTaskExecutor,两条消息可能乱序到达回调。需要权衡是否值得。
2. Ping/Pong 和连接数限制
Undertow 的 Ping/Pong 支持
Undertow 的 AbstractReceiveListener 有 onPingMessage 和 onPongMessage 回调,但默认不自动回复 Ping,也不主动发 Ping,需要自己实现。好消息是代码量不大:
// 在 AbstractReceiveListener 里加:
override fun onPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage) {
// 自动回复 Pong(协议要求)
val data = message.data
try {
WebSockets.sendPongBlocking(data.resource.firstOrNull() ?: ByteBuffer.allocate(0), channel)
} finally {
data.free()
}
}主动发 Ping 则需要你在框架层起一个心跳定时器,大致结构:
// wsConfig 里加:
data class WsConfig(
val idleTimeoutMs: Long = 60_000,
val maxMessageSizeBytes: Long = 65536,
val pingIntervalMs: Long = 0, // 0 = 禁用
val pingTimeoutMs: Long = 10_000,
)
// 连接建立后:
if (wsConfig.pingIntervalMs > 0) {
val lastPong = AtomicLong(System.currentTimeMillis())
// onPongMessage 里更新 lastPong
heartbeatScheduler.scheduleAtFixedRate({
if (System.currentTimeMillis() - lastPong.get() > wsConfig.pingTimeoutMs) {
connection.close(WsCloseReason.Protocol(1001, "ping timeout"))
return@scheduleAtFixedRate
}
runCatching { WebSockets.sendPingBlocking(ByteBuffer.allocate(0), wsChannel) }
}, wsConfig.pingIntervalMs, wsConfig.pingIntervalMs, TimeUnit.MILLISECONDS)
}heartbeatScheduler 可以是一个全局的 ScheduledExecutorService(1个线程),所有连接共用,开销很小。
连接数限制
Undertow 的 RequestLimitingHandler 是针对 HTTP 并发请求的,对 WebSocket 长连接不适用(连接建立后 HTTP exchange 就结束了)。需要自己在应用层计数:
private val activeConnections = AtomicInteger(0)
// handleWebSocketUpgrade 入口处:
val current = activeConnections.incrementAndGet()
if (current > wsConfig.maxConnections && wsConfig.maxConnections > 0) {
activeConnections.decrementAndGet()
exchange.statusCode = 503
exchange.endExchange()
return
}
// 连接关闭时:
connection.onClose { activeConnections.decrementAndGet() }这两个都需要自己实现,没有现成配置项。
3. beforeUpgrade 和 WS 中间件重复?
你说得对,可以去掉这个建议。
回头看你的架构,WS 中间件在 HTTP 升级阶段运行,可以:
检查
Authorizationheader 返回 401往
states里注入用户信息供后续使用
这已经完整覆盖了 beforeUpgrade 的语义,单独暴露一个钩子反而是 API 冗余。
4. 如何发送 1009 关闭帧
先看 Undertow 目前的行为:超出 maxTextBufferSize / maxBinaryBufferSize 时,Undertow 会直接关闭底层 TCP 连接,不发 close frame,然后触发 onError。
要正确处理,需要在 onError 里识别这个错误并补发关闭帧:
override fun onError(channel: WebSocketChannel, error: Throwable) {
if (isMessageTooLarge(error)) {
// 补发 1009,再关闭
runCatching {
WebSockets.sendCloseBlocking(1009, "Message Too Big", channel)
}
connection.close(WsCloseReason.Protocol(1009, "Message Too Big"))
} else if (error is IOException) {
connection.close(WsCloseReason.ClientDisconnected)
} else {
connection.dispatchError(error)
connection.close(WsCloseReason.Error(error))
}
}
private fun isMessageTooLarge(error: Throwable): Boolean {
// Undertow 抛出的是 IOException,message 里含有特定标识
// 需要实测确认具体的异常类型/message,这里是示意
return error.message?.contains("UT001015") == true // Undertow 消息过大错误码
}需要注意的是,如果 Undertow 在内部已经关闭了 TCP 连接,sendCloseBlocking 可能会抛异常,所以要用 runCatching 包裹,保证后续 connection.close() 一定能执行。
实际的错误码/message 需要你运行时抓一下具体异常,Undertow 的错误信息格式是 UT 开头加数字,可以在 Undertow 源码里搜 maxTextBufferSize。