claude

我来帮你设计一个完善的 Event Bus 系统和优雅的请求修改方案。

一、Event Bus 设计思路

1. 事件类型定义

sealed class AppEvent {
    // 生命周期事件
    data class ServerStarting(val config: ServerConfig) : AppEvent()
    data class ServerStarted(val host: String, val port: Int) : AppEvent()
    data class ServerStopping(val reason: String = "") : AppEvent()
    data class ServerStopped(val graceful: Boolean) : AppEvent()
    
    // 路由事件
    data class RouteAdded(val method: String, val path: String, val handler: Any) : AppEvent()
    data class SubAppMounted(val mountPath: String, val subApp: App) : AppEvent()
    
    // 请求处理事件(关键:支持修改)
    data class RequestReceived(
        var method: String,
        var path: String,
        val originalMethod: String,
        val originalPath: String,
        val headers: Headers,
        val timestamp: Long = System.currentTimeMillis()
    ) : AppEvent() {
        val isModified: Boolean
            get() = method != originalMethod || path != originalPath
    }
    
    data class RouteMatching(val request: Request, val availableRoutes: List<String>) : AppEvent()
    data class RouteMatched(val request: Request, val route: String, val params: Map<String, String>) : AppEvent()
    data class RouteNotFound(val request: Request) : AppEvent()
    
    // 中间件事件
    data class MiddlewareExecuting(val name: String, val request: Request) : AppEvent()
    data class MiddlewareExecuted(val name: String, val duration: Long) : AppEvent()
    
    // 响应事件
    data class ResponseGenerating(val request: Request, val statusCode: Int) : AppEvent()
    data class ResponseGenerated(val request: Request, val response: Response) : AppEvent()
    data class ResponseSending(val response: Response) : AppEvent()
    data class ResponseSent(val response: Response, val bytesWritten: Long, val duration: Long) : AppEvent()
    
    // 错误事件
    data class ExceptionThrown(val exception: Exception, val request: Request?, val phase: String) : AppEvent()
    data class ErrorHandled(val exception: Exception, val response: Response) : AppEvent()
}

2. Event Bus 核心实现

/**
 * Event listener with priority support
 */
data class EventListener<T : AppEvent>(
    val priority: Int = 0,
    val async: Boolean = false,
    val handler: (T) -> Unit
)

/**
 * Event Bus implementation with async support and ordering
 */
class EventBus {
    private val listeners = ConcurrentHashMap<KClass<out AppEvent>, MutableList<EventListener<*>>>()
    private val executor = Executors.newVirtualThreadPerTaskExecutor()
    
    /**
     * Subscribe to an event type
     */
    inline fun <reified T : AppEvent> on(
        priority: Int = 0,
        async: Boolean = false,
        noinline handler: (T) -> Unit
    ) {
        val eventClass = T::class
        listeners.computeIfAbsent(eventClass) { mutableListOf() }
            .add(EventListener(priority, async, handler))
        
        // Sort by priority (higher first)
        listeners[eventClass]?.sortByDescending { it.priority }
    }
    
    /**
     * Subscribe once (auto-unsubscribe after first execution)
     */
    inline fun <reified T : AppEvent> once(
        priority: Int = 0,
        noinline handler: (T) -> Unit
    ) {
        var listener: EventListener<T>? = null
        listener = EventListener(priority, false) { event ->
            handler(event)
            off(listener!!)
        }
        on<T>(priority, false) { event ->
            @Suppress("UNCHECKED_CAST")
            (listener as EventListener<T>).handler(event)
        }
    }
    
    /**
     * Unsubscribe a listener
     */
    fun <T : AppEvent> off(listener: EventListener<T>) {
        listeners.values.forEach { it.remove(listener) }
    }
    
    /**
     * Emit an event synchronously
     */
    fun <T : AppEvent> emit(event: T) {
        val eventClass = event::class
        val eventListeners = listeners[eventClass] ?: return
        
        eventListeners.forEach { listener ->
            @Suppress("UNCHECKED_CAST")
            val typedListener = listener as EventListener<T>
            
            if (typedListener.async) {
                executor.submit { 
                    safeExecute(typedListener, event)
                }
            } else {
                safeExecute(typedListener, event)
            }
        }
    }
    
