上次說(shuō)到,基于排序鏈表的定時(shí)器存在一個(gè)問(wèn)題:添加定時(shí)器的效率偏低。這次我們用時(shí)間輪來(lái)解決該問(wèn)題。
如圖就是一個(gè)時(shí)間輪:
在時(shí)間輪內(nèi),指針指向輪子上的一個(gè)槽。它以恒定的速率順時(shí)針轉(zhuǎn)動(dòng)。沒(méi)轉(zhuǎn)動(dòng)一步就指向下一個(gè)槽,每次轉(zhuǎn)動(dòng)稱(chēng)之為一個(gè)tick。一個(gè)滴答的時(shí)間稱(chēng)為時(shí)間輪的槽間隔si(slot interval),它實(shí)際上就是心搏時(shí)間。時(shí)間輪共有N個(gè)槽,因此它運(yùn)轉(zhuǎn)一周的時(shí)間是N*si。每個(gè)槽指向一個(gè)定時(shí)器鏈表,每條鏈表上的定時(shí)器具有相同的特征:它們的定時(shí)時(shí)間相差N*si的整數(shù)倍。時(shí)間輪正式利用這個(gè)關(guān)系將定時(shí)器散列到不同的鏈表中。加入現(xiàn)在指針指向槽cs,我們要添加一個(gè)定時(shí)時(shí)間為ti的定時(shí)器,則該定時(shí)器將被插入槽ts(timer slot)對(duì)應(yīng)的鏈表中:
ts = (cs + (ti / si)) % N
基于排序鏈表的定時(shí)器使用唯一的鏈表來(lái)管理所有定時(shí)器,所以插入操作的效率隨著定時(shí)器數(shù)目的增多而降低。而時(shí)間輪使用哈希表的思想,將定時(shí)器散列到不同的鏈表上。這樣每條鏈表上的定時(shí)器數(shù)目都將明顯少于原來(lái)的排序鏈表上的定時(shí)器數(shù)目,插入操作的效率基本不受定時(shí)器數(shù)目的影響。
很顯然,對(duì)時(shí)間輪而言,要提高定時(shí)精度,就要使si值足夠小;要提高執(zhí)行效率,則要求N值足夠大。
上圖描述的是一個(gè)簡(jiǎn)單的時(shí)間輪,僅僅一個(gè)輪子。而復(fù)雜的時(shí)間輪可能有多個(gè)輪子,不同輪子擁有不同的粒度。
下面是一個(gè)簡(jiǎn)單時(shí)間輪的實(shí)現(xiàn)代碼:
#ifndef TIME_WHEEL_TIMER_H#define TIME_WHEEL_TIMER_H#include <time.h>#include <netinet/in.h>#include <stdio.h>#include <assert.h>const int BUFFER_SIZE = 1024;class tw_timer;//綁定socket和定時(shí)器struct client_data { sockaddr_in addr_; int sockfd_; char buf_[BUFFER_SIZE]; tw_timer* timer_;};//定時(shí)器類(lèi)class tw_timer {public: tw_timer(int rot, int ts) : next_(NULL), PRev_(NULL), rotation_(rot), time_slot_(ts) {} public: void (*timeout_callback_)(client_data*); //定時(shí)器回調(diào)函數(shù)public: int rotation_; //記錄定時(shí)器在時(shí)間輪轉(zhuǎn)多少圈后生效,因?yàn)橛械亩〞r(shí)值比較大 int time_slot_; //記錄定時(shí)器對(duì)應(yīng)于時(shí)間輪上的哪個(gè)槽(對(duì)應(yīng)的鏈表) client_data *user_data_; //客戶數(shù)據(jù) tw_timer* next_; //指向上一個(gè)定時(shí)器 tw_timer* prev_; //指向下一個(gè)定時(shí)器};class time_wheel {public: time_wheel() : cur_slot_(0) { memset(slots_, 0, sizeof(slots_)); //清零每個(gè)槽指針 } ~time_wheel(){ //遍歷每個(gè)槽,并銷(xiāo)毀其中的定時(shí)器 for(int i=0; i<DEFAULT_SLOTS_NUM; ++i){ tw_timer* tmp = slots_[i]; while(tmp != NULL){ slots_[i] = tmp->next_; delete tmp; tmp = slots_[i]; } } }public: tw_timer* add_timer(int timeout); tw_timer* adjust_timer(tw_timer* timer, int timeout); void del_timer(tw_timer* timer); void tick();private: static const int DEFAULT_SLOTS_NUM = 60; static const int SI = 1; tw_timer* slots_[DEFAULT_SLOTS_NUM]; int cur_slot_;};//根據(jù)定時(shí)值timeout創(chuàng)建一個(gè)定時(shí)器,并把它插入合適的槽中tw_timer* time_wheel::add_timer(int timeout){ if(timeout < 0) return NULL; //下面根據(jù)待插入定時(shí)器的超時(shí)值計(jì)算它將在時(shí)間輪轉(zhuǎn)動(dòng)多少個(gè)滴答后被觸發(fā),并將該滴答數(shù)存儲(chǔ)于變量ticks中。 //如果待插入定時(shí)器的超時(shí)值小于時(shí)間輪的槽間隔SI,則將ticks折合為1,下一次它就被觸發(fā)。否則將ticks向下折合為timeout/SI int ticks = 0; if(timeout < SI) ticks = 1; else ticks = timeout / SI; //計(jì)算待插入定時(shí)器在時(shí)間輪轉(zhuǎn)多少圈后被觸發(fā) int rotation = ticks / DEFAULT_SLOTS_NUM; //計(jì)算待插入的定時(shí)器應(yīng)該被插入哪個(gè)槽中 int ts = (cur_slot_ + (ticks % DEFAULT_SLOTS_NUM)) % DEFAULT_SLOTS_NUM; //創(chuàng)建新的定時(shí)器,它在時(shí)間輪轉(zhuǎn)動(dòng)rotation圈之后被觸發(fā),且位于第ts個(gè)槽上 tw_timer* timer = new tw_timer(rotation, ts); //如果第ts個(gè)槽中無(wú)任何定時(shí)器,則把新建的定時(shí)器插入其中,并將該定時(shí)器設(shè)置為該槽的頭結(jié)點(diǎn) if(slots_[ts] == NULL){ printf("add timer, rotation is %d, ts is %d, cur_slot_ is %d/n", rotation, ts, cur_slot_); slots_[ts] = timer; } else{ //否則,將定時(shí)器插入第ts個(gè)槽中 timer->next_ = slots_[ts]; slots_[ts]->prev_ = timer; slots_[ts] = timer; } return timer;}//調(diào)整定時(shí)器,延長(zhǎng)壽命tw_timer* time_wheel::adjust_timer(tw_timer* timer, int timeout){ assert(timer != NULL && timeout >= 0); printf("adjust timer/n"); del_timer(timer); //延長(zhǎng)壽命我們需要?jiǎng)h掉之前的,從新添加一個(gè)新的 return add_timer(timeout);}//刪除目標(biāo)定時(shí)器timervoid time_wheel::del_timer(tw_timer* timer){ if(timer == NULL) return ; int ts = timer->time_slot_; //slots_[ts]是目標(biāo)定時(shí)器所在槽的頭結(jié)點(diǎn)。如果目標(biāo)定時(shí)器就是該頭結(jié)點(diǎn),則需要重置第ts個(gè)槽的頭結(jié)點(diǎn) if(timer == slots_[ts]){ slots_[ts] = slots_[ts]->next_; if(slots_[ts] != NULL) slots_[ts]->prev_ = NULL; delete timer; } else{ timer->prev_->next_ = timer->next_; if(timer->next_ != NULL){ timer->next_->prev_ = timer->prev_; } delete timer; }}//SI時(shí)間到后,調(diào)用該函數(shù),先檢驗(yàn)時(shí)間輪對(duì)應(yīng)的槽的所有timer是否到期了,進(jìn)行相應(yīng)的處理。然后時(shí)間輪向前滾動(dòng)一個(gè)槽的間隔。void time_wheel::tick(){ tw_timer* tmp = slots_[cur_slot_]; //取得時(shí)間輪上當(dāng)前槽的頭結(jié)點(diǎn) printf("current slot is %d/n", cur_slot_); while(tmp != NULL){ printf("tick the timer once/n"); //如果定時(shí)器的rotation值大于0,則它在這一輪補(bǔ)齊作用,它壽命還長(zhǎng)著呢 if(tmp->rotation_ > 0){ tmp->rotation_--; tmp = tmp->next_; //繼續(xù)查找下一個(gè)timer } else{ //否則,說(shuō)明定時(shí)器已經(jīng)到期,于是執(zhí)行定時(shí)任務(wù),然后刪除該定時(shí)器 tmp->timeout_callback_(tmp->user_data_); if(tmp == slots_[cur_slot_]){ printf("delete header in cur_slot/n"); slots_[cur_slot_] = tmp->next_; delete tmp; if(slots_[cur_slot_] != NULL) slots_[cur_slot_]->prev_ == NULL; tmp = slots_[cur_slot_]; } else{ tmp->prev_->next_ = tmp->next_; if(tmp->next_ != NULL) tmp->next_->prev_ = tmp->prev_; tw_timer* tmp2 = tmp->next_; delete tmp; tmp = tmp2; } } } //更新時(shí)間輪的當(dāng)前槽,向前走一步,以反映時(shí)間輪的轉(zhuǎn)動(dòng) cur_slot_ = ++cur_slot_ % DEFAULT_SLOTS_NUM; //similar to cycle queue}#endif下面是測(cè)試代碼,類(lèi)似上篇博客中升序鏈表的測(cè)試代碼,僅有部分不同:
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <signal.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <pthread.h>#include "time_wheel_timer.h"#define FD_LIMIT 65535#define MAX_EVENT_NUMBER 1024#define TIME_SLOT 5static int pipefd[2];static time_wheel timer_lst;static int epollfd = 0;int setnonblocking( int fd ){ int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option;}void addfd(int fd ){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd );}void sig_handler( int sig ){ int save_errno = errno; int msg = sig; send( pipefd[1], ( char* )&msg, 1, 0 ); errno = save_errno;}void addsig( int sig ){ struct sigaction sa; memset( &sa, '/0', sizeof( sa ) ); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset( &sa.sa_mask ); assert( sigaction( sig, &sa, NULL ) != -1 );}void timer_handler(){ timer_lst.tick(); alarm( TIME_SLOT );}void cb_func( client_data* user_data ){ epoll_ctl( epollfd, EPOLL_CTL_DEL, user_data->sockfd_, 0 ); assert( user_data ); close( user_data->sockfd_ ); printf( "close fd %d/n", user_data->sockfd_ );}int main( int argc, char* argv[] ){ if( argc <= 2 ) { printf( "usage: %s ip_address port_number/n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); int on = 1; ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); assert(ret != -1); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd(listenfd ); ret = socketpair( PF_UNIX, SOCK_STREAM, 0, pipefd ); assert( ret != -1 ); setnonblocking( pipefd[1] ); addfd(pipefd[0] ); // add all the interesting signals here addsig( SIGALRM ); addsig( SIGTERM ); bool stop_server = false; client_data* users = new client_data[FD_LIMIT]; bool timeout = false; alarm( TIME_SLOT ); while( !stop_server ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure/n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd(connfd); users[connfd].addr_ = client_address; users[connfd].sockfd_ = connfd; tw_timer* timer = timer_lst.add_timer(3 * TIME_SLOT); timer->user_data_ = &users[connfd]; timer->timeout_callback_ = cb_func; users[connfd].timer_ = timer; } else if( ( sockfd == pipefd[0] ) && ( events[i].events & EPOLLIN ) ) { int sig; char signals[1024]; ret = recv( pipefd[0], signals, sizeof( signals ), 0 ); if( ret == -1 ) { // handle the error continue; } else if( ret == 0 ) { continue; } else { for( int i = 0; i < ret; ++i ) { switch( signals[i] ) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; } } } } } else if( events[i].events & EPOLLIN ) { memset( users[sockfd].buf_, '/0', BUFFER_SIZE ); ret = recv( sockfd, users[sockfd].buf_, BUFFER_SIZE-1, 0 ); printf( "get %d bytes of client data %s from %d/n", ret, users[sockfd].buf_, sockfd ); tw_timer* timer = users[sockfd].timer_; if( ret < 0 ) { if( errno != EAGAIN ) { cb_func( &users[sockfd] ); if( timer ) { timer_lst.del_timer( timer ); } } } else if( ret == 0 ) { cb_func( &users[sockfd] ); if( timer ) { timer_lst.del_timer( timer ); } } else { //send( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 ); if( timer ) { //下面這些注釋代碼是和上篇博客升序鏈表不同的地方之一 // time_t cur = time( NULL ); //timer->expire = cur + 3 * TIMESLOT; // printf( "adjust timer once/n" ); //timer_lst.adjust_timer( timer ); tw_timer* new_timer = timer_lst.adjust_timer(timer, 3*TIME_SLOT); new_timer->user_data_ = &users[sockfd]; new_timer->timeout_callback_ = cb_func; users[sockfd].timer_ = new_timer; } } } else { // others } } if( timeout ) { timer_handler(); timeout = false; } } close( listenfd ); close( pipefd[1] ); close( pipefd[0] ); close( epollfd ); delete [] users; return 0;}對(duì)于時(shí)間輪而言,添加一個(gè)定時(shí)器的時(shí)間復(fù)雜度是O(1),刪除一個(gè)定時(shí)器的時(shí)間復(fù)雜度也是O(1)(因?yàn)槭请p向鏈表直接利用prev指針),執(zhí)行一個(gè)定時(shí)器的時(shí)間復(fù)雜度是O(n)(遍歷某個(gè)槽的鏈表所有節(jié)點(diǎn),因?yàn)橛械墓?jié)點(diǎn)輪數(shù)不是當(dāng)前輪,所以我們不能憑借類(lèi)似升序鏈表那樣只遍歷部分鏈表就知道后面的節(jié)點(diǎn)時(shí)間未到)。但實(shí)際上執(zhí)行一個(gè)定時(shí)器任務(wù)的效率比O(n)好的多,因?yàn)闀r(shí)間輪將所有定時(shí)器散列到不同的鏈表上。時(shí)間輪的槽越多,每條鏈表上定時(shí)器數(shù)量越少。當(dāng)采用多個(gè)輪子實(shí)現(xiàn)時(shí)間輪,執(zhí)行一個(gè)定時(shí)器的時(shí)間復(fù)雜度接近O(1)。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注