线程
1 线程的基本概念
什么是线程
进程其实是一个容器,当我们在编程的时候实际上是在以线程为单位进行编程,包括处理器的调度也是以线程为单位的,一个进程可以有多个线程,一个进程的多个线程共享相同的进程空间,所以多线程之间的并发要比多进程之间的并发更加容易。除了共享进程空间的资源外,每个线程还有自己独有的数据:
- 进程中标识线程的线程标识符
- 寄存器信息
- 栈
- 信号屏蔽字
- 调度优先级和策略
- errno变量
- 其他私有数据
线程标准
线程是先有标准再有实现,所谓的pthread就是posix标准的实现
线程标识符
posix下的线程标识符就是pthread_t,这个数据可以是一个整数,也可以是一个结构体,所以不能单纯的将其当做一个整数来使用
- pthread_equal(3) 比较两个线程id,相同返回非0值,否则返回0值
int pthread_equal(pthread_t t1, pthread_t t2);
- pthread_self() 获取当前线程的线程id
pthread_t pthread_self(void);
2 线程的创建和终止
2.1 线程的创建
- pthread_create() 创建一个线程,成功返回0,失败返回errno
/* 1 thread是线程标识符
2 attr是线程的属性信息,由用户指定
3 start_routine是函数指针
4 arg是传递的参数
*/
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
- 示例
static void * func(void *p) {
puts("Thread is working...");
return NULL;
}
int main(int argc, char **argv) {
pthread_t tid;
int err;
puts("Begin!");
err = pthread_create(&tid, NULL, func, NULL);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
exit(1);
}
puts("End!");
exit(0);
}
运行结果:可以看到有时候会输出线程中的语句,有时候不会输出,因为线程的调度策略是取决于CPU的,没输出的时候是因为main线程已经执行exit(0)推出了,这时进程也会退出所以不会执行线程的语句了,而输出就是因为在main线程执行exit(0)之前线程被调度了
root@VM-24-2-ubuntu:/home/ubuntu/linux# ./a.out
Begin!
End!
root@VM-24-2-ubuntu:/home/ubuntu/linux# ./a.out
Begin!
End!
root@VM-24-2-ubuntu:/home/ubuntu/linux# ./a.out
Begin!
End!
Thread is working...
2.2 线程的终止
终止方式
- 线程从启动例程返回,返回值就是线程的退出码
- 线程可以被同一进程中的其他线程取消
- 线程调用pthread_exit()函数
pthread_exit()函数
- 定义
void pthread_exit(void *retval)
pthread_join() 线程收尸,相当于wait操作
- 定义 等待子线程运行完
int pthread_join(pthread_t thread, void **retval);
2.3 栈清理
相关函数
注意:这两个其实是宏,需要成对出现,不然会出现语法错误
- pthread_cleanup_push() 类似装载钩子函数
void pthread_cleanup_push(void (*routine)(void *),
void *arg);
- pthread_cleanup_pop() 类似钩子函数被调用,不过进程中在其正常退出时会自己调用
void pthread_cleanup_pop(int execute);//传入0时表示只弹栈,但是不调用
示例
static void cleanup_func(void *p) {
puts(p);
}
static void *func(void *p) {
puts("Thread is working...");
pthread_cleanup_push(cleanup_func, "cleanup:1") ;
pthread_cleanup_push(cleanup_func, "cleanup:2") ;
pthread_cleanup_push(cleanup_func, "cleanup:3") ;
puts("push over");
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
pthread_exit(NULL);
}
int main(int argc, char **argv) {
pthread_t tid;
int err;
puts("Begin!");
err = pthread_create(&tid, NULL, func, NULL);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
exit(1);
}
pthread_join(tid, NULL);
puts("End!");
exit(0);
}
输出结果:
Begin!
Thread is working...
push over
cleanup:3
cleanup:2
cleanup:1
End!
2.4 线程取消
相关函数
- pthread_cancel() 给thread发送取消请求,成功返回0否则返回error number
int pthread_cancel(pthread_t thread);
- 取消的状态:允许和不允许
- 允许取消又分为异步cancel和推迟cancel(推迟至cancel点再响应)
- cancel点:POSIX定义的cancel点都是可能引发阻塞的系统调用
- pthread_setcancelstate() 设置取消状态,即是否允许被取消
int pthread_setcancelstate(int state, int *oldstate);
- pthread_setcanceltype() 设置取消方式:异步或推迟
int pthread_setcanceltype(int type, int *oldtype);
- pthread_testcancel() 这个函数不做任何事,就是一个取消点
void pthread_testcancel(void);
2.5 线程分离
-
pthread_detach() 分离一个线程
线程分离后即再也不能对该线程做其他操作,任由其自生自灭
int pthread_detach(pthread_t thread);
2.7 进程和线程原语的比较
2.8 线程竞争实例
判断质数
#define LEFT 30000000
#define RIGHT 30000200
static void *func(void *p) {
int j;
int num = *(int *)p;
for (j = 2; j < num / 2; j++) {
if (num % j == 0) {
return NULL;
}
}
printf("%d is a primer\n", num);
pthread_exit(NULL);
}
int main(int argc, char **argv) {
pthread_t tids[RIGHT - LEFT + 1];
int err;
int i, j;
for (i = LEFT; i < RIGHT; i++) {
err = pthread_create(&tids[i - LEFT], NULL, func, &i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = LEFT; j < i; j++) {
pthread_join(&tids[j - LEFT], NULL);
}
exit(1);
}
}
for (i = LEFT; i < RIGHT; i++) {
pthread_join(tids[i - LEFT], NULL);
}
exit(0);
}
输出结果:可以看到结果是有问题的,即产生了线程竞争的情况,因为在创建线程的时候i采用了地址传参,可能在线程还没有执行到num那条语句的时候,main线程又执行了i++,从而导致前面创建的线程的i发生了变化
30000109 is a primer
30000001 is a primer
30000109 is a primer
既然是因为地址传参,使得每次传参的地址一样导致的竞争,那么我们可以用一个结构体来保存我们的参数,每次传入不同的结构体指针,这样就能避免线程竞争同一块地址了
struct targ {
int n;
};
static void *func(void *p) {
int j, mark = 1;
int num = ((struct targ *)p)->n;
for (j = 2; j < num / 2; j++) {
if (num % j == 0) {
mark = 0;
break;
}
}
if (mark) {
printf("%d is a primer\n", num);
}
pthread_exit(p);
}
int main(int argc, char **argv) {
pthread_t tids[RIGHT - LEFT + 1];
int err;
int i, j;
struct targ *num;
void *p;
for (i = LEFT; i < RIGHT; i++) {
num = malloc(sizeof (*num));
if (num == NULL) {
perror("malloc():");
for (j = LEFT; j < i; j++) {
pthread_join(tids[j - LEFT], NULL);
}
exit(1);
}
num->n = i;
err = pthread_create(&tids[i - LEFT], NULL, func, num);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = LEFT; j < i; j++) {
pthread_join(tids[j - LEFT], NULL);
}
exit(1);
}
}
for (i = LEFT; i < RIGHT; i++) {
pthread_join(tids[i - LEFT], &p);
free(p);
}
exit(0);
}
3 线程同步
3.1 互斥量
3.1.1 基本概念
相关函数
- pthread_mutex_init() 初始化互斥锁
/* Initialize a mutex. */
extern int pthread_mutex_init (pthread_mutex_t *__mutex,
const pthread_mutexattr_t *__mutexattr);
- pthread_mutex_destroy() 销毁互斥锁
/* Destroy a mutex. */
extern int pthread_mutex_destroy (pthread_mutex_t *__mutex);
- 互斥锁静态初始化,使用函数的方式称为动态初始化,静态初始化如下,使用默认的互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
互斥锁的使用
lock和unlock之间的区域称为临界区,写代码时一定要注意临界区内的所有跳转语句,在可能再也回不去的地方要释放锁,防止死锁的发生:
- return
- break
- 函数调用
/* Try locking a mutex. 非阻塞,成功返回0*/
extern int pthread_mutex_trylock (pthread_mutex_t *__mutex)
__THROWNL __nonnull ((1));
/* Lock a mutex. 阻塞的*/
extern int pthread_mutex_lock (pthread_mutex_t *__mutex)
__THROWNL __nonnull ((1));
/* Unlock a mutex. */
extern int pthread_mutex_unlock (pthread_mutex_t *__mutex)
__THROWNL __nonnull ((1));
- pthread_once() 动态模块的单词初始化函数,保证init_routine函数只执行一次
/* Guarantee that the initialization function INIT_ROUTINE will be called
only once, even if pthread_once is executed several times with the
same ONCE_CONTROL argument. ONCE_CONTROL must point to a static or
extern variable initialized to PTHREAD_ONCE_INIT.
The initialization functions might throw exception which is why
this function is not marked with __THROW. */
extern int pthread_once (pthread_once_t *__once_control,
void (*__init_routine) (void)) __nonnull ((1, 2));
pthread_once_t once_control = PTHREAD_ONCE_INIT;
练习
4个线程分别顺序打印abcd,实现程序如下,可以看到C中的锁和java中不太一样,java中的锁要释放必须先获取这个锁,C中一个线程持有的互斥量可以被其他线程释放
#define THRNUM 4
static pthread_mutex_t mut[THRNUM];
static void *func(void *p) {
int i = (int) p;
int c = i + 'a';
while (1) {
pthread_mutex_lock(&mut[i]);
write(1, &c, 1);
sleep(1);
pthread_mutex_unlock(&mut[(i + 1) % THRNUM]);
}
pthread_exit(NULL);
}
int main(int argc, char **argv) {
int i, j, err;
pthread_t tids[THRNUM];
for (i = 0; i < THRNUM; i++) {
pthread_mutex_init(&mut[i], NULL);
pthread_mutex_lock(mut + i);
err = pthread_create(tids + i, NULL, func, (void *) i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = 0; j < i; j++) {
pthread_join(tids[j], NULL);
}
exit(1);
}
}
pthread_mutex_unlock(mut);
for (i = 0; i < THRNUM; i++) {
pthread_join(tids[i], NULL);
}
exit(0);
}
3.1.2 任务池(互斥量实现)
还是原来的求质数的问题,现在的做法是:
- num表示一个任务池
- 创建三个线程来抢任务执行
#define THRNUM 3
#define LEFT 30000000
#define RIGHT 30000200
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static volatile int num = 0;//num > 0表示一个任务,=0表示任务被抢走了,=-1表示任务全放完了
static void *func(void *p) {
int i, mark;
while (1) {
mark = 1;
pthread_mutex_lock(&mut);
if (num == 0) {
pthread_mutex_unlock(&mut);
sched_yield();
continue;
}
if (num == -1) {
pthread_mutex_unlock(&mut);
break;
}
for (i = 2; i < num / 2; i++) {
if (num % i == 0) {
mark = 0;
break;
}
}
if (mark)
printf("[%d]%d is a primer\n", (int) p, num);
if (num != RIGHT) num = 0;
else num = -1;
pthread_mutex_unlock(&mut);
}
pthread_exit(NULL);
}
int main(int argc, char **argv) {
int i, j, err, cnt;
pthread_t tids[THRNUM];
for (i = 0; i < THRNUM; i++) {
err = pthread_create(tids + i, NULL, func, (void *) i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = 0; j < i; j++) {
pthread_join(tids[j], NULL);
}
exit(1);
}
}
cnt = LEFT;
//main线程负责给num赋值
while (num != -1) {
if (num != 0) continue;
pthread_mutex_lock(&mut);
num = cnt;
cnt++;
pthread_mutex_unlock(&mut);
}
for (i = 0; i < THRNUM; i++) {
pthread_join(tids[i], NULL);
}
exit(0);
}
3.1.3 令牌桶(互斥量实现)
头文件
#define MYTBF_MAX 1024
typedef void thd_mytbf_t;
//令牌桶初始化
thd_mytbf_t * mytbf_init(int cps, int burst);
//取令牌
int mytbf_fetchtoken(thd_mytbf_t *, int);
//归还令牌
int mytbf_returntoken(thd_mytbf_t *, int);
//令牌桶销毁
int mytbf_destory(thd_mytbf_t *);
实现
#include <malloc.h>
#include "pthread_mytbf.h"
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
struct thd_mytbf_st {
int cps;
int burst;
volatile sig_atomic_t token;
int pos;
pthread_mutex_t mut;
};
static struct thd_mytbf_st * job[MYTBF_MAX];
static int inited;
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mut_init = PTHREAD_MUTEX_INITIALIZER;
static pthread_t tid_alrm;
//获取空的位置
static int get_free_pos_unlocked() {
int i;
for (i = 0; i < MYTBF_MAX; i++) {
if (job[i] == NULL) return i;
}
return -1;
}
//返回n1和n2的最小值
static int min(int n1, int n2) {
return n1 > n2 ? n2 : n1;
}
static void * thr_alrm_handled(void *) {
int i;
while (1) {
pthread_mutex_lock(&mut_job);
for (i = 0; i < MYTBF_MAX; i++) {
if (job[i] != NULL) {
pthread_mutex_lock(&job[i]->mut);
job[i]->token += job[i]->cps;
if (job[i]->token > job[i]->burst) {
job[i]->token = job[i]->burst;
}
pthread_mutex_unlock(&job[i]->mut);
}
}
pthread_mutex_unlock(&mut_job);
sleep(1);
}
}
//模块卸载
static void module_unload() {
int i;
pthread_cancel(tid_alrm);
pthread_join(tid_alrm, NULL);
//释放令牌
for (i = 0; i < MYTBF_MAX; i++) {
if (job[i] != NULL) {
pthread_mutex_destroy(&job[i]->mut);
free(job[i]);
job[i] = NULL;
}
}
pthread_mutex_destroy(&mut_job);
}
//模块加载
static void module_load() {
int err;
err = pthread_create(&tid_alrm, NULL, thr_alrm_handled, NULL);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
exit(1);
}
atexit(module_unload);
}
//令牌桶初始化
thd_mytbf_t * mytbf_init(int cps, int burst) {
struct thd_mytbf_st *me;
int pos;
pthread_once_t once_control = PTHREAD_ONCE_INIT;
//初始化令牌
me = malloc(sizeof (*me));
if (me == NULL) {
return NULL;
}
me->token = 0;
me->burst = burst;
me->cps = cps;
pthread_mutex_init(&me->mut, NULL);
pthread_mutex_lock(&mut_job);
pos = get_free_pos_unlocked();
if (pos < 0) {
free(me);
pthread_mutex_unlock(&mut_job);
return NULL;
}
me->pos = pos;
job[pos] = me;
pthread_mutex_unlock(&mut_job);
pthread_once(&once_control, module_load);
return me;
}
//取令牌
int mytbf_fetchtoken(thd_mytbf_t *p, int size) {
if (size <= 0) return -EINVAL;
struct thd_mytbf_st *tbf = p;
int n;
pthread_mutex_lock(&tbf->mut);
while (tbf->token <= 0) {
pthread_mutex_unlock(&tbf->mut);
sched_yield();
pthread_mutex_lock(&tbf->mut);
}
n = min(size, tbf->token);
tbf->token -= n;
pthread_mutex_unlock(&tbf->mut);
return n;
}
//归还令牌
int mytbf_returntoken(thd_mytbf_t *p, int size) {
if (size <= 0) return -EINVAL;
struct thd_mytbf_st *me = p;
pthread_mutex_lock(&me->mut);
me->token += size;
if (me->token > me->burst) {
me->token = me->burst;
}
pthread_mutex_unlock(&me->mut);
return size;
}
//令牌桶销毁
int mytbf_destory(thd_mytbf_t *p) {
struct thd_mytbf_st *tbf = p;
pthread_mutex_lock(&mut_job);
job[tbf->pos] = NULL;
pthread_mutex_unlock(&mut_job);
pthread_mutex_destroy(&tbf->mut);
free(p);
return 0;
}
3.2 条件变量
相关函数
- pthread_cond_init() 初始化条件变量
/* Initialize condition variable COND using attributes ATTR, or use
the default values if later is NULL. */
extern int pthread_cond_init (pthread_cond_t *__restrict __cond,
const pthread_condattr_t *__restrict __cond_attr);
pthread_cont_t cond = PTHREAD_COND_INITIALIZER; //静态初始化
- pthread_cond_destroy() 销毁条件变量
/* Destroy condition variable COND. */
extern int pthread_cond_destroy (pthread_cond_t *__cond);
- pthread_cont_broadcast() 唤醒所有等待cond的进程
/* Wake up all threads waiting for condition variables COND. */
extern int pthread_cond_broadcast (pthread_cond_t *__cond);
- pthread_cond_signal() 唤醒一个等待cond的进程
/* Wake up one thread waiting for condition variable COND. */
extern int pthread_cond_signal (pthread_cond_t *__cond);
- pthread_cond_wait() 解锁mutex等待cond条件变量,当被唤醒的时候会自动再去抢mutex,该函数一般放在循环中,防止产生虚假唤醒,循环的条件就是等待的资源
/* Wait for condition variable COND to be signaled or broadcast.
MUTEX is assumed to be locked before.
This function is a cancellation point and therefore not marked with
__THROW. */
extern int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);
- pthread_cond_timewait() 即超时等待
/* Wait for condition variable COND to be signaled or broadcast until
ABSTIME. MUTEX is assumed to be locked before. ABSTIME is an
absolute time specification; zero is the beginning of the epoch
(00:00:00 GMT, January 1, 1970).
This function is a cancellation point and therefore not marked with
__THROW. */
extern int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
const struct timespec *__restrict __abstime);
示例1 使用条件变量解决求质数的问题
#define THRNUM 3
#define LEFT 30000000
#define RIGHT 30000200
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static volatile int num = 0;//num > 0表示一个任务,=0表示任务被抢走了,=-1表示任务全放完了
static void *func(void *p) {
int i, mark;
while (1) {
mark = 1;
pthread_mutex_lock(&mut);
while (num == 0) {
pthread_cond_wait(&cond, &mut);
}
if (num == -1) {
pthread_mutex_unlock(&mut);
break;
}
for (i = 2; i < num / 2; i++) {
if (num % i == 0) {
mark = 0;
break;
}
}
if (mark)
printf("[%d]%d is a primer\n", (int) p, num);
num = 0;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mut);
}
pthread_exit(NULL);
}
int main(int argc, char **argv) {
int i, j, err, cnt;
pthread_t tids[THRNUM];
for (i = 0; i < THRNUM; i++) {
err = pthread_create(tids + i, NULL, func, (void *) i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = 0; j < i; j++) {
pthread_join(tids[j], NULL);
}
exit(1);
}
}
cnt = LEFT;
//main线程负责给num赋值
while (1) {
pthread_mutex_lock(&mut);
while (num != 0) {
pthread_cond_wait(&cond, &mut);
}
num = cnt;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mut);
cnt++;
if (cnt == RIGHT + 1) {
num = -1;
pthread_cond_broadcast(&cond);
break;
}
}
for (i = 0; i < THRNUM; i++) {
pthread_join(tids[i], NULL);
}
pthread_mutex_destroy(&mut);
pthread_cond_destroy(&cond);
exit(0);
}
示例2:使用条件变量解决打印abcd的问题
#define THRNUM 4
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int num;
static void *func(void *p) {
int i = (int) p;
int c = i + 'a';
while (1) {
pthread_mutex_lock(&mut);
while (i != num) {
pthread_cond_wait(&cond, &mut);
}
write(1, &c, 1);
num = (num + 1) % THRNUM;
sleep(1);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mut);
}
pthread_exit(NULL);
}
int main(int argc, char **argv) {
int i, j, err;
pthread_t tids[THRNUM];
for (i = 0; i < THRNUM; i++) {
err = pthread_create(tids + i, NULL, func, (void *) i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
for (j = 0; j < i; j++) {
pthread_join(tids[j], NULL);
}
exit(1);
}
}
for (i = 0; i < THRNUM; i++) {
pthread_join(tids[i], NULL);
}
pthread_mutex_destroy(&mut);
pthread_cond_destroy(&cond);
exit(0);
}
3.3 信号量
3.3.1 基本概念
相关函数
- sem_init() 初始化信号量sem的值为value,若pshared为0则表示该信号量是当前进程的局部信号量,否则就能够在多个进程间共享,成功返回0,失败返回-1并设置errno
/* Initialize semaphore object SEM to VALUE. If PSHARED then share it
with other processes. */
extern int sem_init (sem_t *__sem, int __pshared, unsigned int __value)
- sem_destroy() 销毁信号量sem
/* Free resources associated with semaphore object SEM. */
extern int sem_destroy (sem_t *__sem);
- sem_post()将信号量sem+1,若sem>0则将唤醒因调用sem_wait()阻塞的线程,成功返回0,失败返回-1
/* Post SEM. */
extern int sem_post (sem_t *__sem);
- sem_wait() 若信号量sem>0,则将其-1后直接返回,否则阻塞在此处直到有sem_post()唤醒或信号打断,成功返回0,失败返回-1
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
- sem_getvalue() 获取当前信号量的值并保存到sval中
/* Get current value of SEM and store it in *SVAL. */
extern int sem_getvalue (sem_t *__restrict __sem, int *__restrict __sval);
示例,信号量实现连续打印abcd
#define THRNUM 4
static sem_t sem;
static int num;
static void *func(void *p) {
int i = (int) p;
int c = i +'a';
while (1) {
sem_wait(&sem);
while (i != num) {
sem_post(&sem);
}
sleep(1);
write(1, &c, 1);
num = (num + 1) % THRNUM;
sem_post(&sem);
}
pthread_exit(NULL);
}
int main(int argc, char **argv) {
int i, j, err;
pthread_t tids[THRNUM];
//初始化信号量
sem_init(&sem, 0, 1);
for (i = 0; i < THRNUM; i++) {
err = pthread_create(tids + i, NULL, func, (void *) i);
if (err) {
fprintf(stderr, "pthread_create():%s", strerror(err));
//出错结束前给已经开启的线程收尸
for (j = 0; j < i; j++) {
pthread_join(tids[j], NULL);
}
exit(1);
}
}
for (i = 0; i < THRNUM; i++) {
pthread_join(tids[i], NULL);
}
sem_destroy(&sem);
exit(0);
}
3.3.2 互斥量+条件变量实现信号量
头文件
#ifndef LINUX_MYSEM_H
#define LINUX_MYSEM_H
/**
* 使用互斥量和条件变量来实现信号量
*/
typedef void mysem_t;
/**
* 初始化信号量
* @param initval 信号量的最大值
* @return 返回信号量结构体指针
*/
mysem_t * mysem_init(int initval);
/**
* 归还信号量
* @return
*/
int mysem_add(mysem_t *, int);
/**
* 减去信号量
* @return 0表示成功
*/
int mysem_sub(mysem_t *, int);
/**
* 销毁信号量
* @return 0表示成功
*/
int mysem_destroy(mysem_t *);
#endif //LINUX_MYSEM_H
实现
#include "mysem.h"
#include <pthread.h>
#include <stdio.h>
#include <malloc.h>
#include <errno.h>
struct mysem_st {
int value; //当前资源量
int max; //资源总量
pthread_mutex_t mut;
pthread_cond_t cond;
};
/**
* 初始化信号量
* @param initval 信号量的最大值
* @return 返回信号量结构体指针
*/
mysem_t * mysem_init(int initval) {
if (initval <= 0) return NULL;
struct mysem_st *me = malloc(sizeof (*me));
if (me == NULL) return NULL;
me->max = initval;
me->value = 0;
pthread_mutex_init(&me->mut, NULL);
pthread_cond_init(&me->cond, NULL);
return me;
}
/**
* 归还信号量
* @return
*/
int mysem_add(mysem_t *p, int cnt) {
if (cnt <= 0) return -EINVAL;
struct mysem_st *me = p;
pthread_mutex_lock(&me->mut);
me->value += cnt;
if (me->value > me->max) me->value = me->max;
pthread_cond_broadcast(&me->cond);
pthread_mutex_unlock(&me->mut);
return 0;
}
/**
* 减去信号量
* @return 0表示成功
*/
int mysem_sub(mysem_t *p, int cnt) {
struct mysem_st *me = p;
if (cnt <= 0 || cnt > me->max) return -EINVAL;
pthread_mutex_lock(&me->mut);
while (cnt > me->value) {
pthread_cond_wait(&me->cond, &me->mut);
}
me->value -= cnt;
pthread_mutex_unlock(&me->mut);
return 0;
}
/**
* 销毁信号量
* @return 0表示成功
*/
int mysem_destroy(mysem_t *p) {
struct mysem_st *me = p;
pthread_mutex_destroy(&me->mut);
pthread_cond_destroy(&me->cond);
free(me);
return 0;
}
3.4 读写锁
3.4.1 基本概念
相关函数
- 读写锁初始化和销毁函数
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
- 读锁加锁函数
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
- 写锁加锁函数
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
- 读写锁解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
读写锁的加锁模式
- 读锁加锁模式下,其他线程请求写锁会被阻塞,请求读锁会共享读锁
- 写锁加锁模式下,其他线程请求写锁或读锁都会被阻塞
- 写锁的优先级要比读锁高
3.4.2 信号量实现读写锁
头文件
typedef void rwlock;
//初始化读写锁
rwlock * rwLock_init();
//获取读锁
int rLock(rwlock *);
//获取写锁
int wLock(rwlock *);
//释放读锁
int rUnlock(rwlock *);
//释放写锁
int wUnlock(rwlock *);
//销毁读写锁
int rwLock_destroy(rwlock *);
实现
#include "ReadWriteLock.h"
#include <stdio.h>
#include <semaphore.h>
#include <malloc.h>
#include <errno.h>
struct rwlock_st {
sem_t r_sem;
sem_t w_sem;
int readers;
};
//初始化读写锁
rwlock * rwLock_init() {
struct rwlock_st *me = malloc(sizeof (*me));
if (me == NULL) return NULL;
sem_init(&me->w_sem, 0, 1);
sem_init(&me->r_sem, 0, 1);
me->readers = 0;
return me;
}
//获取读锁
int rLock(rwlock *p) {
if (p == NULL) return -EINVAL;
struct rwlock_st *me = p;
sem_wait(&me->r_sem);
me->readers++;
if (me->readers == 1) {
sem_wait(&me->w_sem);
}
sem_post(&me->r_sem);
return 0;
}
//获取写锁
int wLock(rwlock *p) {
if (p == NULL) return -EINVAL;
struct rwlock_st *me = p;
sem_wait(&me->w_sem);
return 0;
}
//释放读锁
int rUnlock(rwlock *p) {
if (p == NULL) return -EINVAL;
struct rwlock_st *me = p;
sem_wait(&me->r_sem);
me->readers--;
if (me->readers == 0) {
sem_post(&me->w_sem);
}
sem_post(&me->r_sem);
return 0;
}
//释放写锁
int wUnlock(rwlock *p) {
if (p == NULL) return -EINVAL;
struct rwlock_st *me = p;
sem_post(&me->w_sem);
return 0;
}
//销毁读写锁
int rwLock_destroy(rwlock *p) {
if (p == NULL) return -EINVAL;
struct rwlock_st*me = p;
sem_destroy(&me->r_sem);
sem_destroy(&me->w_sem);
free(p);
return 0;
}
4 属性变量
4.1 线程属性
相关函数
- 初始化和销毁属性,成功返回0,失败返回非0的error num
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
- 属性设置函数
pthread_attr_setaffinity_np(3), pthread_attr_setdetachstate(3), pthread_attr_setguardsize(3), pthread_attr_setinheritsched(3), pthread_attr_setschedparam(3), pthread_attr_setschedpolicy(3), pthread_attr_setscope(3), pthread_attr_setstack(3), pthread_attr_setstackaddr(3), pthread_attr_setstacksize(3), pthread_create(3), pthread_getattr_np(3),pthread_setattr_default_np(3)
4.2 互斥量属性
相关函数
- 初始化和销毁属性
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
- 获取和设置pshared值,该值为0则表示互斥量为进程私有,否则可以在进程间共享
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr,
int *pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,
int pshared);
- 设置互斥量类型
int pthread_mutexattr_gettype(const pthread_mutexattr_t *restrict attr,
int *restrict type);
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
4.3 条件变量属性
相关函数
- 初始化和销毁
int pthread_condattr_init(pthread_condattr_t *attr);
int pthread_condattr_destroy(pthread_condattr_t *attr);
5 线程重入
多线程IO
-
标准IO库中的函数都是多线程并发的,当多个线程调用这些函数的时候,会给缓冲区上锁,抢到锁的进行IO操作,没抢到的就会等待
-
代码示例,比如下面的代码会打印
aaaaa
ccccc
bbbbb
而不会打印
aaacc
cccbb
bbbaa
就是因为每个线程在调用puts时会给缓冲区上锁
void * func(void *p) {
puts(p);
pthread_exit(NULL);
}
int main(int argc, char **argv) {
pthread_t tid[3];
int err, i;
err = pthread_create(tid, NULL, func, "aaaaa");
if (err < 0) {
fprintf(stderr, "pthread_create():%s\n", strerror(err));
}
err = pthread_create(tid + 1, NULL, func, "bbbbb");
if (err < 0) {
fprintf(stderr, "pthread_create():%s\n", strerror(err));
}
err = pthread_create(tid + 2, NULL, func, "bbbbb");
if (err < 0) {
fprintf(stderr, "pthread_create():%s\n", strerror(err));
}
for (i = 0; i < 3; i++) {
pthread_join(tid[i], NULL);
}
exit(0);
}
- 如果有函数不支持多线程并发,会在名字上体现出来,比如加上_unlock后缀
线程与信号
- pthread_sigmask() 相当于sigprocmask()函数
int pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset);
- sigwait() 等待信号
int sigwait(const sigset_t *set, int *sig);
- pthread_kill()给线程发送信号
int pthread_kill(pthread_t thread, int sig);
-
线程响应信号的机制
这里就需要对之前在信号章节提到的信号响应过程做一个补充了,一个进程中可以有多个线程,每个线程都有自己的mask和pending位图,同时每个进程还有自己pending位图,线程之间发送信号体现在线程的pending位图上,进程之间发送信号体现在进程的pending位图上,响应进程间的信号时就需要看这时是哪个线程从kernel切换到user了,发生切换的线程会将自己的mask和进程的pending做与运算查看当前进程收到哪些信号,之后再将自己的pending和mask做与运算查看自己收到了哪些线程发的信号
线程与fork()
这里要讨论就是在fork()的时候会不会将所有线程也都复制一份,这需要看线程的实现标准,在POSIX标准中,只会将调用fork()的线程复制一遍
===