Dubbo(六)_时间轮算法

发布时间 2023-09-03 20:41:32作者: Stitches

时间轮算法介绍

HashedWheelTimer 定时轮算法在 netty、dubbo 等框架中运用广泛。比如在 Dubbo 中为了增强系统的容错能力,会有相应的监听判断机制比如 RPC 调用超时机制,消费者判断请求是否超时,如果超时就将结果返回给应用层。

初期 Dubbo 实现时采取将所有返回的结果 DefaultFuture 都放在一个集合中,然后通过一个定时任务每隔一定时间间隔扫描所有 Future 判断任务是否超时;这种方式虽然能实现目标但是却有很多无意义的遍历开销,因此引入时间轮算法来减少监控定时任务的开销。

HashedWheelTimer 是一个环状结构,可以看成一个时钟,其中的每个槽代表一个时间间隔,每个槽中通过双向链表存储待执行的定时任务。通过指针周期性移动,跳动到下一个槽位然后遍历任务链表,找到其中待执行的任务。

时间轮算法实现主要包括:定时任务 HashedWheelTimeout、时钟槽 HashedWheelBucket、时钟引擎 HashedWheelTimer 三个部分。


定时任务 HashedWheelTimeout

Dubbo 中依赖于 Timeout 接口,具体的定时任务类实现了该接口:

public interface Timeout {

    /**
     * Returns the {@link Timer} that created this handle.
     */
    Timer timer();

    /**
     * Returns the {@link TimerTask} which is associated with this handle.
     */
    TimerTask task();

    /**
     * 是否超时
     */
    boolean isExpired();

    /**
     * 是否取消
     */
    boolean isCancelled();

    /**
     * Attempts to cancel the {@link TimerTask} associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}

定时任务类属性信息:

private static final class HashedWheelTimeout implements Timeout {

    // 任务状态:初始化(0)、取消(1)、超时(2)
    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;

    // 用于局部原子更新某个对象的属性,不要求整个对象是原子的
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

    // 时钟引擎
    private final HashedWheelTimer timer;
    // 具体的定时任务,这里是接口
    private final TimerTask task;
    // 到期时间
    private final long deadline;

    // 局部原子更新的属性
    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
    private volatile int state = ST_INIT;

    // 该任务距离需要执行剩下的轮数
    long remainingRounds;

    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    // 任务所属的槽
    HashedWheelBucket bucket;

    void remove() {
        HashedWheelBucket bucket = this.bucket;
        if (bucket != null) {
            bucket.remove(this);
        } else {
            // 当前已提交的任务数
            timer.pendingTimeouts.decrementAndGet();
        }
    }

    // 调整任务为过期状态,并保证任务执行完毕
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }
}

HashedWheelTimeout 支持的操作:

  • remove:调用所属时钟槽的remove将自身从中移除,若尚未被装入槽中,需要额外对所属定时轮的pendingTimeouts执行-1处理
  • expire:任务到期,驱动TimerTask运行
  • cancel:任务被提交方取消,取消的任务被装入定时轮的cancelledTimeouts队列中,待引擎进入下一个滴答时刻调用其remove移除自身

时钟槽 HashedWheelBucket

private static final class HashedWheelBucket {

    /**
        * Used for the linked-list datastructure
    */
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    // 添加定时任务到时间槽
    void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    // 删除定时任务
    public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        // 判断前后是否为空并调整
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        // 如果是头
        if (timeout == head) {
            // 判断是头还是尾
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            // if the timeout is the tail modify the tail to be the prev node.
            tail = timeout.prev;
        }
        // null out prev, next and bucket to allow for GC.
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
        timeout.timer.pendingTimeouts.decrementAndGet();
        return next;
    }

    // 遍历所有任务节点,过期任务执行,未过期任务剩余轮数减
    void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        // process all timeouts
        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            if (timeout.remainingRounds <= 0) {
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                    timeout.expire(); // 执行此任务
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                next = remove(timeout);
            } else {
                timeout.remainingRounds--;
            }
            timeout = next;
        }
    }

    // 循环调用 pollTimeout 获取所有未超时或未取消的任务
    void clearTimeouts(Set<Timeout> set) {
        for (; ; ) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    // 获取当前的任务
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head = null;
        } else {
            this.head = next;
            next.prev = null;
        }

        // null out prev and next to allow for GC.
        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}

HashedWheelBucket 支持的操作:

  • addTimeout:新增尾节点,添加任务
  • pollTimeout:移除首节点,取出头部任务
  • remove:根据引用移除指定节点,被移除任务已被处理或被取消,对所属定时轮的pendingTimeouts相应-1处理
  • clearTimeouts:循环调用pollTimeout获取到所有未超时或者未被取消的任务
  • expireTimeouts:从首节点开始循环遍历槽中所有节点,调用remove取出到期任务运行expire或直接移除remove被取消的任务,对其它正常 任务的剩余轮数执行-1操作

时钟引擎 HashedWheelTimer

