LinkedBlockingQueue 的开销
坦白说开销不大,LinkedBlockingQueue 本身很轻量,空队列只有一个锁和两个指针。10000 个连接的内存占用可以忽略不计。
真正的开销是每个连接一个虚拟线程常驻 —— 虽然虚拟线程本身很便宜(初始栈几百字节),但 10000 个连接就有 10000 个虚拟线程阻塞在 queue.take(),这个规模其实也完全没问题,这正是虚拟线程的设计场景。
所以如果你决定走"框架内置"路线,这个方案是可行的。
框架只提供扩展点,应用层自己实现
这个思路更值得推荐。核心问题是:框架怎么暴露扩展点?
最自然的方式是允许用户自定义 dispatchMessage 的执行策略,抽一个 WsExecutor 接口:
fun interface WsMessageExecutor {
fun execute(connection: WsConnection, message: WsMessage, dispatch: Runnable)
}注册到 WsConfig:
data class WsConfig(
val idleTimeoutMs: Long = 60_000,
val maxMessageSizeBytes: Long = 65536,
val messageExecutor: WsMessageExecutor = WsMessageExecutor { _, _, dispatch -> dispatch.run() }
// 默认行为:在 IO 线程直接执行(保持现有行为)
)dispatchMessage 改成:
internal fun dispatchMessage(message: WsMessage) {
if (isClosed) return
val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
wsConfig.messageExecutor.execute(this, message) {
callbacks.forEach { cb ->
runCatching { cb.accept(message) }.onFailure { dispatchError(it) }
}
}
}这样应用层想要虚拟线程 + 顺序保证,自己实现一个 executor 挂上去就行:
// 应用层实现,框架不感知
class SequentialVirtualWsExecutor : WsMessageExecutor, AutoCloseable {
// 每个连接一个队列+虚拟线程,用 connection 作 key
private val channels = ConcurrentHashMap<WsConnection, ExecutorService>()
override fun execute(connection: WsConnection, message: WsMessage, dispatch: Runnable) {
val executor = channels.computeIfAbsent(connection) {
// 单线程虚拟线程executor,保证该连接消息顺序
Executors.newSingleThreadExecutor(Thread.ofVirtual().factory())
}
executor.execute(dispatch)
// 连接关闭时清理
if (connection.isClosed) {
channels.remove(connection)?.shutdown()
}
}
override fun close() {
channels.values.forEach { it.shutdown() }
channels.clear()
}
}
// 注册
val wsConfig = WsConfig(
messageExecutor = SequentialVirtualWsExecutor()
)Ping/Pong 和连接数限制同理,也不需要框架感知,通过已有的回调机制在应用层组合实现:
app.ws("/chat") { conn ->
connectionCounter.incrementAndGet()
conn.onClose { connectionCounter.decrementAndGet() }
// ping/pong 逻辑也在这里注册
}这个设计的好处是框架核心保持稳定,各种策略(顺序、并发、限流)都是外挂的,用户按需取用,框架本身不需要为每种场景都内置一套机制。