LinkedBlockingQueue 的开销

坦白说开销不大,LinkedBlockingQueue 本身很轻量,空队列只有一个锁和两个指针。10000 个连接的内存占用可以忽略不计。

真正的开销是每个连接一个虚拟线程常驻 —— 虽然虚拟线程本身很便宜(初始栈几百字节),但 10000 个连接就有 10000 个虚拟线程阻塞在 queue.take(),这个规模其实也完全没问题,这正是虚拟线程的设计场景。

所以如果你决定走"框架内置"路线,这个方案是可行的。

框架只提供扩展点,应用层自己实现

这个思路更值得推荐。核心问题是:框架怎么暴露扩展点?

最自然的方式是允许用户自定义 dispatchMessage 的执行策略,抽一个 WsExecutor 接口:

fun interface WsMessageExecutor {
    fun execute(connection: WsConnection, message: WsMessage, dispatch: Runnable)
}

注册到 WsConfig

data class WsConfig(
    val idleTimeoutMs: Long = 60_000,
    val maxMessageSizeBytes: Long = 65536,
    val messageExecutor: WsMessageExecutor = WsMessageExecutor { _, _, dispatch -> dispatch.run() }
    // 默认行为:在 IO 线程直接执行(保持现有行为)
)

dispatchMessage 改成:

internal fun dispatchMessage(message: WsMessage) {
    if (isClosed) return
    val callbacks = synchronized(messageCallbacks) { ArrayList(messageCallbacks) }
    wsConfig.messageExecutor.execute(this, message) {
        callbacks.forEach { cb ->
            runCatching { cb.accept(message) }.onFailure { dispatchError(it) }
        }
    }
}

这样应用层想要虚拟线程 + 顺序保证,自己实现一个 executor 挂上去就行:

// 应用层实现,框架不感知
class SequentialVirtualWsExecutor : WsMessageExecutor, AutoCloseable {
    // 每个连接一个队列+虚拟线程,用 connection 作 key
    private val channels = ConcurrentHashMap<WsConnection, ExecutorService>()

    override fun execute(connection: WsConnection, message: WsMessage, dispatch: Runnable) {
        val executor = channels.computeIfAbsent(connection) {
            // 单线程虚拟线程executor,保证该连接消息顺序
            Executors.newSingleThreadExecutor(Thread.ofVirtual().factory())
        }
        executor.execute(dispatch)
        
        // 连接关闭时清理
        if (connection.isClosed) {
            channels.remove(connection)?.shutdown()
        }
    }

    override fun close() {
        channels.values.forEach { it.shutdown() }
        channels.clear()
    }
}

// 注册
val wsConfig = WsConfig(
    messageExecutor = SequentialVirtualWsExecutor()
)

Ping/Pong 和连接数限制同理,也不需要框架感知,通过已有的回调机制在应用层组合实现:

app.ws("/chat") { conn ->
    connectionCounter.incrementAndGet()
    
    conn.onClose { connectionCounter.decrementAndGet() }
    
    // ping/pong 逻辑也在这里注册
}

这个设计的好处是框架核心保持稳定,各种策略(顺序、并发、限流)都是外挂的,用户按需取用,框架本身不需要为每种场景都内置一套机制。