【Linux】线程概念 | 同步

news2024/11/16 21:24:50

除了线程互斥,我们还有线程同步,来康康吧

文章目录

  • 1.为什么需要同步
  • 2.生产消费模型
    • 2.1 生产者和消费者的关系
    • 2.2 以简单代码为例
    • 2.3 并发
  • 3.条件变量接口
    • 3.1 init/destroy
    • 3.2 pthread_cond_wait
    • 3.3 pthrea_cond_signal/broadcast
    • 3.4 代码示例
      • 3.4.1 小bug
      • 3.4.2 修正
      • 3.4.3 典型错误
  • 4.阻塞队列-生产消费模型实例
    • 4.1 成员变量
    • 4.2 push和pop
      • 4.2.1 运行测试
      • 4.2.2 进一步封装
      • 4.2.3 使用task分配运算任务
  • 5.阻塞队列-循环队列
    • 5.0 为啥用循环队列
    • 5.1 POSIX信号量
      • 5.1.1 sem_init/destroy
      • 5.1.2 sem_wait
      • 5.1.3 sem_post
      • 5.1.4 在循环队列里面使用
    • 5.2 成员变量
    • 5.3 构造/析构
    • 5.4 生产消费
    • 5.5 测试
  • 结语

1.为什么需要同步

在部分条件下,互斥是正确的,但是不合理。比如食堂打饭的时候,食堂阿姨一次只能给一个人打饭,于是就选择通过竞争来获得打饭的权利。此时就会出现有些线程因为优先级过低或者CPU调度问题,一直打不到饭,于是就出现了饥饿问题

这是因为我们对多线程访问同一个资源没有进行限制,全靠CPU调度来决定运行顺序;所以我们需要对线程进行一定的控制,这就是线程同步的概念

  • 饥饿问题:某一个线程一直无法申请到某种资源(比如因为优先级过低)
  • 同步:保证数据安全(临界资源访问)的前提下,让线程根据一定条件和顺序访问临界资源,从而避免饥饿问题
  • 竞态条件:因为时序问题(CPU调度)而导致程序异常;

2.生产消费模型

这个模型其实很简单,消费者去超市购买东西,生产者把商品投放到超市中。

  • 这时候就不需要消费者直接去找工厂问他xx东西又没有生产,他需要购买;而是转去超市里面购买xx东西;
  • 如果xx东西没有货了,超市就会通知生产者进行补货。如果超市里面的货架已经满了,就通知生产者不需要继续生产了;
  • 当商品没货了,超市会告知消费者这个东西没货,消费者会停止消费行为;而生产者补货了之后,超市就会通知消费者让他来购买

image-20230103184124025

在基础模式中,消费者要想知道一个东西有没有货,需要去超市里面看(相当于轮循检测)

我们可以引入一个通知方式,比如超市开放一个微信公众号,告知消费者xx物品是否有货,以及告知消费者什么时候需要补货,此时就不需要消费者和生产者不断询问超市关于一个商品的情况了!这就相当于线程同步

2.1 生产者和消费者的关系

下面提到的是普适情况

  • 消费者有多个,他们之间是竞争关系(互斥)竞争商品的购买
  • 生产者有多个,他们之间是竞争关系(互斥)竞争超市的货架
  • 消费者和生产者之间,既有互斥关系,也是同步关系(需要生产者供货了之后,消费者才能消费)这两个关系并不冲突!

除了上面提到的3种生产关系,还有下面俩点

