非常好,你提供的用户端 API 非常清晰,我基于它和对框架源码的深入分析,提供以下具体设计方案。

一、整体架构图

Code

用户层                      框架核心层                        服务器适配层
─────────────────────      ─────────────────────────────    ──────────────────────────
app.ws("/path") { }        Router (新增 WS 路由类型)         UndertowServer
app.wsUse("/path") { }     Context (升级前复用)              ├─ WebSocket 握手拦截
@Ws 注解                    WebSocketConnection              ├─ Undertow WebSocketChannel 适配
                           WebSocketMessage                  └─ WS Executor 管理
                           WsCloseReason
                           EventBus (新增 WS 事件)

二、新增类型 & 文件清单

1. WebSocketMessage.kt(新增文件)

Kotlin

package io.github.cymoo.colleen

import java.nio.ByteBuffer

/**
 * Represents an incoming WebSocket message.
 */
sealed class WebSocketMessage {
    class Text(private val data: String) : WebSocketMessage() {
        fun text(): String = data
    }
    class Binary(private val data: ByteArray) : WebSocketMessage() {
        fun bytes(): ByteArray = data
        fun buffer(): ByteBuffer = ByteBuffer.wrap(data)
    }
}

2. WsCloseReason.kt(新增文件)

Kotlin

package io.github.cymoo.colleen

/**
 * WebSocket close reason, containing status code and optional description.
 */
data class WsCloseReason(
    val code: Int,
    val reason: String = ""
) {
    companion object {
        val NORMAL = WsCloseReason(1000, "Normal closure")
        val GOING_AWAY = WsCloseReason(1001, "Going away")
    }
}

3. WebSocketConnection.kt(新增文件)

核心连接抽象,对标 SseConnection 的设计思路,但改为事件回调模式:

Kotlin

package io.github.cymoo.colleen

import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer

/**
 * WebSocket connection handle.
 *
 * Provides:
 * - Lifecycle callbacks: onOpen / onMessage / onClose / onError
 * - Send API: text and binary
 * - Access to upgrade-time context: pathParam, query, attribute, state
 *
 * Threading model:
 * - Callbacks are invoked on the WebSocket IO thread (Undertow manages).
 * - send() is thread-safe (synchronized internally).
 */
class WebSocketConnection internal constructor(
    private val writer: WebSocketWriter,
    private val context: Context  // 升级前的 HTTP Context,可复用
) : AutoCloseable {

    private val closed = AtomicBoolean(false)
    val isClosed: Boolean get() = closed.get()

    // === Callback holders ===
    internal var onOpenHandler: (() -> Unit)? = null
    internal var onMessageHandler: ((WebSocketMessage) -> Unit)? = null
    internal var onCloseHandler: ((WsCloseReason) -> Unit)? = null
    internal var onErrorHandler: ((Throwable) -> Unit)? = null

    // === Lifecycle registration ===
    fun onOpen(handler: () -> Unit) { onOpenHandler = handler }
    fun onMessage(handler: (WebSocketMessage) -> Unit) { onMessageHandler = handler }
    fun onClose(handler: (WsCloseReason) -> Unit) { onCloseHandler = handler }
    fun onError(handler: (Throwable) -> Unit) { onErrorHandler = handler }

    // === Send API ===

    @Throws(IOException::class)
    fun send(text: String) {
        ensureOpen()
        writer.sendText(text)
    }

    @Throws(IOException::class)
    fun send(data: ByteArray) {
        ensureOpen()
        writer.sendBinary(data)
    }

    // === Context accessors (from upgrade-time HTTP request) ===

    /** Path parameter from the WS route pattern */
    fun pathParam(name: String): String? = context.pathParam(name)

    /** Query parameter from the upgrade URL */
    fun query(name: String): String? = context.query(name)

    /** Retrieve state set by middleware before upgrade */
    @Suppress("UNCHECKED_CAST")
    fun <T> attribute(key: String): T? = context.getState<T>(key)

    /** Access to service container */
    fun <T : Any> getService(clazz: Class<T>): T = context.getService(clazz)
    inline fun <reified T : Any> getService(): T = getService(T::class.java)

    /** The full Context (for advanced use cases) */
    val ctx: Context get() = context

    // === Close ===

    override fun close() {
        close(WsCloseReason.NORMAL)
    }

    fun close(reason: WsCloseReason) {
        if (closed.compareAndSet(false, true)) {
            writer.close(reason.code, reason.reason)
        }
    }

    private fun ensureOpen() {
        if (closed.get()) throw IOException("WebSocket connection is closed")
    }
}

