Claude
我仔细review了这个设计,整体思路很好,但有几个关键点需要重构和改进。让我帮你完善:
核心问题与改进
1. TaskContext 设计需要重构
原设计的 TaskContext 过于简单,应该提供更好的类型安全:
interface TaskContext {
operator fun <T> get(key: String): T?
operator fun set(key: String, value: Any)
// 提供类型安全的访问
fun <T> getOrThrow(key: String): T
fun <T> getOrDefault(key: String, default: T): T
}2. 参数设计需要优化
task() 方法中 every 和 delay 的语义不够清晰:
// 改进版:明确语义
fun task(
name: String,
cron: String? = null,
every: Duration? = null, // 固定间隔(从上次开始计算)
fixedDelay: Duration? = null, // 固定延迟(从上次结束计算)
initialDelay: Duration = Duration.ZERO, // 首次执行延迟
concurrent: Boolean = false,
block: TaskContext.() -> Any?
)3. 异常处理缺失
需要添加异常处理机制:
fun task(
name: String,
// ... 其他参数
onError: ((Throwable) -> Unit)? = null,
retryPolicy: RetryPolicy? = null,
block: TaskContext.() -> Any?
)4. 生命周期管理不完整
需要添加更细粒度的控制:
fun pause(name: String)
fun resume(name: String)
fun remove(name: String) // 彻底删除任务
fun cancel(name: String) // 取消调度但保留定义
// 查询状态
fun status(name: String): TaskStatus
fun isRunning(name: String): Boolean5. Cron 表达式验证
注册时应该验证 cron 表达式:
tasks.task("cleanup", cron = "invalid") { } // 应该立即抛出异常6. 并发控制需要明确
concurrent: Boolean 参数含义需要文档化:
false: 同一任务的多个实例不能并发执行true: 允许并发执行
但实际场景可能需要更细粒度的控制:
enum class ConcurrencyPolicy {
SKIP, // 跳过本次执行
QUEUE, // 排队等待
PARALLEL // 并行执行
}最终推荐架构
// ============ 核心 API ============
class TaskManager(
config: TaskManagerConfig.() -> Unit = {}
) {
fun task(
name: String,
cron: String? = null,
every: Duration? = null,
fixedDelay: Duration? = null,
initialDelay: Duration = Duration.ZERO,
concurrencyPolicy: ConcurrencyPolicy = ConcurrencyPolicy.SKIP,
onError: ((Throwable) -> Unit)? = null,
block: TaskContext.() -> Any?
)
// 执行 API
fun run(name: String): Future<Any?>
fun run(name: String, contextOverride: TaskContext.() -> Unit): Future<Any?>
fun runBlocking(name: String): Any?
fun runBlocking(name: String, contextOverride: TaskContext.() -> Unit): Any?
// 控制 API
fun pause(name: String)
fun resume(name: String)
fun cancel(name: String)
fun remove(name: String)
// 查询 API
fun status(name: String): TaskStatus
fun isRunning(name: String): Boolean
fun listTasks(): List<String>
// 生命周期
fun shutdown(awaitTermination: Boolean = true)
}
// ============ 配置 ============
class TaskManagerConfig {
var threads: Int = Runtime.getRuntime().availableProcessors()
var threadNamePrefix: String = "task-manager"
var defaultConcurrencyPolicy: ConcurrencyPolicy = ConcurrencyPolicy.SKIP
val context: MutableMap<String, Any> = ConcurrentHashMap()
// 全局错误处理
var onError: ((String, Throwable) -> Unit)? = null
}
// ============ Context ============
interface TaskContext {
operator fun <T> get(key: String): T?
operator fun set(key: String, value: Any)
fun <T> getOrThrow(key: String): T
fun <T> getOrDefault(key: String, default: T): T
// 任务元信息
val taskName: String
val executionCount: Long
val lastExecutionTime: Long?
}
// ============ 枚举 ============
enum class TaskStatus {
REGISTERED,
SCHEDULED,
RUNNING,
PAUSED,
CANCELLED,
COMPLETED,
FAILED
}
enum class ConcurrencyPolicy {
SKIP, // 跳过本次执行(如果已在运行)
QUEUE, // 排队等待
PARALLEL // 并行执行
}完整使用示例
val tasks = TaskManager {
threads = 8
threadNamePrefix = "my-tasks"
context["db"] = database
onError = { taskName, error ->
logger.error("Task $taskName failed", error)
}
}
// 心跳任务 - 每5秒
tasks.task("heartbeat", every = 5.seconds) {
println("heartbeat at ${System.currentTimeMillis()}")
}
// 清理任务 - cron表达式
tasks.task(
name = "cleanup",
cron = "0 0 * * * ?", // 每小时整点
concurrencyPolicy = ConcurrencyPolicy.SKIP,
onError = { error ->
notifyAdmin("Cleanup failed: ${error.message}")
}
) {
val db = getOrThrow<Database>("db")
db.cleanup()
}
// 数据同步 - 固定延迟(上次结束后延迟)
tasks.task(
name = "sync",
fixedDelay = 10.seconds,
initialDelay = 1.minutes
) {
syncData()
}
// 手动任务 - 仅供手动触发
tasks.task("rebuild-index") {
val db = getOrThrow<Database>("db")
db.rebuildIndex()
"Index rebuilt successfully"
}
// 手动执行
tasks.run("rebuild-index")
// 带额外 context
val future = tasks.run("cleanup") {
this["targetTable"] = "users"
}
// 阻塞等待结果
val result = tasks.runBlocking("rebuild-index")
println(result)
// 控制任务
tasks.pause("heartbeat")
Thread.sleep(10000)
tasks.resume("heartbeat")
// 查询状态
println(tasks.status("cleanup"))
println(tasks.listTasks())
// 优雅关闭
tasks.shutdown(awaitTermination = true)关键改进点总结
类型安全: Context 支持泛型
语义明确:
fixedDelayvsevery区分清晰异常处理: 全局 + 任务级错误处理
生命周期完整: pause/resume/cancel/remove
并发策略: 从 boolean 改为枚举,更灵活
元信息暴露: TaskContext 提供执行次数等信息
查询能力: 状态查询和任务列表
验证前置: cron 表达式在注册时验证
需要我提供完整的实现代码吗(约300-400行)?