一、相关概念:
1.耦合:耦合是指两个或两个以上的体系或两种运动形式间通过相互作用而彼此影响以至联合起来的现象。在软件工程中,对象之间的耦合度就是对象之间的依赖性。对象之间的耦合越高,维护成本越高,因此对象的设计应使类和构件之间的耦合最小。
2.耦合性:耦合性是程序结构中各个模块之间相互关联的度量。它取决于各个模块之间的接口的复杂程度、调用模块的方式以及哪些信息通过接口。
3.解耦:字面意思就是解除耦合关系。在软件工程中,降低耦合度即可以理解为解耦,模块间有依赖关系必然存在耦合,理论上的绝对零耦合是做不到的,但可以通过一些现有的方法将耦合度降至最低。设计的核心思想就是尽可能减少代码耦合,如果发现代码耦合,就要采取解耦技术。原则就是A功能的代码不要写在B的功能代码中,如果两者之间需要交互,可以通过接口,通过消息,甚至可以引入框架,但总之就是不要直接交叉写。
二、生产者和消费者问题简述
生产者是写入数据,消费者是读取数据,消费者读取一个数据之后,这个数据就没有了,相当于一个有限缓冲区的问题。如下图:
1.缓冲区的作用相当于解耦
如果两个进程之间直接交换数据的话,一个生产者进程对消费者进程给出数据,但是消费者进程还没有机会去读取,这时生产者进程就会发生阻塞,等待消费者进程来读取数据,两个进程之间相互影响,具有一定的耦合性。通过上面这样一个缓冲区来交换数据的话就可以做到解耦。
有了这样一个缓冲区,如果再增加一个消费者进程来读取数据,生产者进程的代码也不用修改,对生产者进程也没有什么很大的影响,只是生产者进程产生数据的速度会变快。也不会因为增加了一个消费者进程来读取数据,而这时生产者进程还没有产生数据,使消费者进程发生阻塞,因为生产者进程产生的数据已经放到了缓冲区中,消费者进程可以随时来读取。
当然如果缓冲区被生产者进程产生的数据写满了,这时生产者进程就会被阻塞住,反之,如果缓冲区为空,消费者进程想要读取数据,那么消费者进程就会被阻塞。所以,一定要注意缓冲区满和缓冲区空的情况,缓冲区满的时候生产者不可以写入数据,缓冲区空的时候消费者不可以读取数据。
2.生产者向缓冲区写入数据和消费者从缓冲区读取数据的过程
假如现在有n个生产者(写入操作),m个消费者(读取操作),它们都要去操作这块缓冲区空间,要注意不能有两个或多个生产者同时写数据,也不能两个或多个消费者同时读数据,因为如果两个或多个生产者或消费者同时写入或者读取数据,会操作同一块缓冲区空间的数据,这里的消费者读取数据是相当于把数据读走了,也就是把这个数据清理掉了,会对别的进程产生影响,所以不可以两个或多个消费者同时去读取缓冲区中数据。
首先生产者和消费者对缓冲区的访问是一个互斥性访问,所以不管是生产者往缓冲区写入数据还是消费者从缓冲区读取数据,都需要通过互斥锁来控制线程的同步。然后,因为两个或多个生产者同时向缓冲区写入数据,也不能有两个或多个消费者从缓冲区读取数据,所以这里需要通过信号量来控制同步。但是要注意设置互斥锁和信号量的先后顺序。
3.生产者和消费者对缓冲区设置互斥锁和信号量的思路
(1)设置互斥锁
对于互斥锁,无论是消费者还是生产者都应该先看看自己可不可以对缓冲区进行操作,如果是生产者要先看缓冲区有没有剩余空间,如果有剩余空间然后再去加锁,加了锁之后就可以保证只有自己在向缓冲区中写入数据;如果是消费者要先看缓冲区有没有数据,如果有数据再去加锁,加了锁之后就可以保证只有自己在读取缓冲区中的数据。比如,消费者不可以先对缓冲区加锁,因为如果加了所之后再去读取数据的时候发现缓冲区没有数据,消费者就会被阻塞,但是这时消费者已经对缓冲区加过锁了,生产者也不可以向缓冲区中写入数据,也会被阻塞。
(2)设置信号量
对于信号量,需要设置两个信号量,其中一个去控制生产者可不可以向缓冲区写入数据,另一个控制消费者可不可以从缓冲区读取数据。控制生产者的信号量的值要参考缓冲区空间的数目,要看缓冲区空间的数目是几,生产者信号量的值就是几,因为可能不只有一个生产者,会有多个生产者。就比如如果把控制生产者的信号量的值设置为1,生产者要向缓冲区写入数据,P操作就会把信号量的值改为0,进行写入数据的操作,这时如果另外一个生产者也要像缓冲区中写入数据,就会失败,因为此时信号量的值为0,不能再执行P操作了。控制消费者的信号量的值则应该为0,默认情况下缓冲区没有数据,所有消费者都读取不了数据。如下图:
4.生产者和消费者对缓冲区设置互斥锁和信号量的方法
对于生产者来讲,要看现在缓冲区有没有空间去写入数据,生产者首先要对sc_sem这个信号量进行P操作,如果P操作失败,就说明当前缓冲区满,需要等待,如果P操作成功,生产者每次向缓冲区中写入数据,sc_sem这个信号量的值就减1,执行完P操作之后,就要向缓冲区中写入数据,这时候要防止别的生产者也要向缓冲区写入数据,所以接下来就要定义一个互斥锁,执行加锁操作,生产者向缓冲区中写完数据之后,要进行解锁操作,这时候其它想要向缓冲区写入数据的生产者就可以继续写入数据了,当所有要向缓冲区写入数据的生产者都写完数据之后,对xf_sem这个信号量进行V操作,xf_sem这个信号量的值就加1,xf_sem这个信号量的值从0变为1,这时候消费者就可以工作(从缓冲区读取数据)了。代码的思路应该是这样的:
p(sc_sem)
lock(mutex)
write
unlock(mutex)
v(xf_sem)
对于消费者来讲,要看当前的缓冲区中有没有数据可以读取,消费者首先要对信号量xf_sem进行P操作,如果此时信号量xf_sem为0,也就意味着缓冲区是空的,P操作执行失败,消费者就会被阻塞,如果信号量xf_sem不为0,消费者才可以对信号量xf_sem进行P操作成功,信号量xf_sem的值减1,这时消费者才可以从缓冲区读取数据,这时要防止别的消费者也要从缓冲区中读取数据以及生产者想要向缓冲区中写入数据,所以要对缓冲区加锁,加锁之后,消费就可以从缓冲区中读取数据,读完数据之后进行解锁,这时其他想要从缓冲区读取数据的消费者就可以继续读取数据了,等全部想要读取数据的消费者读取完之后,缓冲区就有空闲的空间了,这时对信号量sc_sem进行V操作,信号量sc_sem的值就加1,这时候生产者就可以继续对信号量sc_sem进行P操作,重复上面生产者向缓冲区写入数据的过程。代码的思路应该是这样的:
p(xf_sem)
lock(mutex)
read
unlock(mutex)
v(sc_sem)
三、生产者和消费者问题代码示例
假设现在有2个生产者,3个消费者,有一个大小为30的缓冲区。每个生产者向缓冲区写入30次数据,有2个生产者,一共写入60次数据,每个消费者从缓冲区读取20次数据,有3个消费者,一共读取60次数据。生产者一共向缓冲区写入60次数据,消费者一共从缓冲区读取了60次数据,这个过程结束之后,缓冲区为空。
代码如下:
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
#include<semaphore.h>
#define BUFF_MAX 30 //缓冲区的大小
int buff[BUFF_MAX]; //缓冲区
int in = 0;//写入数据的下标
int out = 0;//读取数据的下标
pthread_mutex_t mutex;//定义一个锁变量
sem_t sc_sem;//定义一个信号量,用来控制生产者
sem_t xf_sem;//定义一个信号量,用来控制消费者
void* sc_fun(void*arg)//生产者线程函数
{
for(int i=0;i<30;i++)//一个生产者线程写入30次,有2个生产者,一共写入60次数据
{
sem_wait(&sc_sem);//对控制生产者的信号量sc_sem进行P操作
pthread_mutex_lock(&mutex);//加锁
//随机产生一个数据再写入
buff[in]=rand()%100;//产生一个100以内的随机数,将其写入缓冲区
printf("生产者向缓存区中写入数据%d,在%d下标写入\n",buff[in],in);
in=(in+1)%BUFF_MAX;//更新in的下标
pthread_mutex_unlock(&mutex);//解锁
sem_post(&xf_sem);//对控制消费者的信号量xf_sem进行V操作
int n=rand()%3;//产生一个3以内的随机数
sleep(n);/*睡眠3秒以内,为了不让当前的生产者继续对信号量sc_sem进行P操作,
给其他生产者向缓冲区写入数据的机会,让结果看着更随机*/
}
}
void* xf_fun(void* arg)//消费者线程函数
{
for(int i=0;i<20;i++)//一个消费者线程读取20次,有3个消费者,一共读取60次数据
{
sem_wait(&xf_sem);//对控制消费者的信号量xf_sem进行P操作
pthread_mutex_lock(&mutex);//加锁
printf("消费者从缓冲区读取数据%d,在%d下标读取\n",buff[out],out);
out=(1+out)%BUFF_MAX;
pthread_mutex_unlock(&mutex);
sem_post(&sc_sem);
int n=rand()%3;
sleep(3);
}
}
int main()
{
pthread_mutex_init(&mutex,NULL);//初始化互斥锁
sem_init(&sc_sem,0,BUFF_MAX);//初始化sc_sem信号量
sem_init(&xf_sem,0,BUFF_MAX);//初始化xf_sem信号量
srand((int)time(NULL));//随机种子
pthread_t sc_id[2];//定义2个生产者线程
for(int i=0;i<2;i++)//创建并启动这两个生产者线程
{
pthread_create(&sc_id[i],NULL,sc_fun,NULL);
}
pthread_t xf_id[3];//定义3个消费者线程
for(int i=0;i<3;i++)
{
pthread_create(&xf_id[i],NULL,xf_fun,NULL);
}
for(int i=0;i<2;i++)
{
pthread_join(sc_id[i],NULL);//等待2个生产者线程结束
}
for(int i=0;i<3;i++)
{
pthread_join(xf_id[i],NULL);//等待3个消费者线程结束
}
sem_destroy(&sc_sem);//销毁生产者信号量
sem_destroy(&xf_sem);//销毁消费者信号量
pthread_mutex_destroy(&mutex);//销毁锁
exit(0);
}
部分运行结果如下: