我来仔细分析这些 review 意见,逐一判断其合理性并给出修复建议。

逐条分析

Issue #8.1: Virtual Executor Not Awaited on Shutdown

结论:部分有道理,但描述有误

看现有代码的 stop() 方法(第131-152行):

virtualExecutor?.shutdown()
// ...
virtualExecutor?.let {
    val terminated = it.awaitTermination(config.shutdownTimeout, TimeUnit.MILLISECONDS)
    if (!terminated) {
        it.shutdownNow()
    }
}
server.stop()

实际上代码已经调用了 awaitTermination,所以 review 说"没有 await"是描述有误的

但确实存在一个逻辑问题:virtualExecutor.shutdown()gracefulShutdownHandler.awaitShutdown() 之前调用。这意味着:

正确顺序应该是:先等 HTTP 层停止接收,再停 executor。

Issue #8.2: SSE Executor Never Shut Down

结论:完全正确,这是真实的资源泄漏

sseExecutorlazy 初始化的,但 stop() 方法完全没有处理它:

private val sseExecutor by lazy { ... } // 从不关闭

如果有 SSE 连接发生过,JVM 退出时这个线程池会阻止 clean exit(非虚拟线程情况下的 daemon=true 只是次要缓解,虚拟线程时更严重)。

Issue #8.3: SSE Race Condition on Termination

结论:有道理,但严重程度被高估

sseExecutor.execute {
    connection.use {
        try {
            handler.accept(it)
        } catch (e: IOException) { ... }
    }
}

stop() 被调用时,正在执行的 SSE handler 会被 shutdownNow() 中断(如果加了的话),可能导致:

但 SSE 本身是单向流,客户端断线重连是标准行为,所以"严重程度 HIGH"有些夸大,实际是 MEDIUM

Issue #8.4: requestHandler Not Null-Checked

结论:正确,但实际风险取决于使用方式

private lateinit var requestHandler: (Request) -> Response

lateinit var 在未初始化时访问会抛 UninitializedPropertyAccessException。虽然正常使用下 start() 先被调用,但防御性编程角度这个批评合理。

Issue #8.5: Redundant Executor Creation

结论:有道理,可合并,但两者职责有差异需权衡

virtualExecutor(请求处理,短任务)和 sseExecutor(SSE,长任务)确实都用虚拟线程时是两个独立 executor。虚拟线程本身是轻量级的,合并是合理优化,但不是严重问题。

Issue #8.6: Multipart Factory Recreated

结论:完全正确

fun adapt(exchange: HttpServerExchange, config: ServerConfig): Request {
    // ...
    multipartSupplier = if (isMultipart) {
        { parseMultipart(exchange, config) }  // 每次调用都重建 factory
    }
}

private fun parseMultipart(...): List<Part> {
    val parser = createFormParserFactory(config).createParser(exchange) // 每次都 new
}

FormParserFactory 的构建涉及 parser 注册等初始化工作,应该缓存。

修复方案

现在给出完整修复代码,重点解决上述问题:以下是对各 issue 的详细分析和完整修复方案。先用一张图梳理 shutdown 时序的问题所在:

现在给出完整的修复代码,每处改动都有注释说明原因:

class UndertowServer(private val config: ServerConfig) : WebServer {
    private lateinit var server: Undertow
    private lateinit var gracefulShutdownHandler: GracefulShutdownHandler

    // FIX #8.4: 改为可空类型,避免 lateinit 在 start() 前被访问时抛 UninitializedPropertyAccessException
    private var requestHandler: ((Request) -> Response)? = null

    // FIX #8.5: 合并两个 virtual executor 为一个
    // 原代码为请求处理和 SSE 分别创建了两个 executor,在虚拟线程模式下没有意义
    // 虚拟线程是轻量级的,统一调度不影响性能,同时简化 shutdown 逻辑
    private val sharedExecutor: ExecutorService? = if (config.useVirtualThreads) {
        logger.info("Using virtual threads")
        Executors.newVirtualThreadPerTaskExecutor()
    } else {
        null
    }

    // FIX #8.5: sseExecutor 复用 sharedExecutor,非虚拟线程时 lazy 创建 cached 线程池
    private val sseExecutor: ExecutorService by lazy {
        sharedExecutor ?: Executors.newCachedThreadPool { runnable ->
            Thread(runnable).apply {
                isDaemon = true
                name = "sse-worker-${System.currentTimeMillis()}"
            }
        }
    }