    /**
     * Emit an event and wait for all async listeners
     */
    suspend fun <T : AppEvent> emitAwait(event: T) {
        val eventClass = event::class
        val eventListeners = listeners[eventClass] ?: return
        
        coroutineScope {
            eventListeners.map { listener ->
                @Suppress("UNCHECKED_CAST")
                val typedListener = listener as EventListener<T>
                
                async {
                    safeExecute(typedListener, event)
                }
            }.awaitAll()
        }
    }
    
    private fun <T : AppEvent> safeExecute(listener: EventListener<T>, event: T) {
        try {
            listener.handler(event)
        } catch (e: Exception) {
            // Log but don't throw to prevent one listener from breaking others
            println("Error in event listener: ${e.message}")
        }
    }
    
    fun shutdown() {
        executor.shutdown()
    }
}

3. 集成到 App 类

class App {
    val eventBus = EventBus()
    
    // 便捷方法
    inline fun <reified T : AppEvent> on(
        priority: Int = 0,
        async: Boolean = false,
        noinline handler: (T) -> Unit
    ) = eventBus.on<T>(priority, async, handler)
    
    inline fun <reified T : AppEvent> once(
        priority: Int = 0,
        noinline handler: (T) -> Unit
    ) = eventBus.once<T>(priority, handler)
    
    fun <T : AppEvent> emit(event: T) = eventBus.emit(event)
    
    // ... existing code
}

二、优雅的请求修改方案

方案 1: 不可变 Request + MutableRequestContext(推荐)

/**
 * Mutable request context for pre-processing phase
 */
data class MutableRequestContext(
    var method: String,
    var path: String,
    val originalMethod: String,
    val originalPath: String,
    val queryString: String,
    val headers: Headers,
    val stream: InputStream?,
    val metadata: RequestMetadata,
    private val multipartSupplier: () -> List<Part>
) {
    /**
     * Create immutable Request from this context
     */
    fun toRequest(): Request = Request(
        method = method,
        path = path,
        queryString = queryString,
        headers = headers,
        stream = stream,
        metadata = metadata,
        multipartSupplier = multipartSupplier
    )
}

/**
 * Immutable request (original design)
 */
data class Request(
    val method: String,
    val path: String,
    val queryString: String = "",
    val headers: Headers = Headers(),
    val stream: InputStream? = null,
    val metadata: RequestMetadata = RequestMetadata(),
    private val multipartSupplier: () -> List<Part> = { emptyList() },
)

方案 2: 使用 Pipeline Phases

enum class RequestPhase {
    PRE_PROCESSING,  // 可以修改 method/path
    PROCESSING,      // 只读
    POST_PROCESSING  // 只读
}

data class Context(
    val request: Request,
    val response: Response = Response(),
    val app: App,
    val params: MutableMap<String, String> = mutableMapOf(),
    val state: MutableMap<String, Any> = mutableMapOf(),
    internal var phase: RequestPhase = RequestPhase.PRE_PROCESSING
) {
    // 只在 PRE_PROCESSING 阶段允许修改
    internal var mutableMethod: String = request.method
        set(value) {
            require(phase == RequestPhase.PRE_PROCESSING) {
                "Cannot modify request method after pre-processing phase"
            }
            field = value
        }
    
    internal var mutablePath: String = request.path
        set(value) {
            require(phase == RequestPhase.PRE_PROCESSING) {
                "Cannot modify request path after pre-processing phase"
            }
            field = value
        }
    
    val effectiveMethod: String
        get() = if (phase == RequestPhase.PRE_PROCESSING) mutableMethod else request.method
        
    val effectivePath: String
        get() = if (phase == RequestPhase.PRE_PROCESSING) mutablePath else request.path
}

推荐实现:结合 Event Bus

