當(dāng)前位置:首頁(yè) > IT技術(shù) > 其他 > 正文

時(shí)間輪定時(shí)器分析
2022-04-25 23:09:20


時(shí)間輪算法(Timing-Wheel)很早出現(xiàn)在linux kernel 2.6中。因效率非常高,很多應(yīng)用框架都實(shí)現(xiàn)了這個(gè)算法。還有些定時(shí)器使用最小堆實(shí)現(xiàn),但總體來(lái)說(shuō),時(shí)間輪算法在插入性能上更高。

時(shí)間輪分成多個(gè)層級(jí),每一層是一個(gè)圈,和時(shí)鐘類似

當(dāng)個(gè)位的指針轉(zhuǎn)完一圈到達(dá)0這個(gè)刻度之后,十位的指針轉(zhuǎn)1格;當(dāng)十位的轉(zhuǎn)完一圈,百位的轉(zhuǎn)1格,以此類推。這樣就能表示很長(zhǎng)的度數(shù)。

時(shí)間輪能表達(dá)的時(shí)間長(zhǎng)度,和圈的數(shù)量以及每一圈的長(zhǎng)度成正比。假設(shè)有5圈,每個(gè)圈60個(gè)刻度,每個(gè)刻度表示1毫秒,那么這個(gè)時(shí)間輪可以表示這么長(zhǎng):

60 x 60 x 60 x 60 x 60 = 777,600,000?(ms) ~ 216小時(shí)

現(xiàn)在用程序表示這個(gè)指針可能這樣的:

int ptr[5];

假如有一個(gè)update函數(shù)驅(qū)動(dòng)時(shí)間輪運(yùn)轉(zhuǎn)起來(lái),每調(diào)一次跳一格,那這個(gè)算法可能是這樣的:

  • ptr[0]加1,如果ptr[0]等于60,則使ptr[0]重置為0,可以寫為:ptr[0] = (ptr[0] + 1) % 60
  • 如果ptr[0]等于0的時(shí)候,說(shuō)明轉(zhuǎn)了一圈,此時(shí)ptr[1]要加1:ptr[1] = (ptr[1] + 1) % 60
  • 就這樣一直轉(zhuǎn)到第5圈,然后又重新開始。

這樣處理有點(diǎn)麻煩,而且還需要一個(gè)數(shù)組來(lái)表示。其實(shí)可以用一個(gè)uint32變量來(lái)劃分,比如:

| 6bit | 6bit | 6bit | 6bit |  8bit  |
111111 111111 111111 111111 11111111

也分5個(gè)圈,第1個(gè)占8位即256個(gè)槽位,后面4個(gè)分別占6位即64個(gè)槽位。這個(gè)變量只需不斷自增就行,比如前面8位滿了會(huì)變成0,后面自動(dòng)進(jìn)1。這樣就可以定義一些宏:

// 第1個(gè)輪占的位數(shù)
#define TVR_BITS 8
// 第1個(gè)輪的長(zhǎng)度
#define TVR_SIZE (1 << TVR_BITS)
// 第n個(gè)輪占的位數(shù)
#define TVN_BITS 6
// 第n個(gè)輪的長(zhǎng)度
#define TVN_SIZE (1 << TVN_BITS)
// 掩碼:取模或整除用
#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)

想取某一個(gè)圈的當(dāng)前指針位置是:

// 第1個(gè)圈的當(dāng)前指針位置
#define FIRST_INDEX(v) ((v) & TVR_MASK)
// 后面第n個(gè)圈的當(dāng)前指針位置
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)

核心的數(shù)組結(jié)構(gòu)是這樣的:

// 第1個(gè)輪
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;
// 后面幾個(gè)輪
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;

