一.线程同步概念
线程同步:
协同步调,对公共区域数据按序互斥访问。防止数据混乱,产生与时间有关的错误。
数据混乱的原因:
1.资源共享(独享资源则不会)
2.调度随机(意味着数据访问会出现竞争)—线程间竞争
3.线程间缺乏必要同步机制,针对此点,如果引入某种措施,能够使得线程间按照某种规则访问公共区,就实现了线程同步
二.互斥锁(互斥量)
锁的使用:
建议锁,本身不具备强制性。 对公共数据进行保护。所有线程应该在访问公共数据前先上锁再访问。
如下图,T1 T2都使用锁,那么在访问此区域前,若T1上锁,T2就会阻塞在锁上,但T3不使用锁,也可以直接访问此区域,说明锁不是强制的,只是实现线程同步的一种手段,一种建议锁。
互斥锁主要应用函数:
pthread_mutex_init 函数
pthread_mutex_destory 函数
pthread_mutex_lock 函数
pthread_mutex_trylock 函数
pthread_mutex_unlock 函数
以上5个函数的返回值都是:成功返回0,失败返回错误号
pthread_mutex_t 类型,其本质是一个结构体。为简化理解,可忽略其实现细节,简单当成整数看待,只有两种取值:0,1 -------1代表未上锁,0代表已上锁。
使用mutex(互斥量、互斥锁)一般步骤:
-
pthread_mutex_t lock; 创建锁
-
pthread_mutex_init; 初始化 1
-
pthread_mutex_lock;加锁 1-- --> 0
-
访问共享数据(stdout)
-
pthrad_mutext_unlock();解锁 0++ --> 1
-
pthead_mutex_destroy;销毁锁
初始化的两种方式:
动态初始化:
pthread_mutex_init(&mutex, NULL);
静态初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
利用pthread_mutex_t 互斥锁,实现线程间同步例子
1.#include <stdio.h>
2.#include <string.h>
3.#include <pthread.h>
4.#include <stdlib.h>
5.#include <unistd.h>
6.
7.pthread_mutex_t mutex; // 全局区定义一把互斥锁
7.
9.void *tfn(void *arg)
10.{
8. srand(time(NULL));
9.
10. while (1) {
11. pthread_mutex_lock(&mutex); // 加锁
12. printf("hello ");
13. sleep(rand() % 3); // 模拟长时间操作共享资源,导致cpu易主,产生与时间有关的错误
14. printf("world\n");
15. pthread_mutex_unlock(&mutex); // 解锁
16. sleep(rand() % 3);
17. }
18.
19. return NULL;
23.}
20.
25.int main(void)
26.{
21. pthread_t tid;
22. srand(time(NULL));
23. int ret = pthread_mutex_init(&mutex, NULL); // 在创建线程前初始化互斥锁
24.
25. if(ret != 0){
26. fprintf(stderr, "mutex init error:%s\n", strerror(ret));
27. exit(1);
28. }
29.
30. pthread_create(&tid, NULL, tfn, NULL);
31. while (1) {
32. pthread_mutex_lock(&mutex); // 加锁
33. printf("HELLO ");
34. sleep(rand() % 3);
35. printf("WORLD\n");
36. pthread_mutex_unlock(&mutex); // 解锁
37. sleep(rand() % 3);
38. }
39. pthread_join(tid, NULL);
40.
41. pthread_mutex_destory(&mutex); // 销毁互斥锁
42.
43. return 0;
49.}
编译运行,结果如下:
可以看到,主线程和子线程在访问共享区时就没有交叉输出的情况了。
互斥锁使用技巧:
尽量保证锁的粒度, 越小越好。(访问共享数据前,加锁。访问结束【立即】解锁。)
互斥锁,本质是结构体。 我们可以看成整数。 初值为 1。(pthread_mutex_init() 函数调用成功。)
加锁: --操作, 其它线程在pthread_mutex_lock()这个函数上。
解锁: ++操作, 唤醒阻塞在锁上的线程。
try锁:不阻塞的上锁函数pthread_mutex_trylock,它表示尝试加锁,成功直接上锁,失败则直接返回错误号 EBUSY
三.读写锁
读写锁:
锁只有一把。但上锁方式不同:
读方式给数据加锁——读锁。
以写方式给数据加锁——写锁。
读共享,写独占。
写锁优先级高。
当一个线程上写锁时,无论其他线程以何种方式上锁,都会被阻塞。
当上读锁时,其他线程若以读锁上锁时,可以共享读锁,不会被阻塞。
此即读时共享。
另外,当多个线程同时到达,想要上锁时,优先分配给写锁。
相较于互斥量而言,当读线程多的时候,可以提高访问效率
读写锁操作函数
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);
pthread_rwlock_rdlock(&rwlock); 上读锁
pthread_rwlock_wrlock(&rwlock); 上写锁
pthread_rwlock_unlock(&rwlock);
pthread_rwlock_destroy(&rwlock);
以上函数都是成功返回0,失败返回错误号。
pthread_rwlock_t 类型 用于定义一个读写锁变量
pthread_rwlock_t rwlock
四.死锁的产生
死锁:
是使用锁不恰当导致的现象:
1. 对一个锁反复lock。
2. 两个线程,各自持有一把锁,请求另一把。(线程本身持有资源,缺少另外的资源执行,而另外的线程持有它所需的资源,呈环状等待。本身又不会主动释放资源,所以一直阻塞)
五.静态初始化条件变量和互斥量
条件变量本身不是锁! 但是通常结合互斥锁mutex来使用。
条件变量结构体类型: pthread_cond_t cond;
初始化条件变量:
- pthread_cond_init(&cond, NULL); 动态初始化。
- pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 静态初始化。(与初始化互斥锁类似)
相关操作函数:
pthread_cond_wait(&cond, &mutex);
此函数作用比较复杂,一旦被调用,就会先阻塞,再释放已经持有的锁,直到该函数被其他线程唤醒后再次上锁。常用在pthread_mutex_lock上锁之后。
作用:
1) 阻塞等待
2) 解锁已经加锁成功的互斥量 (pthread_mutex_unlock(&mutex)),12两步为一个原子操作
3) 当条件满足(被使用pthread_cond_wait唤醒时),函数返回,解除阻塞并重新申请获取互斥锁。 (相当于, pthread_mutex_lock(&mutex);)
原本线程的调度是随机抢占的,任何一个线程抢到锁后都会先执行。
而利用条件变量,即使某线程率先抢到了锁,若条件不满足,也不能向下执行,需要先放锁给其他线程使用,提供了一种控制线程的手段。下文的生产者消费者模型就是这种情况的一种。
pthread_cond_signal(): 唤醒阻塞在条件变量上的 (至少)一个线程。
pthread_cond_broadcast(): 唤醒阻塞在条件变量上的 所有线程。
六.条件变量的生产者消费者模型分析
生产者线程:不断生产数据
消费者线程:不断消费数据(条件:共享区内有数据)
代码如下(多个消费者,一个生产者):
1.#include <stdio.h>
2.#include <stdlib.h>
3.#include <string.h>
4.#include <unistd.h>
5.#include <errno.h>
6.#include <pthread.h>
7.
8.void err_thread(int ret, char *str)
9.{
8. if (ret != 0) {
9. fprintf(stderr, "%s:%s\n", str, strerror(ret));
10. pthread_exit(NULL);
11. }
14.}
12.
16.struct msg {
13. int num;
14. struct msg *next;
19.};
15.
21.struct msg *head;
16.
23.pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 定义/初始化一个互斥量
24.pthread_cond_t has_data = PTHREAD_COND_INITIALIZER; // 定义/初始化一个条件变量
17.
26.void *produser(void *arg)
27.{
18. while (1) {
19. struct msg *mp = malloc(sizeof(struct msg));
20.
21. mp->num = rand() % 1000 + 1; // 模拟生产一个数据`
22. printf("--produce %d\n", mp->num);
23.
24. pthread_mutex_lock(&mutex); // 加锁 互斥量
25. mp->next = head; // 写公共区域
26. head = mp;
27. pthread_mutex_unlock(&mutex); // 解锁 互斥量
28.
29. pthread_cond_signal(&has_data); // 唤醒阻塞在条件变量 has_data上的线程.
30.
31. sleep(rand() % 3);
32. }
33.
34. return NULL;
45.}
35.
47.void *consumer(void *arg)
48.{
36. while (1) {
37. struct msg *mp;
38.
39. pthread_mutex_lock(&mutex); // 加锁 互斥量
41. while(head == NULL) {
42. pthread_cond_wait(&has_data, &mutex); // 阻塞等待条件变量, 解锁
43. } // pthread_cond_wait 返回时, 重写加锁 mutex
44.
45. mp = head;
46. head = mp->next;
47.
48. pthread_mutex_unlock(&mutex); // 解锁 互斥量
49. printf("---------consumer id = %lu :%d\n", pthread_self(), mp->num);
50.
51. free(mp);
52. sleep(rand()%3);
53. }
54.
55. return NULL;
68.}
56.
70.int main(int argc, char *argv[])
71.{
57. int ret;
58. pthread_t pid, cid;
59.
60. srand(time(NULL));
61.
62. ret = pthread_create(&pid, NULL, produser, NULL); // 生产者
63. if (ret != 0)
64. err_thread(ret, "pthread_create produser error");
65.
66. ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者
67. if (ret != 0)
68. err_thread(ret, "pthread_create consuer error");
69.
70. ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者
71. if (ret != 0)
72. err_thread(ret, "pthread_create consuer error");
73.
74. ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者
75. if (ret != 0)
76. err_thread(ret, "pthread_create consuer error");
77.
78. pthread_join(pid, NULL);
79. pthread_join(cid, NULL);
80.
81. return 0;
97.}
生产者此时生产了数据后,会同时唤醒两个因条件变量阻塞的消费者,若结果A消费者拿到锁,开始消费数据,B消费者就阻塞在锁上。
A消费完数据,把锁归还,B被唤醒,然而此时已经没有数据供B消费了。所以这里有个逻辑错误,消费者阻塞在条件变量那里应该使用**while循环。**这样A消费完数据后,B做的第一件事不是去拿锁,而是判定条件变量。
七.信号量概念及其相关操作函数
信号量: 应用于线程、进程间同步。
相当于 初始化值为 N 的互斥量。
N值:可以同时访问共享数据区的线程数。
sem_t sem; 定义类型。
函数:
1.int sem_init(sem_t*sem, int pshared, unsigned int value);
参数:
sem: 信号量
pshared: 0: 用于线程间同步 1: 用于进程间同步
value:N值。(指定同时访问的线程数)
2.sem_destroy();
销毁sem信号量
3.sem_wait(); (对比 pthread_mutex_lock)
一次调用,做一次-- 操作, 当信号量的值为 0 时,再次 - - 就会阻塞。
4.sem_post(); (对比 pthread_mutex_unlock)
一次调用,做一次++ 操作. 当信号量的值为 N 时, 再次 ++ 就会阻塞。
互斥量即N值为1的信号量。
八.信号量实现的生产者消费者模型
共享区:固定数量的数据区,如int[N]数组
生产者:不断生产,在产品数量为5时阻塞(sam_post()),不再生产。
消费者:不断消费,在产品数量为0时(sam_wait())阻塞。
所以就需要两种信号量sam1,sam2,sam1表示剩余空闲格子,初值为N;sam2表示产品数量,初值为0;
生产者生产一个,sam1–,sam2++
消费者消费一个,sam1++,sam2–;
代码如下:
1./*信号量实现 生产者 消费者问题*/
2.
3.#include <stdlib.h>
4.#include <unistd.h>
5.#include <pthread.h>
6.#include <stdio.h>
7.#include <semaphore.h>
8.
9.#define NUM 5
10.
11.int queue[NUM]; //全局数组实现环形队列
12.sem_t blank_number, product_number; //空格子信号量, 产品信号量
13.
14.void *producer(void *arg)
15.{
16. int i = 0;
17.
18. while (1) {
19. sem_wait(&blank_number); //生产者将空格子数--,为0则阻塞等待
20. queue[i] = rand() % 1000 + 1; //生产一个产品
21. printf("----Produce---%d\n", queue[i]);
22. sem_post(&product_number); //将产品数++
23.
24. i = (i+1) % NUM; //借助下标实现环形
25. sleep(rand()%1);
26. }
27.}
28.
29.void *consumer(void *arg)
30.{
31. int i = 0;
32.
33. while (1) {
34. sem_wait(&product_number); //消费者将产品数--,为0则阻塞等待
35. printf("-Consume---%d\n", queue[i]);
36. queue[i] = 0; //消费一个产品
37. sem_post(&blank_number); //消费掉以后,将空格子数++
38.
39. i = (i+1) % NUM;
40. sleep(rand()%3);
41. }
42.}
43.
44.int main(int argc, char *argv[])
45.{
46. pthread_t pid, cid;
47.
48. sem_init(&blank_number, 0, NUM); //初始化空格子信号量为5, 线程间共享 -- 0
49. sem_init(&product_number, 0, 0); //产品数为0
50.
51. pthread_create(&pid, NULL, producer, NULL);
52. pthread_create(&cid, NULL, consumer, NULL);
53.
54. pthread_join(pid, NULL);
55. pthread_join(cid, NULL);
56.
57. sem_destroy(&blank_number);
58. sem_destroy(&product_number);
59.
60. return 0;
61.}
编译运行,结果如下: