代码:

package site.daydream.colleen

import com.cronutils.model.Cron
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.model.time.ExecutionTime
import com.cronutils.parser.CronParser
import java.time.ZonedDateTime
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.math.abs
import kotlin.time.Duration
import kotlin.time.DurationUnit

// ============= Core Data Models =============

enum class TaskState {
    IDLE, RUNNING, DISABLED
}

enum class ExecutionResult {
    SUCCESS, FAILED, TIMEOUT
}

enum class ManagerState {
    CREATED, RUNNING, STOPPING, STOPPED
}

sealed class Schedule {
    data class Once(val delayMs: Long = 0, val autoRemove: Boolean = true) : Schedule()
    data class FixedRate(val intervalMs: Long) : Schedule()
    data class FixedDelay(val delayMs: Long) : Schedule()
    data class Cron(val expression: String) : Schedule()
}

data class TaskContext(
    val taskName: String,
    val executionTime: Long,
    val injectedObjects: Map<String, Any>
) {
    inline fun <reified T> get(key: String): T? = injectedObjects[key] as? T
    inline fun <reified T> getByType(): T? = injectedObjects.values.filterIsInstance<T>().firstOrNull()
}

sealed class AsyncTaskResult<out T> {
    data class Success<T>(val value: T, val duration: Long) : AsyncTaskResult<T>()
    data class Failed(val exception: Exception, val duration: Long) : AsyncTaskResult<Nothing>()
}

// Helper functions
fun fixedRate(duration: Duration) = Schedule.FixedRate(duration.inWholeMilliseconds)
fun fixedRate(ms: Long) = Schedule.FixedRate(ms)
fun fixedDelay(duration: Duration) = Schedule.FixedDelay(duration.inWholeMilliseconds)
fun fixedDelay(ms: Long) = Schedule.FixedDelay(ms)
fun cron(expression: String) = Schedule.Cron(expression)
fun once(duration: Duration = Duration.ZERO, autoRemove: Boolean = true) =
    Schedule.Once(duration.inWholeMilliseconds, autoRemove)

fun once(ms: Long = 0, autoRemove: Boolean = true) = Schedule.Once(ms, autoRemove)

// ============= Task Information =============

data class ExecutionStats(
    val totalExecutions: Long,
    val successCount: Long,
    val failureCount: Long,
    val lastResult: ExecutionResult?,
    val lastException: Exception?,
    val lastExecutionDuration: Long,
    val averageExecutionDuration: Long,
    val minExecutionDuration: Long,
    val maxExecutionDuration: Long
)

data class TaskInfo(
    val name: String,
    val state: TaskState,
    val enabled: Boolean,
    val schedule: Schedule,
    val addedTime: Long,
    val lastExecuteTime: Long?,
    val nextExecuteTime: Long?,
    val stats: ExecutionStats
) {
    override fun toString(): String = """
        |Task: $name
        |  State: $state (Enabled: $enabled)
        |  Schedule: $schedule
        |  Stats: ${stats.totalExecutions} total, ${stats.successCount} success, ${stats.failureCount} failed
        |  Duration: last=${stats.lastExecutionDuration}ms, avg=${stats.averageExecutionDuration}ms, min=${stats.minExecutionDuration}ms, max=${stats.maxExecutionDuration}ms
        |  LastResult: ${stats.lastResult}, LastError: ${stats.lastException?.message ?: "None"}
    """.trimMargin()
}

data class ManagerStats(
    val totalTasks: Int,
    val enabledTasks: Int,
    val disabledTasks: Int,
    val runningTasksCount: Int,
    val totalExecutions: Long,
    val totalSuccesses: Long,
    val totalFailures: Long,
    val managerState: ManagerState,
    val uptime: Long
) {
    override fun toString(): String = """
        |TaskManager Stats
        |  Tasks: total=$totalTasks, enabled=$enabledTasks, disabled=$disabledTasks, running=$runningTasksCount
        |  Executions: total=$totalExecutions, success=$totalSuccesses, failed=$totalFailures
        |  State: $managerState, uptime=${uptime}ms
    """.trimMargin()
}

// ============= Exception Definitions =============

