1.使用st-thread
我们用一个简单的demo研究一下st框架。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "st.h"
static void *_thread(void *arg) {
printf("thread: %lu\n", pthread_self());
return NULL;
}
int main(int argc, char *argv[]) {
if (st_init() < 0) {
perror("st_init");
exit(1);
}
for (unsigned int i=0; i<10; i++) {
if (st_thread_create(_thread, NULL, 0, 0) == NULL) {
perror("st_thread_create");
exit(1);
}
}
st_thread_exit(NULL);
return 0;
}
out:
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
thread: 140727908825184
2.初始化st_init
这里主要做了几件事:
- 选择io复用函数类型
- 配置系统相关设置:屏蔽SIGPIPE和配置文件描述符限制
- 初始化全局变量_st_this_vp
- 创建ide线程
- 创建primorial线程
int st_init(void)
{
_st_thread_t *thread;
if (_st_active_count) {
/* Already initialized */
return 0;
}
// 初始化空闲栈队列
ST_INIT_CLIST(&_st_free_stacks);
/* We can ignore return value here */
// 选择使用哪种IO多路复用函数,eg:_st_select_eventsys
st_set_eventsys(ST_EVENTSYS_DEFAULT);
// 屏蔽SIGPIPE信号和调整文件描述符限制
pthread_once(&io_once_control, (void (*)(void))_st_io_init);
memset(&_st_this_vp, 0, sizeof(_st_vp_t));
// 线程队列初始化
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);
#ifdef DEBUG
ST_INIT_CLIST(&_ST_THREADQ);
#endif
// io多路复用函数对应的初始化,eg:_st_select_init
if ((*_st_eventsys->init)() < 0)
return -1;
// 获取内存页大小,一般4096
_st_this_vp.pagesize = getpagesize();
// 获取时间,微妙
_st_this_vp.last_clock = st_utime();
// 创建ide线程:主要用来处理io事件和定时器
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
// 从run队列移除ide线程:因为只有没有线程调用时,才会调用ide线程
_ST_DEL_RUNQ(_st_this_vp.idle_thread);
// 初始化primorial线程,primorial线程用来标记系统进程
thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +
(ST_KEYS_MAX * sizeof(void *)));
if (!thread)
return -1;
_st_this_vp.primorial_thread = thread;
thread->private_data = (void **) (thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
// 当前运行的是primorial线程,当primorial线程退出时,整个进程也会终止。
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;
#ifdef DEBUG
_ST_ADD_THREADQ(thread);
#endif
return 0;
}
2.1 全局结构 _st_this_vp
全局的相关信息都保存在_st_this_vp,主要保存了:
- primorial线程:和系统进程共存,线程退出,进程也就退出了。
- **idle线程:**在没有线程运行会被调度,主要任务是用IO多路复用函数等待IO事件和处理定时器
- **run线程队列:**运行线程,我们的工作线程。
- **io线程队列:**当线程需要等待io事件时,会被放到io线程队列,当等待的io事件发生或者超时和中断时,会从队列移除加入到run队列中。
- **zombie线程队列:**线程结束时,如果设置了joinable,就会被加入到这个队列。
- **sleep线程:**处理定时器,基于二叉树保存定时器
typedef struct _st_vp {
_st_thread_t *primorial_thread;
_st_thread_t *idle_thread; /* Idle thread for this vp */
st_utime_t last_clock; /* The last time we went into vp_check_clock() */
_st_clist_t run_q; /* run queue for this vp */
_st_clist_t io_q; /* io queue for this vp */
_st_clist_t zombie_q; /* zombie queue for this vp */
#ifdef DEBUG
_st_clist_t thread_q; /* all threads of this vp */
#endif
int pagesize;
_st_thread_t *sleep_q; /* sleep queue for this vp */
int sleepq_size; /* number of threads on sleep queue */
#ifdef ST_SWITCH_CB
st_switch_cb_t switch_out_cb; /* called when a thread is switched out */
st_switch_cb_t switch_in_cb; /* called when a thread is switched in */
#endif
} _st_vp_t;
2.2 线程结构 _st_thread_t
线程的相关信息都保存在_st_thread,主要保存了:
- 线程状态
- 线程启动函数
- 线程所在的队列:run sleep和zombie等队列
- 线程堆栈:
- 线程上下文 :jmp_buf,线程的跳转主要通过setjmp/longjmp跳转,jmp_buf保存了线程的上下文信息
struct _st_thread {
int state; /* 线程状态 */
int flags; /* Thread's flags */
void *(*start)(void *arg); /* 线程启动函数 */
void *arg; /* 线程启动参数 */
void *retval; /* 线程启动函数返回值 */
_st_stack_t *stack; /* 记录堆栈信息 */
_st_clist_t links; /* 用于插入到 run/sleep/zombie 线程队列 */
_st_clist_t wait_links; /* 用于插入到 mutex/condvar 等待队列 */
#ifdef DEBUG
_st_clist_t tlink; /* For putting on thread queue */
#endif
st_utime_t due; /* Wakeup time when thread is sleeping */
_st_thread_t *left; /* 记录sleep定时器二叉树左节点 */
_st_thread_t *right; /* 记录sleep定时器二叉树右节点 */
int heap_index; /* 堆节点 */
void **private_data; /* 线程私有数据 */
_st_cond_t *term; /* joinable类型的线程 */
jmp_buf context; /* 线程上下文:线程的跳转主要通过setjmp/longjmp跳转,jmp_buf保存了线程的上下文信息 */
};
2.3 线程状态
// 线程状态
#define _ST_ST_RUNNING 0 // 执行中
#define _ST_ST_RUNNABLE 1 // 可执行状态,等待调度
#define _ST_ST_IO_WAIT 2 // 等待IO事件
#define _ST_ST_LOCK_WAIT 3 // 等待互斥锁
#define _ST_ST_COND_WAIT 4 // 等待条件变量
#define _ST_ST_SLEEPING 5 // sleep
#define _ST_ST_ZOMBIE 6 // 线程已结束,待其它线程调用st_thread_join收尸
#define _ST_ST_SUSPENDED 7 // 暂停,只能调用st_thread_interrupt唤醒
// 线程flag
#define _ST_FL_PRIMORDIAL 0x01 // 原生线程,非创建的,没有分配私有栈
#define _ST_FL_IDLE_THREAD 0x02 // 空闲线程,用于epoll,处理定时器
#define _ST_FL_ON_SLEEPQ 0x04 // 线程sleep中,如调用st_usleep、st_cond_timedwait等
#define _ST_FL_INTERRUPT 0x08 // 线程被st_thread_interrupt()中断
#define _ST_FL_TIMEDOUT 0x10 // 定时器超时
线程状态转换:
3.创建线程 st_thread_create
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg,
int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
void **ptds;
char *sp;
// 1.创建线程栈
if (stk_size == 0)
stk_size = ST_DEFAULT_STACK_SIZE;
// 页对齐 eg: 64 * 1024 = 65536
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
stack = _st_stack_new(stk_size);
if (!stack)
return NULL;
/* Allocate thread object and per-thread data off the stack */
// 2.分配线程对象
sp = stack->stk_top;
sp = sp - (ST_KEYS_MAX * sizeof(void *));
ptds = (void **) sp;
sp = sp - sizeof(_st_thread_t);
thread = (_st_thread_t *) sp;
// 栈指针64位对齐
if ((unsigned long)sp & 0x3f)
sp = sp - ((unsigned long)sp & 0x3f);
stack->sp = sp - _ST_STACK_PAD_SIZE;
memset(thread, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
// 3.初始化线程
thread->private_data = ptds;
thread->stack = stack;
thread->start = start;
thread->arg = arg;
// 4.初始化线程上下文
#ifndef __ia64__
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
#else
_ST_INIT_CONTEXT(thread, stack->sp, stack->bsp, _st_thread_main);
#endif
/* If thread is joinable, allocate a termination condition variable */
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
// 5.设置线程状态
thread->state = _ST_ST_RUNNABLE;
_st_active_count++;
_ST_ADD_RUNQ(thread);
#ifdef DEBUG
_ST_ADD_THREADQ(thread);
#endif
#ifndef NVALGRIND
thread->stack->valgrind_stack_id =
VALGRIND_STACK_REGISTER(thread->stack->stk_top, thread->stack->stk_bottom);
#endif
return thread;
}
3.1线程堆栈 _st_stack_t
typedef struct _st_stack {
_st_clist_t links; /* 空闲栈链表 */
char *vaddr; /* 内存分配的起始位置 */
int vaddr_size; /* 栈 总大小 */
int stk_size; /* 栈 可用部分大小 */
char *stk_bottom; /* 私有栈 结束位置 */
char *stk_top; /* 私有栈 起始位置 */
void *sp; /* 栈指针 */
#ifdef __ia64__
void *bsp; /* Register stack backing store pointer */
#endif
#ifndef NVALGRIND
/* id returned by VALGRIND_STACK_REGISTER */
/* http://valgrind.org/docs/manual/manual-core-adv.html */
unsigned int valgrind_stack_id;
#endif
} _st_stack_t;
线程上下文 jmp_buf
/* Calling environment, plus possibly a saved signal mask. */
struct __jmp_buf_tag
{
/* NOTE: The machine-dependent definitions of `__sigsetjmp'
assume that a `jmp_buf' begins with a `__jmp_buf' and that
`__mask_was_saved' follows it. Do not move these members
or add others before it. */
__jmp_buf __jmpbuf; /* Calling environment. */
int __mask_was_saved; /* Saved the signal mask? */
__sigset_t __saved_mask; /* Saved signal mask. */
};
__BEGIN_NAMESPACE_STD
typedef struct __jmp_buf_tag jmp_buf[1];
栈创建
_st_stack_t *_st_stack_new(int stack_size)
{
_st_clist_t *qp;
_st_stack_t *ts;
int extra;
// 如果_st_free_stacks非空,则从里面取
for (qp = _st_free_stacks.next; qp != &_st_free_stacks; qp = qp->next) {
ts = _ST_THREAD_STACK_PTR(qp);
if (ts->stk_size >= stack_size) {
/* Found a stack that is big enough */
ST_REMOVE_LINK(&ts->links);
_st_num_free_stacks--;
ts->links.next = NULL;
ts->links.prev = NULL;
return ts;
}
}
/* Make a new thread stack object. */
if ((ts = (_st_stack_t *)calloc(1, sizeof(_st_stack_t))) == NULL)
return NULL;
// eg: _st_randomize_stacks:0 extra:0
extra = _st_randomize_stacks ? _ST_PAGE_SIZE : 0;
// eg: stack_size:65536 REDZONE:pagesize(4096)*2=8192
// vaddr_size = 16 * 4096 + 2*4096 = 18 *4096
ts->vaddr_size = stack_size + 2*REDZONE + extra;
// 申请内存,返回起始地址
ts->vaddr = _st_new_stk_segment(ts->vaddr_size);
if (!ts->vaddr) {
free(ts);
return NULL;
}
// 栈总大小
ts->stk_size = stack_size;
// 栈
ts->stk_bottom = ts->vaddr + REDZONE;
ts->stk_top = ts->stk_bottom + stack_size;
#ifdef DEBUG
mprotect(ts->vaddr, REDZONE, PROT_NONE);
mprotect(ts->stk_top + extra, REDZONE, PROT_NONE);
#endif
if (extra) {
long offset = (random() % extra) & ~0xf;
ts->stk_bottom += offset;
ts->stk_top += offset;
}
return ts;
}
到此,我们可以分析出,栈空间布局:
4.线程调度
4.1 setjmp和longjmp函数
这里线程的调度是基于 setjmp和longjmp,和goto类似,不过goto是函数内部跳转,这个跳转范围更大。所以讲解一下用法,
#include <setjmp.h>
int setjmp(jmp_buf env);
返回值:若直接调用则返回0,若从longjmp调用返回则返回非0值的longjmp中的val值
void longjmp(jmp_buf env,int val);
调用此函数则返回到语句setjmp所在的地方,其中env 就是setjmp中的 env,而val 则是使setjmp的返回值变为val。
当检查到一个错误时,则以两个参数调用longjmp函数,第一个就是在调用setjmp时所用的env,第二个参数是具有非0值的val,它将成为从setjmp处返回的值。
使用第二个参数的原因是对于一个setjmp可以有多个longjmp。
我们看个demo
#include <stdio.h>
#include <setjmp.h>
static jmp_buf buf;
void second(void) {
printf("second\n"); // 打印
longjmp(buf,1); // 跳回setjmp的调用处 - 使得setjmp返回值为1
}
void first(void) {
second();
printf("first\n"); // 不可能执行到此行
}
int main() {
if ( ! setjmp(buf) ) {
first(); // 进入此行前,setjmp返回0
} else { // 当longjmp跳转回,setjmp返回1,因此进入此行
printf("main\n"); // 打印
}
return 0;
}
out:
second
main
注意到虽然first()子程序被调用,"first"不可能被打印。"main"被打印,因为条件语句if ( ! setjmp(buf) )被执行第二次。
使用setjmp和longjmp要注意以下几点:
1、setjmp与longjmp结合使用时,它们必须有严格的先后执行顺序,也即先调用setjmp函数,之后再调用longjmp函数,以恢复到先前被保存的“程序执行点”。否则,如果在setjmp调用之前,执行longjmp函数,将导致程序的执行流变的不可预测,很容易导致程序崩溃而退出
\2. longjmp必须在setjmp调用之后,而且longjmp必须在setjmp的作用域之内。具体来说,在一个函数中使用setjmp来初始化一个全局标号,然后只要该函数未曾返回,那么在其它任何地方都可以通过longjmp调用来跳转到 setjmp的下一条语句执行。实际上setjmp函数将发生调用处的局部环境保存在了一个jmp_buf的结构当中,只要主调函数中对应的内存未曾释放 (函数返回时局部内存就失效了),那么在调用longjmp的时候就可以根据已保存的jmp_buf参数恢复到setjmp的地方执行。
4.2 线程上下文初始化
可以看到线程上下文对象就是jmp_buf,我们看看怎么初始化的。
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
展开如下:
{
if (setjmp(thread->context))
_st_thread_main();
thread->context[0].__jmpbuf[JB_RSP]
}
和上面的demo是不是很像,这里主要设置了setjmp返回0,那什么时候进入_st_thread_main呢?答案就在st_thread_exit
4.3 线程退出st_thread_exit
void st_thread_exit(void *retval)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
printf("st_thread_exit current thread: %p\n", thread);
thread->retval = retval;
_st_thread_cleanup(thread);
_st_active_count--;
if (thread->term) {
/* Put thread on the zombie queue */
thread->state = _ST_ST_ZOMBIE;
_ST_ADD_ZOMBIEQ(thread);
/* Notify on our termination condition variable */
st_cond_signal(thread->term);
/* Switch context and come back later */
_ST_SWITCH_CONTEXT(thread);
/* Continue the cleanup */
st_cond_destroy(thread->term);
thread->term = NULL;
}
#ifdef DEBUG
_ST_DEL_THREADQ(thread);
#endif
#ifndef NVALGRIND
if (!(thread->flags & _ST_FL_PRIMORDIAL)) {
VALGRIND_STACK_DEREGISTER(thread->stack->valgrind_stack_id);
}
#endif
if (!(thread->flags & _ST_FL_PRIMORDIAL)) {
_st_stack_free(thread->stack);
}
// 启动线程调度
_ST_SWITCH_CONTEXT(thread);
free(thread);
(*_st_eventsys->free)();
}
这里主要就是干了两件件事:
- 退出当前线程
- 线程调度:_ST_SWITCH_CONTEXT
我们展开看一下_ST_SWITCH_CONTEXT,代码如下:
if (!setjmp(_thread->context)) {
_st_vp_schedule();
}
这里设置了当前线程的jmp_buf ,返回0,启动调度_st_vp_schedule
void _st_vp_schedule(void)
{
_st_thread_t *thread;
// 查找RUN队列,空就调度idle_thread
if (_ST_RUNQ.next != &_ST_RUNQ) {
// 从run队列取出next线程
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
// 如果空就切换到idle线程
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
// 切换到thread线程
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
这里展开一下_ST_RESTORE_CONTEXT
_st_this_thread = (_thread)
longjmp((_thread)->context, 1)
就是设置了一下当前线程,然后通过longjmp跳转到_st_thread_main,执行线程的启动函数,到这里调度的基本原理就清楚了。
总结一下:
- 每个线程通过jmp_buf 保存信息,然后把自己切出去,后面调度的时候,再通过longjmp跳转回来。
- 调度是非抢占式的,需要用户自己把握,如果,某个线程长时间占用CPU,别的线程就无法被调度。
- 所有的IO和sleep操作需要用st自己的接口,不然无法调度。
SRS流媒体服务器架构设计
音视频开发 视频教程:https://ke.qq.com/course/3202131?flowToken=1031864(免费订阅不迷路)
音视频开发学习资料、教学视频,免费分享有需要的可以自行添加学习交流群:739729163 领取