浅析Golang的层级时间轮实现方案

发布时间 2023-09-05 21:12:42作者: 3WLineCode

文章目录

  • 时间轮介绍
    • 简单时间轮
    • 层级时间轮
      • kafka中的实现细节
  • 基于go语言的层级时间轮实现

 

一、时间轮介绍

  工作中,我们经常遇到到延时任务这类需求(例如用户开始一个任务,15分钟后给他发送一个通知奖励;用户下单未付款,三分钟后发送一条提醒消息...)。一般情况下,我们使用time.Timer对象完成工作,例如:

       // other code...
        go func() {
        timer := time.NewTimer(30 * time.Second)
        <-timer.C
        //延时执行的业务逻辑
    }()        

  在go语言运行时内置实现中,Timer对象的实现是使用小根堆这个数据结构。小根堆在写入,删除一个元素时,时间复杂度为O(log n)。当服务进程中,有100W个延时任务需要执行时,这100W个延时任务,写入到go运行时内置的Timer对象就是一个比较耗时,低效的操作了。而基于时间轮的任务写入、删除延时任务操作,可以将时间复杂度降低至O(1),从而高效地管理和触发这些延时任务,以满足复杂的调度需求。

  常见的时间轮有两种,分别是简单时间轮层级时间轮。下面参考Kafka的时间轮设计以及实现,分别讨论相关数据结构以及实现。

  (1)简单时间轮

  时间轮(TimeWheel)是一个存储延时任务的环形队列。队列内每个元素称为一个时间格(TimeBucket),可以存放一个任务列表(TimerTaskList),这个任务列表(TimerTaskList)则是一个环形双向链表,它的每一项表示的是延时任务项(TimerTaskEntity),代表真实的延时任务。所以,简单时间轮有大致这样的层级关系 TimeWheel -> m * TimerTaskList/TimeBucket -> n * TimerTaskEntity。(ps.时间轮的环形队列底层数据结构可以用数组来实现;TimerTaskList采用环形双向链表可以在O(1)时间复杂度去插入/删除TimerTaskEntity

  时间轮的每一个时间格,代表了当前时间轮的基本时间跨度(tickMs)。所以,假设当前时间轮队列长度(wheelSize)为7,时间跨度为1秒,这个时间轮的总时间跨度(interval) = tickMs * wheelSize = 1s * 7 = 7s。(ps.下文时间轮队列长度统一简称队列长度。)

  最后,时间轮还存在一个表盘指针(currentTime),表示当前时间轮所处的时间。currentTime将整个时间轮划分为到期部分未到期部分。ps. currentTime当前指向的时间格也属于到期部分,表示正在执行的时间格

  当我们往时间轮写入一个任务时,就如下图所示

  这样,简单时间轮的雏形就出来了,用代码实现就如下所示:

// TimeWheel 时间轮对象
type TimeWheel struct {
    Buckets []Bucket // 时间格队列

    WheelSize int //时间轮格数量
    TickMs    int // 基本时间跨度

    CurrentTime int //表盘指针
    mu          sync.RWMutex
}

// Bucket 时间格
type Bucket struct {
    TaskList   *TimerTaskList
}

// TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
type TimerTaskList = list.List

// TimerTaskEntity 抽象的具体任务
type TimerTaskEntity struct {
    DelayTime int
    Task      func()
}

  (1.1)简单时间轮的实现

  接下来,就是让这个时间轮跑起来。这里我们使用一个Ticker驱动这个时间轮,这个Ticker的执行周期为时间轮的基本时间跨度。我们在最开始,添加一个1s和3s,9s的延时任务,然后随着时间的推移,分别在2023/09/01 23:52:20,2023/09/01 23:52:22,2023/09/01 23:52:28 执行了三个延时任务。完整代码如下:

  1 // TimeWheel 时间轮对象
  2 type TimeWheel struct {
  3     Buckets []Bucket // 时间格队列
  4 
  5     WheelSize int //时间轮格数量
  6     TickMs    int // 基本时间跨度
  7 
  8     CurrentTime int //表盘指针
  9     mu          sync.RWMutex
 10 
 11     ticker    *time.Ticker
 12     startChan chan bool
 13     stopChan  chan bool
 14 }
 15 
 16 // Bucket 时间格
 17 type Bucket struct {
 18     TaskList *TimerTaskList
 19 }
 20 
 21 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
 22 type TimerTaskList = list.List
 23 
 24 // TimerTaskEntity 抽象的具体任务
 25 type TimerTaskEntity struct {
 26     DelayTime int // 延时时间
 27     Task      func()
 28 }
 29 
 30 func NewTimeWheel(wheelSize, tickMs int) *TimeWheel {
 31     return &TimeWheel{
 32         Buckets:     make([]Bucket, wheelSize),
 33         WheelSize:   wheelSize,
 34         TickMs:      tickMs,
 35         CurrentTime: 0,
 36 
 37         startChan: make(chan bool, 1),
 38         stopChan:  make(chan bool, 1),
 39     }
 40 }
 41 
 42 func (t *TimeWheel) Start() {
 43     select {
 44     case t.startChan <- true:
 45 
 46     default:
 47         fmt.Println("timewheel is already running, exit")
 48         return
 49     }
 50     // 启动时间轮的内部定时器
 51     t.ticker = time.NewTicker(time.Duration(t.TickMs) * time.Second)
 52     go func() {
 53         for {
 54             select {
 55             case <-t.ticker.C:
 56                 t.handler()
 57             case <-t.stopChan:
 58                 return
 59             }
 60         }
 61     }()
 62 }
 63 
 64 func (t *TimeWheel) Stop() {
 65     select {
 66     case <-t.startChan:
 67         t.ticker.Stop()
 68         t.stopChan <- true
 69         return
 70     default:
 71         log.Println("timewheel has stopped,exit.")
 72         return
 73     }
 74 }
 75 
 76 func (t *TimeWheel) handler() {
 77     t.mu.Lock()
 78     defer t.mu.Unlock()
 79 
 80     // 时间轮转动
 81     t.CurrentTime++
 82     // 转动一圈后,指针复位
 83     if t.CurrentTime%t.WheelSize == 0 {
 84         t.CurrentTime = 0
 85     }
 86 
 87     //跳过没有任务的时间槽
 88     taskList := t.Buckets[t.CurrentTime].TaskList
 89     if taskList == nil {
 90         return
 91     }
 92     for e := taskList.Front(); e != nil; e = e.Next() {
 93         taskEntity, _ := e.Value.(*TimerTaskEntity)
 94         go taskEntity.Task()
 95     }
 96     // 删除时间槽的定时器链表
 97     t.Buckets[t.CurrentTime].TaskList = nil
 98 }
 99 
100 func (t *TimeWheel) AddTimerTaskEntity(entity *TimerTaskEntity) {
101 
102     t.mu.Lock()
103     defer t.mu.Unlock()
104     if len(t.startChan) == 0 {
105         log.Println(" timewheel has not been started")
106         return
107     }
108 
109     // 找到延时应该放在哪个时间格
110     index := (t.CurrentTime + entity.DelayTime) % t.WheelSize
111     bucket := t.Buckets[index]
112     if bucket.TaskList == nil {
113         bucket.TaskList = list.New()
114     }
115     // 延时任务放入时间格链表
116     bucket.TaskList.PushBack(entity)
117     t.Buckets[index] = bucket
118 }
简单时间轮Demo
 1 func TestTimeWheel(t *testing.T) {
 2     simpleTimeWheel := NewTimeWheel(10, 1)
 3     simpleTimeWheel.Start()
 4     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
 5         Task: func() {
 6             log.Println("this is delay 1 s task")
 7         },
 8         DelayTime: 1,
 9     })