internal fun createHttpHandler(): (Request) -> Response {
    return { originalRequest ->
        // Phase 1: Pre-processing (可修改)
        val preProcessEvent = AppEvent.RequestReceived(
            method = originalRequest.method,
            path = originalRequest.path,
            originalMethod = originalRequest.method,
            originalPath = originalRequest.path,
            headers = originalRequest.headers
        )
        
        eventBus.emit(preProcessEvent)
        
        // 创建最终的不可变 Request
        val finalRequest = if (preProcessEvent.isModified) {
            originalRequest.copy(
                method = preProcessEvent.method,
                path = preProcessEvent.path
            )
        } else {
            originalRequest
        }
        
        // Phase 2: Processing (不可修改)
        val ctx = Context(request = finalRequest, app = this)
        
        emit(AppEvent.RouteMatching(finalRequest, router.getRoutes()))
        
        try {
            router.handleRequest(ctx)
            
            emit(AppEvent.ResponseGenerating(finalRequest, ctx.response.statusCode))
            ctx.response.materialize(ctx)
            emit(AppEvent.ResponseGenerated(finalRequest, ctx.response))
            
        } catch (e: Exception) {
            emit(AppEvent.ExceptionThrown(e, finalRequest, "processing"))
            
            if (mountPath != "" && config.error.propagateSubAppExceptions) {
                throw e
            } else {
                handleError(e, ctx)
                ctx.response.materialize(ctx)
                emit(AppEvent.ErrorHandled(e, ctx.response))
            }
        }
        
        emit(AppEvent.ResponseSending(ctx.response))
        ctx.response
    }
}

fun listen(port: Int = 8080, host: String = "127.0.0.1", callback: Runnable = Runnable {}) {
    if (host != config.server.host) config.server.host = host
    if (port != config.server.port) config.server.port = port
    
    emit(AppEvent.ServerStarting(config.server))
    
    val server = UndertowServer(config.server)
    webServer = server
    
    Runtime.getRuntime().addShutdownHook(
        Thread({ shutdown() }, "shutdown")
    )
    
    try {
        server.start(createHttpHandler())
        emit(AppEvent.ServerStarted(host, port))
    } catch (e: Exception) {
        logger.error("Failed to start server", e)
        emit(AppEvent.ExceptionThrown(e, null, "startup"))
        shutdown()
        throw e
    }
    
    try {
        callback.run()
    } catch (e: Exception) {
        logger.error("Startup callback failed", e)
        shutdown()
        throw e
    }
}

private fun shutdown() {
    emit(AppEvent.ServerStopping())
    shutdownHooks.forEach { 
        try { 
            it.run() 
        } catch (e: Exception) {
            logger.error("Shutdown hook failed", e)
        }
    }
    webServer?.stop()
    eventBus.shutdown()
    emit(AppEvent.ServerStopped(graceful = true))
}

三、使用示例

val app = App()

// 方法重写中间件
app.on<AppEvent.RequestReceived>(priority = 100) { event ->
    // X-HTTP-Method-Override header support
    val override = event.headers["X-HTTP-Method-Override"]
    if (override != null && event.method == "POST") {
        event.method = override.uppercase()
        println("Method overridden: ${event.originalMethod} -> ${event.method}")
    }
}

// URL 重写
app.on<AppEvent.RequestReceived>(priority = 90) { event ->
    if (event.path.startsWith("/old-api/")) {
        event.path = event.path.replace("/old-api/", "/api/v2/")
    }
}

// 性能监控
app.on<AppEvent.ResponseSent> { event ->
    println("Response sent: ${event.bytesWritten} bytes in ${event.duration}ms")
}

// 错误追踪
app.on<AppEvent.ExceptionThrown>(async = true) { event ->
    // Send to error tracking service
    errorTracker.report(event.exception, event.request)
}

app.listen(8080)

这个设计的优势:

  1. 类型安全:使用 sealed class 确保事件类型安全

  2. 优先级控制:通过 priority 参数控制执行顺序

  3. 异步支持:async 参数支持异步处理

  4. 不可变性:Request 保持不可变,修改仅在预处理阶段

  5. 清晰的生命周期:事件明确标识请求处理的各个阶段