design
TaskManager 架构设计文档
1. 架构概览
┌─────────────────────────────────────────────────────────────┐
│ TaskManager │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ Task │ │ Scheduler │ │ Thread Pool │ │
│ │ Registry │ │ Thread │ │ (Executor) │ │
│ └──────────────┘ └──────────────┘ └─────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌──────▼──────────────────▼───────────────────▼─────────┐ │
│ │ Task Queue (Priority) │ │
│ └────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘2. 核心组件
2.1 TaskManager
职责:
任务注册和管理
生命周期控制(start/shutdown)
手动任务执行
查询接口
关键字段:
private val executor: ScheduledThreadPoolExecutor
private val schedulerThread: Thread
private val tasks: ConcurrentHashMap<String, TaskEntry>
private val taskQueue: PriorityBlockingQueue<TaskQueueEntry>2.2 Scheduler Thread
职责:
从优先级队列获取下一个待执行任务
等待到执行时间
提交任务到线程池
重新调度周期性任务
工作流程:
1. 从 taskQueue 取出下一个任务(阻塞)
2. 计算延迟时间并等待
3. 检查任务是否仍然启用
4. 提交到线程池执行
5. 如果是周期性任务,计算下次执行时间并重新入队
6. 重复2.3 Thread Pool (ScheduledThreadPoolExecutor)
职责:
执行任务
并发控制
线程管理
配置:
线程数:可配置(默认为 CPU 核心数)
守护线程:true
RemoveOnCancelPolicy:true
2.4 Task Registry
数据结构:
private data class TaskEntry(
val name: String,
val schedule: Schedule?,
val trigger: Trigger?,
val allowConcurrent: Boolean,
val enabled: AtomicBoolean,
val executing: AtomicBoolean,
val executionCount: AtomicLong,
val block: TaskContext.() -> Any?
)3. 调度机制
3.1 Trigger 接口
internal interface Trigger {
fun nextExecutionTime(lastExecution: Long?): Long?
}实现类:
CronTrigger: 基于 cron-utils 解析 cron 表达式FixedRateTrigger: 固定间隔(从上次开始计算)FixedDelayTrigger: 固定延迟(从上次结束计算)OnceTrigger: 一次性执行InitialDelayTrigger: 包装其他 Trigger 添加初始延迟
3.2 Priority Queue
使用 PriorityBlockingQueue<TaskQueueEntry> 按执行时间排序:
private data class TaskQueueEntry(
val entry: TaskEntry,
val nextExecutionTime: Long
) : Comparable<TaskQueueEntry>优势:
自动按时间排序
阻塞式获取(scheduler 线程可以等待)
线程安全
4. 并发控制
4.1 任务级并发控制
通过 TaskEntry.executing 标志:
if (!entry.allowConcurrent && !entry.executing.compareAndSet(false, true)) {
// Skip this execution
return
}逻辑:
allowConcurrent = false: 使用 CAS 检查,如果任务正在执行则跳过allowConcurrent = true: 直接执行,允许多个实例并发
4.2 线程安全
线程安全的数据结构:
ConcurrentHashMap: 任务注册表PriorityBlockingQueue: 任务队列AtomicBoolean/AtomicLong: 状态标志
同步点:
任务注册:ConcurrentHashMap 保证
队列操作:PriorityBlockingQueue 保证
执行状态:AtomicBoolean CAS 操作
5. Context 管理
5.1 全局 Context
companion object {
private val globalContext = ConcurrentHashMap<String, Any?>()
}在 TaskManager 初始化时从配置复制:
config.context.forEach { (key, value) ->
globalContext[key] = value
}5.2 任务 Context
每次执行时创建新的 TaskContextImpl:
val contextMap = ConcurrentHashMap<String, Any?>(globalContext)
contextMap.putAll(additionalContext)
val context = TaskContextImpl(
taskName = entry.name,
executionCount = executionCount,
states = contextMap
)特点:
继承全局 context
支持任务级覆盖
隔离性:每次执行独立的上下文实例
6. 生命周期管理
6.1 启动流程
TaskManager() 构造
↓
创建线程池
↓
创建调度线程(未启动)
↓
如果 autoStart = true
↓
start()
↓
启动调度线程
↓
调度所有启用的任务6.2 关闭流程
shutdown(awaitTermination)
↓
停止调度线程
↓
关闭线程池
↓
如果 awaitTermination = true
↓
等待所有任务完成(最多 30 秒)
↓
强制关闭(如果超时)7. 任务执行流程
┌─────────────────────────────────────────────────────────┐
│ Task Execution │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. Check concurrent execution control │
│ if (!allowConcurrent && executing) → skip │
│ │
│ 2. Build context │
│ - Copy global context │
│ - Add additional context │
│ │
│ 3. Fire onTaskStart hook │
│ - Allow dynamic context injection │
│ │
│ 4. Execute task block │
│ - Catch all exceptions │
│ │
│ 5. Fire onTaskComplete hook │
│ - Pass execution result/error │
│ │
│ 6. Clear executing flag (if !allowConcurrent) │
│ │
└─────────────────────────────────────────────────────────┘8. 错误处理
8.1 任务执行错误
var error: Throwable? = null
try {
result = entry.block.invoke(context)
} catch (e: Throwable) {
error = e // Captured but not suppressed
}
// Fire onTaskComplete with error
config.onTaskComplete?.invoke(TaskExecution(..., error = error))
// Re-throw to propagate to caller
if (error != null) throw error策略:
捕获所有异常
通过 hook 通知用户
重新抛出(对于手动执行)
不影响调度器继续运行
8.2 Cron 表达式验证
在任务注册时立即验证:
try {
val parser = CronParser(cronDefinition)
val cron = parser.parse(expression)
cron.validate()
} catch (e: Exception) {
throw IllegalArgumentException("Invalid cron expression: $expression", e)
}9. 性能优化
9.1 避免锁竞争
使用无锁数据结构(
ConcurrentHashMap,AtomicBoolean)调度器单线程顺序处理
任务执行在独立线程池
9.2 内存管理
removeOnCancelPolicy = true: 自动清理已取消任务Context 按需创建,执行完自动回收
优先级队列自动移除已执行任务
9.3 线程利用
守护线程:不阻止 JVM 退出
线程池复用:避免频繁创建销毁
阻塞队列:调度器线程高效等待
10. 扩展性
10.1 自定义 Schedule
用户可以实现自己的 Schedule:
sealed class Schedule {
// ... 现有类型
data class Custom(val trigger: Trigger) : Schedule()
internal fun toTrigger(): Trigger? = when (this) {
is Custom -> trigger
// ...
}
}10.2 自定义监控
通过 hooks 完全自定义:
class MyMonitor {
fun onStart(event: TaskStartEvent) { }
fun onComplete(exec: TaskExecution) { }
}
val monitor = MyMonitor()
val tasks = TaskManager {
onTaskStart = monitor::onStart
onTaskComplete = monitor::onComplete
}11. 限制与权衡
11.1 当前限制
单机调度: 不支持分布式
无持久化: 任务定义不持久化
固定延迟近似: FixedDelayTrigger 实现简化
11.2 设计权衡
选择
原因
单调度线程
简化调度逻辑,避免竞争
优先级队列
高效获取下一任务
无任务持久化
保持简洁,用户自己实现
CAS 并发控制
无锁,高性能
全局 context
避免每次复制
12. 未来改进方向
真正的 FixedDelay: 跟踪任务完成时间
任务依赖: 支持任务间依赖关系
任务优先级: 不同优先级的任务
持久化插件: 可选的任务配置持久化
指标暴露: 内置 Metrics 支持
动态调整: 运行时修改任务配置
13. 总结
核心优势:
✅ 架构简洁清晰
✅ 线程安全
✅ 高性能(无锁设计)
✅ 易于扩展
✅ API 友好
适用场景:
✅ 单机定时任务
✅ 后台作业调度
✅ 周期性数据处理
✅ 维护任务自动化
不适用场景:
❌ 分布式调度
❌ 需要任务持久化
❌ 复杂的任务流编排