设计背景
- 在小型项目中,状态机的跳转往往依赖于某个事件的状态,因此监控某个事件是否处于超时状态就至关重要;
注意事项
- 超时机制应该能够准确的判断出事件是否真正超时,并预留出设置自定义超时处理的接口;
- 超时机制不应该占用过多的系统资源;
- 超时机制应该作为一个独立组件,与各层级(驱动层/协议层/应用层等)之间解耦,只预留出简洁的操作接口;
相关设计思路
- 根据前文描述,创建两个task;monitor_task用于轮询是否有event超时,handle_task用于执行event的超时处理;
- 对于monitor_task轮询event是否超时的操作,借助时间轮算法思想,以最大超时时间max_timeout作为参考设定时间轮周期,根据实际情况划分周期的刻度;定义一个时间指针,该指针每一次跳动一个刻度,周而复始;时钟指针指向的刻度即为超时刻度,将该刻度下的event事件交由handle_task去做处理;比如:
- max_timeout为3000 ms,则设定时间轮周期为4000 ms;设定8个刻度,则刻度间时间差为500 ms;于是monitor_task的运行周期为500 ms,handle_task的运行周期为<=500ms;
- 因为设定为8个刻度,因此需要声明8个数组,该数组用于保存具体的event;当时间指针指向某个刻度,则表示该刻度下的数组中的event已经超时;event相关定义如下:
/*描述一个event*/ typedef struct { int8_t flag; //event标志位——表明是否有效 uint32_t id; //id int32_t (*callback)(void* arg); //超时执行函数 } TIMEOUT_EVENT_UNIT_t; /*描述一个event数组*/ typedef struct { uint8_t count; //数组的元素数量 TIMEOUT_EVENT_UNIT_t unit[EVENT_NUM]; } TIMEOUT_EVENT_t;
- monitor_task中的监控处理和handle_task中的超时处理如下:
static void *monitor_task(void *arg) { while (1) { pthread_mutex_lock(&g_thread_mutex); g_timeout_sec ++; if (g_timeout_sec == TIMEOUT_SEC_NUM) { /*周而复始*/ g_timeout_sec = TIMEOUT_SEC_0_5; } p_global_scale = &g_event[g_timeout_sec]; //时间指针指向下一刻度,该刻度下event超时 p_global_timeout = p_global_scale; //超时指针指向该刻度 pthread_mutex_unlock(&g_thread_mutex); usleep(MOITOR_TASK_CYCLE_MS*MS_TO_US); } pthread_exit(NULL); return NULL; } static void *handle_task(void *arg) { int32_t i = 0; while (1) { pthread_mutex_lock(&g_thread_mutex); for (i = 0; i < EVENT_NUM; i ++) { if (p_global_timeout->count == 0) { break; } if (p_global_timeout->unit[i].flag && p_global_timeout->unit[i].callback != NULL) { p_global_timeout->unit[i].callback(&p_global_timeout->unit[i].id); p_global_timeout->unit[i].flag = 0; p_global_timeout->count --; } } pthread_mutex_unlock(&g_thread_mutex); usleep(HANDLE_TASK_CYCLE_MS*MS_TO_US); } pthread_exit(NULL); return NULL; }
- 对于event的插入和删除操作,有以下两种方案:
- event插入时找到对应的数组,在数组中找到空闲位置插入;删除时,找到对应的event将其删除;两者都需要进行遍历操作;
int32_t unit_timeout_event_add(uint32_t id, TIMEOUT_SEC_e time, int32_t (*callback)(void* arg)) { /* ...... */ p_event = g_event[num]; for (i = 0; i < EVENT_NUM; i ++) { if (p_event[i].flag == 0) { p_event[i].flag = 1; p_event[i].id = id; p_event[i].callback = callback; goto __ADD_SUCCESS; } } /* ...... */ printf("%s err!\n", __func__); return -1; __ADD_SUCCESS: return 0; } int32_t unit_timeout_event_delete(uint32_t id) { int i, j; /* ...... */ for (i = 0; i < SCALE_NUM; i++) { for (j = 0; j < EVENT_NUM; j ++) { if (g_event[i][j].flag && g_event[i][j].id == id) { g_event[i][j].flag = 0; } } } /* ...... */ return 0; }
- 将event重新映射,在找到对应的数组后,通过重映射id直接插入即可;删除时,通过重映射id直接删除即可;
typedef enum { TIMEOUT_ID_1 = 0, TIMEOUT_ID_2, /* ...... */ TIMEOUT_ID_NUM, } TIMEOUT_ID_e; int32_t unit_timeout_event_add(TIMEOUT_ID_e id, TIMEOUT_SEC_e time, int32_t (*callback)(void* arg)) { /* ...... */ p_event->unit[id].flag = 1; p_event->unit[id].id = id; p_event->unit[id].callback = callback; p_event->count ++; /* ...... */ return 0; } int32_t unit_timeout_event_delete(TIMEOUT_ID_e id) { /* ...... */ for (i = 0; i < SCALE_NUM; i ++) { if (g_event[i].unit[id].flag == 1) { g_event[i].unit[id].flag = 0; g_event[i].count --; } } /* ...... */ return 0; }
- 总的来说,方案2相较于方案1采用了空间换时间的操作,即通过重新映射event id,减少了遍历操作即减少了系统资源的占用,但同样减少了可监控的event数量;
疑惑
- 为什么max_timeout为3000 ms,刻度间时间差为500 ms,却设置时间轮周期为4000 ms(比max_timeout多了两个刻度周期)?是因为存在以下两种情况需要规避:
- monitor_task阻塞499ms,即将获取到调度权,假设此时时间指针指向刻度0;恰巧此时插入了一个超时event,超时时间设置为500 ms;所以这个时候这个event应该插入到刻度2而非刻度1,以确保不会被误判;但此时存在event实际上在500ms超时了,但却在500~1000ms才感知到的问题,对于这个问题只能根据实际情况,通过增加刻度来减小误差;
- 在上述情况下,假设设置时间轮周期为3000 ms(6个刻度),同时假设此时时间指针指向刻度0;恰巧此时插入了一个超时event,超时时间设置为3000 ms,此时event插入至刻度0;随后handle_task立马执行,所以event被误判为超时;
流程图
参考链接
- 任务调度:时间轮算法经典案例解析及应用实现
- RPC实现原理之核心技术-时间轮
代码链接
- gitee or github