10     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
11         Task: func() {
12             log.Println("this is delay 3 s task")
13         },
14         DelayTime: 3,
15     })
16     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
17         Task: func() {
18             log.Println("this is delay 9 s task")
19         },
20         DelayTime: 9,
21     })
22 
23     time.Sleep(time.Second * 20)
24     simpleTimeWheel.Stop()
25 
26 }
测试示例

   这里可以看到简单时间轮的局限性:时间轮一旦初始化完成,确定了时间轮的时间格长度后,就不能再添加超过总时间跨度的延时任务了(例如一个tickMs为1s,wheelSize为10的时间轮,假如写入一个15s的延时任务,就会跟5s延时任务有冲突)。此时解决方案之一是扩充wheelSize时间格长度,但是假如我要写入一个100W * tickMs后执行的延时任务。就需要把wheelSize扩充到100W,不仅浪费内存空间,而且整体拉低时间轮的执行效率。

  (2)层级时间轮

  针对简单时间轮的局限性,就引入层级时间轮的概念。当任务到期时间超过当前时间轮所表示的时间范围后,就尝试添把它加到上层时间轮中。上层时间轮为按需创建,且随着时间的推进,上层时间轮中的延时任务会被降级重新插入到下层时间轮中。ps.下文中数字越大,层数越高。例如1层时间轮的上层为2层时间轮)

  同样,上层时间轮也会有时间格(TimeBucket),任务列表(TimerTaskList),基本时间跨度(tickMs),队列长度(wheelSize),总时间跨度(interval)这些概念。但是第二层的基本时间跨度(tickMs-wheel2)第一层的总时间跨度(interval-wheel1)。假如时间轮当前时间轮和上层时间轮的队列长度都是相同的,那么上层时间轮总时间跨度为:

  •   interval-wheel2 = tickMs-wheel2 * wheelSize;
  • tickMs-wheel2 = interval-wheel1 = tickMs-wheel1 * wheelSize;

  举个例子,当前时间轮A基本时间跨度(tickMs)为1s,A的队列长度(wheelSize)为7。此时假如我们要添加一个15秒后执行的延时任务X。此时时间轮A已经不能满足条件,所以这个任务需要插入到上层时间轮B中。假设B和A的队列长度(wheelSize)相同。时间轮B的tickMs-B = tickMs-A * wheelSize-A = 7s。时间轮B的interval-B = tickMs-B * wheelSize-B = 49s。那么我们任务X将保存在什么地方呢?最终将保存到时间轮B中时间格2中所对应的任务列表(TimerTaskList)中。(队列的第三个元素,该时间格的时间跨度为[14s,21s) )。如下图:

  同理,假如我们要添加一个延时50s后执行的任务,就需要引入第三层的时间轮C。C的基本时间跨度 tickMs-C = tickMs-B * wheelSize-B。 (ps.这里可以想象一下手表的时,分,秒针。秒针的基本时间跨度为1s,队列长度为60,总时间跨度为秒针转一圈的时间60s;分针的基本时间跨度为1min = 1s * 60,队列长度为60,总时间跨度为分针转一圈的时间60min;时针的基本时间跨度为1h = 1min * 60,队列长度为12,总时间跨度为时针转一圈的时间12h。)

  接下来,就要让我们的层级时间轮运行起来了。随着时间的流逝,时间轮B表盘指针往前移动,当指向时间格2的时候,会把任务X重新提交到时间轮A的时间格1中(该时间格的时间跨度为[1s,2s) ,因为时间轮B走了两格,所以任务X的剩余时间为15s - 7s * 2 = 1s),这个行为称为时间轮的降级操作。之后再经历1s,任务X真正到期,最终被执行。

  这里还有些实现的细节需要注意下

  • 除了第一层时间轮以外,其余高层时间轮的起始时间(startMs)必须是创建此层时间轮时前一层时间轮的表盘指针(currentTime)。
  • 每一层时间轮的表盘指针(currentTime)必须为基本时间跨度(tickMs)的整数倍。如果不满足,将执行一个修剪操作,将currentTime修剪为tickMs整数倍,并以此为依据与时间格的到期时间范围对应起来。假设某一时刻的时间为timerMs,那么具体的修剪行为的公式为 currentTime = timerMs - ( timerMs % tickMs )。(ps. currentTime始终为tickMs的整数倍)例如在加入延时任务X后的第9秒这个时刻,时间轮B的currentTime为 9 - ( 9 % 7 ) = 7s。
  • 每一层时间轮中都包含一个引用(overflowWheel),它指向跟高一层的时间轮。所以客户端只有创建的第一层时间轮,根据该引用间接持有各个层级时间轮的引用。

  (2.1)关于“空推进”问题

  我们简单时间轮中,使用了一个ticker来驱动时间轮的运转,在TimeWheel的handler()函数中,可以看到这样一行代码:

