Go語言中時間輪的實現


作者 | luozhiyun 責編 | 張文

來源 | https:http://www.luozhiyun.com/archives/444

最近在工作中有一個需求,簡單來說就是在短時間內會創建上百萬個定時任務,創建的時候會將對應的金額相加,防止超售,需要過半個小時再去核對數據,如果數據對不上就需要將加上的金額再減回去,

這個需求如果用 Go 內置的 Timer 來做的話性能比較低下,因為 Timer 是使用最小堆來實現的,創建和刪除的時間復雜度都為 O(log n)。如果使用時間輪的話則是 O(1)性能會好很多。

對于時間輪來說,我以前寫過一篇 java 版的時間輪算法分析:https:http://www.luozhiyun.com/archives/59,這次來看看 Go 語言的時間輪實現,順便大家有興趣的也可以對比一下兩者的區別,以及我寫文章的水平和一年多前有沒有提升,哈哈哈。

時間輪的運用其實是非常的廣泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等組件中都存在時間輪的蹤影,下面用 Go 實現的時間輪是以 Kafka 的代碼為原型來實現的。

完整代碼: https:http://github.com/devYun/timingwheel,

介紹

簡單時間輪

在時間輪中存儲任務的是一個環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表,定時任務列表是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項,其中封裝了真正的定時任務。

時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(tickMs)。時間輪的時間格個數是固定的,可用 wheelSize 來表示,那么整個時間輪的總體時間跨度(interval)可以通過公式tickMs×wheelSize計算得出,

時間輪還有一個表盤指針(currentTime),用來表示時間輪當前所處的時間,currentTime 是 tickMs 的整數倍,currentTime 指向的地方是表示到期的時間格,表示需要處理的時間格所對應的鏈表中的所有任務,

如下圖是一個 tickMs 為 1s,wheelSize 等于 10 的時間輪,每一格里面放的是一個定時任務鏈表,鏈表里面存有真正的任務項:


taskList

初始情況下表盤指針 currentTime 指向時間格0,若時間輪的 tickMs 為 1ms 且 wheelSize 等于10,那么 interval 則等于 10s。如下圖此時有一個定時為2s的任務插進來會存放到時間格為 2 的任務鏈表中,用紅色標記。隨著時間的不斷推移,指針 currentTime 不斷向前推進,如果過了 2s,那么 currentTime 會指向時間格 2 的位置,會將此時間格的任務鏈表獲取出來處理,


timewheel

如果當前的指針 currentTime 指向的是 2,此時如果插入一個 9s 的任務進來,那么新來的任務會服用原來的時間格鏈表,會存放到時間格 1 中


timewheelAdd9S

這里所講的時間輪都是簡單時間輪,只有一層,總體時間范圍在 currentTime 和 currentTime+interval 之間。如果現在有一個 15s 的定時任務是需要重新開啟一個時間輪,設置一個時間跨度至少為 15s 的時間輪才夠用,但是這樣擴充是沒有底線的,如果需要一個 1 萬秒的時間輪,那么就需要一個這么大的數組去存放,不僅占用很大的內存空間,而且也會因為需要遍歷這么大的數組從而拉低效率。

因此引入了層級時間輪的概念,

層級時間輪

如圖是一個兩層的時間輪,第二層時間輪也是由 10 個時間格組成,每個時間格的跨度是 10s。第二層的時間輪的 tickMs 為第一層時間輪的 interval,即 10s,每一層時間輪的 wheelSize 是固定的,都是 10,那么第二層的時間輪的總體時間跨度 interval 為 100s。

圖中展示了每個時間格對應的過期時間范圍, 我們可以清晰地看到, 第二層時間輪的第 0 個時間格的過期時間范圍是 [0,9],也就是說, 第二層時間輪的一個時間格就可以表示第一層時間輪的所有(10 個)時間格;

如果向該時間輪中添加一個 15s 的任務,那么當第一層時間輪容納不下時,進入第二層時間輪,并插入到過期時間為[10,19]的時間格中。


timewheellevel2

隨著時間的流逝,當原本 15s 的任務還剩下 5s 的時候,這里就有一個時間輪降級的操作,此時第一層時間輪的總體時間跨度已足夠,此任務被添加到第一層時間輪到期時間為 5 的時間格中,之后再經歷 5s 后,此任務真正到期,最終執行相應的到期操作,

代碼實現

因為我們這個 Go 語言版本的時間輪代碼是仿照 Kafka 寫的,所以在具體實現時間輪 TimingWheel 時還有一些小細節:

  • 時間輪的時間格中每個鏈表會有一個 root 節點用于簡化邊界條件。它是一個附加的鏈表節點,該節點作為第一個節點,它的值域中并不存儲任何東西,只是為了操作的方便而引入的;

  • 除了第一層時間輪,其余高層時間輪的起始時間(startMs)都設置為創建此層時間輪時前面第一輪的 currentTime。每一層的 currentTime 都必須是 tickMs 的整數倍,如果不滿足則會將 currentTime 修剪為 tickMs 的整數倍,修剪方法為:currentTime = startMs – (startMs % tickMs);

  • Kafka 中的定時器只需持有 TimingWheel 的第一層時間輪的引用,并不會直接持有其他高層的時間輪,但每一層時間輪都會有一個引用(overflowWheel)指向更高一層的應用;

  • Kafka 中的定時器使用了 DelayQueue 來協助推進時間輪。在操作中會將每個使用到的時間格中每個鏈表都加入 DelayQueue,DelayQueue 會根據時間輪對應的過期時間 expiration 來排序,最短 expiration 的任務會被排在 DelayQueue 的隊頭,通過單獨線程來獲取 DelayQueue 中到期的任務;