在实际程序中,消费者和生产者都是由线程承担的(2种角色

超市是内存中特定的一个数据结构,也是临界资源(1个交易场所

我们可以用321原则来快速记住这几条,这样就记住了生产消费模型!👍

2.2 以简单代码为例

在旧模式中,main函数调用另外一个函数,想获得返回值,需要等这个函数运行完毕;好比消费者购买东西,需要去找厂家并等待厂家生产……

image-20230103190310130

而在生产消费者模型中,main作为主线程,只需要把待处理的数据丢进缓冲区;而线程B从缓冲区中取出数据,处理完毕后放回缓冲区。main可以先执行其他代码,过一会再过来拿线程B处理好的结果。

这就实现了生产和消费的解耦

image-20230103190405101

2.3 并发

生产消费模型的并发,更多的体现在消费者在处理任务的同时,生产者可以生产任务;

线程切换的成本低于进程,由此便提高了数据处理的效率


接下来我们就要解决下面这些问题😁

1.如何让多个消费者线程等待呢?又如何让消费者线程被唤醒呢?
2.如何让多个生产者线程等待呢?又如何让生产者线程被唤醒呢?
3.如何衡量消费者和生产者所关心的条件是否就绪呢?

而前面提到的通知方式,在linux系统中,就是条件变量了!

3.条件变量接口

3.1 init/destroy

基本的接口和pthread库的其他接口很相似,都是一样的用法;其中attr也是设置条件变量的属性,这里置为nullptr即可

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
                      const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

3.2 pthread_cond_wait

这两个接口都是让线程在一个条件变量下进行等待,其中timewait接口可以设置等待的时间(超时了就不等了)

条件变量也是临界资源,所以这里需要一个mutex锁来保证条件变量读写的原子性

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
                           pthread_mutex_t *restrict mutex,
                           const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
                      pthread_mutex_t *restrict mutex);

3.3 pthrea_cond_signal/broadcast

这个接口的作用是给在条件变量下等待的线程发信号

#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

其中broadcast是给在当前条件变量等待的所有线程发信号,而signal是发送信号,只唤醒一个线程;

如果调用成功,这两个函数都会返回0;否则返回错误码

RETURN VALUE
       If  successful, the pthread_cond_broadcast() and pthread_cond_signal() functions shall return zero; otherwise, an error number shall
       be returned to indicate the error.

3.4 代码示例

下面这个代码可以很好的演示上面提到的多个接口

#include<iostream>
#include<string.h>
#include<signal.h>
#include<pthread.h>
#include<thread>
#include<unistd.h>
#include<sys/types.h>
#include<sys/syscall.h>
using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//锁
pthread_cond_t cond;//条件变量

volatile bool quit = false;

void*func(void* arg)
{
    while(!quit)//这里有bug,后续会提到
    {
        pthread_cond_wait(&cond,&mutex);
        cout << "thread is running... " << (char*)arg << endl;
    }
    cout << "thread quit... " << (char*)arg << endl;
}

int main()
{
    pthread_cond_init(&cond,nullptr);
    pthread_t t1,t2,t3;
    pthread_create(&t1,nullptr,func,(void*)"t1");
    pthread_create(&t2,nullptr,func,(void*)"t2");
    pthread_create(&t3,nullptr,func,(void*)"t3");

    char c;
    while(1)
    {
        cout << "[a/b]$ ";
        cin >> c;
        if(c=='a')
        {
            pthread_cond_signal(&cond);
        }
        else if(c=='b')
        {
            pthread_cond_broadcast(&cond);
        }
        else
        {
            quit = true;
            break;
        }
        usleep(500);
    }

    cout << "main break: " << quit << endl;
    sleep(1);
    pthread_cond_broadcast(&cond);

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
    pthread_join(t3,nullptr);

    return 0;
}

每次输入a,就signal会唤醒一个线程;每次输入b,会调用broadcast,唤醒当前所有线程

[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ a
thread is running... t1
[a/b]$ a
thread is running... t2
[a/b]$ a
thread is running... t3
[a/b]$ b
thread is running... t1
thread is running... t2
thread is running... t3

3.4.1 小bug

上面的代码示例,会出现下面的问题,即我们输入除了a和b以外的所有字符,都应该会把全局变量quit改成true,让三个线程都退出

但观察到的现象却是只有一个线程退出了,其他线程阻塞等待了

[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t3
thread quit... t3

这是因为pthread_cond_wait里面进行了独特的操作,即等待之前,它会释放锁,等待之后,他会重新申请锁

int pthread_cond_wait(pthread_cond_t *restrict cond,
                      pthread_mutex_t *restrict mutex)
{
	pthread_mutex_unlock(mutex);//先解锁
    //避免因为该线程拿着锁去休眠了,导致其他线程无法申请该锁
    
	//条件变量相关代码
	
    pthread_mutex_lock(mutex);//条件满足后,再加锁
}

第一个退出的线程,退出之前申请了锁却没有释放,于是就导致其他线程在条件满足后,没有办法申请锁,只能阻塞等待!

3.4.2 修正

修正的方法很简单,我们只需要在while(!quit)循环的退出条件满足之后,释放一下锁,就OK了!

void*func(void* arg)
{
    while(!quit)
    {
        pthread_cond_wait(&cond,&mutex);
        cout << "thread is running... " << (char*)arg << endl;
        
    }
    pthread_mutex_unlock(&mutex);//正确操作:需要在条件满足后,解锁
    cout << "thread quit... " << (char*)arg << endl;
}

此时就能看到,所有线程都正常退出了!

[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t1
thread quit... t1
thread is running... t2
thread quit... t2
thread is running... t3
thread quit... t3
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$

3.4.3 典型错误

根据这点,就能引出一个比较典型的错误

pthread_mutex_lock(&mutex);
while (condition_is_false) {
    pthread_mutex_unlock(&mutex);
    pthread_cond_wait(&cond);//解锁和加锁的操作,该函数会帮我们完成
    pthread_mutex_lock(&mutex);//二次申请同一把锁,出现死锁!
}
pthread_mutex_unlock(&mutex);

两次申请同一把锁,就好比自己把自己绊倒了😂我们要避免写出这样的错误代码!


4.阻塞队列-生产消费模型实例

这个队列的作用,就是提供一个超市,供生产者和消费者进行数据的交换

  • 生产者,往队列里面push数据
  • 消费者,从队列里面pop数据

看起来有些类似于管道,同样的,生产者和消费者在读取阻塞队列的时候,不仅需要保证自己的操作是原子操作,还需要做到一定的访问控制;即消费者在队列空的时候不能继续pop,生产者在队列满的时候不能继续push

此时,我们还可以引入一个微信公众号,也就是一定的通知方式:不要让生产者、消费者疯狂检测阻塞队列,而是引入条件变量,在队列不为空的时候,通知消费者;在队列不为满的时候,通知生产者;这样就达到了线程之间的同步。

4.1 成员变量

要实现阻塞队列,我们首先需要理清楚需要什么成员变量,来保护该队列

  • 用于访问控制的锁,同一时刻只能有一个线程访问队列
  • 用户线程同步的条件变量,因为我们需要在不同的条件下通知不同的人,所以需要2个条件变量
  • 一个队列,为了方便,采用std::queue,这样就不用自己造轮子了

理清楚了之后,就可以来写成员变量啦;我采用了模板类型,这样阻塞队列就可以用来存放任何我们想要的类型了

template<class T>
class BlockQueue
{
private:
    queue<T> _bq;//队列
    size_t _size;//大小
    pthread_mutex_t _mutex;//锁
    pthread_cond_t _proInf;//通知生产者
    pthread_cond_t _conInf;//通知消费者
public:
    BlockQueue(int sz=5)
        :_size(sz)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_proInf,nullptr);
        pthread_cond_init(&_conInf,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_proInf);
        pthread_cond_destroy(&_conInf);
    }
};

你可能想问,queue不是有封装size吗?哪这里我们还定义一个大小变量,会不会有些多余?

nope!实际上,这里的这个_size 就好比我们在C语言写顺序表的时候,成员capacity;其作用是来判断我们的队列有没有满的。


4.2 push和pop

对于一个队列,最重要的操作就是在队头出数据,队尾入数据

简单说来,就是需要在处理队列数据的时候进行加锁,保证原子性;

除此以外,生产者和消费者有不同的操作逻辑:

  • 生产
    • 判断是否符合生产条件(队列没有满)
    • 满,不生产;不满,生产;
    • 满了的时候,生产者应在条件变量中等待(等待消费者消费)
    • 不满的时候,生产者生产,并通知消费者来消费
  • 消费
    • 判断是否满足消费条件(队列不为空)
    • 空,不消费;不空,消费;
    • 空了的时候,消费者应该在条件变量中等待(等待生产者生产)
    • 不空的时候,消费者消费,并通知生产者继续生产

这样就实现了阻塞队列push和pop的基本逻辑;由此可以写出下面的代码

    //消费者消费
    T pop()
    {
        //加锁
        pthread_mutex_lock(&_mutex);
        //判断条件
        if(_size)//空,不消费
        {
            pthread_cond_wait(&_conInf,&_mutex);
        }

        //消费并通知生产者
        T tmp = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_proInf);

        //解锁
        pthread_mutex_unlock(&_mutex);
        return tmp;
    }
    //生产者生产
    void Push(const T& in)
    {
        //加锁
        pthread_mutex_lock(&_mutex);
        //判断条件
        if(bq.size()>=_size)//满,不生产
        {
            pthread_cond_wait(&_proInf,&_mutex);
        }

        //生产并通知消费者
        _bq.push(in);
        pthread_cond_signal(&_conInf);

        //解锁
        pthread_mutex_unlock(&_mutex);
    }

4.2.1 运行测试

有了这个基本框架,我们就可以来测试一下代码啦!

先来一个生产者和消费者康康吧

#include "blockqueue.hpp"
#include <time.h>
#include <stdlib.h>

void *consume(void *args)
{
    BlockQueue<int> *bqp = (BlockQueue<int> *)args;
    while(1)
    {
        // 消费
        int ret = bqp->pop();
        cout << "consume " << pthread_self() << " 消费:" << ret << endl;
        sleep(1);
    }
}
void *produce(void *args)
{
    BlockQueue<int> *bqp = (BlockQueue<int> *)args;
    while (1)
    {
        // 制作
        cout << "########↓" << endl;
        int a = rand()%100;
        // 投放到超市
        bqp->push(a);
        cout << "produce " << pthread_self() << " 生产:" << a << endl;
        sleep(2);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t t1,t2;
    BlockQueue<int> bq(5);
    pthread_create(&t1,nullptr,produce,(void*)&bq);
    pthread_create(&t2,nullptr,consume,(void*)&bq);

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);

    return 0;
}

可以看到,刚开始消费者并没有运行,而是等待生产者生产出数据了之后,再开始消费!我们的目的成功达成!

[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 139716365768448 生产:41
########↓
produce 139716365768448 生产:11
consume 139716357375744 消费:41
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:11
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:25
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:63
consume 139716357375744 消费:25

如果增加线程到2生产2消费,则会看到下面的情况

[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 140483816179456 生产:51
consume 140483807786752 消费:51
########↓
produce 140483799394048 生产:38
consume 140483791001344 消费:38
########↓
produce 140483816179456 生产:37
consume 140483807786752 消费:37

每次被唤醒的生产者和消费者都是不一样的,交替唤醒

4.2.2 进一步封装

为了代码的可读性,我们可以对阻塞队列进一步封装

#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;

template<class T>
class BlockQueue
{
private:
    queue<T> _bq;//队列
    size_t _size;//大小
    pthread_mutex_t _mutex;//锁
    pthread_cond_t _proInf;//通知生产者
    pthread_cond_t _conInf;//通知消费者
public:
    BlockQueue(int sz=5)
        :_size(sz)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_proInf,nullptr);
        pthread_cond_init(&_conInf,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_proInf);
        pthread_cond_destroy(&_conInf);
    }
    //消费者消费
    T pop()
    {
        //加锁
        lock();
        //判断条件
        if(isEmpty())//空,不消费
        {
            ConWait();
        }

        //消费并通知生产者
        T tmp = _bq.front();
        _bq.pop();
        WakeUpPro();

        //解锁
        unlock();
        return tmp;
    }
    //生产者生产
    void push(const T& in)
    {
        //加锁
        lock();
        //判断条件
        if(isFull())//满,不生产
        {
            ProWait();
        }

        //生产并通知消费者
        _bq.push(in);
        WakeUpCon();

        //解锁
        unlock();
    }
private:
    void lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }
    //唤醒消费者
    void WakeUpCon()
    {
        pthread_cond_signal(&_conInf);
    }
    //唤醒生产者
    void WakeUpPro()
    {
        pthread_cond_signal(&_proInf);
    }
    //消费者等待
    void ConWait()
    {
        pthread_cond_wait(&_conInf,&_mutex);
    }
    //生产者等待
    void ProWait()
    {
        pthread_cond_wait(&_proInf,&_mutex);
    }

    //判断条件
    bool isFull()
    {
        return _size == _bq.size();
    }
    bool isEmpty()
    {
        return _bq.empty();
    }
};