4. WebSocketWriter.kt(新增文件)

底层写入抽象(对标 SseWriter),由 Undertow 适配层实现:

Kotlin

package io.github.cymoo.colleen

import java.io.IOException

/**
 * Low-level WebSocket writer abstraction.
 *
 * Implementations are provided by the server adapter (e.g., Undertow).
 */
interface WebSocketWriter {
    @Throws(IOException::class)
    fun sendText(text: String)

    @Throws(IOException::class)
    fun sendBinary(data: ByteArray)

    fun close(code: Int, reason: String)
}

5. Ws.kt(新增注解文件)

Kotlin

package io.github.cymoo.colleen

/**
 * Marks a controller method as a WebSocket endpoint.
 *
 * Usage:
 * ```
 * @Ws("/echo")
 * fun echo(conn: WebSocketConnection) { ... }
 * ```
 */
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class Ws(val value: String)

三、已有文件的修改

6. Response.kt / CoreTypes.kt — 新增 RawResponseBody.WebSocket

Kotlin

// 在 RawResponseBody sealed class 中新增:
sealed class RawResponseBody {
    // ... 现有: Empty, Bytes, Stream, Sse
    data class WebSocket(
        val handler: Consumer<WebSocketConnection>,
        val wsConfig: WsEndpointConfig? = null
    ) : RawResponseBody()
}

// 在 ResponseBody sealed class 中新增:
sealed class ResponseBody {
    // ... 现有类型
    data class WebSocket(
        val handler: Consumer<WebSocketConnection>
    ) : ResponseBody()
}

7. Context.kt — 新增 ctx.ws() 方法

Kotlin

// 在 Context 类中新增:

/**
 * Upgrades the HTTP connection to WebSocket.
 *
 * Must be called during route handling; the actual upgrade is
 * performed by the server adapter after the handler returns.
 */
fun ws(handler: Consumer<WebSocketConnection>) = response.ws(handler)

对应 Response 类中新增:

Kotlin

fun ws(handler: Consumer<WebSocketConnection>) {
    // 不需要设置 Content-Type 等 header,握手由 Undertow 处理
    body = ResponseBody.WebSocket(handler)
}

8. Colleen.kt — 新增路由注册方法

Kotlin

// === WebSocket 路由 ===

/**
 * Registers a WebSocket endpoint.
 *
 * The handler receives a WebSocketConnection and should register
 * onOpen / onMessage / onClose / onError callbacks.
 */
fun ws(path: String, handler: Consumer<WebSocketConnection>) {
    // 内部注册为一个特殊的 GET handler,在路由匹配后判断是否为 WS 升级
    router.addWebSocketRoute(path, handler)
}

// === WebSocket 中间件 ===

/**
 * Registers middleware that runs before WebSocket upgrade.
 *
 * This middleware receives the standard Context (HTTP upgrade request)
 * and can reject the connection by throwing an exception.
 */
fun wsUse(path: String, middleware: Middleware) {
    router.addWebSocketMiddleware(path, middleware)
}

// 也可以全局:
fun wsUse(middleware: Middleware) {
    router.addWebSocketMiddleware("/*", middleware)
}

9. Router.kt — 新增 WebSocket 路由类型

Kotlin

// 在 Router 中新增:

internal data class WebSocketRoute(
    val pattern: String,
    val handler: Consumer<WebSocketConnection>,
    val middlewares: List<Middleware> = emptyList()
)

private val wsRoutes = mutableListOf<WebSocketRoute>()

fun addWebSocketRoute(path: String, handler: Consumer<WebSocketConnection>) {
    wsRoutes.add(WebSocketRoute(normalizePath(path), handler))
}