open class TaskManagerException(message: String, cause: Throwable? = null) : RuntimeException(message, cause)
class TaskAlreadyExistsException(name: String) : TaskManagerException("Task '$name' already exists")
class TaskNotFoundException(name: String) : TaskManagerException("Task '$name' not found")
class ManagerNotStartedException : TaskManagerException("Manager not started")
class InvalidCronExpressionException(expression: String, cause: Throwable? = null) :
    TaskManagerException("Invalid cron expression: '$expression'", cause)

// ============= CRON Support =============

internal object CronSupport {
    private val cronParser = CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX))

    fun parseCron(expression: String): Cron {
        return try {
            cronParser.parse(expression)
        } catch (e: Exception) {
            throw InvalidCronExpressionException(expression, e)
        }
    }

    fun getNextExecutionTime(cron: Cron, from: ZonedDateTime): Long {
        val executionTime = ExecutionTime.forCron(cron)
        val nextExecution = executionTime.nextExecution(from)
        return if (nextExecution.isPresent) {
            nextExecution.get().toInstant().toEpochMilli()
        } else {
            Long.MAX_VALUE
        }
    }
}

internal class StatsAccumulator {
    private val countValue = AtomicLong(0)
    private val sumValue = AtomicLong(0)
    private val minValue = AtomicLong(Long.MAX_VALUE)
    private val maxValue = AtomicLong(Long.MIN_VALUE)

    fun record(duration: Long) {
        countValue.incrementAndGet()
        sumValue.addAndGet(duration)
        minValue.getAndUpdate { cur -> if (duration < cur) duration else cur }
        maxValue.getAndUpdate { cur -> if (duration > cur) duration else cur }
    }

    val count: Long get() = countValue.get()
    val average: Long get() = if (this.count == 0L) 0 else sumValue.get() / this.count
    val min: Long get() = if (count == 0L) 0 else minValue.get()
    val max: Long get() = if (count == 0L) 0 else maxValue.get()
}

// ============= Internal Task Wrapper =============

internal class InternalTask(
    val name: String,
    val schedule: Schedule,
    var enabled: Boolean,
    val allowOverlapping: Boolean,
    val execute: (TaskContext) -> Unit,
    val contextProvider: (() -> TaskContext)?
) {
    @Volatile
    var state: TaskState = TaskState.IDLE
    @Volatile
    var isRunning: Boolean = false

    val addedTime: Long = System.currentTimeMillis()
    var lastExecuteTime: Long? = null
    var nextExecuteTime: Long? = null

    // 🔧 修复1: CRON任务防重复执行
    @Volatile
    var lastCronExecutionTime: Long = 0

    val totalExecutions = AtomicLong(0)
    val successCount = AtomicLong(0)
    val failureCount = AtomicLong(0)

    var lastResult: ExecutionResult? = null
    var lastException: Exception? = null
    var lastExecutionDuration: Long = 0

    private val executionDurations = StatsAccumulator()
    private var parsedCron: Cron? = null

    fun getParsedCron(): Cron? {
        if (parsedCron == null && schedule is Schedule.Cron) {
            parsedCron = CronSupport.parseCron(schedule.expression)
        }
        return parsedCron
    }

    fun recordDuration(duration: Long) {
        executionDurations.record(duration)
    }

    fun toTaskInfo(): TaskInfo = TaskInfo(
        name = name,
        state = state,
        enabled = enabled,
        schedule = schedule,
        addedTime = addedTime,
        lastExecuteTime = lastExecuteTime,
        nextExecuteTime = nextExecuteTime,
        stats = ExecutionStats(
            totalExecutions = totalExecutions.get(),
            successCount = successCount.get(),
            failureCount = failureCount.get(),
            lastResult = lastResult,
            lastException = lastException,
            lastExecutionDuration = lastExecutionDuration,
            averageExecutionDuration = executionDurations.average,
            minExecutionDuration = executionDurations.min,
            maxExecutionDuration = executionDurations.max
        )
    )
}

internal class InternalAsyncTask<T>(
    val name: String,
    val execute: (Array<out Any?>) -> T
) {
    @Volatile
    var state: TaskState = TaskState.IDLE
    @Volatile
    var isRunning: Boolean = false

    val addedTime: Long = System.currentTimeMillis()
    var lastExecuteTime: Long? = null

    val totalExecutions = AtomicLong(0)
    val successCount = AtomicLong(0)
    val failureCount = AtomicLong(0)

    var lastException: Exception? = null
    var lastExecutionDuration: Long = 0

    private val executionDurations = StatsAccumulator()

    fun recordDuration(duration: Long) {
        executionDurations.record(duration)
    }
}