    // FIX #8.3: 用 AtomicInteger 追踪正在执行的 SSE 任务数
    // 使得 stop() 可以 await 所有 SSE 任务真正结束,而非暴力中断
    private val activeSseTasks = AtomicInteger(0)

    override fun start(handler: (Request) -> Response) {
        this.requestHandler = handler
        server = buildServer()
        server.start()
    }

    override fun stop() {
        logger.info("Shutting down server...")

        // Step 1: 停止接受新的 HTTP 请求
        gracefulShutdownHandler.shutdown()

        // Step 2: 等待 HTTP 层所有在途请求完成
        // FIX #8.1: 必须先等 HTTP 层 drain,再关 executor
        // 原代码在 awaitShutdown 之前就调用了 virtualExecutor.shutdown()
        // 导致仍在 dispatch 阶段的请求提交 task 时抛 RejectedExecutionException
        val httpCompleted = gracefulShutdownHandler.awaitShutdown(config.shutdownTimeout)
        if (!httpCompleted) {
            logger.warn("HTTP shutdown timeout exceeded")
        }

        // Step 3: 关闭请求处理 executor(虚拟线程模式下)
        // 此时 HTTP 层已停止接受新请求,不会再有新 task 提交
        sharedExecutor?.let { exec ->
            exec.shutdown()
            val terminated = exec.awaitTermination(config.shutdownTimeout, TimeUnit.MILLISECONDS)
            if (!terminated) {
                logger.warn("Shared executor did not terminate, forcing shutdown")
                exec.shutdownNow()
            }
        }

        // FIX #8.2 + #8.3: 关闭 SSE executor 并等待所有 SSE 任务完成
        // 原代码完全没有关闭 sseExecutor,导致:
        //   - 非虚拟线程下:cached 线程池的线程阻止 JVM 干净退出(daemon=true 只是缓解)
        //   - 虚拟线程下:sharedExecutor 关闭后 SSE 任务可能被中断
        // 注意:只在 sseExecutor 未与 sharedExecutor 共享时才需要单独关闭
        if (sharedExecutor == null && ::sseExecutor.isInitialized()) {
            // 等待 in-flight SSE 任务优雅结束
            // FIX #8.3: 在 shutdownNow 之前先 await,给 SSE handler 机会发送结束帧
            val sseDrained = waitForSseTasks(config.shutdownTimeout)
            if (!sseDrained) {
                logger.warn("SSE tasks did not complete in time")
            }

            sseExecutor.shutdown()
            val sseTerminated = sseExecutor.awaitTermination(
                config.shutdownTimeout, TimeUnit.MILLISECONDS
            )
            if (!sseTerminated) {
                logger.warn("SSE executor did not terminate, forcing shutdown")
                sseExecutor.shutdownNow()
            }
        }

        // Step 5: 最后停止 Undertow 本身
        server.stop()
    }

    // FIX #8.3: 轮询等待 activeSseTasks 归零
    // 使用 CountDownLatch 或 AtomicInteger + sleep 均可,这里用简单的 sleep 轮询
    // 生产环境可替换为 CompletableFuture 或 Phaser 实现更精确的等待
    private fun waitForSseTasks(timeoutMs: Long): Boolean {
        val deadline = System.currentTimeMillis() + timeoutMs
        while (activeSseTasks.get() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(50)
        }
        return activeSseTasks.get() == 0
    }

    // lazy 属性的 isInitialized 需要用 ::sseExecutor.isInitialized() 判断
    // 避免在没有 SSE 请求时强制初始化线程池
    private val sseExecutorInitialized get() = 
        UndertowServer::sseExecutor.getDelegate().isInitialized()

    private fun buildServer(): Undertow {
        val cpuCount = Runtime.getRuntime().availableProcessors()

        return Undertow.builder().apply {
            addHttpListener(config.port, config.host)
            setIoThreads(cpuCount)
            setWorkerThreads(
                if (config.useVirtualThreads) cpuCount else config.maxThreads
            )
            setServerOption(UndertowOptions.URL_CHARSET, "UTF-8")
            setServerOption(UndertowOptions.IDLE_TIMEOUT, config.idleTimeout.toInt())
            setServerOption(UndertowOptions.MAX_ENTITY_SIZE, config.maxRequestSize)
            setSocketOption(Options.TCP_NODELAY, true)
            setBufferSize(8 * 1024)
            setDirectBuffers(true)
            setHandler(buildHandlerChain())
        }.build()
    }

