3.12生产者消费者模型
生产者消费者模型中的对象:
1、生产者
2、消费者
3、容器
若容器已满,生产者阻塞在这,通知消费者去消费;若容器已空,则消费者阻塞,通知生产者去生产。生产者可以有多个,消费者也可以有多个。容器中的数据是多个线程共享的,线程同步问题涉及到互斥量、读写锁等。
条件变量、信号量等。
/*
生产者消费者模型(粗略的版本),此处不考虑容器存满
*/
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
// 创建一个互斥量
pthread_mutex_t mutex;
struct Node{
int num;
struct Node *next;
};
// 头结点
struct Node * head = NULL;
void * producer(void * arg) {
// 不断的创建新的节点(头插法),添加到链表中
while(1) {
pthread_mutex_lock(&mutex);
//malloc默认返回void *,类型转换为struct Node *
struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 1000;
printf("add node, num : %d, tid : %ld\n", newNode->num, pthread_self());
pthread_mutex_unlock(&mutex);
usleep(100);
}
return NULL;
}
void * customer(void * arg) {
while(1) {
pthread_mutex_lock(&mutex);
// 保存头结点的指针,记录要删除的结点信息
struct Node * tmp = head;
// 判断是否有数据,从头部开始删除
if(head != NULL) {
// 有数据
head = head->next;
printf("del node, num : %d, tid : %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
usleep(100);
} else {
// 没有数据,需进行解锁,不然讲产生死锁
pthread_mutex_unlock(&mutex);
}
}
return NULL;
}
int main() {
//初始化互斥量
pthread_mutex_init(&mutex, NULL);
// 创建5个生产者线程,和5个消费者线程
pthread_t ptids[5], ctids[5];
for(int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
//分离后的子线程可自行释放资源,不需其他线程对其进行回收释放
for(int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while(1) {
sleep(10);
}
//释放互斥量
pthread_mutex_destroy(&mutex);
pthread_exit(NULL);
return 0;
}
显示结果:
后续课程解决当前生产者消费者模型所存在的问题!互斥量、条件变量、信号量
3.13条件变量
条件变量,某个条件满足后引起阻塞或某个条件满足后解除阻塞。配合互斥锁实现线程同步。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
// 创建一个互斥量,解决线程同步问题
pthread_mutex_t mutex;
// 创建条件变量,解决没有数据时,阻塞
pthread_cond_t cond;
struct Node{
int num;
struct Node *next;
};
// 头结点
struct Node * head = NULL;
void * producer(void * arg) {
// 不断的创建新的节点,添加到链表中
while(1) {
pthread_mutex_lock(&mutex);
struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 1000;
printf("add node, num : %d, tid : %ld\n", newNode->num, pthread_self());
// 只要生产了一个,就通知消费者消费,解除阻塞
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
usleep(100);
}
return NULL;
}
void * customer(void * arg) {
while(1) {
pthread_mutex_lock(&mutex);
// 保存头结点的指针
struct Node * tmp = head;
// 判断是否有数据
if(head != NULL) {
// 有数据
head = head->next;
printf("del node, num : %d, tid : %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
usleep(100);
} else {
// 没有数据,需要等待
// 当这个函数调用阻塞的时候,会对互斥锁进行解锁,当不阻塞的,继续向下执行,会重新加锁。
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
}
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
// 创建5个生产者线程,和5个消费者线程
pthread_t ptids[5], ctids[5];
for(int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
//实现线程分离
for(int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while(1) {
sleep(10);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_exit(NULL);
return 0;
}
说明:
1、消费线程进来,拿到互斥锁。
2、如果内容为空,则阻塞,同时释放互斥锁(这就完成了拿去和释放动作),无论有几个消费线程抢到锁,都会阻塞在这里。
3、生产线程抢到锁,于是生产。只要生产了就会通知消费线程。同时下一步解除锁。
4、!!!!== wait解除阻塞,但是同时也重新拿到了锁!!!==很重要!!!
5、此时再次评估条件,满足条件,拿着锁向下执行
void* costom(void* arg){
while(1){
pthread_mutex_lock(&mutex);
struct Node* tmp = head;
if(head != NULL) {
head = head->next;
printf("delete node ,num : %d,tid:%ld\n",tmp->num,pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
usleep(100);
}else{
//没有数据 需要等待
// // 当这个函数调用阻塞的时候,会对互斥锁进行解锁,当不阻塞的,继续向下执行,会重新加锁。
pthread_cond_wait(&cond,&mutex);
//!!!!!!!!!!所以这一步必不可少
pthread_mutex_unlock(&mutex);
//收到信号,回到这里,生产者一释放锁,wait就重新拿到锁。但拿到锁以后,出else,到while入口还有一步拿锁,此时的lock无法执行,就会阻塞在那里。所以一定要在wait后面释放锁。
}
}
return NULL;
}
3.14信号量(semaphore)
信号量,信号灯,灯亮则表示资源可用,灯灭则表示资源不可用。信号量主要用于阻塞线程,但不保证线程安全。
生产者信号量初始化为8,每调用一次sem_wait(&psem),生产者信号量-1,每生产一个,消费者信号量加一,当生产者信号量为0时,阻塞。
消费者部分:每调用一次sem_wait(&csem),消费者信号量-1,每消费一个,生产者信号量加一,当消费者信号量为0时,阻塞。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
// 创建一个互斥量
pthread_mutex_t mutex;
// 创建两个信号量
sem_t psem;
sem_t csem;
struct Node{
int num;
struct Node *next;
};
// 头结点
struct Node * head = NULL;
void * producer(void * arg) {
// 不断的创建新的节点,添加到链表中
while(1) {
sem_wait(&psem);
pthread_mutex_lock(&mutex);
struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 1000;
printf("add node, num : %d, tid : %ld\n", newNode->num, pthread_self());
pthread_mutex_unlock(&mutex);
sem_post(&csem);
}
return NULL;
}
void * customer(void * arg) {
while(1) {
sem_wait(&csem);
pthread_mutex_lock(&mutex);
// 保存头结点的指针
struct Node * tmp = head;
head = head->next;
printf("del node, num : %d, tid : %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
sem_post(&psem);
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL);
sem_init(&psem, 0, 8);
sem_init(&csem, 0, 0);
// 创建5个生产者线程,和5个消费者线程
pthread_t ptids[5], ctids[5];
for(int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
for(int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while(1) {
sleep(10);
}
pthread_mutex_destroy(&mutex);
pthread_exit(NULL);
return 0;
}
C++实现生产者消费者模型
参考佬博客如下:
https://blog.csdn.net/qq_42214953/article/details/104878720