Claude

我仔细review了这个设计,整体思路很好,但有几个关键点需要重构和改进。让我帮你完善:

核心问题与改进

1. TaskContext 设计需要重构

原设计的 TaskContext 过于简单,应该提供更好的类型安全:

interface TaskContext {
    operator fun <T> get(key: String): T?
    operator fun set(key: String, value: Any)
    
    // 提供类型安全的访问
    fun <T> getOrThrow(key: String): T
    fun <T> getOrDefault(key: String, default: T): T
}

2. 参数设计需要优化

task() 方法中 everydelay 的语义不够清晰:

// 改进版:明确语义
fun task(
    name: String,
    cron: String? = null,
    every: Duration? = null,        // 固定间隔(从上次开始计算)
    fixedDelay: Duration? = null,   // 固定延迟(从上次结束计算)
    initialDelay: Duration = Duration.ZERO,  // 首次执行延迟
    concurrent: Boolean = false,
    block: TaskContext.() -> Any?
)

3. 异常处理缺失

需要添加异常处理机制:

fun task(
    name: String,
    // ... 其他参数
    onError: ((Throwable) -> Unit)? = null,
    retryPolicy: RetryPolicy? = null,
    block: TaskContext.() -> Any?
)

4. 生命周期管理不完整

需要添加更细粒度的控制:

fun pause(name: String)
fun resume(name: String)
fun remove(name: String)  // 彻底删除任务
fun cancel(name: String)  // 取消调度但保留定义

// 查询状态
fun status(name: String): TaskStatus
fun isRunning(name: String): Boolean

5. Cron 表达式验证

注册时应该验证 cron 表达式:

tasks.task("cleanup", cron = "invalid") { }  // 应该立即抛出异常

6. 并发控制需要明确

concurrent: Boolean 参数含义需要文档化:

但实际场景可能需要更细粒度的控制:

enum class ConcurrencyPolicy {
    SKIP,      // 跳过本次执行
    QUEUE,     // 排队等待
    PARALLEL   // 并行执行
}

最终推荐架构

// ============ 核心 API ============

class TaskManager(
    config: TaskManagerConfig.() -> Unit = {}
) {
    
    fun task(
        name: String,
        cron: String? = null,
        every: Duration? = null,
        fixedDelay: Duration? = null,
        initialDelay: Duration = Duration.ZERO,
        concurrencyPolicy: ConcurrencyPolicy = ConcurrencyPolicy.SKIP,
        onError: ((Throwable) -> Unit)? = null,
        block: TaskContext.() -> Any?
    )
    
    // 执行 API
    fun run(name: String): Future<Any?>
    fun run(name: String, contextOverride: TaskContext.() -> Unit): Future<Any?>
    fun runBlocking(name: String): Any?
    fun runBlocking(name: String, contextOverride: TaskContext.() -> Unit): Any?
    
    // 控制 API
    fun pause(name: String)
    fun resume(name: String)
    fun cancel(name: String)
    fun remove(name: String)
    
    // 查询 API
    fun status(name: String): TaskStatus
    fun isRunning(name: String): Boolean
    fun listTasks(): List<String>
    
    // 生命周期
    fun shutdown(awaitTermination: Boolean = true)
}

// ============ 配置 ============

class TaskManagerConfig {
    var threads: Int = Runtime.getRuntime().availableProcessors()
    var threadNamePrefix: String = "task-manager"
    var defaultConcurrencyPolicy: ConcurrencyPolicy = ConcurrencyPolicy.SKIP
    val context: MutableMap<String, Any> = ConcurrentHashMap()
    
    // 全局错误处理
    var onError: ((String, Throwable) -> Unit)? = null
}

// ============ Context ============

interface TaskContext {
    operator fun <T> get(key: String): T?
    operator fun set(key: String, value: Any)
    
    fun <T> getOrThrow(key: String): T
    fun <T> getOrDefault(key: String, default: T): T
    
    // 任务元信息
    val taskName: String
    val executionCount: Long
    val lastExecutionTime: Long?
}

// ============ 枚举 ============

enum class TaskStatus {
    REGISTERED,
    SCHEDULED,
    RUNNING,
    PAUSED,
    CANCELLED,
    COMPLETED,
    FAILED
}

enum class ConcurrencyPolicy {
    SKIP,      // 跳过本次执行(如果已在运行)
    QUEUE,     // 排队等待
    PARALLEL   // 并行执行
}

完整使用示例

val tasks = TaskManager {
    threads = 8
    threadNamePrefix = "my-tasks"
    context["db"] = database
    
    onError = { taskName, error ->
        logger.error("Task $taskName failed", error)
    }
}

// 心跳任务 - 每5秒
tasks.task("heartbeat", every = 5.seconds) {
    println("heartbeat at ${System.currentTimeMillis()}")
}

// 清理任务 - cron表达式
tasks.task(
    name = "cleanup",
    cron = "0 0 * * * ?",  // 每小时整点
    concurrencyPolicy = ConcurrencyPolicy.SKIP,
    onError = { error ->
        notifyAdmin("Cleanup failed: ${error.message}")
    }
) {
    val db = getOrThrow<Database>("db")
    db.cleanup()
}

// 数据同步 - 固定延迟(上次结束后延迟)
tasks.task(
    name = "sync",
    fixedDelay = 10.seconds,
    initialDelay = 1.minutes
) {
    syncData()
}

// 手动任务 - 仅供手动触发
tasks.task("rebuild-index") {
    val db = getOrThrow<Database>("db")
    db.rebuildIndex()
    "Index rebuilt successfully"
}

// 手动执行
tasks.run("rebuild-index")

// 带额外 context
val future = tasks.run("cleanup") {
    this["targetTable"] = "users"
}

// 阻塞等待结果
val result = tasks.runBlocking("rebuild-index")
println(result)

// 控制任务
tasks.pause("heartbeat")
Thread.sleep(10000)
tasks.resume("heartbeat")

// 查询状态
println(tasks.status("cleanup"))
println(tasks.listTasks())

// 优雅关闭
tasks.shutdown(awaitTermination = true)

关键改进点总结

  1. 类型安全: Context 支持泛型

  2. 语义明确: fixedDelay vs every 区分清晰

  3. 异常处理: 全局 + 任务级错误处理

  4. 生命周期完整: pause/resume/cancel/remove

  5. 并发策略: 从 boolean 改为枚举,更灵活

  6. 元信息暴露: TaskContext 提供执行次数等信息

  7. 查询能力: 状态查询和任务列表

  8. 验证前置: cron 表达式在注册时验证

需要我提供完整的实现代码吗(约300-400行)?