func (t *TimeWheel) handler() {
        // other code ...
      

        //跳过没有任务的时间槽
    taskList := t.Buckets[t.CurrentTime].TaskList
    if taskList == nil {
        return
    }    
        
        // other code ...  
}    

  假设我们现在有一个tickMs为1s的,wheelSize为1000的时间轮,现在往这个时间轮里面写入两个延时任务。第一个延时任务为200s之后执行,第二个延时任务为850s之后执行。那么在执行第一个延时任务,需要让ticker驱动200次,才能执行到第一个任务,且这200次驱动推进中,前199次为“空推进”。第二个延时任务执行时,又需要649次空推进。这样会无辜消耗机器的性能资源。  

  (2.2)kafka中如何解决“空推进”

  针对上面“空推进”这个问题,Kafka的延时队列使用了JDK(JAVA语言)的一个叫做DelayQueue的队列来协助推进时间轮。DelayQueue如名字所示,是一个延迟队列,具有队列的所有特性。策略如下:

  • Kafka把所有任务项(TimerTaskEntity)对应的任务列表(TimerTaskList)都加入到DelayQueue中。每一个任务列表(TimerTaskList)都有一个expiration(到期时间),该时间为 时间轮当前时间(currentTime) + 延时任务的延时时间(TimerTaskEntity.DelayTime)。依据这个时间做一个排序,将expiration(到期时间)最短任务列表(TimerTaskList)排在DelayQueue的队头,其他任务列表按照超时时间依次入队。这样,DelayQueue队列中各个元素就是按照expiration排好序的任务列表(TimerTaskList)。
  • 然后再单独开启一个线程“ExpiredOperationReaper”(过期收割机线程?),通过队头出队的方式来获取DelayQueue队列中到期的任务(获取队头元素的时间复杂度为O(1),获取到队头元素后,会切换新的队头元素)。
  • 最后,根据ExpiredOperationReaper获取到任务列表(TimerTaskList),既可以根据它的expiration(到期时间来推进时间轮时间,又可以对任务列表(TimerTaskList)中的任务项(TimerTaskEntity)做相应操作(降级 or 执行)。 总结下就是,这里使用时间轮(TimeWheel)来完成对 任务项(TimerTaskEntity)插入,删除等操作,使用DelayQueue辅助来完成对时间轮“精准推进”操作。  

 

二、基于go语言的层级时间轮实现

  首先,实现延时队列(DelayQueue)。DelayQueue中的priorityQueue使用小根堆实现,配合过期时间作为延迟优先级,保证了队头元素是最早到期的延时任务。Poll函数完成“精准推进”操作。如下所示:

  1 type item struct {
  2     Value    interface{}
  3     Priority int64
  4     Index    int
  5 }
  6 
  7 type priorityQueue []*item
  8 
  9 func newPriorityQueue(capacity int) priorityQueue {
 10     return make(priorityQueue, 0, capacity)
 11 }
 12 
 13 func (pq priorityQueue) Len() int {
 14     return len(pq)
 15 }
 16 
 17 func (pq priorityQueue) Less(i, j int) bool {
 18     return pq[i].Priority < pq[j].Priority
 19 }
 20 
 21 func (pq priorityQueue) Swap(i, j int) {
 22     pq[i], pq[j] = pq[j], pq[i]
 23     pq[i].Index = i
 24     pq[j].Index = j
 25 }
 26 
 27 func (pq *priorityQueue) Push(x interface{}) {
 28     n := len(*pq)
 29     c := cap(*pq)
 30     if n+1 > c {
 31         npq := make(priorityQueue, n, c*2)
 32         copy(npq, *pq)
 33         *pq = npq
 34     }
 35     *pq = (*pq)[0 : n+1]
 36     item := x.(*item)
 37     item.Index = n
 38     (*pq)[n] = item
 39 }
 40 
 41 func (pq *priorityQueue) Pop() interface{} {
 42     n := len(*pq)
 43     c := cap(*pq)
 44     if n < (c/2) && c > 25 {
 45         npq := make(priorityQueue, n, c/2)
 46         copy(npq, *pq)
 47         *pq = npq
 48     }
 49     item := (*pq)[n-1]
 50     item.Index = -1
 51     *pq = (*pq)[0 : n-1]
 52     return item
 53 }
 54 
 55 func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
 56     if pq.Len() == 0 {
 57         return nil, 0
 58     }
 59 
 60     item := (*pq)[0]
 61     if item.Priority > max {
 62         return nil, item.Priority - max
 63     }
 64     heap.Remove(pq, 0)
 65 
 66     return item, 0
 67 }
 68 
 69 // DelayQueue 小根堆实现的优先级队列
 70 type DelayQueue struct {
 71     C chan interface{}
 72 
 73     mu sync.Mutex
 74     pq priorityQueue
 75 
 76     sleeping int32
 77     wakeupC  chan struct{}
 78 }
 79 
 80 func NewDelayQueue(size int) *DelayQueue {
 81     return &DelayQueue{
 82         C:       make(chan interface{}),
 83         pq:      newPriorityQueue(size),
 84         wakeupC: make(chan struct{}),
 85     }
 86 }
 87 
 88 // Offer 写入一个指定到期时间的元素到当前的延时队列
 89 func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
 90     item := &item{
 91         Value:    elem,
 92         Priority: expiration,
 93     }
 94 
 95     dq.mu.Lock()
 96     heap.Push(&dq.pq, item)
 97     index := item.Index
 98     dq.mu.Unlock()
 99 