// ============= TaskManager Main Class =============

class TaskManager(
    val maxConcurrent: Int = 5,
    val allowOverlapping: Boolean = false,
    val threadName: String = "task-manager",
    val shutdownTimeout: Duration = Duration.parse("30s"),
    val onError: ((task: TaskInfo, exception: Exception) -> Unit)? = null
) {
    private val lock = ReentrantReadWriteLock()
    private val tasks = ConcurrentHashMap<String, InternalTask>()
    private val asyncTasks = ConcurrentHashMap<String, InternalAsyncTask<*>>()
    private var managerState = ManagerState.CREATED
    private val injectedObjects = ConcurrentHashMap<String, Any>()

    private val totalExecutions = AtomicLong(0)
    private val totalSuccesses = AtomicLong(0)
    private val totalFailures = AtomicLong(0)
    private val startTime = System.currentTimeMillis()

    private lateinit var scheduler: ScheduledExecutorService
    private val scheduledFutures = ConcurrentHashMap<String, ScheduledFuture<*>>()

    // 🔧 修复2: 使用Phaser代替CountDownLatch以支持重用
    private val awaitPhaser = Phaser(1)

    companion object {
        private val threadCounter = AtomicLong(0)
        private val managerCounter = AtomicLong(0) // 🔧 修复11: 唯一实例ID

        fun builder(): Builder = Builder()
        operator fun invoke(block: Builder.() -> Unit): TaskManager = builder().apply(block).build()
    }

    class Builder {
        var maxConcurrent: Int = 5
        var allowOverlapping: Boolean = false
        var threadName: String = "task-manager"
        var shutdownTimeout: Duration = Duration.parse("30s")
        var exceptionHandler: ((task: TaskInfo, exception: Exception) -> Unit)? = null

        fun maxConcurrent(value: Int) = apply { this.maxConcurrent = value }
        fun allowOverlapping(value: Boolean) = apply { this.allowOverlapping = value }
        fun threadName(value: String) = apply { this.threadName = value }
        fun shutdownTimeout(value: Duration) = apply { this.shutdownTimeout = value }
        fun exceptionHandler(handler: (task: TaskInfo, exception: Exception) -> Unit) =
            apply { this.exceptionHandler = handler }

        fun build(): TaskManager = TaskManager(
            maxConcurrent, allowOverlapping, threadName, shutdownTimeout, exceptionHandler
        )
    }

    // 🔧 修复11: 唯一线程名
    private val instanceId = managerCounter.incrementAndGet()

    // ========== Lifecycle Management ==========

    fun start(await: Boolean = false) {
        lock.writeLock().withLock {
            if (managerState == ManagerState.RUNNING) return@withLock
            if (managerState != ManagerState.CREATED) {
                throw IllegalStateException("Cannot restart a stopped manager")
            }

            scheduler = Executors.newScheduledThreadPool(maxConcurrent) { runnable ->
                Thread(runnable, "$threadName-$instanceId-${threadCounter.incrementAndGet()}")
            }

            managerState = ManagerState.RUNNING
        }

        val tasksToSchedule = lock.readLock().withLock {
            tasks.values.filter { it.enabled && it.state != TaskState.DISABLED }.toList()
        }

        tasksToSchedule.forEach { scheduleTask(it) }

        if (await) {
            awaitPhaser.awaitAdvance(0)
        }
    }

    fun await() {
        awaitPhaser.awaitAdvance(0)
    }

    fun stop() {
        lock.writeLock().withLock {
            if (managerState != ManagerState.RUNNING) return@withLock
            managerState = ManagerState.STOPPING

            scheduledFutures.values.forEach { it.cancel(false) }
            scheduledFutures.clear()

            scheduler.shutdown()
            val terminated = scheduler.awaitTermination(
                shutdownTimeout.toLong(DurationUnit.MILLISECONDS),
                TimeUnit.MILLISECONDS
            )

            if (!terminated) {
                scheduler.shutdownNow()
            }

            managerState = ManagerState.STOPPED
            awaitPhaser.arrive()
        }
    }

    fun stopNow() {
        lock.writeLock().withLock {
            if (managerState != ManagerState.RUNNING && managerState != ManagerState.STOPPING) return

            scheduledFutures.values.forEach { it.cancel(true) }
            scheduledFutures.clear()

            scheduler.shutdownNow()
            managerState = ManagerState.STOPPED
            awaitPhaser.arrive()
        }
    }

    fun isRunning(): Boolean = lock.readLock().withLock { managerState == ManagerState.RUNNING }

    // ========== Context Injection ==========

    fun injectObject(key: String, obj: Any) {
        injectedObjects[key] = obj
    }

    fun injectObjects(vararg pairs: Pair<String, Any>) = pairs.forEach { (k, v) -> injectedObjects[k] = v }
    fun removeInjectedObject(key: String) {
        injectedObjects.remove(key)
    }

    fun getInjectedObject(key: String): Any? = injectedObjects[key]
    fun clearInjectedObjects() {
        injectedObjects.clear()
    }

    fun getAllInjectedObjects(): Map<String, Any> = injectedObjects.toMap()

    // ========== Task Management ==========

    fun add(name: String, schedule: Schedule, execute: (TaskContext) -> Unit) {
        lock.writeLock().withLock {
            if (tasks.containsKey(name)) throw TaskAlreadyExistsException(name)

            val task = InternalTask(
                name, schedule, true, allowOverlapping, execute,
                contextProvider = {
                    TaskContext(name, System.currentTimeMillis(), injectedObjects.toMap())
                }
            )

            tasks[name] = task
            if (managerState == ManagerState.RUNNING) scheduleTask(task)
        }
    }

    fun addOrReplace(name: String, schedule: Schedule, execute: (TaskContext) -> Unit) {
        lock.writeLock().withLock {
            scheduledFutures[name]?.cancel(false)
            scheduledFutures.remove(name)

            val task = InternalTask(
                name, schedule, true, allowOverlapping, execute,
                contextProvider = {
                    TaskContext(name, System.currentTimeMillis(), injectedObjects.toMap())
                }
            )

            tasks[name] = task
            if (managerState == ManagerState.RUNNING) scheduleTask(task)
        }
    }

    fun <T> addAsync(name: String, execute: (Array<out Any?>) -> T) {
        lock.writeLock().withLock {
            if (asyncTasks.containsKey(name)) throw TaskAlreadyExistsException(name)
            asyncTasks[name] = InternalAsyncTask(name, execute)
        }
    }

    fun enable(name: String) {
        lock.writeLock().withLock {
            val task = tasks[name] ?: throw TaskNotFoundException(name)
            if (task.enabled) return
            task.enabled = true
            task.state = TaskState.IDLE
            if (managerState == ManagerState.RUNNING) scheduleTask(task)
        }
    }

    fun disable(name: String) {
        lock.writeLock().withLock {
            val task = tasks[name] ?: throw TaskNotFoundException(name)
            if (!task.enabled) return
            task.enabled = false
            task.state = TaskState.DISABLED

            // 🔧 修复3: 取消任务的future
            scheduledFutures[name]?.cancel(false)
            scheduledFutures.remove(name)
        }
    }

    fun remove(name: String) {
        lock.writeLock().withLock {
            tasks.remove(name) ?: throw TaskNotFoundException(name)
            scheduledFutures[name]?.cancel(false)
            scheduledFutures.remove(name)
        }
    }

    fun removeAsync(name: String) {
        lock.writeLock().withLock {
            asyncTasks.remove(name) ?: throw TaskNotFoundException(name)
        }
    }

    // ========== Task Execution ==========

    fun runNow(name: String, wait: Boolean = false) {
        if (managerState != ManagerState.RUNNING) throw ManagerNotStartedException()

        val task = lock.readLock().withLock {
            tasks[name] ?: throw TaskNotFoundException(name)
        }

        // 🔧 修复4: 在执行时再次检查状态
        if (!task.enabled) throw IllegalStateException("Task '$name' is disabled")

        if (wait) {
            executeTask(task)
        } else {
            scheduler.submit { executeTask(task) }
        }
    }

    @Suppress("UNCHECKED_CAST")
    fun <T> runAsync(name: String, vararg params: Any?, wait: Boolean = false): Future<AsyncTaskResult<T>> {
        if (managerState != ManagerState.RUNNING) throw ManagerNotStartedException()

        val task = lock.readLock().withLock {
            asyncTasks[name] ?: throw TaskNotFoundException(name)
        } as InternalAsyncTask<T>

        return if (wait) {
            CompletableFuture.completedFuture(executeAsyncTask(task, params))
        } else {
            scheduler.submit(Callable { executeAsyncTask(task, params) })
        }
    }

    private fun scheduleTask(task: InternalTask) {
        if (!task.enabled) return

        val future = when (val sched = task.schedule) {
            is Schedule.Once -> {
                scheduler.schedule({
                    executeTask(task)
                    // 🔧 修复7: Once任务执行后自动处理
                    if (sched.autoRemove) {
                        lock.writeLock().withLock {
                            tasks.remove(task.name)
                            scheduledFutures.remove(task.name)
                        }
                    }
                }, sched.delayMs, TimeUnit.MILLISECONDS)
            }

            is Schedule.FixedRate -> scheduler.scheduleAtFixedRate(
                { executeTask(task) }, 0, sched.intervalMs, TimeUnit.MILLISECONDS
            )

            is Schedule.FixedDelay -> scheduler.scheduleWithFixedDelay(
                { executeTask(task) }, sched.delayMs, sched.delayMs, TimeUnit.MILLISECONDS
            )

            is Schedule.Cron -> scheduleCronTask(task)
        }

        scheduledFutures[task.name] = future
        updateNextExecuteTime(task)
    }

    /**
     * 🔧 修复1: CRON调度防重复执行
     */
    private fun scheduleCronTask(task: InternalTask): ScheduledFuture<*> {
        val cron = task.getParsedCron()!!

        return scheduler.scheduleAtFixedRate({
            if (!task.enabled || task.isRunning) return@scheduleAtFixedRate

            val now = ZonedDateTime.now()
            val nextTime = CronSupport.getNextExecutionTime(cron, now)
            val currentTime = System.currentTimeMillis()

            // 在下次执行时间的前后1秒窗口内且未执行过时才执行
            if (abs(currentTime - nextTime) < 1000 &&
                task.lastCronExecutionTime < nextTime - 1000
            ) {
                task.lastCronExecutionTime = currentTime
                executeTask(task)
            }
        }, 0, 1, TimeUnit.SECONDS)
    }

    private fun updateNextExecuteTime(task: InternalTask) {
        task.nextExecuteTime = when (val sched = task.schedule) {
            is Schedule.Once -> System.currentTimeMillis() + sched.delayMs
            is Schedule.FixedRate -> System.currentTimeMillis() + sched.intervalMs
            is Schedule.FixedDelay -> System.currentTimeMillis() + sched.delayMs
            is Schedule.Cron -> {
                val cron = task.getParsedCron()
                cron?.let { CronSupport.getNextExecutionTime(it, ZonedDateTime.now()) } ?: Long.MAX_VALUE
            }
        }
    }

    private fun executeTask(task: InternalTask) {
        // 🔧 双重检查状态
        if (!task.enabled || (task.isRunning && !task.allowOverlapping)) return

        task.isRunning = true
        task.state = TaskState.RUNNING
        task.lastExecuteTime = System.currentTimeMillis()

        try {
            val startTime = System.currentTimeMillis()
            val context = task.contextProvider?.invoke() ?: TaskContext(task.name, startTime, emptyMap())

            task.execute(context)

            val duration = System.currentTimeMillis() - startTime
            task.lastExecutionDuration = duration
            task.recordDuration(duration)
            task.totalExecutions.incrementAndGet()
            task.successCount.incrementAndGet()
            task.state = TaskState.IDLE
            task.lastResult = ExecutionResult.SUCCESS
            task.lastException = null

            totalExecutions.incrementAndGet()
            totalSuccesses.incrementAndGet()

        } catch (e: InterruptedException) {
            Thread.currentThread().interrupt()
            handleTaskException(task, e)
        } catch (e: Exception) {
            handleTaskException(task, e)
        } finally {
            task.isRunning = false
        }
    }

    private fun handleTaskException(task: InternalTask, e: Exception) {
        val duration = System.currentTimeMillis() - (task.lastExecuteTime ?: System.currentTimeMillis())
        task.lastExecutionDuration = duration
        task.recordDuration(duration)
        task.totalExecutions.incrementAndGet()
        task.failureCount.incrementAndGet()
        task.state = TaskState.IDLE
        task.lastResult = ExecutionResult.FAILED
        task.lastException = e

        totalExecutions.incrementAndGet()
        totalFailures.incrementAndGet()

        // 🔧 修复6: 异常处理器的异常不再被完全吞没
        try {
            onError?.invoke(task.toTaskInfo(), e)
        } catch (handlerEx: Exception) {
            System.err.println("Error in exception handler for task '${task.name}': ${handlerEx.message}")
            handlerEx.printStackTrace()
        }
    }

    private fun <T> executeAsyncTask(task: InternalAsyncTask<T>, params: Array<out Any?>): AsyncTaskResult<T> {
        task.isRunning = true
        task.state = TaskState.RUNNING
        task.lastExecuteTime = System.currentTimeMillis()

        return try {
            val startTime = System.currentTimeMillis()
            val result = task.execute(params)
            val duration = System.currentTimeMillis() - startTime

            task.lastExecutionDuration = duration
            task.recordDuration(duration)
            task.totalExecutions.incrementAndGet()
            task.successCount.incrementAndGet()
            task.state = TaskState.IDLE
            task.lastException = null

            totalExecutions.incrementAndGet()
            totalSuccesses.incrementAndGet()

            AsyncTaskResult.Success(result, duration)
        } catch (e: Exception) {
            val duration = System.currentTimeMillis() - task.lastExecuteTime!!
            task.lastExecutionDuration = duration
            task.recordDuration(duration)
            task.totalExecutions.incrementAndGet()
            task.failureCount.incrementAndGet()
            task.state = TaskState.IDLE
            task.lastException = e

            totalExecutions.incrementAndGet()
            totalFailures.incrementAndGet()

            AsyncTaskResult.Failed(e, duration)
        } finally {
            task.isRunning = false
        }
    }

    // ========== Query API ==========

    fun get(name: String): TaskInfo? = lock.readLock().withLock { tasks[name]?.toTaskInfo() }
    fun exists(name: String): Boolean = lock.readLock().withLock { tasks.containsKey(name) }
    fun existsAsync(name: String): Boolean = lock.readLock().withLock { asyncTasks.containsKey(name) }
    fun isRunning(name: String): Boolean = lock.readLock().withLock {
        tasks[name]?.isRunning ?: asyncTasks[name]?.isRunning ?: false
    }

    fun isEnabled(name: String): Boolean = lock.readLock().withLock { tasks[name]?.enabled ?: false }
    fun getAll(): List<TaskInfo> = lock.readLock().withLock { tasks.values.map { it.toTaskInfo() } }
    fun getAll(state: TaskState): List<TaskInfo> = lock.readLock().withLock {
        tasks.values.filter { it.state == state }.map { it.toTaskInfo() }
    }

    fun stats(): ManagerStats = lock.readLock().withLock {
        val enabledCount = tasks.values.count { it.enabled }
        val runningCount = tasks.values.count { it.isRunning } + asyncTasks.values.count { it.isRunning }

        ManagerStats(
            totalTasks = tasks.size + asyncTasks.size,
            enabledTasks = enabledCount,
            disabledTasks = tasks.size - enabledCount,
            runningTasksCount = runningCount,
            totalExecutions = totalExecutions.get(),
            totalSuccesses = totalSuccesses.get(),
            totalFailures = totalFailures.get(),
            managerState = managerState,
            uptime = System.currentTimeMillis() - startTime
        )
    }

    fun taskCount(): Int = lock.readLock().withLock { tasks.size + asyncTasks.size }
}

// ============= Extension Functions =============

fun TaskManager.scheduleTask(name: String, schedule: Schedule, block: (TaskContext) -> Unit) =
    this.add(name, schedule, block)

fun <T> TaskManager.asyncTask(name: String, block: (Array<out Any?>) -> T) =
    this.addAsync(name, block)