claude
我来帮你设计一个完善的 Event Bus 系统和优雅的请求修改方案。
一、Event Bus 设计思路
1. 事件类型定义
sealed class AppEvent {
// 生命周期事件
data class ServerStarting(val config: ServerConfig) : AppEvent()
data class ServerStarted(val host: String, val port: Int) : AppEvent()
data class ServerStopping(val reason: String = "") : AppEvent()
data class ServerStopped(val graceful: Boolean) : AppEvent()
// 路由事件
data class RouteAdded(val method: String, val path: String, val handler: Any) : AppEvent()
data class SubAppMounted(val mountPath: String, val subApp: App) : AppEvent()
// 请求处理事件(关键:支持修改)
data class RequestReceived(
var method: String,
var path: String,
val originalMethod: String,
val originalPath: String,
val headers: Headers,
val timestamp: Long = System.currentTimeMillis()
) : AppEvent() {
val isModified: Boolean
get() = method != originalMethod || path != originalPath
}
data class RouteMatching(val request: Request, val availableRoutes: List<String>) : AppEvent()
data class RouteMatched(val request: Request, val route: String, val params: Map<String, String>) : AppEvent()
data class RouteNotFound(val request: Request) : AppEvent()
// 中间件事件
data class MiddlewareExecuting(val name: String, val request: Request) : AppEvent()
data class MiddlewareExecuted(val name: String, val duration: Long) : AppEvent()
// 响应事件
data class ResponseGenerating(val request: Request, val statusCode: Int) : AppEvent()
data class ResponseGenerated(val request: Request, val response: Response) : AppEvent()
data class ResponseSending(val response: Response) : AppEvent()
data class ResponseSent(val response: Response, val bytesWritten: Long, val duration: Long) : AppEvent()
// 错误事件
data class ExceptionThrown(val exception: Exception, val request: Request?, val phase: String) : AppEvent()
data class ErrorHandled(val exception: Exception, val response: Response) : AppEvent()
}2. Event Bus 核心实现
/**
* Event listener with priority support
*/
data class EventListener<T : AppEvent>(
val priority: Int = 0,
val async: Boolean = false,
val handler: (T) -> Unit
)
/**
* Event Bus implementation with async support and ordering
*/
class EventBus {
private val listeners = ConcurrentHashMap<KClass<out AppEvent>, MutableList<EventListener<*>>>()
private val executor = Executors.newVirtualThreadPerTaskExecutor()
/**
* Subscribe to an event type
*/
inline fun <reified T : AppEvent> on(
priority: Int = 0,
async: Boolean = false,
noinline handler: (T) -> Unit
) {
val eventClass = T::class
listeners.computeIfAbsent(eventClass) { mutableListOf() }
.add(EventListener(priority, async, handler))
// Sort by priority (higher first)
listeners[eventClass]?.sortByDescending { it.priority }
}
/**
* Subscribe once (auto-unsubscribe after first execution)
*/
inline fun <reified T : AppEvent> once(
priority: Int = 0,
noinline handler: (T) -> Unit
) {
var listener: EventListener<T>? = null
listener = EventListener(priority, false) { event ->
handler(event)
off(listener!!)
}
on<T>(priority, false) { event ->
@Suppress("UNCHECKED_CAST")
(listener as EventListener<T>).handler(event)
}
}
/**
* Unsubscribe a listener
*/
fun <T : AppEvent> off(listener: EventListener<T>) {
listeners.values.forEach { it.remove(listener) }
}
/**
* Emit an event synchronously
*/
fun <T : AppEvent> emit(event: T) {
val eventClass = event::class
val eventListeners = listeners[eventClass] ?: return
eventListeners.forEach { listener ->
@Suppress("UNCHECKED_CAST")
val typedListener = listener as EventListener<T>
if (typedListener.async) {
executor.submit {
safeExecute(typedListener, event)
}
} else {
safeExecute(typedListener, event)
}
}
}
/**
* Emit an event and wait for all async listeners
*/
suspend fun <T : AppEvent> emitAwait(event: T) {
val eventClass = event::class
val eventListeners = listeners[eventClass] ?: return
coroutineScope {
eventListeners.map { listener ->
@Suppress("UNCHECKED_CAST")
val typedListener = listener as EventListener<T>
async {
safeExecute(typedListener, event)
}
}.awaitAll()
}
}
private fun <T : AppEvent> safeExecute(listener: EventListener<T>, event: T) {
try {
listener.handler(event)
} catch (e: Exception) {
// Log but don't throw to prevent one listener from breaking others
println("Error in event listener: ${e.message}")
}
}
fun shutdown() {
executor.shutdown()
}
}3. 集成到 App 类
class App {
val eventBus = EventBus()
// 便捷方法
inline fun <reified T : AppEvent> on(
priority: Int = 0,
async: Boolean = false,
noinline handler: (T) -> Unit
) = eventBus.on<T>(priority, async, handler)
inline fun <reified T : AppEvent> once(
priority: Int = 0,
noinline handler: (T) -> Unit
) = eventBus.once<T>(priority, handler)
fun <T : AppEvent> emit(event: T) = eventBus.emit(event)
// ... existing code
}二、优雅的请求修改方案
方案 1: 不可变 Request + MutableRequestContext(推荐)
/**
* Mutable request context for pre-processing phase
*/
data class MutableRequestContext(
var method: String,
var path: String,
val originalMethod: String,
val originalPath: String,
val queryString: String,
val headers: Headers,
val stream: InputStream?,
val metadata: RequestMetadata,
private val multipartSupplier: () -> List<Part>
) {
/**
* Create immutable Request from this context
*/
fun toRequest(): Request = Request(
method = method,
path = path,
queryString = queryString,
headers = headers,
stream = stream,
metadata = metadata,
multipartSupplier = multipartSupplier
)
}
/**
* Immutable request (original design)
*/
data class Request(
val method: String,
val path: String,
val queryString: String = "",
val headers: Headers = Headers(),
val stream: InputStream? = null,
val metadata: RequestMetadata = RequestMetadata(),
private val multipartSupplier: () -> List<Part> = { emptyList() },
)方案 2: 使用 Pipeline Phases
enum class RequestPhase {
PRE_PROCESSING, // 可以修改 method/path
PROCESSING, // 只读
POST_PROCESSING // 只读
}
data class Context(
val request: Request,
val response: Response = Response(),
val app: App,
val params: MutableMap<String, String> = mutableMapOf(),
val state: MutableMap<String, Any> = mutableMapOf(),
internal var phase: RequestPhase = RequestPhase.PRE_PROCESSING
) {
// 只在 PRE_PROCESSING 阶段允许修改
internal var mutableMethod: String = request.method
set(value) {
require(phase == RequestPhase.PRE_PROCESSING) {
"Cannot modify request method after pre-processing phase"
}
field = value
}
internal var mutablePath: String = request.path
set(value) {
require(phase == RequestPhase.PRE_PROCESSING) {
"Cannot modify request path after pre-processing phase"
}
field = value
}
val effectiveMethod: String
get() = if (phase == RequestPhase.PRE_PROCESSING) mutableMethod else request.method
val effectivePath: String
get() = if (phase == RequestPhase.PRE_PROCESSING) mutablePath else request.path
}推荐实现:结合 Event Bus
internal fun createHttpHandler(): (Request) -> Response {
return { originalRequest ->
// Phase 1: Pre-processing (可修改)
val preProcessEvent = AppEvent.RequestReceived(
method = originalRequest.method,
path = originalRequest.path,
originalMethod = originalRequest.method,
originalPath = originalRequest.path,
headers = originalRequest.headers
)
eventBus.emit(preProcessEvent)
// 创建最终的不可变 Request
val finalRequest = if (preProcessEvent.isModified) {
originalRequest.copy(
method = preProcessEvent.method,
path = preProcessEvent.path
)
} else {
originalRequest
}
// Phase 2: Processing (不可修改)
val ctx = Context(request = finalRequest, app = this)
emit(AppEvent.RouteMatching(finalRequest, router.getRoutes()))
try {
router.handleRequest(ctx)
emit(AppEvent.ResponseGenerating(finalRequest, ctx.response.statusCode))
ctx.response.materialize(ctx)
emit(AppEvent.ResponseGenerated(finalRequest, ctx.response))
} catch (e: Exception) {
emit(AppEvent.ExceptionThrown(e, finalRequest, "processing"))
if (mountPath != "" && config.error.propagateSubAppExceptions) {
throw e
} else {
handleError(e, ctx)
ctx.response.materialize(ctx)
emit(AppEvent.ErrorHandled(e, ctx.response))
}
}
emit(AppEvent.ResponseSending(ctx.response))
ctx.response
}
}
fun listen(port: Int = 8080, host: String = "127.0.0.1", callback: Runnable = Runnable {}) {
if (host != config.server.host) config.server.host = host
if (port != config.server.port) config.server.port = port
emit(AppEvent.ServerStarting(config.server))
val server = UndertowServer(config.server)
webServer = server
Runtime.getRuntime().addShutdownHook(
Thread({ shutdown() }, "shutdown")
)
try {
server.start(createHttpHandler())
emit(AppEvent.ServerStarted(host, port))
} catch (e: Exception) {
logger.error("Failed to start server", e)
emit(AppEvent.ExceptionThrown(e, null, "startup"))
shutdown()
throw e
}
try {
callback.run()
} catch (e: Exception) {
logger.error("Startup callback failed", e)
shutdown()
throw e
}
}
private fun shutdown() {
emit(AppEvent.ServerStopping())
shutdownHooks.forEach {
try {
it.run()
} catch (e: Exception) {
logger.error("Shutdown hook failed", e)
}
}
webServer?.stop()
eventBus.shutdown()
emit(AppEvent.ServerStopped(graceful = true))
}三、使用示例
val app = App()
// 方法重写中间件
app.on<AppEvent.RequestReceived>(priority = 100) { event ->
// X-HTTP-Method-Override header support
val override = event.headers["X-HTTP-Method-Override"]
if (override != null && event.method == "POST") {
event.method = override.uppercase()
println("Method overridden: ${event.originalMethod} -> ${event.method}")
}
}
// URL 重写
app.on<AppEvent.RequestReceived>(priority = 90) { event ->
if (event.path.startsWith("/old-api/")) {
event.path = event.path.replace("/old-api/", "/api/v2/")
}
}
// 性能监控
app.on<AppEvent.ResponseSent> { event ->
println("Response sent: ${event.bytesWritten} bytes in ${event.duration}ms")
}
// 错误追踪
app.on<AppEvent.ExceptionThrown>(async = true) { event ->
// Send to error tracking service
errorTracker.report(event.exception, event.request)
}
app.listen(8080)这个设计的优势:
类型安全:使用 sealed class 确保事件类型安全
优先级控制:通过 priority 参数控制执行顺序
异步支持:async 参数支持异步处理
不可变性:Request 保持不可变,修改仅在预处理阶段
清晰的生命周期:事件明确标识请求处理的各个阶段