fun addWebSocketMiddleware(path: String, middleware: Middleware) {
    // 将中间件关联到匹配 path 的 WS 路由
    wsMiddlewares.add(path to middleware)
}

/**
 * Finds a matching WebSocket route for the given path.
 * Returns null if no WS route matches (fallback to normal HTTP).
 */
fun matchWebSocketRoute(path: String): WebSocketRouteMatch? { ... }

10. Event.kt — 新增 WebSocket 生命周期事件

Kotlin

// 在 Event sealed class 中新增:

/** Emitted when a WebSocket connection is successfully established. */
class WebSocketConnected(val connection: WebSocketConnection) : Event(false)

/** Emitted when a WebSocket connection is closed. */
class WebSocketDisconnected(
    val connection: WebSocketConnection,
    val reason: WsCloseReason
) : Event(false)

11. Config.kt / ServerConfig — 新增 WebSocket 配置项

Kotlin

// 在 ServerConfig 中新增:

/** WebSocket idle timeout in milliseconds. Default: 5 minutes. */
var wsIdleTimeout: Long = 300_000

/** Maximum WebSocket message size in bytes. Default: 64 KB. */
var maxWebSocketMessageSize: Long = 64 * 1024

12. Scanner.kt — 支持 @Ws 注解扫描

修改控制器扫描逻辑,识别 @Ws 注解的方法,将其注册为 WebSocket 路由。方法签名必须为 (WebSocketConnection) -> Unit

四、Undertow 适配层

13. UndertowServer.kt — 核心改动

Kotlin

// 新增 import
import io.undertow.websockets.WebSocketConnectionCallback
import io.undertow.websockets.WebSocketProtocolHandshakeHandler
import io.undertow.websockets.core.*
import io.undertow.websockets.spi.WebSocketHttpExchange
import java.nio.ByteBuffer

class UndertowServer(private val config: ServerConfig) : WebServer {

    // ... 现有代码 ...

    // === 新增 WebSocket 路由存储 ===
    private val wsRoutes = ConcurrentHashMap<String, WsRouteEntry>()

    internal data class WsRouteEntry(
        val handler: Consumer<WebSocketConnection>,
        val middlewares: List<Any> // 中间件链
    )

    /**
     * 注册 WebSocket 路由(由 Colleen 调用)。
     */
    fun registerWebSocketRoute(path: String, entry: WsRouteEntry) {
        wsRoutes[path] = entry
    }

    // === 修改 createRootHandler,加入 WebSocket 升级检测 ===

    private fun createRootHandler(): HttpHandler {
        return HttpHandler { exchange ->
            val requestStartNano = System.nanoTime()

            dispatchToWorker(exchange) {
                exchange.startBlocking()

                val request = UndertowRequestAdapter.adapt(exchange, config)
                val response = requestHandler(request)

                // 检查是否为 WebSocket 升级响应
                if (response.materializedBody is RawResponseBody.WebSocket) {
                    handleWebSocketUpgrade(response, exchange)
                    return@dispatchToWorker
                }

                // ... 原有 HTTP response 写入逻辑不变 ...
            }
        }
    }

    // === WebSocket 升级处理 ===

