我来实现一个不使用协程的超时中间件方案:我提供了三种不使用协程的超时中间件实现方案:

方案1:基于 ExecutorService(通用方案)

使用 Future.get(timeout) 实现超时控制,适用于任何 Servlet 容器。

优点

缺点

方案2:基于 Servlet 异步支持(推荐)

使用 Servlet 3.0+ 的 AsyncContext,这是最标准的做法。

优点

使用前提

@WebServlet(asyncSupported = true)

方案3:带自定义线程池配置

可以精确控制线程池大小和行为。

关键点

  1. 原子操作:使用 AtomicBoolean 确保超时和完成状态的线程安全

  2. 响应检查:使用 response.isCommitted 避免重复写入响应

  3. 任务取消:超时后调用 future.cancel(true) 尝试中断执行

使用建议

如果你使用的是 Jetty,推荐使用方案2(AsyncTimeoutMiddleware),这是最符合 Servlet 规范且性能最好的方式。

需要我进一步解释某个实现细节吗?


import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse

/**
 * 超时中间件(不使用协程)
 * @param timeoutMillis 超时时间(毫秒)
 */
class TimeoutMiddleware(
    private val timeoutMillis: Long = 30000L
) : Middleware {
    
    private val executor = Executors.newCachedThreadPool()
    
    override fun handle(ctx: Context, next: Next) {
        val completed = AtomicBoolean(false)
        val future = executor.submit {
            try {
                next.invoke()
                completed.set(true)
            } catch (e: Exception) {
                if (!completed.get()) {
                    throw e
                }
            }
        }
        
        try {
            // 等待执行完成,设置超时时间
            future.get(timeoutMillis, TimeUnit.MILLISECONDS)
        } catch (e: TimeoutException) {
            // 超时了,取消任务
            future.cancel(true)
            if (completed.compareAndSet(false, true)) {
                handleTimeout(ctx)
            }
        } catch (e: ExecutionException) {
            // 执行过程中的异常
            throw e.cause ?: e
        } catch (e: InterruptedException) {
            future.cancel(true)
            Thread.currentThread().interrupt()
            throw RuntimeException("Request interrupted", e)
        }
    }
    
    private fun handleTimeout(ctx: Context) {
        if (!ctx.response.isCommitted) {
            ctx.response.status = 408
            ctx.response.contentType = "application/json; charset=utf-8"
            ctx.json(mapOf(
                "error" to "Request Timeout",
                "message" to "请求处理超时,超过 ${timeoutMillis}ms"
            ))
        }
    }
}

/**
 * 使用 Servlet 异步支持的超时中间件(推荐方式)
 */
class AsyncTimeoutMiddleware(
    private val timeoutMillis: Long = 30000L
) : Middleware {
    
    override fun handle(ctx: Context, next: Next) {
        val request = ctx.request
        val response = ctx.response
        
        // 启动异步处理
        val asyncContext = request.startAsync()
        asyncContext.timeout = timeoutMillis
        
        // 设置超时监听器
        asyncContext.addListener(object : javax.servlet.AsyncListener {
            override fun onTimeout(event: javax.servlet.AsyncEvent) {
                if (!response.isCommitted) {
                    response.status = 408
                    response.contentType = "application/json; charset=utf-8"
                    response.writer.write("""{"error":"Request Timeout","message":"请求超时"}""")
                }
                asyncContext.complete()
            }
            
            override fun onComplete(event: javax.servlet.AsyncEvent) {}
            override fun onError(event: javax.servlet.AsyncEvent) {}
            override fun onStartAsync(event: javax.servlet.AsyncEvent) {}
        })
        
        // 在新线程中执行后续中间件
        asyncContext.start {
            try {
                next.invoke()
                asyncContext.complete()
            } catch (e: Exception) {
                if (!response.isCommitted) {
                    response.status = 500
                    response.contentType = "application/json; charset=utf-8"
                    response.writer.write("""{"error":"Internal Server Error"}""")
                }
                asyncContext.complete()
                throw e
            }
        }
    }
}

/**
 * 简单的超时中间件工厂函数
 */