結構體

type TimingWheel struct {http:// 時間跨度,單位是毫秒tick int64 http:// in millisecondshttp:// 時間輪個數wheelSize int64http:// 總跨度interval int64 http:// in millisecondshttp:// 當前指針指向時間currentTime int64 http:// in millisecondshttp:// 時間格列表buckets []*buckethttp:// 延遲隊列queue *delayqueue.DelayQueuehttp:// 上級的時間輪引用overflowWheel unsafe.Pointer http:// type: *TimingWheel
exitC chan struct{}waitGroup waitGroupWrapper}

tick、wheelSize、interval、currentTime 都比較好理解,buckets 字段代表的是時間格列表,queue 是一個延遲隊列,所有的任務都是通過延遲隊列來進行觸發,overflowWheel 是上層時間輪的引用。

type bucket struct {http:// 任務的過期時間expiration int64
mu sync.Mutexhttp:// 相同過期時間的任務隊列timers *list.List}

bucket 里面實際上封裝的是時間格里面的任務隊列,里面放入的是相同過期時間的任務,到期后會將隊列 timers 拿出來進行處理。這里有個有意思的地方是由于會有多個線程并發的訪問 bucket,所以需要用到原子類來獲取 int64 位的值,為了保證 32 位系統上面讀取 64 位數據的一致性,需要進行 64 位對齊,具體的可以看這篇:https:http://www.luozhiyun.com/archives/429,講的是對內存對齊的思考。

type Timer struct {http:// 到期時間expiration int64 http:// in millisecondshttp:// 要被執行的具體任務task func()http:// Timer所在bucket的指針b unsafe.Pointer http:// type: *buckethttp:// bucket列表中對應的元素element *list.Element}

Timer 是時間輪的最小執行單元,是定時任務的封裝,到期后會調用 task 來執行任務。


Group 37

初始化時間輪

例如現在初始化一個tick是1s,wheelSize是10的時間輪:

func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() }func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { http:// 將傳入的tick轉化成毫秒 tickMs := int64(tick / time.Millisecond) http:// 如果小于零,那么panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } http:// 設置開始時間 startMs := timeToMs(time.Now().UTC()) http:// 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), )}func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { http:// 初始化buckets的大小 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket() } http:// 實例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, http:// currentTime必須是tickMs的倍數,所以這里使用truncate進行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), }}

初始化十分簡單,大家可以看看上面的代碼注釋即可,

啟動時間輪

下面我們看看 start 方法:

func (tw *TimingWheel) Start() { http:// Poll會執行一個無限循環,將到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) }) }) http:// 開啟無限循環獲取queue中C的數據 tw.waitGroup.Wrap(func() { for { select { http:// 從隊列里面出來的數據都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) http:// 時間輪會將當前時間 currentTime 往前移動到 bucket的到期時間 tw.advanceClock(b.Expiration()) http:// 取出bucket隊列的數據,并調用addOrRun方法執行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } })}

這里使用了 util 封裝的一個 Wrap 方法,這個方法會起一個 goroutines 異步執行傳入的函數,具體的可以到我上面給出的鏈接去看源碼,

Start 方法會啟動兩個 goroutines,第一個goroutines用來調用延遲隊列的 queue 的 Poll 方法,這個方法會一直循環獲取隊列里面的數據,然后將到期的數據放入到 queue 的 C 管道中;第二個 goroutines 會無限循環獲取 queue 中 C 的數據,如果 C 中有數據表示已經到期,那么會先調用 advanceClock 方法將當前時間 currentTime 往前移動到 bucket的到期時間,然后再調用 Flush 方法取出 bucket 中的隊列,并調用 addOrRun 方法執行,

func (tw *TimingWheel) advanceClock(expiration int64) {currentTime := atomic.LoadInt64(&tw.currentTime)http:// 過期時間大于等于(當前時間+tick)if expiration >= currentTime+tw.tick {http:// 將currentTime設置為expiration,從而推進currentTimecurrentTime = truncate(expiration, tw.tick)atomic.StoreInt64(&tw.currentTime, currentTime)
http:// Try to advance the clock of the overflow wheel if presenthttp:// 如果有上層時間輪,那么遞歸調用上層時間輪的引用overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel != nil {(*TimingWheel)(overflowWheel).advanceClock(currentTime)}}}

advanceClock 方法會根據到期時間來重新設置 currentTime,從而推進時間輪前進,

func (b *bucket) Flush(reinsert func(*Timer)) {var ts []*Timer
b.mu.Lock()http:// 循環獲取bucket隊列節點for e := b.timers.Front(); e != nil; {next := e.Next()
t := e.Value.(*Timer)http:// 將頭節點移除bucket隊列b.remove(t)ts = append(ts, t)
e = next}b.mu.Unlock()
b.SetExpiration(-1) http:// TODO: Improve the coordination with b.Add()
for _, t := range ts {reinsert(t)}}

Flush 方法會根據 bucket 里面 timers 列表進行遍歷插入到 ts 數組中,然后調用 reinsert 方法,這里是調用的 addOrRun 方法,

func (tw *TimingWheel) addOrRun(t *Timer) {http:// 如果已經過期,那么直接執行if !tw.add(t) {http:// 異步執行定時任務go t.task()}}

addOrRun 會調用 add 方法檢查傳入的定時任務 Timer 是否已經到期,如果到期那么異步調用 task 方法直接執行。add 方法我們下面會接著分析,

整個 start 執行流程如圖:


timewheel_start

  1. start 方法會啟動一個 goroutines 調用 poll 來處理 DelayQueue 中到期的數據,并將數據放入到管道 C 中;

  2. start 方法啟動第二個 goroutines 方法會循環獲取 DelayQueue 中管道C的數據,管道 C 中實際上存放的是一個 bucket,然后遍歷bucket的timers列表,如果任務已經到期,那么異步執行,沒有到期則重新放入到 DelayQueue 中。

add task

func main() {tw := timingwheel.NewTimingWheel(time.Second, 10)tw.Start()http:// 添加任務tw.AfterFunc(time.Second*15, func() {fmt.Println("The timer fires")exitC <- time.Now().UTC()})}

我們通過 AfterFunc 方法添加一個 15s 的定時任務,如果到期了,那么執行傳入的函數。

func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {t := &Timer{expiration: timeToMs(time.Now().UTC().Add(d)),task: f,}tw.addOrRun(t)return t}

AfterFunc 方法會根據傳入的任務到期時間,以及到期需要執行的函數封裝成 Timer,調用 addOrRun 方法,addOrRun 方法我們上面已經看過了,會根據到期時間來決定是否需要執行定時任務。

下面我們來看一下 add 方法:

func (tw *TimingWheel) add(t *Timer) bool {currentTime := atomic.LoadInt64(&tw.currentTime)http:// 已經過期if t.expiration < currentTime+tw.tick {http:// Already expiredreturn falsehttp:// 到期時間在第一層環內} else if t.expiration < currentTime+tw.interval {http:// Put it into its own buckethttp:// 獲取時間輪的位置virtualID := t.expiration / tw.tickb := tw.buckets[virtualID%tw.wheelSize]http:// 將任務放入到bucket隊列中b.Add(t)http:// 如果是相同的時間,那么返回false,防止被多次插入到隊列中if b.SetExpiration(virtualID * tw.tick) {http:// 將該bucket加入到延遲隊列中tw.queue.Offer(b, b.Expiration())}
return true} else {http:// Out of the interval. Put it into the overflow wheelhttp:// 如果放入的到期時間超過第一層時間輪,那么放到上一層中去overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel == nil {atomic.CompareAndSwapPointer(&tw.overflowWheel,nil,http:// 需要注意的是,這里tick變成了intervalunsafe.Pointer(newTimingWheel(tw.interval,tw.wheelSize,currentTime,tw.queue,)),)overflowWheel = atomic.LoadPointer(&tw.overflowWheel)}http:// 往上遞歸return (*TimingWheel)(overflowWheel).add(t)}}

add 方法根據到期時間來分成了三部分,第一部分是小于當前時間+tick,表示已經到期,那么返回 false 執行任務即可;

第二部分的判斷會根據 expiration 是否小于時間輪的跨度,如果小于的話表示該定時任務可以放入到當前時間輪中,通過取模找到 buckets 對應的時間格并放入到 bucket 隊列中,SetExpiration 方法會根據傳入的參數來判斷是否已經執行過延遲隊列的 Offer 方法,防止重復插入;

第三部分表示該定時任務的時間跨度超過了當前時間輪,需要升級到上一層的時間輪中。需要注意的是,上一層的時間輪的 tick 是當前時間輪的 interval,延遲隊列還是同一個,然后設置為指針 overflowWheel,并調用 add 方法往上層遞歸。

結尾

到這里時間輪已經講完了,不過還有需要注意的地方,我們在用上面的時間輪實現中,使用了 DelayQueue 加環形隊列的方式實現了時間輪,對定時任務項的插入和刪除操作而言,TimingWheel 時間復雜度為 O(1),在 DelayQueue 中的隊列使用的是優先隊列,時間復雜度是 O(log n),但是由于 buckets 列表實際上是非常小的,所以并不會影響性能,

Reference

  • https:http://github.com/RussellLuo/timingwheel

  • https:http://zhuanlan.zhihu.com/p/121483218

  • https:http://github.com/apache/kafka/tree/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/utils/timer

0 条回复 A文章作者 M管理員
    暫無討論,說說你的看法吧