    private fun handleWebSocketUpgrade(response: Response, exchange: HttpServerExchange) {
        val wsBody = response.materializedBody as RawResponseBody.WebSocket

        val callback = WebSocketConnectionCallback { wsExchange, channel ->
            // 配置
            channel.idleTimeout = config.wsIdleTimeout
            channel.setMaxMessageSize(config.maxWebSocketMessageSize)

            // 创建 Undertow WebSocket Writer
            val writer = UndertowWebSocketWriter(channel)

            // 这里 context 已经在 Colleen.handleRequest 中构建
            // 我们需要把它传递过来(通过 response 的附加数据)
            val context = response.wsContext
                ?: error("WebSocket context not available")

            val connection = WebSocketConnection(writer, context)

            // 注册 Undertow 回调 → 转发到用户回调
            channel.receiveSetter.set(object : AbstractReceiveListener() {
                override fun onFullTextMessage(
                    channel: WebSocketChannel,
                    message: BufferedTextMessage
                ) {
                    val msg = WebSocketMessage.Text(message.data)
                    connection.onMessageHandler?.invoke(msg)
                }

                override fun onFullBinaryMessage(
                    channel: WebSocketChannel,
                    message: BufferedBinaryMessage
                ) {
                    val data = message.data.resource
                        .fold(ByteArray(0)) { acc, buf ->
                            acc + buf.toByteArray()
                        }
                    val msg = WebSocketMessage.Binary(data)
                    connection.onMessageHandler?.invoke(msg)
                }

                override fun onCloseMessage(
                    cm: CloseMessage,
                    channel: WebSocketChannel
                ) {
                    val reason = WsCloseReason(cm.code, cm.reason ?: "")
                    connection.onCloseHandler?.invoke(reason)
                    // 触发事件
                    // eventBus.emit(Event.WebSocketDisconnected(connection, reason))
                }

                override fun onError(
                    channel: WebSocketChannel,
                    error: Throwable
                ) {
                    connection.onErrorHandler?.invoke(error)
                }
            })

            channel.resumeReceives()

            // 执行用户 handler(注册回调)
            wsBody.handler.accept(connection)

            // 触发 onOpen
            connection.onOpenHandler?.invoke()

            // 触发事件
            // eventBus.emit(Event.WebSocketConnected(connection))
        }

        // 执行 Undertow WebSocket 握手
        val handshakeHandler = WebSocketProtocolHandshakeHandler(callback)
        handshakeHandler.handleRequest(exchange)
    }

    // === Undertow WebSocket Writer 实现 ===

    class UndertowWebSocketWriter(
        private val channel: WebSocketChannel
    ) : WebSocketWriter {

        override fun sendText(text: String) {
            WebSockets.sendTextBlocking(text, channel)
        }

        override fun sendBinary(data: ByteArray) {
            WebSockets.sendBinaryBlocking(ByteBuffer.wrap(data), channel)
        }

        override fun close(code: Int, reason: String) {
            WebSockets.sendCloseBlocking(CloseMessage(code, reason), channel)
        }
    }
}

五、WebSocket 中间件复用机制

WS 中间件 完全复用现有的 Middleware 接口

Kotlin

typealias Middleware = (ctx: Context, next: () -> Unit) -> Unit

工作流程

  1. 收到 HTTP 升级请求 → 路由匹配到 WS 路由

  2. 构建 Context(和普通 HTTP 请求一样)

  3. 运行中间件链(wsUse 注册的 + 全局的)

  4. 中间件可以:

    • 检查 token / cookie → 设置 state

    • 抛异常 → 返回 HTTP 401/403,不执行升级

  5. 中间件全部通过后 → ctx.ws(handler) → 设置 ResponseBody.WebSocket

  6. Undertow 执行 WebSocket 握手

这意味着现有的 RequestLogger、CORS、Auth 等中间件可以直接用于 wsUse

六、配置 DSL 扩展

Kotlin

app.config {
    server {
        wsIdleTimeout = 300_000              // 5 分钟
        maxWebSocketMessageSize = 64 * 1024  // 64 KB
    }
}

七、完整文件变更清单

八、关键设计决策说明

  1. WebSocketConnection 持有 Context 引用 — 这样可以直接复用 pathParam(), query(), getState(), getService() 等能力,与你示例中的用法一致。

  2. 回调注册式 API(而非 SSE 的阻塞式) — WebSocket 是双向通信,使用 onMessage 回调比阻塞循环更自然。SseConnection 的阻塞式适合单向推送,但不适合 WS。

  3. 中间件完全复用wsUse 接受的就是标准 Middleware,不需要任何新类型。

  4. undertow-core 已包含 WebSocket 支持 — 不需要额外依赖 undertow-websockets-jsrio.undertow.websockets.core 包已在 undertow-core 中。

  5. @Ws 注解方法签名fun method(conn: WebSocketConnection) — Scanner 识别后自动注册为 WS 路由。

请确认这个设计方案是否满足你的需求,或者有需要调整的地方。确认后我将创建 PR。