逐一回答:

1. 虚拟线程能解决阻塞 IO 线程的问题吗?

不能,两者是完全不同的线程。

虚拟线程负责的是 HTTP 请求处理(dispatchToWorker 那条路径),但 WebSocket 消息回调走的是 Undertow 的 XNIO IO 线程,这是 Undertow 内部的 native 线程,不受你的 virtualExecutor 管控。

HTTP 请求:   IO线程 → virtualExecutor(虚拟线程) ✅ 阻塞没问题
WS 回调:     IO线程 → dispatchMessage() → onMessage(cb) ❌ 还是在 IO 线程

虚拟线程对 WebSocket 这条路径没有任何影响。如果真的想让 onMessage 可以安全阻塞,需要框架层面把回调 dispatch 出去:

// 在 dispatchMessage 里改成投递到 executor
internal fun dispatchMessage(message: WsMessage) {
    if (isClosed) return
    val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
    messageExecutor.execute {  // 投递到虚拟线程或 worker
        callbacks.forEach { cb ->
            runCatching { cb.accept(message) }.onFailure { dispatchError(it) }
        }
    }
}

不过这会引入新问题:消息顺序能否保证?如果用 newVirtualThreadPerTaskExecutor,两条消息可能乱序到达回调。需要权衡是否值得。

2. Ping/Pong 和连接数限制

Undertow 的 Ping/Pong 支持

Undertow 的 AbstractReceiveListeneronPingMessageonPongMessage 回调,但默认不自动回复 Ping,也不主动发 Ping,需要自己实现。好消息是代码量不大:

// 在 AbstractReceiveListener 里加:
override fun onPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage) {
    // 自动回复 Pong(协议要求)
    val data = message.data
    try {
        WebSockets.sendPongBlocking(data.resource.firstOrNull() ?: ByteBuffer.allocate(0), channel)
    } finally {
        data.free()
    }
}

主动发 Ping 则需要你在框架层起一个心跳定时器,大致结构:

// wsConfig 里加:
data class WsConfig(
    val idleTimeoutMs: Long = 60_000,
    val maxMessageSizeBytes: Long = 65536,
    val pingIntervalMs: Long = 0,   // 0 = 禁用
    val pingTimeoutMs: Long = 10_000,
)

// 连接建立后:
if (wsConfig.pingIntervalMs > 0) {
    val lastPong = AtomicLong(System.currentTimeMillis())
    // onPongMessage 里更新 lastPong
    
    heartbeatScheduler.scheduleAtFixedRate({
        if (System.currentTimeMillis() - lastPong.get() > wsConfig.pingTimeoutMs) {
            connection.close(WsCloseReason.Protocol(1001, "ping timeout"))
            return@scheduleAtFixedRate
        }
        runCatching { WebSockets.sendPingBlocking(ByteBuffer.allocate(0), wsChannel) }
    }, wsConfig.pingIntervalMs, wsConfig.pingIntervalMs, TimeUnit.MILLISECONDS)
}

heartbeatScheduler 可以是一个全局的 ScheduledExecutorService(1个线程),所有连接共用,开销很小。

连接数限制

Undertow 的 RequestLimitingHandler 是针对 HTTP 并发请求的,对 WebSocket 长连接不适用(连接建立后 HTTP exchange 就结束了)。需要自己在应用层计数:

private val activeConnections = AtomicInteger(0)

// handleWebSocketUpgrade 入口处:
val current = activeConnections.incrementAndGet()
if (current > wsConfig.maxConnections && wsConfig.maxConnections > 0) {
    activeConnections.decrementAndGet()
    exchange.statusCode = 503
    exchange.endExchange()
    return
}

// 连接关闭时:
connection.onClose { activeConnections.decrementAndGet() }

这两个都需要自己实现,没有现成配置项。

3. beforeUpgrade 和 WS 中间件重复?

你说得对,可以去掉这个建议。

回头看你的架构,WS 中间件在 HTTP 升级阶段运行,可以:

这已经完整覆盖了 beforeUpgrade 的语义,单独暴露一个钩子反而是 API 冗余。

4. 如何发送 1009 关闭帧

先看 Undertow 目前的行为:超出 maxTextBufferSize / maxBinaryBufferSize 时,Undertow 会直接关闭底层 TCP 连接,不发 close frame,然后触发 onError

要正确处理,需要在 onError 里识别这个错误并补发关闭帧:

override fun onError(channel: WebSocketChannel, error: Throwable) {
    if (isMessageTooLarge(error)) {
        // 补发 1009,再关闭
        runCatching {
            WebSockets.sendCloseBlocking(1009, "Message Too Big", channel)
        }
        connection.close(WsCloseReason.Protocol(1009, "Message Too Big"))
    } else if (error is IOException) {
        connection.close(WsCloseReason.ClientDisconnected)
    } else {
        connection.dispatchError(error)
        connection.close(WsCloseReason.Error(error))
    }
}

private fun isMessageTooLarge(error: Throwable): Boolean {
    // Undertow 抛出的是 IOException,message 里含有特定标识
    // 需要实测确认具体的异常类型/message,这里是示意
    return error.message?.contains("UT001015") == true  // Undertow 消息过大错误码
}

需要注意的是,如果 Undertow 在内部已经关闭了 TCP 连接,sendCloseBlocking 可能会抛异常,所以要用 runCatching 包裹,保证后续 connection.close() 一定能执行。

实际的错误码/message 需要你运行时抓一下具体异常,Undertow 的错误信息格式是 UT 开头加数字,可以在 Undertow 源码里搜 maxTextBufferSize