方案对比

推荐:每连接虚拟线程 + 队列方案。

理由是"自定义executor"把复杂度转移给了用户,但绝大多数用户并不需要定制,只需要一个"能安全阻塞"的环境。虚拟线程方案开销确实轻量,且彻底消除了"不能在回调里阻塞"这个隐患,用户心智负担最低。

唯一前提是要求 Java 21+,但既然 HTTP 处理已经用了虚拟线程,这个前提已经成立。

所有修改汇总

1. WsConnection — 核心改动

引入虚拟线程消费循环dispatchMessage 改为入队,IO线程不再执行回调。

class WsConnection internal constructor(...) {

    // 新增:消息派发队列,容量可配置防止积压
    private val messageQueue = LinkedBlockingQueue<Any>() // Any 用于区分 message/error

    // 新增:每连接专属虚拟线程
    private val dispatchThread: Thread = Thread.ofVirtual().start {
        while (true) {
            val event = try {
                messageQueue.poll(5, TimeUnit.SECONDS) ?: if (isClosed) break else continue
            } catch (e: InterruptedException) {
                break
            }
            when (event) {
                is WsMessage -> {
                    val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
                    callbacks.forEach { cb ->
                        runCatching { cb.accept(event) }.onFailure { doDispatchError(it) }
                    }
                }
                is Throwable -> doDispatchError(event)
            }
        }
    }

    // 修改:IO线程只入队,立即返回
    internal fun dispatchMessage(message: WsMessage) {
        if (isClosed) return
        messageQueue.offer(message)
    }

    // 修改:error 也走队列,保证和 message 的相对顺序
    internal fun dispatchError(error: Throwable) {
        if (isClosed) return
        messageQueue.offer(error)
    }

    // 实际执行 error 回调,只在虚拟线程里调用
    private fun doDispatchError(error: Throwable) {
        val callbacks = synchronized(errorCallbacks) { ArrayList(errorCallbacks) }
        callbacks.forEach { cb -> runCatching { cb.accept(error) } }
    }

    // 修改:close 时中断虚拟线程
    fun close(reason: WsCloseReason) {
        if (!closed.compareAndSet(false, true)) return
        dispatchThread.interrupt() // 新增:唤醒虚拟线程使其退出
        // ... 其余逻辑不变
    }
}

原因

2. UndertowServer.handleWebSocketUpgrade — Ping/Pong 回复

补充 onPingMessage 回调,自动回复 Pong(WebSocket 协议要求)。

// 在 AbstractReceiveListener 里新增:
override fun onPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage) {
    val data = message.data
    try {
        WebSockets.sendPongBlocking(
            data.resource.firstOrNull() ?: ByteBuffer.allocate(0),
            channel
        )
    } finally {
        data.free()
    }
}

原因:RFC 6455 要求收到 Ping 必须回复 Pong,否则对端会认为连接异常。Undertow 不自动处理,必须手动实现。不加这个,使用了心跳机制的客户端(浏览器通常不发 Ping,但服务端代理、负载均衡器会)会误判连接断开。

3. UndertowServer.handleWebSocketUpgrade — 1009 消息过大处理

onError 里识别消息过大错误,补发 1009 关闭帧。

override fun onError(channel: WebSocketChannel, error: Throwable) {
    if (isMessageTooLarge(error)) {
        runCatching {
            WebSockets.sendCloseBlocking(1009, "Message Too Big", channel)
        }
        connection.close(WsCloseReason.Protocol(1009, "Message Too Big"))
    } else if (error is IOException) {
        connection.close(WsCloseReason.ClientDisconnected)
    } else {
        connection.dispatchError(error)
        connection.close(WsCloseReason.Error(error))
    }
}

private fun isMessageTooLarge(error: Throwable): Boolean {
    // 需实测确认 Undertow 具体异常,以实际抓到的为准
    return error is IOException && error.message?.contains("UT001015") == true
}

原因:超出 maxMessageSizeBytes 时 Undertow 直接断开 TCP,客户端收不到任何说明。发 1009 是协议规范要求,客户端可以据此做出正确响应(如缩小消息、提示用户),而不是困惑地面对一个莫名断开的连接。

4. WsConfig — 补充 Ping 主动发送配置(可选)

框架提供配置项,默认关闭,用户按需开启:

data class WsConfig(
    val idleTimeoutMs: Long = 60_000,
    val maxMessageSizeBytes: Long = 65536,
    // 新增:主动 Ping 心跳,0 表示禁用
    val pingIntervalMs: Long = 0,
    val pingTimeoutMs: Long = 10_000,
)

配合 UndertowServer 里一个共享的 ScheduledExecutorService(单线程即可,所有连接共用)在连接建立时注册心跳任务,连接关闭时取消。具体实现参考之前讨论的结构,此处不赘述。

原因:纯靠 idleTimeout 在某些网络环境下不可靠(NAT、透明代理会静默丢弃长时间无数据的连接),主动 Ping 能更早发现僵尸连接。做成可选配置而非强制,是因为并非所有场景都需要,且客户端有时也会主动发 Ping。