代码:
package site.daydream.colleen
import com.cronutils.model.Cron
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.model.time.ExecutionTime
import com.cronutils.parser.CronParser
import java.time.ZonedDateTime
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.math.abs
import kotlin.time.Duration
import kotlin.time.DurationUnit
// ============= Core Data Models =============
enum class TaskState {
IDLE, RUNNING, DISABLED
}
enum class ExecutionResult {
SUCCESS, FAILED, TIMEOUT
}
enum class ManagerState {
CREATED, RUNNING, STOPPING, STOPPED
}
sealed class Schedule {
data class Once(val delayMs: Long = 0, val autoRemove: Boolean = true) : Schedule()
data class FixedRate(val intervalMs: Long) : Schedule()
data class FixedDelay(val delayMs: Long) : Schedule()
data class Cron(val expression: String) : Schedule()
}
data class TaskContext(
val taskName: String,
val executionTime: Long,
val injectedObjects: Map<String, Any>
) {
inline fun <reified T> get(key: String): T? = injectedObjects[key] as? T
inline fun <reified T> getByType(): T? = injectedObjects.values.filterIsInstance<T>().firstOrNull()
}
sealed class AsyncTaskResult<out T> {
data class Success<T>(val value: T, val duration: Long) : AsyncTaskResult<T>()
data class Failed(val exception: Exception, val duration: Long) : AsyncTaskResult<Nothing>()
}
// Helper functions
fun fixedRate(duration: Duration) = Schedule.FixedRate(duration.inWholeMilliseconds)
fun fixedRate(ms: Long) = Schedule.FixedRate(ms)
fun fixedDelay(duration: Duration) = Schedule.FixedDelay(duration.inWholeMilliseconds)
fun fixedDelay(ms: Long) = Schedule.FixedDelay(ms)
fun cron(expression: String) = Schedule.Cron(expression)
fun once(duration: Duration = Duration.ZERO, autoRemove: Boolean = true) =
Schedule.Once(duration.inWholeMilliseconds, autoRemove)
fun once(ms: Long = 0, autoRemove: Boolean = true) = Schedule.Once(ms, autoRemove)
// ============= Task Information =============
data class ExecutionStats(
val totalExecutions: Long,
val successCount: Long,
val failureCount: Long,
val lastResult: ExecutionResult?,
val lastException: Exception?,
val lastExecutionDuration: Long,
val averageExecutionDuration: Long,
val minExecutionDuration: Long,
val maxExecutionDuration: Long
)
data class TaskInfo(
val name: String,
val state: TaskState,
val enabled: Boolean,
val schedule: Schedule,
val addedTime: Long,
val lastExecuteTime: Long?,
val nextExecuteTime: Long?,
val stats: ExecutionStats
) {
override fun toString(): String = """
|Task: $name
| State: $state (Enabled: $enabled)
| Schedule: $schedule
| Stats: ${stats.totalExecutions} total, ${stats.successCount} success, ${stats.failureCount} failed
| Duration: last=${stats.lastExecutionDuration}ms, avg=${stats.averageExecutionDuration}ms, min=${stats.minExecutionDuration}ms, max=${stats.maxExecutionDuration}ms
| LastResult: ${stats.lastResult}, LastError: ${stats.lastException?.message ?: "None"}
""".trimMargin()
}
data class ManagerStats(
val totalTasks: Int,
val enabledTasks: Int,
val disabledTasks: Int,
val runningTasksCount: Int,
val totalExecutions: Long,
val totalSuccesses: Long,
val totalFailures: Long,
val managerState: ManagerState,
val uptime: Long
) {
override fun toString(): String = """
|TaskManager Stats
| Tasks: total=$totalTasks, enabled=$enabledTasks, disabled=$disabledTasks, running=$runningTasksCount
| Executions: total=$totalExecutions, success=$totalSuccesses, failed=$totalFailures
| State: $managerState, uptime=${uptime}ms
""".trimMargin()
}
// ============= Exception Definitions =============
open class TaskManagerException(message: String, cause: Throwable? = null) : RuntimeException(message, cause)
class TaskAlreadyExistsException(name: String) : TaskManagerException("Task '$name' already exists")
class TaskNotFoundException(name: String) : TaskManagerException("Task '$name' not found")
class ManagerNotStartedException : TaskManagerException("Manager not started")
class InvalidCronExpressionException(expression: String, cause: Throwable? = null) :
TaskManagerException("Invalid cron expression: '$expression'", cause)
// ============= CRON Support =============
internal object CronSupport {
private val cronParser = CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX))
fun parseCron(expression: String): Cron {
return try {
cronParser.parse(expression)
} catch (e: Exception) {
throw InvalidCronExpressionException(expression, e)
}
}
fun getNextExecutionTime(cron: Cron, from: ZonedDateTime): Long {
val executionTime = ExecutionTime.forCron(cron)
val nextExecution = executionTime.nextExecution(from)
return if (nextExecution.isPresent) {
nextExecution.get().toInstant().toEpochMilli()
} else {
Long.MAX_VALUE
}
}
}
internal class StatsAccumulator {
private val countValue = AtomicLong(0)
private val sumValue = AtomicLong(0)
private val minValue = AtomicLong(Long.MAX_VALUE)
private val maxValue = AtomicLong(Long.MIN_VALUE)
fun record(duration: Long) {
countValue.incrementAndGet()
sumValue.addAndGet(duration)
minValue.getAndUpdate { cur -> if (duration < cur) duration else cur }
maxValue.getAndUpdate { cur -> if (duration > cur) duration else cur }
}
val count: Long get() = countValue.get()
val average: Long get() = if (this.count == 0L) 0 else sumValue.get() / this.count
val min: Long get() = if (count == 0L) 0 else minValue.get()
val max: Long get() = if (count == 0L) 0 else maxValue.get()
}
// ============= Internal Task Wrapper =============
internal class InternalTask(
val name: String,
val schedule: Schedule,
var enabled: Boolean,
val allowOverlapping: Boolean,
val execute: (TaskContext) -> Unit,
val contextProvider: (() -> TaskContext)?
) {
@Volatile
var state: TaskState = TaskState.IDLE
@Volatile
var isRunning: Boolean = false
val addedTime: Long = System.currentTimeMillis()
var lastExecuteTime: Long? = null
var nextExecuteTime: Long? = null
// 🔧 修复1: CRON任务防重复执行
@Volatile
var lastCronExecutionTime: Long = 0
val totalExecutions = AtomicLong(0)
val successCount = AtomicLong(0)
val failureCount = AtomicLong(0)
var lastResult: ExecutionResult? = null
var lastException: Exception? = null
var lastExecutionDuration: Long = 0
private val executionDurations = StatsAccumulator()
private var parsedCron: Cron? = null
fun getParsedCron(): Cron? {
if (parsedCron == null && schedule is Schedule.Cron) {
parsedCron = CronSupport.parseCron(schedule.expression)
}
return parsedCron
}
fun recordDuration(duration: Long) {
executionDurations.record(duration)
}
fun toTaskInfo(): TaskInfo = TaskInfo(
name = name,
state = state,
enabled = enabled,
schedule = schedule,
addedTime = addedTime,
lastExecuteTime = lastExecuteTime,
nextExecuteTime = nextExecuteTime,
stats = ExecutionStats(
totalExecutions = totalExecutions.get(),
successCount = successCount.get(),
failureCount = failureCount.get(),
lastResult = lastResult,
lastException = lastException,
lastExecutionDuration = lastExecutionDuration,
averageExecutionDuration = executionDurations.average,
minExecutionDuration = executionDurations.min,
maxExecutionDuration = executionDurations.max
)
)
}
internal class InternalAsyncTask<T>(
val name: String,
val execute: (Array<out Any?>) -> T
) {
@Volatile
var state: TaskState = TaskState.IDLE
@Volatile
var isRunning: Boolean = false
val addedTime: Long = System.currentTimeMillis()
var lastExecuteTime: Long? = null
val totalExecutions = AtomicLong(0)
val successCount = AtomicLong(0)
val failureCount = AtomicLong(0)
var lastException: Exception? = null
var lastExecutionDuration: Long = 0
private val executionDurations = StatsAccumulator()
fun recordDuration(duration: Long) {
executionDurations.record(duration)
}
}
// ============= TaskManager Main Class =============
class TaskManager(
val maxConcurrent: Int = 5,
val allowOverlapping: Boolean = false,
val threadName: String = "task-manager",
val shutdownTimeout: Duration = Duration.parse("30s"),
val onError: ((task: TaskInfo, exception: Exception) -> Unit)? = null
) {
private val lock = ReentrantReadWriteLock()
private val tasks = ConcurrentHashMap<String, InternalTask>()
private val asyncTasks = ConcurrentHashMap<String, InternalAsyncTask<*>>()
private var managerState = ManagerState.CREATED
private val injectedObjects = ConcurrentHashMap<String, Any>()
private val totalExecutions = AtomicLong(0)
private val totalSuccesses = AtomicLong(0)
private val totalFailures = AtomicLong(0)
private val startTime = System.currentTimeMillis()
private lateinit var scheduler: ScheduledExecutorService
private val scheduledFutures = ConcurrentHashMap<String, ScheduledFuture<*>>()
// 🔧 修复2: 使用Phaser代替CountDownLatch以支持重用
private val awaitPhaser = Phaser(1)
companion object {
private val threadCounter = AtomicLong(0)
private val managerCounter = AtomicLong(0) // 🔧 修复11: 唯一实例ID
fun builder(): Builder = Builder()
operator fun invoke(block: Builder.() -> Unit): TaskManager = builder().apply(block).build()
}
class Builder {
var maxConcurrent: Int = 5
var allowOverlapping: Boolean = false
var threadName: String = "task-manager"
var shutdownTimeout: Duration = Duration.parse("30s")
var exceptionHandler: ((task: TaskInfo, exception: Exception) -> Unit)? = null
fun maxConcurrent(value: Int) = apply { this.maxConcurrent = value }
fun allowOverlapping(value: Boolean) = apply { this.allowOverlapping = value }
fun threadName(value: String) = apply { this.threadName = value }
fun shutdownTimeout(value: Duration) = apply { this.shutdownTimeout = value }
fun exceptionHandler(handler: (task: TaskInfo, exception: Exception) -> Unit) =
apply { this.exceptionHandler = handler }
fun build(): TaskManager = TaskManager(
maxConcurrent, allowOverlapping, threadName, shutdownTimeout, exceptionHandler
)
}
// 🔧 修复11: 唯一线程名
private val instanceId = managerCounter.incrementAndGet()
// ========== Lifecycle Management ==========
fun start(await: Boolean = false) {
lock.writeLock().withLock {
if (managerState == ManagerState.RUNNING) return@withLock
if (managerState != ManagerState.CREATED) {
throw IllegalStateException("Cannot restart a stopped manager")
}
scheduler = Executors.newScheduledThreadPool(maxConcurrent) { runnable ->
Thread(runnable, "$threadName-$instanceId-${threadCounter.incrementAndGet()}")
}
managerState = ManagerState.RUNNING
}
val tasksToSchedule = lock.readLock().withLock {
tasks.values.filter { it.enabled && it.state != TaskState.DISABLED }.toList()
}
tasksToSchedule.forEach { scheduleTask(it) }
if (await) {
awaitPhaser.awaitAdvance(0)
}
}
fun await() {
awaitPhaser.awaitAdvance(0)
}
fun stop() {
lock.writeLock().withLock {
if (managerState != ManagerState.RUNNING) return@withLock
managerState = ManagerState.STOPPING
scheduledFutures.values.forEach { it.cancel(false) }
scheduledFutures.clear()
scheduler.shutdown()
val terminated = scheduler.awaitTermination(
shutdownTimeout.toLong(DurationUnit.MILLISECONDS),
TimeUnit.MILLISECONDS
)
if (!terminated) {
scheduler.shutdownNow()
}
managerState = ManagerState.STOPPED
awaitPhaser.arrive()
}
}
fun stopNow() {
lock.writeLock().withLock {
if (managerState != ManagerState.RUNNING && managerState != ManagerState.STOPPING) return
scheduledFutures.values.forEach { it.cancel(true) }
scheduledFutures.clear()
scheduler.shutdownNow()
managerState = ManagerState.STOPPED
awaitPhaser.arrive()
}
}
fun isRunning(): Boolean = lock.readLock().withLock { managerState == ManagerState.RUNNING }
// ========== Context Injection ==========
fun injectObject(key: String, obj: Any) {
injectedObjects[key] = obj
}
fun injectObjects(vararg pairs: Pair<String, Any>) = pairs.forEach { (k, v) -> injectedObjects[k] = v }
fun removeInjectedObject(key: String) {
injectedObjects.remove(key)
}
fun getInjectedObject(key: String): Any? = injectedObjects[key]
fun clearInjectedObjects() {
injectedObjects.clear()
}
fun getAllInjectedObjects(): Map<String, Any> = injectedObjects.toMap()
// ========== Task Management ==========
fun add(name: String, schedule: Schedule, execute: (TaskContext) -> Unit) {
lock.writeLock().withLock {
if (tasks.containsKey(name)) throw TaskAlreadyExistsException(name)
val task = InternalTask(
name, schedule, true, allowOverlapping, execute,
contextProvider = {
TaskContext(name, System.currentTimeMillis(), injectedObjects.toMap())
}
)
tasks[name] = task
if (managerState == ManagerState.RUNNING) scheduleTask(task)
}
}
fun addOrReplace(name: String, schedule: Schedule, execute: (TaskContext) -> Unit) {
lock.writeLock().withLock {
scheduledFutures[name]?.cancel(false)
scheduledFutures.remove(name)
val task = InternalTask(
name, schedule, true, allowOverlapping, execute,
contextProvider = {
TaskContext(name, System.currentTimeMillis(), injectedObjects.toMap())
}
)
tasks[name] = task
if (managerState == ManagerState.RUNNING) scheduleTask(task)
}
}
fun <T> addAsync(name: String, execute: (Array<out Any?>) -> T) {
lock.writeLock().withLock {
if (asyncTasks.containsKey(name)) throw TaskAlreadyExistsException(name)
asyncTasks[name] = InternalAsyncTask(name, execute)
}
}
fun enable(name: String) {
lock.writeLock().withLock {
val task = tasks[name] ?: throw TaskNotFoundException(name)
if (task.enabled) return
task.enabled = true
task.state = TaskState.IDLE
if (managerState == ManagerState.RUNNING) scheduleTask(task)
}
}
fun disable(name: String) {
lock.writeLock().withLock {
val task = tasks[name] ?: throw TaskNotFoundException(name)
if (!task.enabled) return
task.enabled = false
task.state = TaskState.DISABLED
// 🔧 修复3: 取消任务的future
scheduledFutures[name]?.cancel(false)
scheduledFutures.remove(name)
}
}
fun remove(name: String) {
lock.writeLock().withLock {
tasks.remove(name) ?: throw TaskNotFoundException(name)
scheduledFutures[name]?.cancel(false)
scheduledFutures.remove(name)
}
}
fun removeAsync(name: String) {
lock.writeLock().withLock {
asyncTasks.remove(name) ?: throw TaskNotFoundException(name)
}
}
// ========== Task Execution ==========
fun runNow(name: String, wait: Boolean = false) {
if (managerState != ManagerState.RUNNING) throw ManagerNotStartedException()
val task = lock.readLock().withLock {
tasks[name] ?: throw TaskNotFoundException(name)
}
// 🔧 修复4: 在执行时再次检查状态
if (!task.enabled) throw IllegalStateException("Task '$name' is disabled")
if (wait) {
executeTask(task)
} else {
scheduler.submit { executeTask(task) }
}
}
@Suppress("UNCHECKED_CAST")
fun <T> runAsync(name: String, vararg params: Any?, wait: Boolean = false): Future<AsyncTaskResult<T>> {
if (managerState != ManagerState.RUNNING) throw ManagerNotStartedException()
val task = lock.readLock().withLock {
asyncTasks[name] ?: throw TaskNotFoundException(name)
} as InternalAsyncTask<T>
return if (wait) {
CompletableFuture.completedFuture(executeAsyncTask(task, params))
} else {
scheduler.submit(Callable { executeAsyncTask(task, params) })
}
}
private fun scheduleTask(task: InternalTask) {
if (!task.enabled) return
val future = when (val sched = task.schedule) {
is Schedule.Once -> {
scheduler.schedule({
executeTask(task)
// 🔧 修复7: Once任务执行后自动处理
if (sched.autoRemove) {
lock.writeLock().withLock {
tasks.remove(task.name)
scheduledFutures.remove(task.name)
}
}
}, sched.delayMs, TimeUnit.MILLISECONDS)
}
is Schedule.FixedRate -> scheduler.scheduleAtFixedRate(
{ executeTask(task) }, 0, sched.intervalMs, TimeUnit.MILLISECONDS
)
is Schedule.FixedDelay -> scheduler.scheduleWithFixedDelay(
{ executeTask(task) }, sched.delayMs, sched.delayMs, TimeUnit.MILLISECONDS
)
is Schedule.Cron -> scheduleCronTask(task)
}
scheduledFutures[task.name] = future
updateNextExecuteTime(task)
}
/**
* 🔧 修复1: CRON调度防重复执行
*/
private fun scheduleCronTask(task: InternalTask): ScheduledFuture<*> {
val cron = task.getParsedCron()!!
return scheduler.scheduleAtFixedRate({
if (!task.enabled || task.isRunning) return@scheduleAtFixedRate
val now = ZonedDateTime.now()
val nextTime = CronSupport.getNextExecutionTime(cron, now)
val currentTime = System.currentTimeMillis()
// 在下次执行时间的前后1秒窗口内且未执行过时才执行
if (abs(currentTime - nextTime) < 1000 &&
task.lastCronExecutionTime < nextTime - 1000
) {
task.lastCronExecutionTime = currentTime
executeTask(task)
}
}, 0, 1, TimeUnit.SECONDS)
}
private fun updateNextExecuteTime(task: InternalTask) {
task.nextExecuteTime = when (val sched = task.schedule) {
is Schedule.Once -> System.currentTimeMillis() + sched.delayMs
is Schedule.FixedRate -> System.currentTimeMillis() + sched.intervalMs
is Schedule.FixedDelay -> System.currentTimeMillis() + sched.delayMs
is Schedule.Cron -> {
val cron = task.getParsedCron()
cron?.let { CronSupport.getNextExecutionTime(it, ZonedDateTime.now()) } ?: Long.MAX_VALUE
}
}
}
private fun executeTask(task: InternalTask) {
// 🔧 双重检查状态
if (!task.enabled || (task.isRunning && !task.allowOverlapping)) return
task.isRunning = true
task.state = TaskState.RUNNING
task.lastExecuteTime = System.currentTimeMillis()
try {
val startTime = System.currentTimeMillis()
val context = task.contextProvider?.invoke() ?: TaskContext(task.name, startTime, emptyMap())
task.execute(context)
val duration = System.currentTimeMillis() - startTime
task.lastExecutionDuration = duration
task.recordDuration(duration)
task.totalExecutions.incrementAndGet()
task.successCount.incrementAndGet()
task.state = TaskState.IDLE
task.lastResult = ExecutionResult.SUCCESS
task.lastException = null
totalExecutions.incrementAndGet()
totalSuccesses.incrementAndGet()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
handleTaskException(task, e)
} catch (e: Exception) {
handleTaskException(task, e)
} finally {
task.isRunning = false
}
}
private fun handleTaskException(task: InternalTask, e: Exception) {
val duration = System.currentTimeMillis() - (task.lastExecuteTime ?: System.currentTimeMillis())
task.lastExecutionDuration = duration
task.recordDuration(duration)
task.totalExecutions.incrementAndGet()
task.failureCount.incrementAndGet()
task.state = TaskState.IDLE
task.lastResult = ExecutionResult.FAILED
task.lastException = e
totalExecutions.incrementAndGet()
totalFailures.incrementAndGet()
// 🔧 修复6: 异常处理器的异常不再被完全吞没
try {
onError?.invoke(task.toTaskInfo(), e)
} catch (handlerEx: Exception) {
System.err.println("Error in exception handler for task '${task.name}': ${handlerEx.message}")
handlerEx.printStackTrace()
}
}
private fun <T> executeAsyncTask(task: InternalAsyncTask<T>, params: Array<out Any?>): AsyncTaskResult<T> {
task.isRunning = true
task.state = TaskState.RUNNING
task.lastExecuteTime = System.currentTimeMillis()
return try {
val startTime = System.currentTimeMillis()
val result = task.execute(params)
val duration = System.currentTimeMillis() - startTime
task.lastExecutionDuration = duration
task.recordDuration(duration)
task.totalExecutions.incrementAndGet()
task.successCount.incrementAndGet()
task.state = TaskState.IDLE
task.lastException = null
totalExecutions.incrementAndGet()
totalSuccesses.incrementAndGet()
AsyncTaskResult.Success(result, duration)
} catch (e: Exception) {
val duration = System.currentTimeMillis() - task.lastExecuteTime!!
task.lastExecutionDuration = duration
task.recordDuration(duration)
task.totalExecutions.incrementAndGet()
task.failureCount.incrementAndGet()
task.state = TaskState.IDLE
task.lastException = e
totalExecutions.incrementAndGet()
totalFailures.incrementAndGet()
AsyncTaskResult.Failed(e, duration)
} finally {
task.isRunning = false
}
}
// ========== Query API ==========
fun get(name: String): TaskInfo? = lock.readLock().withLock { tasks[name]?.toTaskInfo() }
fun exists(name: String): Boolean = lock.readLock().withLock { tasks.containsKey(name) }
fun existsAsync(name: String): Boolean = lock.readLock().withLock { asyncTasks.containsKey(name) }
fun isRunning(name: String): Boolean = lock.readLock().withLock {
tasks[name]?.isRunning ?: asyncTasks[name]?.isRunning ?: false
}
fun isEnabled(name: String): Boolean = lock.readLock().withLock { tasks[name]?.enabled ?: false }
fun getAll(): List<TaskInfo> = lock.readLock().withLock { tasks.values.map { it.toTaskInfo() } }
fun getAll(state: TaskState): List<TaskInfo> = lock.readLock().withLock {
tasks.values.filter { it.state == state }.map { it.toTaskInfo() }
}
fun stats(): ManagerStats = lock.readLock().withLock {
val enabledCount = tasks.values.count { it.enabled }
val runningCount = tasks.values.count { it.isRunning } + asyncTasks.values.count { it.isRunning }
ManagerStats(
totalTasks = tasks.size + asyncTasks.size,
enabledTasks = enabledCount,
disabledTasks = tasks.size - enabledCount,
runningTasksCount = runningCount,
totalExecutions = totalExecutions.get(),
totalSuccesses = totalSuccesses.get(),
totalFailures = totalFailures.get(),
managerState = managerState,
uptime = System.currentTimeMillis() - startTime
)
}
fun taskCount(): Int = lock.readLock().withLock { tasks.size + asyncTasks.size }
}
// ============= Extension Functions =============
fun TaskManager.scheduleTask(name: String, schedule: Schedule, block: (TaskContext) -> Unit) =
this.add(name, schedule, block)
fun <T> TaskManager.asyncTask(name: String, block: (Array<out Any?>) -> T) =
this.addAsync(name, block)