我来仔细分析这些 review 意见,逐一判断其合理性并给出修复建议。
逐条分析
Issue #8.1: Virtual Executor Not Awaited on Shutdown
结论:部分有道理,但描述有误
看现有代码的 stop() 方法(第131-152行):
virtualExecutor?.shutdown()
// ...
virtualExecutor?.let {
val terminated = it.awaitTermination(config.shutdownTimeout, TimeUnit.MILLISECONDS)
if (!terminated) {
it.shutdownNow()
}
}
server.stop()实际上代码已经调用了 awaitTermination,所以 review 说"没有 await"是描述有误的。
但确实存在一个逻辑问题:virtualExecutor.shutdown() 在 gracefulShutdownHandler.awaitShutdown() 之前调用。这意味着:
新请求提交给
virtualExecutor会被拒绝(RejectedExecutionException)但
gracefulShutdownHandler还在接收请求并尝试 dispatch
正确顺序应该是:先等 HTTP 层停止接收,再停 executor。
Issue #8.2: SSE Executor Never Shut Down
结论:完全正确,这是真实的资源泄漏
sseExecutor 是 lazy 初始化的,但 stop() 方法完全没有处理它:
private val sseExecutor by lazy { ... } // 从不关闭如果有 SSE 连接发生过,JVM 退出时这个线程池会阻止 clean exit(非虚拟线程情况下的 daemon=true 只是次要缓解,虚拟线程时更严重)。
Issue #8.3: SSE Race Condition on Termination
结论:有道理,但严重程度被高估
sseExecutor.execute {
connection.use {
try {
handler.accept(it)
} catch (e: IOException) { ... }
}
}当 stop() 被调用时,正在执行的 SSE handler 会被 shutdownNow() 中断(如果加了的话),可能导致:
SSE 连接未能发送 close frame
handler 内部状态损坏
但 SSE 本身是单向流,客户端断线重连是标准行为,所以"严重程度 HIGH"有些夸大,实际是 MEDIUM。
Issue #8.4: requestHandler Not Null-Checked
结论:正确,但实际风险取决于使用方式
private lateinit var requestHandler: (Request) -> Responselateinit var 在未初始化时访问会抛 UninitializedPropertyAccessException。虽然正常使用下 start() 先被调用,但防御性编程角度这个批评合理。
Issue #8.5: Redundant Executor Creation
结论:有道理,可合并,但两者职责有差异需权衡
virtualExecutor(请求处理,短任务)和 sseExecutor(SSE,长任务)确实都用虚拟线程时是两个独立 executor。虚拟线程本身是轻量级的,合并是合理优化,但不是严重问题。
Issue #8.6: Multipart Factory Recreated
结论:完全正确
fun adapt(exchange: HttpServerExchange, config: ServerConfig): Request {
// ...
multipartSupplier = if (isMultipart) {
{ parseMultipart(exchange, config) } // 每次调用都重建 factory
}
}
private fun parseMultipart(...): List<Part> {
val parser = createFormParserFactory(config).createParser(exchange) // 每次都 new
}FormParserFactory 的构建涉及 parser 注册等初始化工作,应该缓存。
修复方案
现在给出完整修复代码,重点解决上述问题:以下是对各 issue 的详细分析和完整修复方案。先用一张图梳理 shutdown 时序的问题所在:
现在给出完整的修复代码,每处改动都有注释说明原因:
class UndertowServer(private val config: ServerConfig) : WebServer {
private lateinit var server: Undertow
private lateinit var gracefulShutdownHandler: GracefulShutdownHandler
// FIX #8.4: 改为可空类型,避免 lateinit 在 start() 前被访问时抛 UninitializedPropertyAccessException
private var requestHandler: ((Request) -> Response)? = null
// FIX #8.5: 合并两个 virtual executor 为一个
// 原代码为请求处理和 SSE 分别创建了两个 executor,在虚拟线程模式下没有意义
// 虚拟线程是轻量级的,统一调度不影响性能,同时简化 shutdown 逻辑
private val sharedExecutor: ExecutorService? = if (config.useVirtualThreads) {
logger.info("Using virtual threads")
Executors.newVirtualThreadPerTaskExecutor()
} else {
null
}
// FIX #8.5: sseExecutor 复用 sharedExecutor,非虚拟线程时 lazy 创建 cached 线程池
private val sseExecutor: ExecutorService by lazy {
sharedExecutor ?: Executors.newCachedThreadPool { runnable ->
Thread(runnable).apply {
isDaemon = true
name = "sse-worker-${System.currentTimeMillis()}"
}
}
}
// FIX #8.3: 用 AtomicInteger 追踪正在执行的 SSE 任务数
// 使得 stop() 可以 await 所有 SSE 任务真正结束,而非暴力中断
private val activeSseTasks = AtomicInteger(0)
override fun start(handler: (Request) -> Response) {
this.requestHandler = handler
server = buildServer()
server.start()
}
override fun stop() {
logger.info("Shutting down server...")
// Step 1: 停止接受新的 HTTP 请求
gracefulShutdownHandler.shutdown()
// Step 2: 等待 HTTP 层所有在途请求完成
// FIX #8.1: 必须先等 HTTP 层 drain,再关 executor
// 原代码在 awaitShutdown 之前就调用了 virtualExecutor.shutdown()
// 导致仍在 dispatch 阶段的请求提交 task 时抛 RejectedExecutionException
val httpCompleted = gracefulShutdownHandler.awaitShutdown(config.shutdownTimeout)
if (!httpCompleted) {
logger.warn("HTTP shutdown timeout exceeded")
}
// Step 3: 关闭请求处理 executor(虚拟线程模式下)
// 此时 HTTP 层已停止接受新请求,不会再有新 task 提交
sharedExecutor?.let { exec ->
exec.shutdown()
val terminated = exec.awaitTermination(config.shutdownTimeout, TimeUnit.MILLISECONDS)
if (!terminated) {
logger.warn("Shared executor did not terminate, forcing shutdown")
exec.shutdownNow()
}
}
// FIX #8.2 + #8.3: 关闭 SSE executor 并等待所有 SSE 任务完成
// 原代码完全没有关闭 sseExecutor,导致:
// - 非虚拟线程下:cached 线程池的线程阻止 JVM 干净退出(daemon=true 只是缓解)
// - 虚拟线程下:sharedExecutor 关闭后 SSE 任务可能被中断
// 注意:只在 sseExecutor 未与 sharedExecutor 共享时才需要单独关闭
if (sharedExecutor == null && ::sseExecutor.isInitialized()) {
// 等待 in-flight SSE 任务优雅结束
// FIX #8.3: 在 shutdownNow 之前先 await,给 SSE handler 机会发送结束帧
val sseDrained = waitForSseTasks(config.shutdownTimeout)
if (!sseDrained) {
logger.warn("SSE tasks did not complete in time")
}
sseExecutor.shutdown()
val sseTerminated = sseExecutor.awaitTermination(
config.shutdownTimeout, TimeUnit.MILLISECONDS
)
if (!sseTerminated) {
logger.warn("SSE executor did not terminate, forcing shutdown")
sseExecutor.shutdownNow()
}
}
// Step 5: 最后停止 Undertow 本身
server.stop()
}
// FIX #8.3: 轮询等待 activeSseTasks 归零
// 使用 CountDownLatch 或 AtomicInteger + sleep 均可,这里用简单的 sleep 轮询
// 生产环境可替换为 CompletableFuture 或 Phaser 实现更精确的等待
private fun waitForSseTasks(timeoutMs: Long): Boolean {
val deadline = System.currentTimeMillis() + timeoutMs
while (activeSseTasks.get() > 0 && System.currentTimeMillis() < deadline) {
Thread.sleep(50)
}
return activeSseTasks.get() == 0
}
// lazy 属性的 isInitialized 需要用 ::sseExecutor.isInitialized() 判断
// 避免在没有 SSE 请求时强制初始化线程池
private val sseExecutorInitialized get() =
UndertowServer::sseExecutor.getDelegate().isInitialized()
private fun buildServer(): Undertow {
val cpuCount = Runtime.getRuntime().availableProcessors()
return Undertow.builder().apply {
addHttpListener(config.port, config.host)
setIoThreads(cpuCount)
setWorkerThreads(
if (config.useVirtualThreads) cpuCount else config.maxThreads
)
setServerOption(UndertowOptions.URL_CHARSET, "UTF-8")
setServerOption(UndertowOptions.IDLE_TIMEOUT, config.idleTimeout.toInt())
setServerOption(UndertowOptions.MAX_ENTITY_SIZE, config.maxRequestSize)
setSocketOption(Options.TCP_NODELAY, true)
setBufferSize(8 * 1024)
setDirectBuffers(true)
setHandler(buildHandlerChain())
}.build()
}
private fun buildHandlerChain(): HttpHandler {
var handler: HttpHandler = createRootHandler()
if (config.maxConcurrentRequests > 0) {
handler = RequestLimitingHandler(config.maxConcurrentRequests, handler)
}
handler = GracefulShutdownHandler(handler).also {
gracefulShutdownHandler = it
}
return handler
}
private fun createRootHandler(): HttpHandler {
return HttpHandler { exchange ->
val requestStartNano = System.nanoTime()
dispatchToWorker(exchange) {
exchange.startBlocking()
// FIX #8.4: requestHandler 为 null 时返回 503
// 理论上 start() 先于任何请求,但防御性检查保证不会崩溃
val handler = requestHandler ?: run {
exchange.statusCode = 503
exchange.outputStream.bufferedWriter().use { it.write("Server is starting") }
return@dispatchToWorker
}
val request = UndertowRequestAdapter.adapt(exchange, config)
val response = handler(request)
val responseWriteStartNano = System.nanoTime()
exchange.addExchangeCompleteListener { ex, next ->
try {
val now = System.nanoTime()
val total = (now - requestStartNano).nanoseconds
val io = (now - responseWriteStartNano).nanoseconds
val bytes = ex.responseBytesSent
response.onResponseSent?.invoke(total, io, bytes)
} finally {
next.proceed()
}
}
try {
writeResponse(response, exchange)
} catch (err: Exception) {
logger.error("Failed to write response", err)
writeInternalError(exchange)
}
}
}
}
private inline fun dispatchToWorker(
exchange: HttpServerExchange,
crossinline block: () -> Unit
) {
when {
sharedExecutor != null -> exchange.dispatch(sharedExecutor, Runnable { block() })
exchange.isInIoThread -> exchange.dispatch(Runnable { block() })
else -> block()
}
}
private fun writeResponse(response: Response, exchange: HttpServerExchange) {
exchange.statusCode = response.status
response.headers.forEach { key, values ->
values.forEach { value ->
exchange.responseHeaders.add(HttpString(key), value)
}
}
val body = response.materializedBody
?: throw IllegalStateException("Response body not materialized")
when (body) {
is RawResponseBody.Empty -> exchange.endExchange()
is RawResponseBody.Bytes -> exchange.outputStream.use { it.write(body.bytes) }
is RawResponseBody.Stream -> {
body.input.use { input ->
exchange.outputStream.use { output -> input.copyTo(output) }
}
}
is RawResponseBody.Sse -> handleSseResponse(body.handler, exchange)
}
}
private fun handleSseResponse(handler: Consumer<SseConnection>, exchange: HttpServerExchange) {
val writer = UndertowSseWriter(exchange)
val connection = SseConnection(writer)
// FIX #8.3: 注册 task 开始,结束时递减计数
activeSseTasks.incrementAndGet()
sseExecutor.execute {
try {
connection.use {
try {
handler.accept(it)
} catch (e: IOException) {
logger.debug("SSE connection closed by client: ${e.message}")
} catch (e: Exception) {
connection.close(SseCloseReason.Error(e))
logger.error("SSE error: ${e.message}", e)
}
}
} finally {
// 无论正常结束还是异常,都保证计数正确递减
activeSseTasks.decrementAndGet()
}
}
}
class UndertowSseWriter(exchange: HttpServerExchange) : SseWriter {
private val writer = exchange.outputStream.bufferedWriter(Charsets.UTF_8)
override fun write(text: String) = writer.write(text)
override fun flush() = writer.flush()
override fun close() = writer.close()
}
private fun writeInternalError(exchange: HttpServerExchange) {
if (exchange.isResponseStarted) return
exchange.statusCode = 500
exchange.outputStream.bufferedWriter().use { it.write("Internal server error") }
}
}
// FIX #8.6: FormParserFactory 改为 lazy 单例,不再每次 multipart 请求时重建
// FormParserFactory.Builder 内部会注册 parser 定义,构建代价不可忽视
object UndertowRequestAdapter {
// lazy + @Volatile 保证线程安全的延迟初始化
// 注意:ServerConfig 在此处需要外部传入,但 config 在 adapt() 调用前已固定
// 所以将 factory 作为 lateinit 属性,在第一次 adapt() 时由 config 初始化
@Volatile
private var formParserFactory: FormParserFactory? = null
private fun getOrCreateFactory(config: ServerConfig): FormParserFactory {
// Double-checked locking,避免每次 multipart 请求都加锁
return formParserFactory ?: synchronized(this) {
formParserFactory ?: createFormParserFactory(config).also {
formParserFactory = it
}
}
}
private fun createFormParserFactory(config: ServerConfig): FormParserFactory {
val multipart = MultiPartParserDefinition().apply {
maxIndividualFileSize = config.maxFileSize
setFileSizeThreshold(config.fileSizeThreshold)
}
return FormParserFactory.Builder()
.addParser(multipart)
.withDefaultCharset(StandardCharsets.UTF_8.name())
.build()
}
fun adapt(exchange: HttpServerExchange, config: ServerConfig): Request {
val headers = extractHeaders(exchange)
val contentType = headers["content-type"]
val isMultipart = contentType?.startsWith("multipart/form-data") == true
return Request(
method = exchange.requestMethod.toString(),
path = UrlPath.normalize(exchange.requestPath),
queryString = exchange.queryString ?: "",
headers = headers,
stream = exchange.inputStream,
serverInfo = Request.ServerInfo(
remoteAddr = exchange.sourceAddress.address.hostAddress,
remoteHost = exchange.sourceAddress.hostName
?: exchange.sourceAddress.address.hostAddress,
remotePort = exchange.sourceAddress.port,
scheme = exchange.requestScheme,
serverName = exchange.hostName ?: "localhost",
serverPort = exchange.hostPort,
isSecure = exchange.requestScheme == "https",
protocol = exchange.protocol.toString()
),
multipartSupplier = if (isMultipart) {
{ parseMultipart(exchange, config) }
} else {
{ emptyList() }
}
)
}
private fun parseMultipart(exchange: HttpServerExchange, config: ServerConfig): List<Part> {
// FIX #8.6: 使用缓存的 factory 而非每次重建
val parser = getOrCreateFactory(config).createParser(exchange)
?: error("Request is not multipart")
val formData: FormData = try {
parser.parseBlocking()
} catch (err: IOException) {
if (err.message?.contains("UT000036") == true) {
throw IllegalStateException(
"Failed to parse multipart body. " +
"The request body may have been consumed earlier.",
err
)
}
throw err
}
val parts = mutableListOf<Part>()
for (fieldName in formData) {
for (formValue in formData.get(fieldName)) {
val part = if (formValue.isFileItem) {
FilePart(
name = fieldName,
filename = formValue.fileName ?: "unknown",
contentType = formValue.headers.getFirst(
io.undertow.util.Headers.CONTENT_TYPE
),
size = formValue.fileItem.fileSize,
inputStream = formValue.fileItem.inputStream,
)
} else {
FormField(fieldName, formValue.value)
}
parts.add(part)
}
}
return parts
}
private fun extractHeaders(exchange: HttpServerExchange): Headers {
val headers = Headers()
for (headerValues: HeaderValues in exchange.requestHeaders) {
val name = headerValues.headerName.toString()
headerValues.forEach { headers.add(name, it) }
}
return headers
}
}一个额外补充
Review 中 #8.3 的表述略有偏差,它说"SSE tasks may be forcefully terminated",但实际上在原代码里 sseExecutor 根本就没被关闭(#8.2),所以任务不会被 shutdownNow() 强制中断——而是会泄漏。#8.3 真正的风险是:加上 #8.2 的修复后(即正确地 shutdownNow()),如果没有 #8.3 的 in-flight 计数机制,才会出现"正在写数据的 SSE handler 被中断"。所以这两个 issue 是联动的,必须一起修复。
另外,sseExecutor 在平台线程模式下与 sharedExecutor 是两个不同的对象,stop() 里需要分别处理;在虚拟线程模式下两者是同一个对象(sharedExecutor),不需要二次关闭——代码中通过 if (sharedExecutor == null) 来区分这两种情况。