非常好,你提供的用户端 API 非常清晰,我基于它和对框架源码的深入分析,提供以下具体设计方案。
一、整体架构图
Code
用户层 框架核心层 服务器适配层
───────────────────── ───────────────────────────── ──────────────────────────
app.ws("/path") { } Router (新增 WS 路由类型) UndertowServer
app.wsUse("/path") { } Context (升级前复用) ├─ WebSocket 握手拦截
@Ws 注解 WebSocketConnection ├─ Undertow WebSocketChannel 适配
WebSocketMessage └─ WS Executor 管理
WsCloseReason
EventBus (新增 WS 事件)二、新增类型 & 文件清单
1. WebSocketMessage.kt(新增文件)
Kotlin
package io.github.cymoo.colleen
import java.nio.ByteBuffer
/**
* Represents an incoming WebSocket message.
*/
sealed class WebSocketMessage {
class Text(private val data: String) : WebSocketMessage() {
fun text(): String = data
}
class Binary(private val data: ByteArray) : WebSocketMessage() {
fun bytes(): ByteArray = data
fun buffer(): ByteBuffer = ByteBuffer.wrap(data)
}
}2. WsCloseReason.kt(新增文件)
Kotlin
package io.github.cymoo.colleen
/**
* WebSocket close reason, containing status code and optional description.
*/
data class WsCloseReason(
val code: Int,
val reason: String = ""
) {
companion object {
val NORMAL = WsCloseReason(1000, "Normal closure")
val GOING_AWAY = WsCloseReason(1001, "Going away")
}
}3. WebSocketConnection.kt(新增文件)
核心连接抽象,对标 SseConnection 的设计思路,但改为事件回调模式:
Kotlin
package io.github.cymoo.colleen
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
/**
* WebSocket connection handle.
*
* Provides:
* - Lifecycle callbacks: onOpen / onMessage / onClose / onError
* - Send API: text and binary
* - Access to upgrade-time context: pathParam, query, attribute, state
*
* Threading model:
* - Callbacks are invoked on the WebSocket IO thread (Undertow manages).
* - send() is thread-safe (synchronized internally).
*/
class WebSocketConnection internal constructor(
private val writer: WebSocketWriter,
private val context: Context // 升级前的 HTTP Context,可复用
) : AutoCloseable {
private val closed = AtomicBoolean(false)
val isClosed: Boolean get() = closed.get()
// === Callback holders ===
internal var onOpenHandler: (() -> Unit)? = null
internal var onMessageHandler: ((WebSocketMessage) -> Unit)? = null
internal var onCloseHandler: ((WsCloseReason) -> Unit)? = null
internal var onErrorHandler: ((Throwable) -> Unit)? = null
// === Lifecycle registration ===
fun onOpen(handler: () -> Unit) { onOpenHandler = handler }
fun onMessage(handler: (WebSocketMessage) -> Unit) { onMessageHandler = handler }
fun onClose(handler: (WsCloseReason) -> Unit) { onCloseHandler = handler }
fun onError(handler: (Throwable) -> Unit) { onErrorHandler = handler }
// === Send API ===
@Throws(IOException::class)
fun send(text: String) {
ensureOpen()
writer.sendText(text)
}
@Throws(IOException::class)
fun send(data: ByteArray) {
ensureOpen()
writer.sendBinary(data)
}
// === Context accessors (from upgrade-time HTTP request) ===
/** Path parameter from the WS route pattern */
fun pathParam(name: String): String? = context.pathParam(name)
/** Query parameter from the upgrade URL */
fun query(name: String): String? = context.query(name)
/** Retrieve state set by middleware before upgrade */
@Suppress("UNCHECKED_CAST")
fun <T> attribute(key: String): T? = context.getState<T>(key)
/** Access to service container */
fun <T : Any> getService(clazz: Class<T>): T = context.getService(clazz)
inline fun <reified T : Any> getService(): T = getService(T::class.java)
/** The full Context (for advanced use cases) */
val ctx: Context get() = context
// === Close ===
override fun close() {
close(WsCloseReason.NORMAL)
}
fun close(reason: WsCloseReason) {
if (closed.compareAndSet(false, true)) {
writer.close(reason.code, reason.reason)
}
}
private fun ensureOpen() {
if (closed.get()) throw IOException("WebSocket connection is closed")
}
}4. WebSocketWriter.kt(新增文件)
底层写入抽象(对标 SseWriter),由 Undertow 适配层实现:
Kotlin
package io.github.cymoo.colleen
import java.io.IOException
/**
* Low-level WebSocket writer abstraction.
*
* Implementations are provided by the server adapter (e.g., Undertow).
*/
interface WebSocketWriter {
@Throws(IOException::class)
fun sendText(text: String)
@Throws(IOException::class)
fun sendBinary(data: ByteArray)
fun close(code: Int, reason: String)
}5. Ws.kt(新增注解文件)
Kotlin
package io.github.cymoo.colleen
/**
* Marks a controller method as a WebSocket endpoint.
*
* Usage:
* ```
* @Ws("/echo")
* fun echo(conn: WebSocketConnection) { ... }
* ```
*/
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class Ws(val value: String)三、已有文件的修改
6. Response.kt / CoreTypes.kt — 新增 RawResponseBody.WebSocket
Kotlin
// 在 RawResponseBody sealed class 中新增:
sealed class RawResponseBody {
// ... 现有: Empty, Bytes, Stream, Sse
data class WebSocket(
val handler: Consumer<WebSocketConnection>,
val wsConfig: WsEndpointConfig? = null
) : RawResponseBody()
}
// 在 ResponseBody sealed class 中新增:
sealed class ResponseBody {
// ... 现有类型
data class WebSocket(
val handler: Consumer<WebSocketConnection>
) : ResponseBody()
}7. Context.kt — 新增 ctx.ws() 方法
Kotlin
// 在 Context 类中新增:
/**
* Upgrades the HTTP connection to WebSocket.
*
* Must be called during route handling; the actual upgrade is
* performed by the server adapter after the handler returns.
*/
fun ws(handler: Consumer<WebSocketConnection>) = response.ws(handler)对应 Response 类中新增:
Kotlin
fun ws(handler: Consumer<WebSocketConnection>) {
// 不需要设置 Content-Type 等 header,握手由 Undertow 处理
body = ResponseBody.WebSocket(handler)
}8. Colleen.kt — 新增路由注册方法
Kotlin
// === WebSocket 路由 ===
/**
* Registers a WebSocket endpoint.
*
* The handler receives a WebSocketConnection and should register
* onOpen / onMessage / onClose / onError callbacks.
*/
fun ws(path: String, handler: Consumer<WebSocketConnection>) {
// 内部注册为一个特殊的 GET handler,在路由匹配后判断是否为 WS 升级
router.addWebSocketRoute(path, handler)
}
// === WebSocket 中间件 ===
/**
* Registers middleware that runs before WebSocket upgrade.
*
* This middleware receives the standard Context (HTTP upgrade request)
* and can reject the connection by throwing an exception.
*/
fun wsUse(path: String, middleware: Middleware) {
router.addWebSocketMiddleware(path, middleware)
}
// 也可以全局:
fun wsUse(middleware: Middleware) {
router.addWebSocketMiddleware("/*", middleware)
}9. Router.kt — 新增 WebSocket 路由类型
Kotlin
// 在 Router 中新增:
internal data class WebSocketRoute(
val pattern: String,
val handler: Consumer<WebSocketConnection>,
val middlewares: List<Middleware> = emptyList()
)
private val wsRoutes = mutableListOf<WebSocketRoute>()
fun addWebSocketRoute(path: String, handler: Consumer<WebSocketConnection>) {
wsRoutes.add(WebSocketRoute(normalizePath(path), handler))
}
fun addWebSocketMiddleware(path: String, middleware: Middleware) {
// 将中间件关联到匹配 path 的 WS 路由
wsMiddlewares.add(path to middleware)
}
/**
* Finds a matching WebSocket route for the given path.
* Returns null if no WS route matches (fallback to normal HTTP).
*/
fun matchWebSocketRoute(path: String): WebSocketRouteMatch? { ... }10. Event.kt — 新增 WebSocket 生命周期事件
Kotlin
// 在 Event sealed class 中新增:
/** Emitted when a WebSocket connection is successfully established. */
class WebSocketConnected(val connection: WebSocketConnection) : Event(false)
/** Emitted when a WebSocket connection is closed. */
class WebSocketDisconnected(
val connection: WebSocketConnection,
val reason: WsCloseReason
) : Event(false)11. Config.kt / ServerConfig — 新增 WebSocket 配置项
Kotlin
// 在 ServerConfig 中新增:
/** WebSocket idle timeout in milliseconds. Default: 5 minutes. */
var wsIdleTimeout: Long = 300_000
/** Maximum WebSocket message size in bytes. Default: 64 KB. */
var maxWebSocketMessageSize: Long = 64 * 102412. Scanner.kt — 支持 @Ws 注解扫描
修改控制器扫描逻辑,识别 @Ws 注解的方法,将其注册为 WebSocket 路由。方法签名必须为 (WebSocketConnection) -> Unit。
四、Undertow 适配层
13. UndertowServer.kt — 核心改动
Kotlin
// 新增 import
import io.undertow.websockets.WebSocketConnectionCallback
import io.undertow.websockets.WebSocketProtocolHandshakeHandler
import io.undertow.websockets.core.*
import io.undertow.websockets.spi.WebSocketHttpExchange
import java.nio.ByteBuffer
class UndertowServer(private val config: ServerConfig) : WebServer {
// ... 现有代码 ...
// === 新增 WebSocket 路由存储 ===
private val wsRoutes = ConcurrentHashMap<String, WsRouteEntry>()
internal data class WsRouteEntry(
val handler: Consumer<WebSocketConnection>,
val middlewares: List<Any> // 中间件链
)
/**
* 注册 WebSocket 路由(由 Colleen 调用)。
*/
fun registerWebSocketRoute(path: String, entry: WsRouteEntry) {
wsRoutes[path] = entry
}
// === 修改 createRootHandler,加入 WebSocket 升级检测 ===
private fun createRootHandler(): HttpHandler {
return HttpHandler { exchange ->
val requestStartNano = System.nanoTime()
dispatchToWorker(exchange) {
exchange.startBlocking()
val request = UndertowRequestAdapter.adapt(exchange, config)
val response = requestHandler(request)
// 检查是否为 WebSocket 升级响应
if (response.materializedBody is RawResponseBody.WebSocket) {
handleWebSocketUpgrade(response, exchange)
return@dispatchToWorker
}
// ... 原有 HTTP response 写入逻辑不变 ...
}
}
}
// === WebSocket 升级处理 ===
private fun handleWebSocketUpgrade(response: Response, exchange: HttpServerExchange) {
val wsBody = response.materializedBody as RawResponseBody.WebSocket
val callback = WebSocketConnectionCallback { wsExchange, channel ->
// 配置
channel.idleTimeout = config.wsIdleTimeout
channel.setMaxMessageSize(config.maxWebSocketMessageSize)
// 创建 Undertow WebSocket Writer
val writer = UndertowWebSocketWriter(channel)
// 这里 context 已经在 Colleen.handleRequest 中构建
// 我们需要把它传递过来(通过 response 的附加数据)
val context = response.wsContext
?: error("WebSocket context not available")
val connection = WebSocketConnection(writer, context)
// 注册 Undertow 回调 → 转发到用户回调
channel.receiveSetter.set(object : AbstractReceiveListener() {
override fun onFullTextMessage(
channel: WebSocketChannel,
message: BufferedTextMessage
) {
val msg = WebSocketMessage.Text(message.data)
connection.onMessageHandler?.invoke(msg)
}
override fun onFullBinaryMessage(
channel: WebSocketChannel,
message: BufferedBinaryMessage
) {
val data = message.data.resource
.fold(ByteArray(0)) { acc, buf ->
acc + buf.toByteArray()
}
val msg = WebSocketMessage.Binary(data)
connection.onMessageHandler?.invoke(msg)
}
override fun onCloseMessage(
cm: CloseMessage,
channel: WebSocketChannel
) {
val reason = WsCloseReason(cm.code, cm.reason ?: "")
connection.onCloseHandler?.invoke(reason)
// 触发事件
// eventBus.emit(Event.WebSocketDisconnected(connection, reason))
}
override fun onError(
channel: WebSocketChannel,
error: Throwable
) {
connection.onErrorHandler?.invoke(error)
}
})
channel.resumeReceives()
// 执行用户 handler(注册回调)
wsBody.handler.accept(connection)
// 触发 onOpen
connection.onOpenHandler?.invoke()
// 触发事件
// eventBus.emit(Event.WebSocketConnected(connection))
}
// 执行 Undertow WebSocket 握手
val handshakeHandler = WebSocketProtocolHandshakeHandler(callback)
handshakeHandler.handleRequest(exchange)
}
// === Undertow WebSocket Writer 实现 ===
class UndertowWebSocketWriter(
private val channel: WebSocketChannel
) : WebSocketWriter {
override fun sendText(text: String) {
WebSockets.sendTextBlocking(text, channel)
}
override fun sendBinary(data: ByteArray) {
WebSockets.sendBinaryBlocking(ByteBuffer.wrap(data), channel)
}
override fun close(code: Int, reason: String) {
WebSockets.sendCloseBlocking(CloseMessage(code, reason), channel)
}
}
}五、WebSocket 中间件复用机制
WS 中间件 完全复用现有的 Middleware 接口:
Kotlin
typealias Middleware = (ctx: Context, next: () -> Unit) -> Unit工作流程:
收到 HTTP 升级请求 → 路由匹配到 WS 路由
构建
Context(和普通 HTTP 请求一样)运行中间件链(
wsUse注册的 + 全局的)中间件可以:
检查 token / cookie → 设置 state
抛异常 → 返回 HTTP 401/403,不执行升级
中间件全部通过后 →
ctx.ws(handler)→ 设置ResponseBody.WebSocketUndertow 执行 WebSocket 握手
这意味着现有的 RequestLogger、CORS、Auth 等中间件可以直接用于 wsUse。
六、配置 DSL 扩展
Kotlin
app.config {
server {
wsIdleTimeout = 300_000 // 5 分钟
maxWebSocketMessageSize = 64 * 1024 // 64 KB
}
}七、完整文件变更清单
八、关键设计决策说明
WebSocketConnection持有Context引用 — 这样可以直接复用pathParam(),query(),getState(),getService()等能力,与你示例中的用法一致。回调注册式 API(而非 SSE 的阻塞式) — WebSocket 是双向通信,使用
onMessage回调比阻塞循环更自然。SseConnection的阻塞式适合单向推送,但不适合 WS。中间件完全复用 —
wsUse接受的就是标准Middleware,不需要任何新类型。undertow-core已包含 WebSocket 支持 — 不需要额外依赖undertow-websockets-jsr,io.undertow.websockets.core包已在undertow-core中。@Ws注解方法签名:fun method(conn: WebSocketConnection)— Scanner 识别后自动注册为 WS 路由。
请确认这个设计方案是否满足你的需求,或者有需要调整的地方。确认后我将创建 PR。