4.2.3 使用task分配运算任务

因为阻塞队列是用模板类型的,我们可以自己实现一个仿函数,来给生产者消费者分配任务

#pragma once

#include <iostream>
using namespace std;

class Task
{
public:
    Task(int one=0, int two=0, char op="+") 
        : _elem1(one), _elem2(two), _operator(op)
    {}
    // 仿函数
    int operator() ()
    {
        return run();
    }
    // 运行仿函数
    int run()
    {
        int result = 0;
        switch (_operator)
        {
        case '+':
            result = _elem1 + _elem2;
            break;
        case '-':
            result = _elem1 - _elem2;
            break;
        case '*':
            result = _elem1 * _elem2;
            break;
        case '/':
        {
            if (_elem2 == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = _elem1 / _elem2;
            }
            break;
        } 
        case '%':
        {
            if (_elem2 == 0)
            {
                cout << "mod zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = _elem1 % _elem2;
            }
            break;
        }
        default:
            cout << "unknown: " << _operator << endl;
            break;
        }
        return result;
    }

    // 获取元素,方便打印
    void get(int *e1, int *e2, char *op)
    {
        *e1 = _elem1;
        *e2 = _elem2;
        *op = _operator;
    }
private:
    int _elem1;
    int _elem2;
    char _operator;
};