100     // 假如新写入的元素是最早到期的元素,则尝试唤醒等待调用Poll函数的调用者
101     if index == 0 {
102         if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
103             dq.wakeupC <- struct{}{}
104         }
105     }
106 }
107 
108 // Poll 无限循环的获取一个元素
109 func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
110     for {
111         // 获取当前时间
112         now := nowF()
113 
114         dq.mu.Lock()
115         // 查看并弹出到期元素
116         item, delta := dq.pq.PeekAndShift(now)
117         if item == nil {
118             // 假如没有到期元素,则重置延时队列睡眠状态。同时保证Poll 和 Offer操作原子性
119             atomic.StoreInt32(&dq.sleeping, 1)
120         }
121         dq.mu.Unlock()
122 
123         if item == nil {
124             if delta == 0 {
125                 // 没有元素到期
126                 select {
127                 case <-dq.wakeupC:
128                     // 等待第一个写入的元素
129                     continue
130                 case <-exitC:
131                     goto exit
132                 }
133             } else if delta > 0 {
134                 // delta > 0 , 延时队列中最少有一个item在等待处理中
135                 select {
136                 case <-dq.wakeupC:
137                     continue
138                 case <-time.After(time.Duration(delta) * time.Second):
139                     // 等待队列中"最早"的元素到期
140                     if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
141                         <-dq.wakeupC
142                     }
143                     continue
144                 case <-exitC:
145                     goto exit
146                 }
147             }
148         }
149 
150         select {
151         case dq.C <- item.Value: //取到到期的元素,通过延时队列C channel 发送到期元素
152         case <-exitC:
153             goto exit
154         }
155     }
156 
157 exit:
158     // Reset the states
159     atomic.StoreInt32(&dq.sleeping, 0)
160 }
延时队列(DelayQueue)

  接下来是实现延时任务对象(TimerTaskEntity),以及时间格对象(Bucket)。如下所示:

 1 // TimerTaskEntity 延时任务
 2 type TimerTaskEntity struct {
 3     DelayTime int64 // 延时时间
 4     Task      func()
 5 
 6     b unsafe.Pointer // type: *bucket  保存当前延时任务所在的时间格,使用桶指针,可通过原子操作并发更新/读取
 7 
 8     element *list.Element // 延时任务所在的双向链表中的节点元素
 9 
10 }
11 
12 func (t *TimerTaskEntity) getBucket() *Bucket {
13     return (*Bucket)(atomic.LoadPointer(&t.b))
14 }
15 
16 func (t *TimerTaskEntity) setBucket(b *Bucket) {
17     atomic.StorePointer(&t.b, unsafe.Pointer(b))
18 }
19 
20 // Stop 停止延时任务的执行
21 func (t *TimerTaskEntity) Stop() bool {
22     stopped := false
23     for b := t.getBucket(); b != nil; b = t.getBucket() {
24         // 如果时间格尚未过期/执行,则从时间格中删除这个延时任务
25         stopped = b.Remove(t)
26     }
27     return stopped
28 }
29 
30 // Bucket 时间格
31 type Bucket struct {
32     expiration int64 // 时间格的到期时间,这个时间是时间格内存储定时任务的到期时间
33 
34     mu       sync.Mutex
35     TaskList *TimerTaskList
36 }
37 
38 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
39 type TimerTaskList = list.List
40 
41 func NewBucket() *Bucket {
42     return &Bucket{
43         TaskList:   list.New(),
44         expiration: -1,
45     }
46 }
47 
48 func (b *Bucket) Expiration() int64 {
49     return atomic.LoadInt64(&b.expiration)
50 }
51 
52 func (b *Bucket) SetExpiration(expiration int64) bool {
53     return atomic.SwapInt64(&b.expiration, expiration) != expiration
54 }
55 
56 func (b *Bucket) Add(t *TimerTaskEntity) {
57     b.mu.Lock()
58 
59     e := b.TaskList.PushBack(t)
60     t.setBucket(b)
61     t.element = e
62 
63     b.mu.Unlock()
64 }
65 
66 func (b *Bucket) remove(t *TimerTaskEntity) bool {
67     // 检查当前延时任务是否属于当前桶
68     if t.getBucket() != b {
69         return false
70     }
71     b.TaskList.Remove(t.element)
72     t.setBucket(nil)
73     t.element = nil
74     return true
75 }
76 
77 func (b *Bucket) Remove(t *TimerTaskEntity) bool {
78     b.mu.Lock()
79     defer b.mu.Unlock()
80     return b.remove(t)
81 }
82 
83 // Flush 延时任务降级,重新插入到下层时间轮中
84 func (b *Bucket) Flush(reinsert func(*TimerTaskEntity)) {
85     b.mu.Lock()
86     defer b.mu.Unlock()
87 
88     for e := b.TaskList.Front(); e != nil; {
89         next := e.Next()
90         t := e.Value.(*TimerTaskEntity)
91         b.remove(t)
92         reinsert(t)
93         e = next
94     }
95     // 当前桶所有延时任务降级完成后,该桶过期时间重置为-1,该桶不再有效
96     b.SetExpiration(-1)
97 }
延时任务&时间格

  最后,基于任务对象(TimerTaskEntity),时间格对象(Bucket)还有延时队列(DelayQueue)完成对时间轮的实现。其中,一个延时任务的写入,会在写入到时间轮的同时,以过期时间为优先级参照写入到延时队列中。时间轮启动时,会开启两个协程分别执行从延时队列中取出最近一次到期的任务(DelayQueue的Poll函数);和根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级)操作。如下:

 1 // truncate returns the result of rounding x toward zero to a multiple of m.
 2 // If m <= 0, Truncate returns x unchanged.
 3 func truncate(x, m int64) int64 {
 4     if m <= 0 {
 5         return x
 6     }
 7     return x - x%m
 8 }
 9 
