在raft协议中, 会初始化三个计时器是和选举有关的:
voteTimer:这个timer负责定期的检查,如果当前的state的状态是候选者(STATE_CANDIDATE),那么就会发起选举
electionTimer:在一定时间内如果leader没有与 Follower 进行通信时,Follower 就可以认为leader已经不能正常担任leader的职责,那么就会进行选举,在选举之前会先发起预投票,如果没有得到半数以上节点的反馈,则候选者就会识趣的放弃参选。所以这个timer负责预投票
stepDownTimer:定时检查是否需要重新选举leader,如果当前的leader没有获得超过半数的Follower响应,那么这个leader就应该下台然后重新选举。
public boolean init(final NodeOptions opts) {
....
// Init timers
//设置投票计时器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
//处理投票超时
handleVoteTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定范围内返回一个随机的时间戳
return randomTimeout(timeoutMs);
}
};
//设置预投票计时器
//当leader在规定的一段时间内没有与 Follower 舰船进行通信时,
// Follower 就可以认为leader已经不能正常担任旗舰的职责,则 Follower 可以去尝试接替leader的角色。
// 这段通信超时被称为 Election Timeout
//候选者在发起投票之前,先发起预投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定范围内返回一个随机的时间戳
//为了避免同时发起选举而导致失败
return randomTimeout(timeoutMs);
}
};
//leader下台的计时器
//定时检查是否需要重新选举leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
....
if (!this.conf.isEmpty()) {
//新启动的node需要重新选举
stepDown(this.currTerm, false, new Status());
}
....
}
如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。把大批量的调度任务全部都绑定时间轮上,通过时间轮进行所有任务的管理,触发以及运行。能够高效地管理各种延时任务,周期任务,通知任务等。时间轮(TimingWheel)算法应用范围非常广泛,各种操作系统的定时任务调度都有用到,我们熟悉的 Linux Crontab,以及 Java 开发过程中常用的 Dubbo、Netty、Akka、Quartz、ZooKeeper 、Kafka 等,几乎所有和 时间任务调度 都采用了时间轮的思想。
如何实现一个简易的C++时间轮
在介绍这个之前,先介绍linux定时任务的基本实现方式
一般有两个常见的比较有效的方法。一个是用 Linux 内部定时器alarm;另一个是用 sleep 或 usleep 函数让进程睡眠一段时间
如果不要求很精确的话,用 alarm() 和 signal() 就够了
unsigned int alarm(unsigned int seconds)
专门为SIGALRM信号而设,在指定的时间seconds秒后,将向进程本身发送SIGALRM信号,又称为闹钟时间。进程调用alarm后,任何以前的alarm()调用都将无效。如果参数seconds为零,那么进程内将不再包含任何闹钟时间。如果调用alarm()前,进程中已经设置了闹钟时间,则返回上一个闹钟时间的剩余时间,否则返回0。
void sig_alarm()
{
exit(0);
}
int main(int argc, char *argv[])
{
signal(SIGALRM, sig_alarm);
alarm(10);
sleep(15);
printf("Hello World!\n");
return 0;
}
所以当main()程序挂起10秒钟时,signal函数调用SIGALRM信号的处理函数sig_alarm,并且sig_alarm执行exit(0)使得程序直接退出。因此,printf(“Hello World!\n”)语句是没有被执行的。
如果你只做一般的定时,到了时间去执行一个任务,sleep是最简单的。但是sleep会让程序挂起,肯定不好。我们要的是可以继续执行下面的任务,时间一到,就出发信号,进行相应任务处理。除非用一个死循环专门用来定时调度。
不过,如果有多个定时任务,就需要对定时器做一些封装了。先介绍一下基于升序时间链表的计时器。
//Time_list.h
#include <netinet/in.h> //sockaddr_in
#include <list>
#include <functional>
#define BUFFER_SIZE 64
class util_timer; //前向声明
//客户端数据
struct client_data {
sockaddr_in address; //socket地址
int sockfd; //socket文件描述符
char buf[BUFFER_SIZE]; //数据缓存区
util_timer* timer; //定时器
};
//定时器类
class util_timer {
public:
time_t expire; //任务超时时间(绝对时间)
std::function<void(client_data*)> callBackFunc; //回调函数
client_data* userData; //用户数据
};
class Timer_list {
public:
explicit Timer_list();
~Timer_list();
public:
void add_timer(util_timer* timer); //添加定时器
void del_timer(util_timer* timer); //删除定时器
void adjust_timer(util_timer* timer); //调整定时器
void tick(); //处理链表上到期的任务
private:
std::list<util_timer*> m_timer_list; //定时器链表
};
//Timer_list.cpp
#include "Timer_list.h"
#include <time.h>
Timer_list::Timer_list() {
}
Timer_list::~Timer_list() {
m_timer_list.clear();
}
void Timer_list::add_timer(util_timer* timer) { //将定时器添加到链表
if (!timer) return;
else {
auto item = m_timer_list.begin();
while (item != m_timer_list.end()) {
if (timer->expire < (*item)->expire) {
m_timer_list.insert(item, timer);
return;
}
item++;
}
m_timer_list.emplace_back(timer);
}
}
void Timer_list::del_timer(util_timer* timer) { //将定时器从链表删除
if (!timer) return;
else {
auto item = m_timer_list.begin();
while (item != m_timer_list.end()) {
if (timer == *item) {
m_timer_list.erase(item);
return;
}
item++;
}
}
}
void Timer_list::adjust_timer(util_timer *timer) { //调整定时器在链表中的位置
del_timer(timer);
add_timer(timer);
}
void Timer_list::tick() { //SIGALRM信号触发,处理链表上到期的任务
if (m_timer_list.empty()) return;
time_t cur = time(nullptr);
//检测当前定时器链表中到期的任务。
while (!m_timer_list.empty()) {
util_timer* temp = m_timer_list.front();
if (cur < temp->expire) break;
temp->callBackFunc(temp->userData);
m_timer_list.pop_front();
}
}
怎么运用呢服务端?
首先要设置signal(SIGALRM, sig_alarm); alarm(10)这样每10秒钟就会触发信号处理函数,函数里面调用tick处理链表上到期的任务。并且再次发送alarm,周期执行
在主循环不断接受新链接,把新链接的客户端数据绑定到计时器上(计时器包括超时时间,回调函数,用户数据),计时器加入到链表中。
如果要处理不活跃的连接,每次连接有活动,还要更新过期时间=当前时间+10s,调整在链表的位置
//SIGALRM 信号的处理函数
void timer_handler() {
timer_list.tick(); //调用升序定时器链表类的tick() 处理链表上到期的任务
alarm(TIMESLOT); //再次发出 SIGALRM 信号
}
//定时器回调函数 删除socket上注册的事件并关闭此socket
void timer_callback(client_data* user_data) {
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, user_data->sockfd, 0);
if (user_data) {
close(user_data->sockfd);
cout << "close fd : " << user_data->sockfd << endl;
}
}
因为在有序链表插入节点的时间复杂度为O(n),而且是单链表,意味着链表越长,插一个节点所要找到合适位置的时间开销就会越大,这样下来,时间效率是比较低的。
很显然,对于时间轮而言,要提高精度,就要使si的值足够小; 要提高执行效率,则要求N值足够大,使定时器尽可能的分布在不同的槽。
#define BUFFER_SIZE 64
class tw_timer;
/* 绑定socket和定时器 */
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer *timer;
};
/* 定时器类 */
class tw_timer {
public:
tw_timer(int rot, int ts) {
next = nullptr;
prev = nullptr;
rotation = rot;
time_slot = ts;
}
public:
int rotation; /* 记录定时器在时间轮转多少圈后生效 */
int time_slot; /* 记录定时器属于时间轮上的哪个槽(对应的链表,下同) */
void (*cb_func)(client_data *); /* 定时器回调函数 */
client_data *user_data; /* 客户端数据 */
tw_timer *next; /* 指向下一个定时器 */
tw_timer *prev; /* 指向前一个定时器 */
};
定时器类和之前的不一样了。包含了前驱后驱指针,槽,回调函数。
下面是时间轮代码
**class time_wheel {
public:
time_wheel();
~time_wheel();
/* 根据定时值timeout创建一个定时器,并把它插入到合适的槽中 */
tw_timer *add_timer(int timeout);
/* 删除目标定时器timer */
void del_timer(tw_timer *timer);
/* SI时间到后,调用该函数,时间轮向前滚动一个槽的间隔 */
void tick();
private:
static const int N = 60; /* 时间轮上槽的数量 */
static const int SI = 1; /* 每1 s时间轮转动一次,即槽间隔为1 s */
tw_timer* slots[N]; /* 时间轮的槽,其中每个元素指向一个定时器链表,链表无序 */
int cur_slot; /* 时间轮的当前槽 */
};**
time_wheel::time_wheel() {
cur_slot = 0;
for (int i = 0; i < N; i++) {
slots[i] = nullptr;
}
}
time_wheel::~time_wheel() {
/* 遍历每个槽,并销毁其中的定时器 */
for (int i = 0; i < N; i++) {
/* 释放链表上的每个节点 */
tw_timer *tmp = slots[i];
while (tmp) {
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
tw_timer *time_wheel::add_timer(int timeout) {
if (timeout < 0) {
return nullptr;
}
int ticks = 0;
/* 下面根据带插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发,并将该滴答
* 数存储于变量ticks中。如果待插入定时器的超时值小于时间轮的槽间隔SI,则将ticks
* 向上折合为1,否则就将ticks向下折合为timeout/SI */
if (timeout < SI) {
ticks = 1;
} else {
ticks = timeout / SI;
}
/* 计算待插入的定时器在时间轮转动多少圈后被触发 */
int rotation = ticks / N;
/* 计算待插入的定时器应该被插入到哪个槽中 */
int ts = (cur_slot + (ticks % N)) % N;
/* 创建新的定时器,它在时间轮转动ratation圈之后被触发,且位于第ts个槽上 */
tw_timer *timer = new tw_timer(rotation, ts);
/* 如果第ts个槽中无任何定时器,则把新建的定时器插入其中,并将该定时器设置为该槽的头结点 */
if (slots[ts] == nullptr) {
printf("add timer, rotation is %d,cur_slot is %d\n", rotation, ts, cur_slot);
slots[ts] = timer;
} else {
/* 头插法在链表中插入节点 */
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
void time_wheel::del_timer(tw_timer *timer) {
if (timer == nullptr) {
return;
}
int ts = timer->time_slot;
/* slots[ts]是目标定时器所在槽的头结点。如果目标定时器就是该头结点,则需要
* 重置第ts个槽的头结点 */
if (timer == slots[ts]) {
slots[ts] = slots[ts]->next;
if (slots[ts]) {
slots[ts]->prev = nullptr;
}
delete timer;
} else {
timer->prev->next = timer->next;
if (timer->next) {
timer->next->prev = timer->prev;
}
delete timer;
}
}
void time_wheel::tick() {
tw_timer *tmp = slots[cur_slot]; /* 取得时间轮上当前槽的头结点 */
printf("current slots is %d\n", cur_slot);
while (tmp) {
printf("tick the timer once\n");
/* 如果定时器的ratation值大于0,则它在这一轮中不起作用 */
if (tmp->rotation > 0) {
tmp->rotation--;
tmp = tmp->next;
}
/* 否则说明定时器已经到期,于是执行定时任务,然后删除该定时器 */
else {
tmp->cb_func(tmp->user_data);
if (tmp == slots[cur_slot]) {
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next;
delete tmp;
if (slots[cur_slot]) {
slots[cur_slot]->prev = nullptr;
}
tmp = slots[cur_slot];
} else {
tmp->prev->next = tmp->next;
if (tmp->next) {
tmp->next->prev = tmp->prev;
}
tw_timer *tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
/* 更新时间轮的当前槽,以反映时间轮的转动 */
cur_slot = cur_slot + 1;
cur_slot = cur_slot % N;
}
时间轮还可以用多个不同精度的时间轮(内存优化),最小堆进行优化。