我来实现一个不使用协程的超时中间件方案:我提供了三种不使用协程的超时中间件实现方案:
方案1:基于 ExecutorService(通用方案)
使用 Future.get(timeout) 实现超时控制,适用于任何 Servlet 容器。
优点:
不依赖 Servlet 3.0+ 特性
实现简单直接
线程池可配置
缺点:
需要额外的线程池
资源占用相对较高
方案2:基于 Servlet 异步支持(推荐)
使用 Servlet 3.0+ 的 AsyncContext,这是最标准的做法。
优点:
原生支持,性能最好
不需要额外线程池管理
Jetty 原生支持
使用前提:
@WebServlet(asyncSupported = true)方案3:带自定义线程池配置
可以精确控制线程池大小和行为。
关键点
原子操作:使用
AtomicBoolean确保超时和完成状态的线程安全响应检查:使用
response.isCommitted避免重复写入响应任务取消:超时后调用
future.cancel(true)尝试中断执行
使用建议
如果你使用的是 Jetty,推荐使用方案2(AsyncTimeoutMiddleware),这是最符合 Servlet 规范且性能最好的方式。
需要我进一步解释某个实现细节吗?
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
/**
* 超时中间件(不使用协程)
* @param timeoutMillis 超时时间(毫秒)
*/
class TimeoutMiddleware(
private val timeoutMillis: Long = 30000L
) : Middleware {
private val executor = Executors.newCachedThreadPool()
override fun handle(ctx: Context, next: Next) {
val completed = AtomicBoolean(false)
val future = executor.submit {
try {
next.invoke()
completed.set(true)
} catch (e: Exception) {
if (!completed.get()) {
throw e
}
}
}
try {
// 等待执行完成,设置超时时间
future.get(timeoutMillis, TimeUnit.MILLISECONDS)
} catch (e: TimeoutException) {
// 超时了,取消任务
future.cancel(true)
if (completed.compareAndSet(false, true)) {
handleTimeout(ctx)
}
} catch (e: ExecutionException) {
// 执行过程中的异常
throw e.cause ?: e
} catch (e: InterruptedException) {
future.cancel(true)
Thread.currentThread().interrupt()
throw RuntimeException("Request interrupted", e)
}
}
private fun handleTimeout(ctx: Context) {
if (!ctx.response.isCommitted) {
ctx.response.status = 408
ctx.response.contentType = "application/json; charset=utf-8"
ctx.json(mapOf(
"error" to "Request Timeout",
"message" to "请求处理超时,超过 ${timeoutMillis}ms"
))
}
}
}
/**
* 使用 Servlet 异步支持的超时中间件(推荐方式)
*/
class AsyncTimeoutMiddleware(
private val timeoutMillis: Long = 30000L
) : Middleware {
override fun handle(ctx: Context, next: Next) {
val request = ctx.request
val response = ctx.response
// 启动异步处理
val asyncContext = request.startAsync()
asyncContext.timeout = timeoutMillis
// 设置超时监听器
asyncContext.addListener(object : javax.servlet.AsyncListener {
override fun onTimeout(event: javax.servlet.AsyncEvent) {
if (!response.isCommitted) {
response.status = 408
response.contentType = "application/json; charset=utf-8"
response.writer.write("""{"error":"Request Timeout","message":"请求超时"}""")
}
asyncContext.complete()
}
override fun onComplete(event: javax.servlet.AsyncEvent) {}
override fun onError(event: javax.servlet.AsyncEvent) {}
override fun onStartAsync(event: javax.servlet.AsyncEvent) {}
})
// 在新线程中执行后续中间件
asyncContext.start {
try {
next.invoke()
asyncContext.complete()
} catch (e: Exception) {
if (!response.isCommitted) {
response.status = 500
response.contentType = "application/json; charset=utf-8"
response.writer.write("""{"error":"Internal Server Error"}""")
}
asyncContext.complete()
throw e
}
}
}
}
/**
* 简单的超时中间件工厂函数
*/
fun timeout(timeoutMillis: Long = 30000L): Middleware {
return AsyncTimeoutMiddleware(timeoutMillis)
}
// ============ 使用示例 ============
fun main() {
val app = Application()
// 方式1:全局超时设置(30秒)
app.use(timeout(30000))
// 方式2:为特定路由设置不同的超时时间
app.use(timeout(5000)) // 5秒超时
// 业务路由
app.get("/api/slow") { ctx ->
// 模拟耗时操作
Thread.sleep(10000)
ctx.json(mapOf("message" to "completed"))
}
app.start(8080)
}
// ============ 高级版本:支持自定义超时处理 ============
class AdvancedTimeoutMiddleware(
private val timeoutMillis: Long = 30000L,
private val onTimeout: ((Context) -> Unit)? = null
) : Middleware {
private val executor = Executors.newCachedThreadPool()
override fun handle(ctx: Context, next: Next) {
val completed = AtomicBoolean(false)
val future = executor.submit {
try {
next.invoke()
completed.set(true)
} catch (e: Exception) {
if (!completed.get()) {
throw e
}
}
}
try {
future.get(timeoutMillis, TimeUnit.MILLISECONDS)
} catch (e: TimeoutException) {
future.cancel(true)
if (completed.compareAndSet(false, true)) {
if (onTimeout != null) {
onTimeout.invoke(ctx)
} else {
defaultTimeoutHandler(ctx)
}
}
} catch (e: ExecutionException) {
throw e.cause ?: e
}
}
private fun defaultTimeoutHandler(ctx: Context) {
if (!ctx.response.isCommitted) {
ctx.response.status = 408
ctx.response.contentType = "application/json; charset=utf-8"
ctx.json(mapOf(
"error" to "Request Timeout",
"message" to "请求超时",
"timeout" to timeoutMillis
))
}
}
}
// 使用示例:自定义超时处理
fun customTimeoutExample() {
val app = Application()
app.use(AdvancedTimeoutMiddleware(15000) { ctx ->
// 自定义超时响应
ctx.response.status = 503
ctx.json(mapOf(
"error" to "Service Unavailable",
"message" to "服务繁忙,请稍后重试"
))
// 添加日志
println("Request timeout: ${ctx.request.requestURI}")
})
}
// ============ 带线程池配置的版本 ============
class ConfigurableTimeoutMiddleware(
private val timeoutMillis: Long = 30000L,
private val threadPool: ExecutorService = Executors.newFixedThreadPool(100)
) : Middleware {
override fun handle(ctx: Context, next: Next) {
val completed = AtomicBoolean(false)
val future = threadPool.submit {
try {
next.invoke()
completed.set(true)
} catch (e: Exception) {
if (!completed.get()) {
throw e
}
}
}
try {
future.get(timeoutMillis, TimeUnit.MILLISECONDS)
} catch (e: TimeoutException) {
future.cancel(true)
if (completed.compareAndSet(false, true)) {
if (!ctx.response.isCommitted) {
ctx.response.status = 408
ctx.response.contentType = "application/json; charset=utf-8"
ctx.json(mapOf("error" to "Request Timeout"))
}
}
} catch (e: ExecutionException) {
throw e.cause ?: e
}
}
}
// ============ 基础类型定义(参考) ============
interface Middleware {
fun handle(ctx: Context, next: Next)
}
typealias Next = () -> Unit
interface Context {
val request: HttpServletRequest
val response: HttpServletResponse
fun json(data: Any)
}
class Application {
private val middlewares = mutableListOf<Middleware>()
fun use(middleware: Middleware) {
middlewares.add(middleware)
}
fun get(path: String, handler: (Context) -> Unit) {
// 路由注册逻辑
}
fun start(port: Int) {
// 启动服务器
}
}