10 func timeToS(t time.Time) int64 {
11     return t.UnixNano() / int64(time.Second)
12 }
13 
14 func sToTime(t int64) time.Time {
15     return time.Unix(0, t*int64(time.Second)).UTC()
16 }
17 
18 type waitGroupWrapper struct {
19     sync.WaitGroup
20 }
21 
22 func (w *waitGroupWrapper) Wrap(cb func()) {
23     w.Add(1)
24     go func() {
25         cb()
26         w.Done()
27     }()
28 }
时间相关转换函数
  1 type TimingWheel struct {
  2     tickMs    int64 //基本时间跨度
  3     wheelSize int64 // 时间轮队列长度
  4 
  5     interval    int64       // 总跨度
  6     currentTime int64       //表盘指针
  7     buckets     []*Bucket   // 时间格队列
  8     queue       *DelayQueue // 延时队列
  9 
 10     // 上层时间轮引用 可以通过Add函数去并发的读写
 11     overflowWheel unsafe.Pointer // type: *TimingWheel
 12 
 13     exitC     chan struct{}
 14     waitGroup waitGroupWrapper
 15 }
 16 
 17 func NewTimingWheel(tickMs time.Duration, wheelSize int64) *TimingWheel {
 18     tick := int64(tickMs / time.Second)
 19     if tickMs <= 0 {
 20         panic(errors.New("tickMs must be greater than or equal to 1s"))
 21     }
 22 
 23     startMs := timeToS(time.Now().UTC())
 24     return newTimingWheel(tick, wheelSize, startMs, NewDelayQueue(int(wheelSize)))
 25 }
 26 
 27 func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *DelayQueue) *TimingWheel {
 28     buckets := make([]*Bucket, wheelSize)
 29     for i := range buckets {
 30         buckets[i] = NewBucket()
 31     }
 32 
 33     return &TimingWheel{
 34         tickMs:      tickMs,
 35         wheelSize:   wheelSize,
 36         currentTime: truncate(startMs, tickMs),
 37         interval:    tickMs * wheelSize,
 38         buckets:     buckets,
 39         queue:       queue,
 40         exitC:       make(chan struct{}),
 41     }
 42 }
 43 
 44 // 增加延时任务至时间轮
 45 func (tw *TimingWheel) add(t *TimerTaskEntity) bool {
 46     currentTime := atomic.LoadInt64(&tw.currentTime)
 47     if t.DelayTime < currentTime+tw.tickMs {
 48         return false
 49     } else if t.DelayTime < currentTime+tw.interval {
 50         // 写入到当前时间轮
 51         virtualID := t.DelayTime / tw.tickMs
 52         b := tw.buckets[virtualID%tw.wheelSize]
 53         b.Add(t)
 54 
 55         // 当前时间格写入到延时队列 ps. 延时队列是小根堆,时间格的过期时间是优先级参照,所以,越早到期的时间格越在队头
 56         if b.SetExpiration(t.DelayTime) {
 57             tw.queue.Offer(b, b.Expiration())
 58         }
 59         return true
 60     } else {
 61         // 超出当前时间轮最大范畴,写入到上层时间轮
 62         overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
 63         // 上层时间轮不存在 则创建新的时间轮。新时间轮的ticketMs = 当前时间轮的interval ;上层时间轮的开始时间startMs为当前时间轮表盘指针时间
 64         if overflowWheel == nil {
 65             atomic.CompareAndSwapPointer(&tw.overflowWheel, nil, unsafe.Pointer(newTimingWheel(tw.interval, tw.wheelSize, currentTime, tw.queue)))
 66             overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
 67         }
 68         return (*TimingWheel)(overflowWheel).add(t)
 69     }
 70 }
 71 
 72 func (tw *TimingWheel) addOrRun(t *TimerTaskEntity) {
 73     if !tw.add(t) {
 74         // 如果无法添加,则表示该延时任务执行时间小于当前时间轮表盘指针指向的时间(换句话说,该延时任务已过期),则立即执行
 75         go t.Task()
 76     }
 77 }
 78 
 79 // advanceClock 推进时间轮时钟到指定的过期时间
 80 func (tw *TimingWheel) advanceClock(expiration int64) {
 81     currentTime := atomic.LoadInt64(&tw.currentTime)
 82     if expiration >= currentTime+tw.tickMs {
 83         currentTime = truncate(expiration, tw.tickMs)
 84         atomic.StoreInt64(&tw.currentTime, currentTime)
 85 
 86         overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
 87         if overflowWheel != nil {
 88             (*TimingWheel)(overflowWheel).advanceClock(currentTime)
 89         }
 90     }
 91 }
 92 
 93 // Start 时间轮启动
 94 // 开启两个协程
 95 //   - 协程1不断地从延时队列中取出最近一次到期的任务;
 96 //   - 协程2根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级)
 97 func (tw *TimingWheel) Start() {
 98     tw.waitGroup.Wrap(func() {
 99         tw.queue.Poll(tw.exitC, func() int64 {
100             return timeToS(time.Now().UTC())
101         })
102     })
103 
104     tw.waitGroup.Wrap(func() {
105         for {
106             select {
107             case elem := <-tw.queue.C:
108                 b := elem.(*Bucket)
109                 tw.advanceClock(b.Expiration())
110                 b.Flush(tw.addOrRun)
111             case <-tw.exitC:
112                 return
113             }
114         }
115     })
116 }
117 
118 func (tw *TimingWheel) Stop() {
119     close(tw.exitC)
120     tw.waitGroup.Wait()
121 }
122 
123 func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *TimerTaskEntity {
124     t := &TimerTaskEntity{
125         DelayTime: timeToS(time.Now().UTC().Add(d)),
126         Task:      f,
127     }
128     tw.addOrRun(t)
129     return t
130 }
时间轮相关实现

  最后看一下执行结果:

 1 func TestTimingWheel_AfterFunc(t *testing.T) {
 2     tw := NewTimingWheel(time.Second, 10)
 3     tw.Start()
 4     log.Printf("启动时间轮 \n")
 5     defer tw.Stop()
 6 
 7     durations := []time.Duration{
 8         //1 * time.Second,
 9         5 * time.Second,
10         2 * time.Second,
11         //50 * time.Second,
12         1 * time.Minute,
13     }
14 
15     for _, d := range durations {
16         dTime := d
17         tw.AfterFunc(d, func() {
18             log.Printf("this is delay %v task \n", dTime)
19         })
20 
21         if dTime == 5*time.Second {
22             tw.AfterFunc(dTime, func() {
23                 log.Printf("this is delay %v task2 \n", dTime)
24             })
25         }
26 
27     }
28 
29     time.Sleep(time.Second * 100)
30 }
测试示例

   

   资料参考:

  https://book.douban.com/subject/30437872

  http://russellluo.com/2018/10/golang-implementation-of-hierarchical-timing-wheels.html