closeCallbacksCollected 的作用及 onClose / close 的逻辑详解

核心问题:两个线程的竞争

线程A(调用 close())          线程B(调用 onClose() 注册回调)
─────────────────────          ─────────────────────────────────
closed.CAS → true
                               ← 此时 isClosed == true,但 close() 还没收集回调
channel.close(...)             onClose(callback) 被调用
// 还没进入 synchronized 块    // 该把回调加入列表,还是立即执行?
synchronized(closeCallbacks) {
    ...

问题的本质是:closed 标志 和 "回调列表已被清空" 这两件事,不是同时发生的closed.compareAndSet 设置之后,close() 还需要一段时间才能进入 synchronized 块、收集并清空回调列表。

closeCallbacksCollected 就是用来精确标记 "close() 已经完成了回调收集" 这个时刻的。

onClose() 的逻辑

fun onClose(callback: Consumer<WsCloseReason>) {
    synchronized(closeCallbacks) {
        if (closeCallbacksCollected) {
            // ① 情况A:close() 已经收集完回调了
            //    列表已清空,不能再往里加了,直接立即执行
            runCatching { callback.accept(closeReason.get()) }
        } else {
            // ② 情况B:close() 还没收集(或者根本还没被调用)
            //    老老实实加入列表,等 close() 来触发
            closeCallbacks.add(callback)
        }
    }
}

关键:整个判断和操作都在 synchronized(closeCallbacks)。这保证了和 close() 里同一把锁的互斥。

close() 的逻辑

fun close(reason: WsCloseReason) {
    // 第一步:CAS 保证幂等,只有第一个调用者能继续
    if (!closed.compareAndSet(false, true)) return

    // 第二步:发送 WebSocket 关闭帧(网络 I/O,放在锁外面)
    runCatching { channel.close(code, msg) }

    // 第三步:在锁内原子地完成三件事
    synchronized(closeCallbacks) {
        closeReason.compareAndSet(WsCloseReason.Normal, reason) // 设置最终原因
        callbacks = ArrayList(closeCallbacks)                    // 收集回调快照
        closeCallbacks.clear()                                   // 清空列表
        closeCallbacksCollected = true                           // ← 竖起旗帜
    }

    // 第四步:锁外依次执行回调
    callbacks.forEach { cb -> runCatching { cb.accept(finalReason) } }
}

为什么不能直接用 isClosed 来判断?

假设去掉 closeCallbacksCollected,在 onClose() 里改成:

// ❌ 错误写法
if (isClosed) {
    callback.accept(closeReason.get())  // closeReason 可能还是默认值 Normal!
}

竞态时序:

close() 线程                         onClose() 线程
────────────────────────────────     ────────────────────────────
closed.CAS → true
channel.close(...)(耗时)
                                     isClosed == true ✓
                                     closeReason.get() → Normal  ← 读到旧值!
synchronized(closeCallbacks) {
    closeReason.set(Error(...))       ← 真正的原因还没写进去
    ...
}

结论:closeCallbacksCollected 充当了一个"内存屏障式的完成信号",它只在 synchronized 块内部被设为 true,确保任何读到它为 true 的线程,一定也能读到正确的 closeReason(因为同一把锁保证了 happens-before)。

完整的时序矩阵

这个设计保证了:每一个 onClose 回调,不管注册时机早晚,都会被执行恰好一次,且拿到正确的关闭原因。