fun timeout(timeoutMillis: Long = 30000L): Middleware {
    return AsyncTimeoutMiddleware(timeoutMillis)
}

// ============ 使用示例 ============

fun main() {
    val app = Application()
    
    // 方式1:全局超时设置(30秒)
    app.use(timeout(30000))
    
    // 方式2:为特定路由设置不同的超时时间
    app.use(timeout(5000))  // 5秒超时
    
    // 业务路由
    app.get("/api/slow") { ctx ->
        // 模拟耗时操作
        Thread.sleep(10000)
        ctx.json(mapOf("message" to "completed"))
    }
    
    app.start(8080)
}

// ============ 高级版本:支持自定义超时处理 ============

class AdvancedTimeoutMiddleware(
    private val timeoutMillis: Long = 30000L,
    private val onTimeout: ((Context) -> Unit)? = null
) : Middleware {
    
    private val executor = Executors.newCachedThreadPool()
    
    override fun handle(ctx: Context, next: Next) {
        val completed = AtomicBoolean(false)
        val future = executor.submit {
            try {
                next.invoke()
                completed.set(true)
            } catch (e: Exception) {
                if (!completed.get()) {
                    throw e
                }
            }
        }
        
        try {
            future.get(timeoutMillis, TimeUnit.MILLISECONDS)
        } catch (e: TimeoutException) {
            future.cancel(true)
            if (completed.compareAndSet(false, true)) {
                if (onTimeout != null) {
                    onTimeout.invoke(ctx)
                } else {
                    defaultTimeoutHandler(ctx)
                }
            }
        } catch (e: ExecutionException) {
            throw e.cause ?: e
        }
    }
    
    private fun defaultTimeoutHandler(ctx: Context) {
        if (!ctx.response.isCommitted) {
            ctx.response.status = 408
            ctx.response.contentType = "application/json; charset=utf-8"
            ctx.json(mapOf(
                "error" to "Request Timeout",
                "message" to "请求超时",
                "timeout" to timeoutMillis
            ))
        }
    }
}

// 使用示例:自定义超时处理
fun customTimeoutExample() {
    val app = Application()
    
    app.use(AdvancedTimeoutMiddleware(15000) { ctx ->
        // 自定义超时响应
        ctx.response.status = 503
        ctx.json(mapOf(
            "error" to "Service Unavailable",
            "message" to "服务繁忙,请稍后重试"
        ))
        // 添加日志
        println("Request timeout: ${ctx.request.requestURI}")
    })
}

// ============ 带线程池配置的版本 ============

class ConfigurableTimeoutMiddleware(
    private val timeoutMillis: Long = 30000L,
    private val threadPool: ExecutorService = Executors.newFixedThreadPool(100)
) : Middleware {
    
    override fun handle(ctx: Context, next: Next) {
        val completed = AtomicBoolean(false)
        val future = threadPool.submit {
            try {
                next.invoke()
                completed.set(true)
            } catch (e: Exception) {
                if (!completed.get()) {
                    throw e
                }
            }
        }
        
        try {
            future.get(timeoutMillis, TimeUnit.MILLISECONDS)
        } catch (e: TimeoutException) {
            future.cancel(true)
            if (completed.compareAndSet(false, true)) {
                if (!ctx.response.isCommitted) {
                    ctx.response.status = 408
                    ctx.response.contentType = "application/json; charset=utf-8"
                    ctx.json(mapOf("error" to "Request Timeout"))
                }
            }
        } catch (e: ExecutionException) {
            throw e.cause ?: e
        }
    }
}

// ============ 基础类型定义(参考) ============

interface Middleware {
    fun handle(ctx: Context, next: Next)
}

typealias Next = () -> Unit

interface Context {
    val request: HttpServletRequest
    val response: HttpServletResponse
    
    fun json(data: Any)
}

class Application {
    private val middlewares = mutableListOf<Middleware>()
    
    fun use(middleware: Middleware) {
        middlewares.add(middleware)
    }
    
    fun get(path: String, handler: (Context) -> Unit) {
        // 路由注册逻辑
    }
    
    fun start(port: Int) {
        // 启动服务器
    }
}