// 時(shí)間輪定時(shí)器
typedef struct timerwheel {
tvroot_t tvroot; // 第1個(gè)輪
tvnum_t tv[4]; // 后面4個(gè)輪
uint64_t lasttime; // 上一次的時(shí)間毫秒
uint32_t currtick; // 當(dāng)前的tick
uint16_t interval; // 每個(gè)時(shí)間點(diǎn)的毫秒間隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;

這里要重點(diǎn)講一下clinknode_t,這是每一個(gè)圈的槽位的數(shù)據(jù),它其實(shí)是一個(gè)雙向循環(huán)鏈表:

時(shí)間輪定時(shí)器分析_結(jié)點(diǎn)

新結(jié)點(diǎn)可以往head的前面加,也可以往head的后面加,相當(dāng)于加到鏈表頭和鏈表尾。初始情況下head的前后指針指向自己。鏈表中的結(jié)點(diǎn)就是定時(shí)器結(jié)點(diǎn)。

雙向鏈表的代碼如下:

#pragma once
/**
* 循環(huán)雙向鏈表
*/

// 鏈表結(jié)點(diǎn)
typedef struct clinknode {
struct clinknode *next;
struct clinknode *prev;
} clinknode_t;

// 初始化鏈表頭:前后都指向自己
static inline void clinklist_init(clinknode_t *head) {
head->prev = head;
head->next = head;
}

// 插入結(jié)點(diǎn)到鏈表的前面,因?yàn)槭茄h(huán)鏈表,其實(shí)是在head的后面
static inline void clinklist_add_front(clinknode_t *head, clinknode_t *node) {
node->prev = head;
node->next = head->next;
head->next->prev = node;
head->next = node;
}

// 插入結(jié)點(diǎn)到鏈表的后面,因?yàn)槭茄h(huán)鏈表,所以其實(shí)是在head的前面
static inline void clinklist_add_back(clinknode_t *head, clinknode_t *node) {
node->prev = head->prev;
node->next = head;
node->prev->next = node;
head->prev = node;
}

// 判斷鏈表是否為空:循環(huán)鏈表為空是頭的下一個(gè)和上一個(gè)都指向自己
static inline int clinklist_is_empty(clinknode_t *head) {
return head == head->next;
}

// 從鏈表中移除自己,同時(shí)會(huì)重設(shè)結(jié)點(diǎn)
static inline void clinklist_remote(clinknode_t *node) {
node->next->prev = node->prev;
node->prev->next = node->next;
clinklist_init(node);
}

// 將鏈表1的結(jié)點(diǎn)取出來(lái),放到鏈表2
static inline void clinklist_splice(clinknode_t *head1, clinknode_t *head2) {
if (!clinklist_is_empty(head1)) {
clinknode_t *first = head1->next; // 第1個(gè)結(jié)點(diǎn)
clinknode_t *last = head1->prev; // 最后1個(gè)結(jié)點(diǎn)
clinknode_t *at = head2->next; // 插在第2個(gè)鏈表的這個(gè)結(jié)點(diǎn)前面
first->prev = head2;
head2->next = first;
last->next = at;
at->prev = last;
clinklist_init(head1);
}
}

整個(gè)定時(shí)器的代碼如下:

timerwheel.h

#pragma once
/**
* 定時(shí)器模塊
*/
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include "clinklist.h"


// 時(shí)間輪定時(shí)器

// 第1個(gè)輪占的位數(shù)
#define TVR_BITS 8
// 第1個(gè)輪的長(zhǎng)度
#define TVR_SIZE (1 << TVR_BITS)
// 第n個(gè)輪占的位數(shù)
#define TVN_BITS 6
// 第n個(gè)輪的長(zhǎng)度
#define TVN_SIZE (1 << TVN_BITS)
// 掩碼:取模或整除用
#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)

// 定時(shí)器回調(diào)函數(shù)
typedef void (*timer_cb_t)(void*);

// 定時(shí)器結(jié)點(diǎn)
typedef struct timernode {
struct linknode *next; // 下一個(gè)結(jié)點(diǎn)
struct linknode *prev; // 上一個(gè)結(jié)點(diǎn)
void *userdata; // 用戶數(shù)據(jù)
timer_cb_t callback; // 回調(diào)函數(shù)
uint32_t expire; // 到期時(shí)間
} timernode_t;

// 第1個(gè)輪
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;

// 后面幾個(gè)輪
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;



// 時(shí)間輪定時(shí)器
typedef struct timerwheel {
tvroot_t tvroot; // 第1個(gè)輪
tvnum_t tv[4]; // 后面4個(gè)輪
uint64_t lasttime; // 上一次的時(shí)間毫秒
uint32_t currtick; // 當(dāng)前的tick
uint16_t interval; // 每個(gè)時(shí)間點(diǎn)的毫秒間隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;


// 初始化時(shí)間輪,interval為每幀的間隔,currtime為當(dāng)前時(shí)間
void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime);
// 初始化時(shí)間結(jié)點(diǎn):cb為回調(diào),ud為用戶數(shù)據(jù)
void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud);
// 增加時(shí)間結(jié)點(diǎn),ticks為觸發(fā)間隔(注意是以interval為單位)
void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks);
// 刪除結(jié)點(diǎn)
int timerwheel_del(timerwheel_t *tw, timernode_t *node);
// 更新時(shí)間輪
void timerwheel_update(timerwheel_t *tw, uint64_t currtime);

