服务端Skynet(二)——消息调度机制
文章目录
- 服务端Skynet(二)——消息调度机制
- 1、提前了解知识
- 1.1、互斥锁(mutex lock : **mut**ual **ex**clusion lock)
- 1.2、自旋锁(spinlock)
- 1.3、读写锁(readers–writer lock)
- 1.4、条件变量(condition variables)
- 2、消息的数据结构
- 3、消费消息流程
- 4、生成消息流程
- 5、监管机制
参考文献:
skynet设计综述
skynet源码赏析
1、提前了解知识
1.1、互斥锁(mutex lock : mutual exclusion lock)
- 概念:互斥锁,一条线程加锁锁住临界区,另一条线程尝试访问该临界区的时候,会发生阻塞,并进入休眠状态。临界区是锁lock和unlock之间的代码片段,一般是多条线程能够共同访问的部分。
- 具体说明:假设一台机器上的cpu有两个核心core0和core1,现在有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B使用Mutex锁,锁住一个临界区,当线程A试图访问该临界区时,因为线程B已经将其锁住,因此线程A被挂起,进入休眠状态,此时core0进行上下文切换,将线程A放入休眠队列中,然后core0运行线程C,当线程B完成临界区的流程并执行解锁之后,线程A又会被唤醒,core0重新运行线程A
1.2、自旋锁(spinlock)
- 概念:自旋锁,一条线程加锁锁住临界区,另一条线程尝试访问该临界区的时候,会发生阻塞,但是不会进入休眠状态,并且不断轮询该锁,直至原来锁住临界区的线程解锁。
- 具体说明:假设一台机器上有两个核心core0和core1,现在有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B调用spin lock锁住临界区,当线程A尝试访问该临界区时,因为B已经加锁,此时线程A会阻塞,并且不断轮询该锁,不会交出core0的使用权,当线程B释放锁时,A开始执行临界区逻辑
1.3、读写锁(readers–writer lock)
- 概述:读写锁,一共三种状态
- 读状态时加锁,此时为共享锁,当一个线程加了读锁时,其他线程如果也尝试以读模式进入临界区,那么不会发生阻塞,直接访问临界区
- 写状态时加锁,此时为独占锁,当某个线程加了写锁,那么其他线程尝试访问该临界区(不论是读还是写),都会阻塞等待
- 不加锁
- 注意:
- 某线程加读取锁时,允许其他线程以读模式进入,此时如果有一个线程尝试以写模式访问临界区时,该线程会被阻塞,而其后尝试以读方式访问该临界区的线程也会被阻塞
- 读写锁适合在读远大于写的情形中使用
1.4、条件变量(condition variables)
- 概述:条件变量是利用线程间共享的变量进行同步的一种机制,是在多线程程序中用来实现"等待–>唤醒"逻辑常用的方法,用于维护一个条件(与是条件变量不同的概念),线程可以使用条件变量来等待某个条件为真,注意理解并不是等待条件变量为真。当条件不满足时,线程将自己加入等待队列,同时释放持有的互斥锁; 当一个线程唤醒一个或多个等待线程时,此时条件不一定为真(虚假唤醒)。
- 具体说明:应用程序A中包含两个线程t1和t2。t1需要在bool变量test_cond为true时才能继续执行,而test_cond的值是由t2来改变的。t1在test_cond为false时调用cond_wait进行等待,t2在改变test_cond的值后,调用cond_signal,唤醒在等待中的t1,告诉t1 test_cond的值变了,这样t1便可继续往下执行。
2、消息的数据结构
skynet 中一共支持两种不同的消息:
- skynet_message 本地消息
- remote_message 远程消息
//skynet_mq.h
struct skynet_message {
uint32_t source; //源地址(发送的)
int session; //当一个服务向另一个服务发请求是会生成一个session(包含请求数据的结构体),当响应端处理完毕之后,将结果放到session 原样返回
void * data;
size_t sz;
};
//skynet_harbor.h
#define GLOBALNAME_LENGTH 16
#define REMOTE_MAX 256
//remote_name 远程节点(skynet) 名称
struct remote_name {
char name[GLOBALNAME_LENGTH];
uint32_t handle;
};
struct remote_message {
struct remote_name destination;
const void * message;
size_t sz;
int type;
};
3、消费消息流程
skynet在启动时,会创建若干条worker线程(由配置指定),这些worker线程被创建以后,会不断得从global_mq里pop出一个次级消息队列来,每个worker线程,每次只pop一个次级消息队列,然后再从次级消息队列中,pop一到若干条消息出来(受权重值影响),最后消息将作为参数传给对应服务的callback函数(每个服务只有一个专属的次级消息队列),当callback执行完时,worker线程会将次级消息队列push回global_mq里,这样就完成了消息的消费。在这个过程中,因为每个worker线程会从global_mq里pop一个次级消息队列出来,此时其他worker线程就不能从global_mq里pop出同一个次级消息队列,也就是说,一个服务不能在多个worker线程内调用callback函数,从而保证了线程安全。大致框图如下:
其中线程池中的前三个线程是 monitor, timer 和 socket 线程。其中,monitor 线程主要负责检查每个服务是否陷入了死循环,socket 线程负责网络相关操作,timer 线程则负责定时器。worker 具有不同的权重值,每个 worker 会不断从全局消息队列中取出某个服务的次级消息队列,并根据权重值的不同从消息队列中取出若干个消息,然后调用服务所属的 callback 函数消费消息。创建流程源码:
//skynet_start.c
static void
start(int thread) {
pthread_t pid[thread+3];
struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}
//创建monitor, timer 和 socket 线程
create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);
//worker 线程的权重值
/*
-1:从次级消息队列取出一个消息进行处理
0:从次级消息队列取出所有消息进行处理
当权重>0时,假设次级消息队列的长度为mq_length,将mq_length转成二进制数值以后,向右移动weight(权重值)位,结果N则是,该线程一次消费次级消息队列的消息数:
1:从次级消息队列取出一半的消息进行处理
2:从次级消息队列取出四分之一的消息进行处理
3:从次级消息队列取出八分之一的消息进行处理
这样做的目的,大概是希望避免过多的worker线程为了等待spinlock解锁,而陷入阻塞状态(因为一些线程,一次消费多条甚至全部次级消息队列的消息,因此在消费期间,不会对global_mq进行入队和出队操作,入队和出队操作时加自旋锁的,因此就不会尝试去访问spinlock锁住的临界区,该线程就在相当一段时间内不会陷入阻塞),进而提升服务器的并发处理能力。这里还有一个细节值得注意,就是前四条线程,每次只是pop一个次级消息队列的消息出来,这样做也在一定程度上保证了没有服务会被饿死。
*/
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
//创建相应数量的 worker 线程
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}
free_monitor(m);
}
在多条线程,同时运作时,每条worker线程都要从global_mq中pop一条次级消息队列出来,对global_mq进行pop和push操作的时候,会用自旋锁锁住临界区
// skynet_mq.c
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;
SPIN_LOCK(q) //自旋锁
assert(queue->next == NULL);
if(q->tail) {
q->tail->next = queue;
q->tail = queue;
} else {
q->head = q->tail = queue;
}
SPIN_UNLOCK(q)
}
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
SPIN_LOCK(q)
struct message_queue *mq = q->head;
if(mq) {
q->head = mq->next;
if(q->head == NULL) {
assert(mq == q->tail);
q->tail = NULL;
}
mq->next = NULL;
}
SPIN_UNLOCK(q)
return mq;
}
正如本节概述所说,一个worker线程被创建出来以后,则是不断尝试从global_mq中pop一个次级消息队列,并从次级消息队列中pop消息,进而通过服务的callback函数来消费该消息:
// skynet_start.c
static void
wakeup(struct monitor *m, int busy) {
if (m->sleep >= m->count - busy) {
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond);
}
}
static void *
thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER);
for (;;) {
skynet_updatetime();
CHECK_ABORT
wakeup(m,m->count-1);
usleep(2500);
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}
static void *
thread_worker(void *p) {
struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct message_queue * q = NULL;
while (!m->quit) {
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
if (pthread_mutex_lock(&m->mutex) == 0) {
++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}
// skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
整个worker线程的消费流程是:
a) worker线程每次,从global_mq中弹出一个次级消息队列,如果次级消息队列为空,则该worker线程投入睡眠,timer线程每隔2.5毫秒会唤醒一条睡眠中的worker线程,并重新尝试从全局消息队列中pop一个次级消息队列出来,当次级消息队列不为空时,进入下一步
b) 根据次级消息的handle,找出其所属的服务(一个skynet_context实例)指针,从次级消息队列中,pop出n条消息(受weight值影响),并且将其作为参数,传给skynet_context的cb函数,并调用它
c) 当完成callback函数调用时,就从global_mq中再pop一个次级消息队列中,供下一次使用,并将本次使用的次级消息队列push回global_mq的尾部
d) 返回第a步
线程安全:
1、整个消费流程,每条worker线程,从global_mq取出的次级消息队列都是唯一的,并且有且只有一个服务与之对应,取出之后,在该worker线程完成所有callback调用之前,不会push回global_mq中,也就是说,在这段时间内,其他worker线程不能拿到这个次级消息队列所对应的服务,并调用callback函数,也就是说一个服务不可能同时在多条worker线程内执行callback函数,从而保证了线程安全。
2、不论是global_mq也好,次级消息队列也好,他们在入队和出队操作时,都有加上spinlock,这样多个线程同时访问mq的时候,第一个访问者会进入临界区并锁住,其他线程会阻塞等待,直至该锁解除,这样也保证了线程安全。global_mq会同时被多个worker线程访问,这个很好理解,因为worker线程总是在不断尝试驱动不同的服务,要驱动服务,首先要取出至少一个消息,要获得消息,就要取出一个次级消息队列,而这个次级消息队列要从全局消息队列里取。虽然一个服务的callback函数,只能在一个worker线程内被调用,但是在多个worker线程中,可以向同一个次级消息队列push消息,即便是该次级消息队列所对应的服务正在执行callback函数,由于次级消息队列不是skynet_context的成员(skynet_context只是包含了次级消息队列的指针),因此改变次级消息队列不等于改变skynet_context上的数据,不会影响到该服务自身内存的数据,次级消息队列在进行push和pop操作的时候,会加上一个spinlock,当多个worker线程同时向同一个次级消息队列push消息时,第一个访问的worker线程,能够进入临界区,其他worker线程就阻塞等待,直至该临界区解锁,这样保证了线程安全。
3、我们在通过handle从skynet_context list里获取skynet_context的过程中(比如派发消息时,要要先获取skynet_context指针,再调用该服务的callback函数),需要加上一个读写锁,因为在skynet运作的过程中,获取skynet_context,比创建skynet_context的情况要多得多,因此这里用了读写锁:
struct skynet_context *
skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H;
struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result);
}
rwlock_runlock(&s->lock);
return result;
}
这里加上读写锁的意义在于,多个worker线程,同时从skynet_context列表中获取context指针时,没有一条线程是会被阻塞的,这样提高了并发的效率,而此时,尝试往skyent_context里表中,添加新的服务的线程将会被阻塞住,因为添加新的服务可能会导致skynet_context列表(也就是代码里的slot列表)可能会被resize,因此读的时候不允许写入,写的时候不允许读取,保证了线程安全。
4、生成消息流程
skynet 中不同的服务运行在不同的上下文当中,彼此之间的交互只能通过消息队列进行转发。不同服务之间转发消息的接口为 skynet_send ,其定义如下:
//skynet_server.c
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -2;
}
//type类型确定消息格式
// PTYPE_TAG_ALLOCSESSION 分配 新session *session = skynet_context_newsession(context);
// PTYPE_TAG_DONTCOPY 不要拷贝data,在data上直接处理
_filter_args(context, type, &session, (void **)&data, &sz);
//消息发送端
if (source == 0) {
source = context->handle;
}
//消息接收端
if (destination == 0) {
if (data) {
skynet_error(context, "Destination address can't be 0");
skynet_free(data);
return -1;
}
return session;
}
/*
skynet_harbor_send 和 skynet_context_push --> skynet_mq_push
skynet发消息的实质是往服务的次级消息队列压入消息
*/
if (skynet_harbor_message_isremote(destination)) {
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz & MESSAGE_TYPE_MASK;
rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
skynet_harbor_send(rmsg, source, session);
} else {
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
5、监管机制
1、定义结构体
//skynet_monitor.c
struct skynet_monitor {
ATOM_INT version; //版本
int check_version; //旧版本
uint32_t source; //源地址
uint32_t destination; //目标地址
};
//skynet_start.c
struct monitor {
int count; //monitor监管的 工作线程数量
struct skynet_monitor ** m; //数组 存放所有的skynet_monitor 一个worker对应一个skynet_monitor
pthread_cond_t cond;
pthread_mutex_t mutex;
int sleep;
int quit;
};
2、运行函数
/*
skynet_start.c thread_monitor
每隔一段时间(5s) 对每个worker线程都执行一次 skynet_monitor_check
工作流程:
worker 从global_mq取出次级消息队列进行消费 执行dispatch_message(ctx, &msg); 会先调用skynet_monitor_trigger函数 添加skynet_monitor
此时skynet_monitor状态:
skynet_monitor->version = 1; skynet_monitor->check_version = 0;
当monitor 对skynet_monitor执行 skynet_monitor_check函数 此时skynet_monitor_check(第一次)中 sm->version == sm->check_version == 1
当worker 陷入死循环达到(第二次)skynet_monitor_check 因为sm->version == sm->check_version成立 返回一条错误日志
当worker 在(第二次)skynet_monitor_check之前的时间(5s)处理完消息,此时sm->source 和 sm->destination 都设置为0
*/
static void *
thread_monitor(void *p) {
struct monitor * m = p;
int i;
int n = m->count;
skynet_initthread(THREAD_MONITOR);
for (;;) {
CHECK_ABORT
for (i=0;i<n;i++) {
skynet_monitor_check(m->m[i]);
}
for (i=0;i<5;i++) {
CHECK_ABORT
sleep(1);
}
}
return NULL;
}
// skynet_monitor.c skynet_monitor_check
void
skynet_monitor_check(struct skynet_monitor *sm) {
if (sm->version == sm->check_version) {
if (sm->destination) { //检查目标地址是否为0
skynet_context_endless(sm->destination);
skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
}
} else {
sm->check_version = sm->version;
}
}
/*
那worker怎么添加monitor?
thread_worker --> skynet_context_message_dispatch --> skynet_monitor_trigger(sm, msg.source , handle);
*/
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
void
skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {
sm->source = source;
sm->destination = destination;
ATOM_FINC(&sm->version);
}