    private fun buildHandlerChain(): HttpHandler {
        var handler: HttpHandler = createRootHandler()
        if (config.maxConcurrentRequests > 0) {
            handler = RequestLimitingHandler(config.maxConcurrentRequests, handler)
        }
        handler = GracefulShutdownHandler(handler).also {
            gracefulShutdownHandler = it
        }
        return handler
    }

    private fun createRootHandler(): HttpHandler {
        return HttpHandler { exchange ->
            val requestStartNano = System.nanoTime()

            dispatchToWorker(exchange) {
                exchange.startBlocking()

                // FIX #8.4: requestHandler 为 null 时返回 503
                // 理论上 start() 先于任何请求,但防御性检查保证不会崩溃
                val handler = requestHandler ?: run {
                    exchange.statusCode = 503
                    exchange.outputStream.bufferedWriter().use { it.write("Server is starting") }
                    return@dispatchToWorker
                }

                val request = UndertowRequestAdapter.adapt(exchange, config)
                val response = handler(request)

                val responseWriteStartNano = System.nanoTime()

                exchange.addExchangeCompleteListener { ex, next ->
                    try {
                        val now = System.nanoTime()
                        val total = (now - requestStartNano).nanoseconds
                        val io = (now - responseWriteStartNano).nanoseconds
                        val bytes = ex.responseBytesSent
                        response.onResponseSent?.invoke(total, io, bytes)
                    } finally {
                        next.proceed()
                    }
                }

                try {
                    writeResponse(response, exchange)
                } catch (err: Exception) {
                    logger.error("Failed to write response", err)
                    writeInternalError(exchange)
                }
            }
        }
    }

    private inline fun dispatchToWorker(
        exchange: HttpServerExchange,
        crossinline block: () -> Unit
    ) {
        when {
            sharedExecutor != null -> exchange.dispatch(sharedExecutor, Runnable { block() })
            exchange.isInIoThread -> exchange.dispatch(Runnable { block() })
            else -> block()
        }
    }

    private fun writeResponse(response: Response, exchange: HttpServerExchange) {
        exchange.statusCode = response.status
        response.headers.forEach { key, values ->
            values.forEach { value ->
                exchange.responseHeaders.add(HttpString(key), value)
            }
        }
        val body = response.materializedBody
            ?: throw IllegalStateException("Response body not materialized")

        when (body) {
            is RawResponseBody.Empty -> exchange.endExchange()
            is RawResponseBody.Bytes -> exchange.outputStream.use { it.write(body.bytes) }
            is RawResponseBody.Stream -> {
                body.input.use { input ->
                    exchange.outputStream.use { output -> input.copyTo(output) }
                }
            }
            is RawResponseBody.Sse -> handleSseResponse(body.handler, exchange)
        }
    }

    private fun handleSseResponse(handler: Consumer<SseConnection>, exchange: HttpServerExchange) {
        val writer = UndertowSseWriter(exchange)
        val connection = SseConnection(writer)

        // FIX #8.3: 注册 task 开始,结束时递减计数
        activeSseTasks.incrementAndGet()

        sseExecutor.execute {
            try {
                connection.use {
                    try {
                        handler.accept(it)
                    } catch (e: IOException) {
                        logger.debug("SSE connection closed by client: ${e.message}")
                    } catch (e: Exception) {
                        connection.close(SseCloseReason.Error(e))
                        logger.error("SSE error: ${e.message}", e)
                    }
                }
            } finally {
                // 无论正常结束还是异常,都保证计数正确递减
                activeSseTasks.decrementAndGet()
            }
        }
    }

    class UndertowSseWriter(exchange: HttpServerExchange) : SseWriter {
        private val writer = exchange.outputStream.bufferedWriter(Charsets.UTF_8)
        override fun write(text: String) = writer.write(text)
        override fun flush() = writer.flush()
        override fun close() = writer.close()
    }

    private fun writeInternalError(exchange: HttpServerExchange) {
        if (exchange.isResponseStarted) return
        exchange.statusCode = 500
        exchange.outputStream.bufferedWriter().use { it.write("Internal server error") }
    }
}

// FIX #8.6: FormParserFactory 改为 lazy 单例,不再每次 multipart 请求时重建
// FormParserFactory.Builder 内部会注册 parser 定义,构建代价不可忽视
object UndertowRequestAdapter {