timerwheel.c

#include "timerwheel.h"

#define FIRST_INDEX(v) ((v) & TVR_MASK)
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)

void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime) {
memset(tw, 0, sizeof(*tw));
tw->interval = interval;
tw->lasttime = currtime;
int i, j;
for (i = 0; i < TVR_SIZE; ++i) {
clinklist_init(tw->tvroot.vec + i);
}
for (i = 0; i < 4; ++i) {
for (j = 0; j < TVN_SIZE; ++j)
clinklist_init(tw->tv[i].vec + j);
}
}

void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud) {
node->next = 0;
node->prev = 0;
node->userdata = ud;
node->callback = cb;
node->expire = 0;
}

static void _timerwheel_add(timerwheel_t *tw, timernode_t *node) {
uint32_t expire = node->expire;
uint32_t idx = expire - tw->currtick;
clinknode_t *head;
if (idx < TVR_SIZE) {
head = tw->tvroot.vec + FIRST_INDEX(expire);
} else {
int i;
uint64_t sz;
for (i = 0; i < 4; ++i) {
sz = (1ULL << (TVR_BITS + (i+1) * TVN_BITS));
if (idx < sz) {
idx = NTH_INDEX(expire, i);
head = tw->tv[i].vec + idx;
break;
}
}
}
clinklist_add_back(head, (clinknode_t*)node);
}

void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks) {
node->expire = tw->currtick + ((ticks > 0) ? ticks : 1);
_timerwheel_add(tw, node);
}

int timerwheel_del(timerwheel_t *tw, timernode_t *node) {
if (!clinklist_is_empty((clinknode_t*)node)) {
clinklist_remote((clinknode_t*)node);
return 1;
}
return 0;
}

void _timerwheel_cascade(timerwheel_t *tw, tvnum_t *tv, int idx) {
clinknode_t head;
clinklist_init(&head);
clinklist_splice(tv->vec + idx, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
_timerwheel_add(tw, node);
}
}

void _timerwheel_tick(timerwheel_t *tw) {
++tw->currtick;

uint32_t currtick = tw->currtick;
int index = (currtick & TVR_MASK);
if (index == 0) {
int i = 0;
int idx;
do {
idx = NTH_INDEX(tw->currtick, i);
_timerwheel_cascade(tw, &(tw->tv[i]), idx);
} while (idx == 0 && ++i < 4);
}

clinknode_t head;
clinklist_init(&head);
clinklist_splice(tw->tvroot.vec + index, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
if (node->callback) {
node->callback(node->userdata);
}
}
}

void timerwheel_update(timerwheel_t *tw, uint64_t currtime) {
if (currtime > tw->lasttime) {
int diff = currtime - tw->lasttime + tw->remainder;
int intv = tw->interval;
tw->lasttime = currtime;

while (diff >= intv) {
diff -= intv;
_timerwheel_tick(tw);
}
tw->remainder = diff;
}
}

核心函數(shù)就兩個(gè):

  • _timerwheel_add 把定時(shí)器結(jié)點(diǎn)加到時(shí)間輪里。
  • _timerwheel_tick 指針往前走一步,如果指針到達(dá)0,則上一個(gè)圈的指針也會(huì)跳一步,此時(shí)要把上一個(gè)圈的鏈表取出來(lái),重新加到當(dāng)前圈里(_timerwheel_cascade)

這份代碼參考了2.6內(nèi)核的timer,其中涉及到一些位運(yùn)算,不過(guò)通過(guò)上面的講解,應(yīng)該不難理解。有興趣了解的還是看代碼吧,代碼才是最好的文檔:)




本文摘自 :https://blog.51cto.com/u

開通會(huì)員,享受整站包年服務(wù)立即開通 >