时钟引擎有节律地周期性运作,总是根据当前时钟滴答选定对应的时钟槽,从链表头部开始迭代,对每一个任务计算出其是否属于当前时钟周期,属于则取出运行, 否则便将对剩下时钟周期数执行减一操作。

另外,引擎维持着两个缓存定时任务的阻塞队列,其中一个用于接受外界断断续续地投递进来的,另外一个则用于缓存那些主动取消的,引擎需要在滴答开始期间 先行将他们装入对应的时钟槽或从中移除他们。

public class HashedWheelTimer implements Timer {
    // 总的时钟引擎个数
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    // 原子更新 HashedWheelTimer 内部workerState属性
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
    // 引擎内核,单线程控制时间轮执行
    private final Worker worker = new Worker();
    // 控制时钟引擎的线程
    private final Thread workerThread;
    // 时钟引擎的状态
    private static final int WORKER_STATE_INIT = 0;
    private static final int WORKER_STATE_STARTED = 1;
    private static final int WORKER_STATE_SHUTDOWN = 2;
    // 原子更新的字段
    @SuppressWarnings({"unused", "FieldMayBeFinal"})
    private volatile int workerState;

    // 每个时钟槽的等待时间
    private final long tickDuration;

    // 存储时钟槽的数组
    private final HashedWheelBucket[] wheel;
    // 掩码,mask = n - 1,执行 ticks & mask 便能定位到对应位置的时钟槽,效果上相当于 ticks % (mask + 1),由n这个2的幂次方保证
    private final int mask;
    // 时钟引擎是否初始化的标记
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    // 存储主动提交任务的队列
    private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
    // 存储主动取消任务的队列
    private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
    // 已提交的任务数
    private final AtomicLong pendingTimeouts = new AtomicLong(0);
    private final long maxPendingTimeouts;
    // 时间引擎开始的时间
    private volatile long startTime;
}

外部 HashedWheelTimer 依赖于状态机执行,初始状态为 WORKER_STATE_INIT,并且由 WORKER_STATE_INIT 状态切换为 WORKER_STATE_STARTED 时会调用内核 Worker 线程不断管理维护定时轮。

在外部向定时轮提交任务时需要调用 newTimeout() 方法,该方法会在任务正式被加入到 timeouts 队列前调用 start() 函数确保定时轮引擎已经启动,然后计算任务的 deadline

// task:定时任务     delay:延时时间    unit:时间单位
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
    }
    // 确保引擎正常启动
    start();

    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 任务添加到 timouts 队列中,等待被放入时间槽
    timeouts.add(timeout);
    return timeout;
}

具体状态机方法,通过 AtomicIntegerFieldUpdater<HashedWheelTimer> 变量对 HashedWheelTimer 的局部属性进行原子更新,具体参考: https://blog.csdn.net/anLA_/article/details/78662383

public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();   // 触发内核 Worker 线程执行
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

Worker 线程相关:

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    @Override
    public void run() {
        // Initialize the startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
            startTime = 1;
        }

        // 如果同一时间多个线程都提交任务,这里优先启动会唤醒其它阻塞在 start() 的线程
        startTimeInitialized.countDown();

        do {
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                int idx = (int) (tick & mask);
                // 处理取消的任务
                processCancelledTasks();
                HashedWheelBucket bucket =
                        wheel[idx];
                // 将当前定时任务放到任务槽中
                transferTimeoutsToBuckets();
                // 处理到期任务
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // Fill the unprocessedTimeouts so we can return them from stop() method.
        for (HashedWheelBucket bucket : wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for (; ; ) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }

    // 将当前定时任务放入对应的时间槽中
    private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
        // adds new timeouts in a loop.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.
                continue;
            }

            long calculated = timeout.deadline / tickDuration;
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            // Ensure we don't schedule for past.
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);

            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);
        }
    }

    // 处理所有取消的任务
    private void processCancelledTasks() {
        for (; ; ) {
            HashedWheelTimeout timeout = cancelledTimeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            try {
                timeout.remove();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
        * calculate goal nanoTime from startTime and current tick number,
        * then wait until that goal has been reached.
        等待一个时间槽的时间
    */
    private long waitForNextTick() {
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}

所以整体时间轮算法的调度运行过程如下:

  1. 等待进入滴答周期

  2. 时钟转动,滴答周期开始:

    1. 将外界主动取消的装载在cancelledTimeouts队列的任务逐个移除
    2. 将外界提交的装载在timeouts队列的任务逐个载入对应的时钟槽里
  3. 根据当前tick定位对应时钟槽,执行其中的定时任务

  4. 检测引擎内核状态是否已经被终止,若未被终止,则循环执行上述操作,否则往下继续执行

  5. 将下述方式获取到未被处理的任务加入unprocessedTimeouts队列:

    1. 遍历时钟槽调用clearTimeouts()
    2. 对timeouts队列中未被加入槽中循环调用poll()
  6. 移除最后一个滴答周期后加入到cancelledTimeouts队列任务

参考:

https://zhuanlan.zhihu.com/p/97094137

https://zhuanlan.zhihu.com/p/531424504

https://zhuanlan.zhihu.com/p/356017518