design

TaskManager 架构设计文档

1. 架构概览

┌─────────────────────────────────────────────────────────────┐
│                        TaskManager                           │
├─────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌─────────────────┐   │
│  │   Task       │  │  Scheduler   │  │  Thread Pool    │   │
│  │   Registry   │  │   Thread     │  │   (Executor)    │   │
│  └──────────────┘  └──────────────┘  └─────────────────┘   │
│         │                  │                   │             │
│         │                  │                   │             │
│  ┌──────▼──────────────────▼───────────────────▼─────────┐  │
│  │              Task Queue (Priority)                     │  │
│  └────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

2. 核心组件

2.1 TaskManager

职责:

关键字段:

private val executor: ScheduledThreadPoolExecutor
private val schedulerThread: Thread
private val tasks: ConcurrentHashMap<String, TaskEntry>
private val taskQueue: PriorityBlockingQueue<TaskQueueEntry>

2.2 Scheduler Thread

职责:

工作流程:

1. 从 taskQueue 取出下一个任务(阻塞)
2. 计算延迟时间并等待
3. 检查任务是否仍然启用
4. 提交到线程池执行
5. 如果是周期性任务,计算下次执行时间并重新入队
6. 重复

2.3 Thread Pool (ScheduledThreadPoolExecutor)

职责:

配置:

2.4 Task Registry

数据结构:

private data class TaskEntry(
    val name: String,
    val schedule: Schedule?,
    val trigger: Trigger?,
    val allowConcurrent: Boolean,
    val enabled: AtomicBoolean,
    val executing: AtomicBoolean,
    val executionCount: AtomicLong,
    val block: TaskContext.() -> Any?
)

3. 调度机制

3.1 Trigger 接口

internal interface Trigger {
    fun nextExecutionTime(lastExecution: Long?): Long?
}

实现类:

3.2 Priority Queue

使用 PriorityBlockingQueue<TaskQueueEntry> 按执行时间排序:

private data class TaskQueueEntry(
    val entry: TaskEntry,
    val nextExecutionTime: Long
) : Comparable<TaskQueueEntry>

优势:

4. 并发控制

4.1 任务级并发控制

通过 TaskEntry.executing 标志:

if (!entry.allowConcurrent && !entry.executing.compareAndSet(false, true)) {
    // Skip this execution
    return
}

逻辑:

4.2 线程安全

线程安全的数据结构:

同步点:

5. Context 管理

5.1 全局 Context

companion object {
    private val globalContext = ConcurrentHashMap<String, Any?>()
}

在 TaskManager 初始化时从配置复制:

config.context.forEach { (key, value) ->
    globalContext[key] = value
}

5.2 任务 Context

每次执行时创建新的 TaskContextImpl

val contextMap = ConcurrentHashMap<String, Any?>(globalContext)
contextMap.putAll(additionalContext)

val context = TaskContextImpl(
    taskName = entry.name,
    executionCount = executionCount,
    states = contextMap
)

特点:

6. 生命周期管理

6.1 启动流程

TaskManager() 构造
    ↓
创建线程池
    ↓
创建调度线程(未启动)
    ↓
如果 autoStart = true
    ↓
start()
    ↓
启动调度线程
    ↓
调度所有启用的任务

6.2 关闭流程

shutdown(awaitTermination)
    ↓
停止调度线程
    ↓
关闭线程池
    ↓
如果 awaitTermination = true
    ↓
等待所有任务完成(最多 30 秒)
    ↓
强制关闭(如果超时)

7. 任务执行流程

┌─────────────────────────────────────────────────────────┐
│                    Task Execution                        │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  1. Check concurrent execution control                  │
│     if (!allowConcurrent && executing) → skip           │
│                                                          │
│  2. Build context                                        │
│     - Copy global context                               │
│     - Add additional context                            │
│                                                          │
│  3. Fire onTaskStart hook                               │
│     - Allow dynamic context injection                   │
│                                                          │
│  4. Execute task block                                   │
│     - Catch all exceptions                              │
│                                                          │
│  5. Fire onTaskComplete hook                            │
│     - Pass execution result/error                       │
│                                                          │
│  6. Clear executing flag (if !allowConcurrent)          │
│                                                          │
└─────────────────────────────────────────────────────────┘

8. 错误处理

8.1 任务执行错误

var error: Throwable? = null
try {
    result = entry.block.invoke(context)
} catch (e: Throwable) {
    error = e  // Captured but not suppressed
}

// Fire onTaskComplete with error
config.onTaskComplete?.invoke(TaskExecution(..., error = error))

// Re-throw to propagate to caller
if (error != null) throw error

策略:

8.2 Cron 表达式验证

在任务注册时立即验证:

try {
    val parser = CronParser(cronDefinition)
    val cron = parser.parse(expression)
    cron.validate()
} catch (e: Exception) {
    throw IllegalArgumentException("Invalid cron expression: $expression", e)
}

9. 性能优化

9.1 避免锁竞争

9.2 内存管理

9.3 线程利用

10. 扩展性

10.1 自定义 Schedule

用户可以实现自己的 Schedule:

sealed class Schedule {
    // ... 现有类型
    
    data class Custom(val trigger: Trigger) : Schedule()
    
    internal fun toTrigger(): Trigger? = when (this) {
        is Custom -> trigger
        // ...
    }
}

10.2 自定义监控

通过 hooks 完全自定义:

class MyMonitor {
    fun onStart(event: TaskStartEvent) { }
    fun onComplete(exec: TaskExecution) { }
}

val monitor = MyMonitor()
val tasks = TaskManager {
    onTaskStart = monitor::onStart
    onTaskComplete = monitor::onComplete
}

11. 限制与权衡

11.1 当前限制

  1. 单机调度: 不支持分布式

  2. 无持久化: 任务定义不持久化

  3. 固定延迟近似: FixedDelayTrigger 实现简化

11.2 设计权衡

选择

原因

单调度线程

简化调度逻辑,避免竞争

优先级队列

高效获取下一任务

无任务持久化

保持简洁,用户自己实现

CAS 并发控制

无锁,高性能

全局 context

避免每次复制

12. 未来改进方向

  1. 真正的 FixedDelay: 跟踪任务完成时间

  2. 任务依赖: 支持任务间依赖关系

  3. 任务优先级: 不同优先级的任务

  4. 持久化插件: 可选的任务配置持久化

  5. 指标暴露: 内置 Metrics 支持

  6. 动态调整: 运行时修改任务配置

13. 总结

核心优势:

适用场景:

不适用场景: