Ping/Pong 和连接数限制是否应该框架支持?
建议框架提供,但作为可选配置,而不是强制。
理由是这两个功能虽然在应用层能实现,但属于基础设施关注点而非业务逻辑,每个用这个框架的人都要自己写一遍是不合理的。参考 Netty、Spring WebSocket 的做法,都是框架内置但默认关闭:
data class WsConfig(
val pingIntervalMs: Long = 0, // 0 = 禁用,用户选择开启
val pingTimeoutMs: Long = 10_000,
val maxConnections: Int = 0, // 0 = 不限制
)这样高级用户可以自己实现定制逻辑,普通用户配个参数就解决了。
WebSocket + 虚拟线程且保证顺序
你说得对,有一个方案可以同时满足:每个连接绑定一个专属虚拟线程,消息投递到该连接的 BlockingQueue,由虚拟线程串行消费。
IO线程: onFullTextMessage → queue.offer(msg) // 非阻塞,极快
虚拟线程: while(true) { msg = queue.take(); dispatch(msg) } // 可以安全阻塞顺序天然保证,因为单个连接只有一个消费线程。虚拟线程阻塞在 queue.take() 或业务 IO 时,不占用任何 carrier thread。
具体实现改动集中在 WsConnection 和 handleWebSocketUpgrade 两处:
class WsConnection internal constructor(...) {
// 每个连接独享的消息队列
private val messageQueue = LinkedBlockingQueue<WsMessage>()
private val dispatchThread: Thread // 虚拟线程
init {
dispatchThread = Thread.ofVirtual().start {
while (!isClosed) {
val msg = try {
// poll + 超时,避免连接关闭后线程永久挂起
messageQueue.poll(5, TimeUnit.SECONDS) ?: continue
} catch (e: InterruptedException) {
break
}
val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
callbacks.forEach { cb ->
runCatching { cb.accept(msg) }.onFailure { dispatchError(it) }
}
}
}
}
internal fun dispatchMessage(message: WsMessage) {
if (isClosed) return
messageQueue.offer(message) // IO 线程只做入队,立即返回
}
fun close(reason: WsCloseReason) {
if (!closed.compareAndSet(false, true)) return
dispatchThread.interrupt() // 唤醒虚拟线程让它退出
// ... 其余逻辑不变
}
}几个细节值得注意:
队列积压问题:如果业务处理慢,队列会无限增长。可以用 ArrayBlockingQueue(capacity) 替换,配合 wsConfig.messageQueueSize,积压时触发背压或直接关闭连接。
dispatchError 的线程:error 回调目前还在 IO 线程触发,如果想一致,也可以把 error 包装成特殊消息投入同一个队列,用 sealed class 区分:
private sealed class DispatchEvent {
data class Message(val msg: WsMessage) : DispatchEvent()
data class Error(val err: Throwable) : DispatchEvent()
}虚拟线程的数量:每个 WsConnection 一个虚拟线程,10000 个连接就是 10000 个虚拟线程,这完全在虚拟线程的设计范围内,不需要担心。