测试一下

#include "blockqueue.hpp"
#include "task.hpp"
#include <time.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
string ops ="+-*/%";

void *consumer(void *args)
{
    BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
    while (1)
    {
        Task t = bqp->pop();
        int result = t();    //处理任务
        int one, two;
        char op;
        t.get(&one, &two, &op);
        cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
    }
}
void *producer(void *args)
{
    BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
    while (1)
    {
        // 制作任务
        int one = rand() % 50;
        int two = rand() % 20;
        char op = ops[rand() % ops.size()];
        Task t(one, two, op);
        // 投放给消费者生产
        bqp->push(t);
        cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
        sleep(1);
    }
}

void test2()
{
    pthread_t t1,t2;
    BlockQueue<Task> bq(5);
    pthread_create(&t1,nullptr,producer,(void*)&bq);
    sleep(1);
    pthread_create(&t2,nullptr,consumer,(void*)&bq);

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
}

int main()
{
    srand((unsigned long)time(nullptr));//乘一个数字添加随机性
    test2();

    return 0;
}

运行,可以看到生产者生产了问题之后,消费者会去解答。此时我们只需要在线程中取回运算好的结果,就OK了!

[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
producter[140703422265088] 1673319321 生产了一个任务: 43-16 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 43-16 = 27
producter[140703422265088] 1673319322 生产了一个任务: 45/12 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 45/12 = 3
producter[140703422265088] 1673319323 生产了一个任务: 17/7 = ?
consumer [140703413872384] 1673319323 消费了一个任务: 17/7 = 2
producter[140703422265088] 1673319324 生产了一个任务: 49-14 = ?
consumer [140703413872384] 1673319324 消费了一个任务: 49-14 = 35
producter[140703422265088] 1673319325 生产了一个任务: 4%4 = ?
consumer [140703413872384] 1673319325 消费了一个任务: 4%4 = 0

5.阻塞队列-循环队列

5.0 为啥用循环队列

上面的队列是封装了queue,下面我们要利用环形队列的方式来实现一个功能相同的环形队列。

所谓循环队列,就是在一个一维数组中,通过头尾两个指针,来标识队列的头尾。如果数据数量超出空间末尾,则在空间的开头放入,并移动尾指针

这部分可以看我的博客 循环队列

用上循环队列,就有一个显著的优势:因为我们访问的(假设是数组)是不同下标位置,其并非同一块内存空间,所以是可以同时访问的!这样就进一步显现了生产消费的并发属性

这就相当于把循环队列这个临界资源分成了一小块一小块;只有满/空的时候,头尾指针会指向同一块空间,其余时间都是不冲突的!

注意:这需要程序猿来保证,获取了信号量之后,访问的肯定是临界资源中的不同区域,否则是会出问题的!

5.1 POSIX信号量

这里又要重新认识一下信号量了,在先前的博客中,简单提到了信号量的概念,其本质上是一个计数器

  • p操作:申请资源
  • v操作:归还资源

对信号量的操作是原子的,不会因为线程切换而发生错误和冲突。

循环队列需要有两个标识符,来标识当前数据的头尾。注意,信号量可不能做下标使用,这里我们可以用int类型,加锁来解决原子性问题;


信号量在环形队列中的作用,还是用于标识 空间剩余/数据数量

因为这是一个阻塞队列:

  • 生产者放入数据,对应的是空间信号量,只有有空间的时候,才能往环形队列里面放入
  • 消费者取出数据,对应的是数据信号量,没有数据也就不能取了

此时,先前介绍的semop函数在此环节不太适合,在此介绍两个来自pthread库的新接口;这些接口在编译的时候都需要带上-lpthread

5.1.1 sem_init/destroy

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

int sem_destroy(sem_t *sem);

因为都是pthread库的,其使用方法还是很相似的,我们需要对信号量进行初始化,并给定一个value作为信号量的初始值;

init函数的pshared参数做一定讲解,在man手册中的介绍是这样的

image-20230117123549827

如果我们的信号量需要在线程中共享,那就将该参数设置为0;

如果信号量需要在进程中共享,其就应该处在共享内存区域,可以对添加了共享内存的进程之间共享;

阻塞队列是给线程使用的,所以我们设置成0就可以了。

5.1.2 sem_wait

这个接口和锁/条件变量的wait是一样的,其作用是申请一个信号量,如果信号量为0,则在此处等待,直到信号量非0

NAME
       sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS
       #include <semaphore.h>
       int sem_wait(sem_t *sem);
       int sem_trywait(sem_t *sem);
       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

       Link with -pthread.

可以简单理解为,这个接口的作用就是让信号量-1

trywait则是非阻塞检测;timewait是设置等待的时间,超时就不等了。这点和条件变量的接口是一样的

5.1.3 sem_post

这个接口的作用就是释放一个信号量,相当于给这个信号量+1

NAME
       sem_post - unlock a semaphore

SYNOPSIS
       #include <semaphore.h>
       int sem_post(sem_t *sem);

5.1.4 在循环队列里面使用

知道了这几个接口,对于我们环形队列的使用就比较明了了

  • 当生产者开始生产的时候,申请一个空间信号量(剩余空间-1)生产完毕后,释放一个数量信号量(剩余数量+1)
  • 当消费者开始消费的时候,申请一个数量信号量(剩余数量-1)生产完毕后,释放一个空间信号量(剩余空间+1)

除了这两个信号量,生产消费的时候,还需要操作头尾指针,指向队列正确的位置

5.2 成员变量

前面铺垫了那么多,现在就可以来试试水了!

    vector<T> _rq;//队列
    sem_t _spaceSem;//空间信号量
    sem_t _valueSem;//数据信号量
    pthread_mutex_t _proMutex;//生产者的锁
    pthread_mutex_t _conMutex;//消费者的锁

    size_t _rear;//尾指针
    size_t _front;//头指针

因为信号量就可以充当条件变量的角色,所以这里就不需要条件变量来通知生产者/消费者了

5.3 构造/析构

接下来要做的,就是写构造/析构了

    RingQueue(int capa = 5)
        :_rq(capa),
        _rear(0),
        _front(0)
    {
        sem_init(&_spaceSem,0,capa);//空间
        sem_init(&_valueSem,0,0);//数量为0

        pthread_mutex_init(&_proMutex,nullptr);
        pthread_mutex_init(&_conMutex,nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_valueSem);

        pthread_mutex_destroy(&_proMutex);
        pthread_mutex_destroy(&_conMutex);
    }

5.4 生产消费

首先我们需要知道,当队列满/空的时候,分别对应啥情况

  • 满:生产太多了,生产者得休息
  • 空:消费太多了,消费者得缓缓

因为有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空:

  • 满的时候:数量=队列容量,空间信号量=0,无法申请空间,无法生产
  • 此时生产者会在空间信号量里面等待,不会继续生产;消费者继续消费
  • 空的时候:空间=队列容量,数量信号量=0,没有可以消费的
  • 此时消费者会在数量信号量里面等待,不会继续消费;生产者继续生产

这也是信号量作为访问控制的一大特征,当你申请成功了,就代表你肯定有临界资源的访问权限了;再加上我们给访问临界区加了锁,自然也不会出现被其他线程抢了的情况🎉

    //生产者
    void Push(T& in)
    {
        sem_wait(&_spaceSem);//获取空间
        pthread_mutex_lock(&_proMutex);

        _rq[_rear] = in; //生产
        _rear++;         // 写入位置后移
        _rear %= _rq.size(); // 更新下标,避免越界

        pthread_mutex_unlock(&_proMutex);
        sem_post(&_valueSem);//释放数量
    }
    //消费
    T pop()
    {
        sem_wait(&_valueSem);
        pthread_mutex_lock(&_conMutex);

        T tmp = _rq[_front];
        _front++;
        _front %= _rq.size();// 更新下标,保证环形特征

        pthread_mutex_unlock(&_conMutex);
        sem_post(&_spaceSem);

        return tmp;
    }

5.5 测试

string ops ="+-*/%";

void *consumer(void *args)
{
    RingQueue<Task> *bqp = (RingQueue<Task> *)args;
    while (1)
    {
        Task t = bqp->pop();
        int result = t();    //处理任务
        int one, two;
        char op;
        t.get(&one, &two, &op);
        cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
    }
}
void *producer(void *args)
{
    RingQueue<Task> *bqp = (RingQueue<Task> *)args;
    while (1)
    {
        // 制作任务
        int one = rand() % 50;
        int two = rand() % 20;
        char op = ops[rand() % ops.size()];
        Task t(one, two, op);
        // 投放给消费者生产
        bqp->push(t);
        cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
        sleep(1);
    }
}

void test2()
{
    pthread_t t1,t2;
    RingQueue<Task> bq(5);
    pthread_create(&t1,nullptr,producer,(void*)&bq);
    sleep(1);
    pthread_create(&t2,nullptr,consumer,(void*)&bq);

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
}

int main()
{
    srand((unsigned long)time(nullptr));//乘一个数字添加随机性
    test2();

    return 0;
}

刚开始运行的时候,发现了这个错误

image-20230117141917662

这是因为在task中,我们重写了构造函数,编译器就不会生成默认构造了

    Task(int one, int two, char op) 
        : _elem1(one), _elem2(two), _operator(op)
    {}

而创建环形队列的时候,会自动调用构造函数。此时发现没有匹配的构造函数(无参),就会报错!

解决办法是,添加上一个无参构造,直接使用default关键字即可

    Task() = default;//使用系统默认生成的无参构造

运行,可以看到生产消费稳定跑起来了!

[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[139758380586752] 1673936332 生产了一个任务: 24-8 = ?
producter[139758380586752] 1673936333 生产了一个任务: 34/19 = ?
consumer [139758372194048] 1673936333 消费了一个任务: 24-8 = 16
consumer [139758372194048] 1673936333 消费了一个任务: 34/19 = 1
producter[139758380586752] 1673936334 生产了一个任务: 18*16 = ?
consumer [139758372194048] 1673936334 消费了一个任务: 18*16 = 288
producter[139758380586752] 1673936335 生产了一个任务: 48+15 = ?
consumer [139758372194048] 1673936335 消费了一个任务: 48+15 = 63

增多线程,也能正常运行!

[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[140344921630464] 1673936659 生产了一个任务: 44-15 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 44-15 = 29
producter[140344921630464] 1673936660 生产了一个任务: 33+14 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 33+14 = 47
producter[140344904845056] 1673936661 生产了一个任务: 14/12 = ?
consumer [140344896452352] 1673936661 消费了一个任务: 14/12 = 1
producter[140344921630464] 1673936661 生产了一个任务: 24-16 = ?
consumer [140344913237760] 1673936661 消费了一个任务: 24-16 = 8
producter[140344904845056] 1673936662 生产了一个任务: 3-3 = ?
consumer [140344896452352] 1673936662 消费了一个任务: 3-3 = 0
producter[140344921630464] 1673936662 生产了一个任务: 36*7 = ?
consumer [140344913237760] 1673936662 消费了一个任务: 36*7 = 252
producter[140344904845056] 1673936663 生产了一个任务: 28+8 = ?
consumer [140344896452352] 1673936663 消费了一个任务: 28+8 = 36
producter[140344921630464] 1673936663 生产了一个任务: 26-5 = ?
consumer [140344913237760] 1673936663 消费了一个任务: 26-5 = 21

结语

关于线程同步的知识点,大概就是这些了。博客写的满满当当,在理解了接口的基本命名和使用逻辑后,感觉就没有那么难了

加油哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python数据可视化(三)绘制统计图形大全

3.1 柱状图以 Python 代码的形式讲解柱状图的绘制原理&#xff0c;这里重点讲解 bar()函数的使用方法。代码&#xff1a;import matplotlib as mpl import matplotlib.pyplot as plt mpl.rcParams["font.sans-serif"]["SimHei"] mpl.rcParams["axes.u…

JavaScript Hashmap散列算法

文章目录前言一、什么是散列表二、为何使用散列算法三、实现散列算法1.字典结构2.散列函数3.put 设置/更新4. 获取值四、使用HashMap处理冲突1.分离链接2.线性探查总结前言 一、什么是散列表 散列表是字典(Dictionary)的一种实现. 集合以[值, 值]形式存储, 字典则以[键, 值]对…

不让袁树雄上春晚,导演于蕾是真英明

自从央视兔年春晚结束后&#xff0c;互联网上面就出现各种吐槽声音&#xff0c;尤其是关于热歌《早安隆回》的话题。说起歌曲《早安隆回》&#xff0c;这是2022年最火的歌曲之一&#xff0c;民间的呼声也一直很高&#xff0c;都希望能够登上央视春晚舞台。 不过随着央视春晚的结…

在甲骨文云容器实例(Container Instances)上部署Alist

在甲骨文云容器实例上部署Oracle Linux 8 Desktop加强版创建容器实例查看容器实例的公共 IP 地址查看密码使用Alist甲骨文云推出了容器实例&#xff0c;这是一项无服务器计算服务&#xff0c;可以即时运行容器&#xff0c;而无需管理任何服务器。今天我们尝试一下通过容器实例部…

数据结构与算法(一)(Python版)

python基础知识整理二 文章目录算法分析运行时间检测大O表示法“变位词”判断问题解法一: O(logn)解法二&#xff1a;暴力法 O(n!)解法三 O(n)Python数据类型的性能示例判断是否是素数求素数个数基本结构——线性结构栈抽象数据类型以及Python实现用Python实现抽象数据结构栈栈…

HQChart实战教程58-K线主图仿tradingview

HQChart实战教程58-K线主图仿tradingview 需求效果图实现思路步骤1. 写透明成交量柱子指标2. 调整成交量柱子和K线图显示区域HQChart插件源码地址完整的demo例子需求 主图K线图和成交量柱子图在一个同窗口显示,柱子图高度为主图窗口高度的1/4,并且成交量柱子图使用透明色 效…

【论文翻译】Deep High-Resolution Representation Learningfor Visual Recognition

目录 摘要 介绍 2.相关工作 3 HIGH-RESOLUTION NETWORKS 3.1 并行多分辨率卷积 3.2 重复的多分辨率融合融合模块 3.3 表示头 3.4实例化 3.5分析 4 人体姿态估计 5语义分割 8.总结 摘要 高分辨率表示对于位置敏感的视觉问题是必不可少的&#xff0c;例如人体姿势估计…

垃圾收集器必问系列—G1

本文已收录至Github&#xff0c;推荐阅读 &#x1f449; Java随想录 人生下来不是为了拖着锁链&#xff0c;而是为了展开双翼。——雨果 文章目录基于Region的堆内存布局可预测的停顿时间模型跨Region引用对象对象引用关系改变运作过程CMS VS G1相关参数Garbage First&#xff…

重要的数据表

重要的数据表目录概述需求&#xff1a;设计思路实现思路分析1.-- 组织结构数据库.参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for change,challeng…

51单片机学习笔记-6串口通信

6 串口通信 [toc] 注&#xff1a;笔记主要参考B站江科大自化协教学视频“51单片机入门教程-2020版 程序全程纯手打 从零开始入门”。 6.1 串口通信原理 串口是一种应用十分广泛的通讯接口&#xff0c;串口成本低、容易使用、通信线路简单&#xff0c;可实现两个设备的互相通…

层级选择器

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>层级选择器</title> <style type"text/css"> /*需求&#xff1a;需要选中前三个段落标签*/ /*下面两个选择器之间加…

JavaScript 发布订阅者模式和观察者模式及区别

一、发布订阅模式 发布订阅模式其实是一种对象间一对多的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都将得到状态改变的通知。 多方订阅&#xff0c;一方发布&#xff0c;订阅放会收到通知 举例&#xff1a;教学楼中每个教室都有一个广…

Iterator_fail-fast和Iterator_fail-safe~

初识fail-fast&#xff1a; fail-fast 机制是java集合(Collection)中的一种错误机制&#xff0c;当多个线程对同一个集合的内容进行操作时&#xff0c;就可能会产生fail-fast事件&#xff0c;它只是一种错误检测机制&#xff0c;只能被用来检测错误&#xff0c;因为JDK并不保证…

蓝桥杯2022Python组

蓝桥杯2022Python组 1.排列字母 用一个sorted就好了&#xff0c;没啥好说的 s WHERETHEREISAWILLTHEREISAWAY s sorted(s) # 变成列表形式了 print(.join(s))2.寻找整数 这题我刚开始以为答案只能是11和17的倍数&#xff0c;就在他们的倍数里面找&#xff0c;发现不对&…

STL——STL简介、STL六大组件

一、STL是什么 STL(standard template library)&#xff1a; C标准模板库&#xff0c;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;还是一个包罗数据结构与算法的软件框架。 通俗来讲&#xff1a; STL就是将常见的数据结构&#xff08;顺序表、…

Superset权限管理

文章目录1.Superset角色1&#xff09;Admin2&#xff09;Alpha3&#xff09;Gamma4&#xff09;Sql_lab5&#xff09;Public2.实操自定义授权1&#xff09;权限集2&#xff09;实操1.Superset角色 Superset的默认角色有&#xff1a;Admin、Alpha、Gamma、sql_lab、Public 1&a…

【信息资源建设】

考试分为单选、判断改错、名词解释、简答、综合论述 1.1 如果按价值观念划分&#xff0c;则可将信息分为有用信息和无用信息 信息的特性&#xff1a;普遍性、客观性、时效性、传递性、共享性、变换性、转化性、可伪性。 1.1.2 &#xff08;必考&#xff09;OECD的知识分类…

【docker概念和实践 4】(4) 本地镜像提交到本地仓库

一、说明 registry是一个镜像&#xff0c;该镜像专门生成镜像仓库的容器&#xff0c;registry是基于http协议&#xff0c;那就是说&#xff0c;在单机、局域网、或者互联网上都可以建立registry数据仓库&#xff0c;存放自己构建的镜像。本篇专门介绍如何在本地单机上建立容器仓…

Python中的集合(set and frozenset)语法汇总

集合的基本语法知识目前有两种集合类型&#xff1a;set和frozenset。可变集合&#xff1a;set()set类型是可变的&#xff0c; 其内容可以使用 add() 和 remove() 这样的方法来改变&#xff0c;因为是可变的&#xff0c;所以没有哈希值&#xff0c;且不能被用作字典的键或其它集…

java线上项目排查,Arthas简单上手

Arthas 是Alibaba开源的Java诊断工具。参考&#xff1a;Arthas 用户文档 — Arthas 3.5.4 文档 当你遇到以下类似问题而束手无策时&#xff0c;Arthas可以帮助你解决&#xff1a; 这个类从哪个 jar 包加载的&#xff1f;为什么会报各种类相关的 Exception&#xff1f;我改的代…