    // lazy + @Volatile 保证线程安全的延迟初始化
    // 注意:ServerConfig 在此处需要外部传入,但 config 在 adapt() 调用前已固定
    // 所以将 factory 作为 lateinit 属性,在第一次 adapt() 时由 config 初始化
    @Volatile
    private var formParserFactory: FormParserFactory? = null

    private fun getOrCreateFactory(config: ServerConfig): FormParserFactory {
        // Double-checked locking,避免每次 multipart 请求都加锁
        return formParserFactory ?: synchronized(this) {
            formParserFactory ?: createFormParserFactory(config).also {
                formParserFactory = it
            }
        }
    }

    private fun createFormParserFactory(config: ServerConfig): FormParserFactory {
        val multipart = MultiPartParserDefinition().apply {
            maxIndividualFileSize = config.maxFileSize
            setFileSizeThreshold(config.fileSizeThreshold)
        }
        return FormParserFactory.Builder()
            .addParser(multipart)
            .withDefaultCharset(StandardCharsets.UTF_8.name())
            .build()
    }

    fun adapt(exchange: HttpServerExchange, config: ServerConfig): Request {
        val headers = extractHeaders(exchange)
        val contentType = headers["content-type"]
        val isMultipart = contentType?.startsWith("multipart/form-data") == true

        return Request(
            method = exchange.requestMethod.toString(),
            path = UrlPath.normalize(exchange.requestPath),
            queryString = exchange.queryString ?: "",
            headers = headers,
            stream = exchange.inputStream,
            serverInfo = Request.ServerInfo(
                remoteAddr = exchange.sourceAddress.address.hostAddress,
                remoteHost = exchange.sourceAddress.hostName
                    ?: exchange.sourceAddress.address.hostAddress,
                remotePort = exchange.sourceAddress.port,
                scheme = exchange.requestScheme,
                serverName = exchange.hostName ?: "localhost",
                serverPort = exchange.hostPort,
                isSecure = exchange.requestScheme == "https",
                protocol = exchange.protocol.toString()
            ),
            multipartSupplier = if (isMultipart) {
                { parseMultipart(exchange, config) }
            } else {
                { emptyList() }
            }
        )
    }

    private fun parseMultipart(exchange: HttpServerExchange, config: ServerConfig): List<Part> {
        // FIX #8.6: 使用缓存的 factory 而非每次重建
        val parser = getOrCreateFactory(config).createParser(exchange)
            ?: error("Request is not multipart")

        val formData: FormData = try {
            parser.parseBlocking()
        } catch (err: IOException) {
            if (err.message?.contains("UT000036") == true) {
                throw IllegalStateException(
                    "Failed to parse multipart body. " +
                    "The request body may have been consumed earlier.",
                    err
                )
            }
            throw err
        }

        val parts = mutableListOf<Part>()
        for (fieldName in formData) {
            for (formValue in formData.get(fieldName)) {
                val part = if (formValue.isFileItem) {
                    FilePart(
                        name = fieldName,
                        filename = formValue.fileName ?: "unknown",
                        contentType = formValue.headers.getFirst(
                            io.undertow.util.Headers.CONTENT_TYPE
                        ),
                        size = formValue.fileItem.fileSize,
                        inputStream = formValue.fileItem.inputStream,
                    )
                } else {
                    FormField(fieldName, formValue.value)
                }
                parts.add(part)
            }
        }
        return parts
    }

    private fun extractHeaders(exchange: HttpServerExchange): Headers {
        val headers = Headers()
        for (headerValues: HeaderValues in exchange.requestHeaders) {
            val name = headerValues.headerName.toString()
            headerValues.forEach { headers.add(name, it) }
        }
        return headers
    }
}

一个额外补充

Review 中 #8.3 的表述略有偏差,它说"SSE tasks may be forcefully terminated",但实际上在原代码里 sseExecutor 根本就没被关闭(#8.2),所以任务不会被 shutdownNow() 强制中断——而是会泄漏。#8.3 真正的风险是:加上 #8.2 的修复后(即正确地 shutdownNow()),如果没有 #8.3 的 in-flight 计数机制,才会出现"正在写数据的 SSE handler 被中断"。所以这两个 issue 是联动的,必须一起修复。

另外,sseExecutor 在平台线程模式下与 sharedExecutor 是两个不同的对象,stop() 里需要分别处理;在虚拟线程模式下两者是同一个对象(sharedExecutor),不需要二次关闭——代码中通过 if (sharedExecutor == null) 来区分这两种情况。