Linux知识点 -- Linux多线程(三)

news2024/11/20 13:40:11

Linux知识点 – Linux多线程(三)

文章目录

  • Linux知识点 -- Linux多线程(三)
  • 一、线程同步
    • 1.概念理解
    • 2.条件变量
    • 3.使用条件变量进行线程同步
  • 二、生产者消费者模型
    • 1.概念
    • 2.基于BlockingQueue的生产者消费者模型
    • 3.单生产者单消费者模型
    • 4.多生产者多消费者模型
    • 5.锁的封装
  • 三、POSIX信号量
    • 1.信号量的概念与使用
    • 2.信号量的使用场景
    • 3.信号量接口
    • 4.基于环形队列的生产消费模型
    • 5.基于环形队列的生产消费模型实现
    • 6.信号量的意义


一、线程同步

1.概念理解

持有锁的线程会频繁进入临界区申请临界资源,造成其他进程饥饿的问题;
这本身是没有错的,但是不合理;
线程同步:就是线程按照一定的顺序,进行临界资源的访问;主要就是为了解决访问临界资源和理性的问题;在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题;

2.条件变量

  • 当我们申请临界资源前,需要先做临界资源是否存在的检测,做检测的本质也是访问临界资源;

  • 对临界资源的检测,也一定是要在加锁和解锁之间的;

  • 常规的方式检测条件是否就绪,注定了我们必须要频繁申请和释放锁,我们可以使用条件变量来完成检测:
    (1)资源未就绪的时候,不要让线程再频繁检测,让线程等待;
    (2)当条件就绪时,通知对应的线程,让其进行资源的申请和访问;

  • 初始化条件变量:
    在这里插入图片描述

  • 条件不满足时,等待:
    在这里插入图片描述

  • 条件满足时,发通知:
    在这里插入图片描述
    broadcast是将等待的线程全部唤醒;
    signal是将特定的线程唤醒;

注:pthread库返回值都是成功返回0,失败返回错误码;

3.使用条件变量进行线程同步

按照一定顺序控制线程:

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <pthread.h>

using namespace std;

#define TNUM 4//共四个线程
typedef void (*func_t) (const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);//定义函数指针

class ThreadData
{
public:
    ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
        : _name(name)
        , _func(func)
        , _pmtx(pmtx)
        , _pcond(pcond)
    {}
public:
    string _name;//线程名
    func_t _func;//线程回调的函数
    pthread_mutex_t* _pmtx;//锁
    pthread_cond_t* _pcond;//条件变量
};

void func1(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(true)
    {
        pthread_cond_wait(pcond, pmtx);//默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞
                                        //阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒
        cout << name << "running -- 播放" << endl;
    }
}

void func2(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(true)
    {
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 下载" << endl;
    }
}

void func3(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(true)
    {
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 刷新" << endl;
    }
}

void func4(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(true)
    {
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 扫描" << endl;
    }
}

//每一个线程都进入Entry接口,在entry接口内调用自己的函数
void* Entry(void* args)
{
    ThreadData* td = (ThreadData*)args;//td在每一个线程自己私有的栈空间中保存
    td->_func(td->_name, td->_pmtx, td->_pcond);//这是一个函数,调用完就返回这里
    delete td;//需要在td使用完后进行销毁
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx;    //锁
    pthread_cond_t cond;    //条件变量
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};
    for(int i = 0; i < TNUM; i++)
    {
        string name = "Thread ";
        name += to_string(i + 1);
        ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void*)td);
    }

    sleep(5);//主线程sleep,新线程创建出来都在wait

    while(true)
    {
        cout << "resume thread run code ..." << endl;
        pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了
        sleep(1);
    }

    for(int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        cout << "thread: " << tids[i] << "quit" << endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

上面的代码创建了局部的锁和条件变量,创建了四个新线程,将线程名、回调的函数地址、锁和条件变量的地址都放进了一个类对象中;
在创建线程的函数中,每个线程都调用的是一个Entry入口函数,在Entry接口内调用自己的函数;
在线程执行的函数中,默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞;阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒;
当主线程执行到pthread_cond_signal函数时,会唤醒在指定条件变量下等待的线程;不用指定线程,因为wait的时候线程已经在队列中排队了;

运行结果:
在这里插入图片描述
主线程在等待了5s后,开始调用新线程执行任务,并且新线程是按照一定的顺序被唤醒的;

如果使用pthread_cond_broadcast接口一次唤醒一批线程:

int main()
{
    pthread_mutex_t mtx;    //锁
    pthread_cond_t cond;    //条件变量
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};
    for(int i = 0; i < TNUM; i++)
    {
        string name = "Thread ";
        name += to_string(i + 1);
        ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void*)td);
    }

    sleep(5);//主线程sleep,新线程创建出来都在wait

    while(true)
    {
        cout << "resume thread run code ..." << endl;
        //pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了
        pthread_cond_broadcast(&cond);//一次唤醒一批线程
        sleep(1);
    }

    for(int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        cout << "thread: " << tids[i] << "quit" << endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

运行结果:
在这里插入图片描述
等待队列中所有的线程被一次全部唤醒;

回调函数临界区加入加锁和解锁:
wait一定要在加锁和解锁之间进行;
加入了quit标志位,任务执行完后线程退出;
在这里插入图片描述

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <pthread.h>

using namespace std;

#define TNUM 4//共四个线程
typedef void (*func_t) (const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);//定义函数指针
volatile bool quit = false;//加入quit标志位

class ThreadData
{
public:
    ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
        : _name(name)
        , _func(func)
        , _pmtx(pmtx)
        , _pcond(pcond)
    {}
public:
    string _name;//线程名
    func_t _func;//线程回调的函数
    pthread_mutex_t* _pmtx;//锁
    pthread_cond_t* _pcond;//条件变量
};

void func1(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(!quit)//加入退出判断
    {
        //wait一定要在加锁和解锁之间进行
        pthread_mutex_lock(pmtx);
        //if(临界资源未就绪) 等待
        pthread_cond_wait(pcond, pmtx);//默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞
                                        //阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒
        cout << name << "running -- 播放" << endl;
        pthread_mutex_unlock(pmtx);
    }
}

void func2(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 下载" << endl;
        pthread_mutex_unlock(pmtx);
    }
}

void func3(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 刷新" << endl;
        pthread_mutex_unlock(pmtx);
    }
}

void func4(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx);
        cout << name << "running -- 扫描" << endl;
        pthread_mutex_unlock(pmtx);
    }
}

//每一个线程都进入Entry接口,在entry接口内调用自己的函数
void* Entry(void* args)
{
    ThreadData* td = (ThreadData*)args;//td在每一个线程自己私有的栈空间中保存
    td->_func(td->_name, td->_pmtx, td->_pcond);//这是一个函数,调用完就返回这里
    delete td;//需要在td使用完后进行销毁
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx;    //锁
    pthread_cond_t cond;    //条件变量
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};
    for(int i = 0; i < TNUM; i++)
    {
        string name = "Thread ";
        name += to_string(i + 1);
        ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void*)td);
    }

    sleep(5);//主线程sleep,新线程创建出来都在wait

    int cnt = 10;
    while(cnt)
    {
        cout << "resume thread run code ..."  << cnt-- << endl;
        pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了
        //pthread_cond_broadcast(&cond);//一次唤醒一批线程
        sleep(1);
    }

    cout << "control done" << endl;
    quit = true;
    pthread_cond_broadcast(&cond);//再唤醒一下线程,让其检测quit信号

    for(int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        cout << "thread: " << tids[i] << "quit" << endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

运行结果:
在这里插入图片描述

二、生产者消费者模型

1.概念

生产者消费者模型就是一种多线程运作的模型,就像超市一样,生产者生产了商品运送到超市售卖,而消费者从超市里购买商品;
在这里插入图片描述
其中,生产者和消费者都是给线程进行了角色化,不同的线程执行不同的职能,超市则是一个数据的缓冲区,商品就是数据;

  • 3种关系:
    生产者和生产者:竞争、互斥的关系;
    消费者和消费者:竞争、互斥的关系;
    生产者和消费者:互斥和同步的关系;
  • 2种角色:
    生产者、消费者;
  • 一个交易场所:
    超市;

这个模型能够让生产者和消费者线程之间实现解耦,提高效率;
当生产者生产了商品,就能够给消费者同步信息,唤醒消费者线程;
当消费者消费之后,就能给生产者同步信息,唤醒生产者线程,继续生产;
可以让生产者和消费者线程互相同步;
在逻辑层面上解耦消费者和生产者,能够提高效率
重点是给线程赋予了角色;
需要消除生产中的状态,避免数据不一致;

  • 生产过程:
    生产者将商品生产出来放到仓库,消费者从仓库取走商品;
    生产和消费的过程不仅于此,生产者生产数据,消费者使用数据都需要花时间;

2.基于BlockingQueue的生产者消费者模型

  • BlockingQueue:阻塞队列
    当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
    当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出;

    在这里插入图片描述

3.单生产者单消费者模型

BlockQueue.hpp:

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>

using namespace std;

const int gDefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }

    void push(const T &in) // 生产者放数据
    {
        pthread_mutex_lock(&_mtx);
        // 1.先检测当前的临界资源是否满足访问条件
        // pthread_cond_wait是在临界区中的,此时进程是持有锁的,如果去等待了,锁怎么办?
        // pthread_cond_wait第二个参数是一个锁,当此进程成功挂起后,传入的锁,会被自动释放
        // 当此进程被唤醒时,从哪里阻塞的,就从那里唤醒,被唤醒的时候,此进程还是在临界区内部的
        // 当被唤醒的时候,pthread_cond_wait会帮助此线程获取锁

        // pthread_cond_wait:只要是一个函数,就有可能调用失败,也有可能存在伪唤醒的情况
        // 因此条件变量的使用规范:使用while循环持续进行条件检测
        // 这样在访问临界资源时,就能100%确定资源是就绪的
        while (isQueueFull())
        {
            pthread_cond_wait(&_Full, &_mtx);
        }

        // 2.访问临界资源
        _bq.push(in);

        // 加入控制策略:当队列中数据量过半后,才唤醒消费者线程
        if (_bq.size() >= _capacity / 2)
        {
            pthread_cond_signal(&_Empty); // 生产者放了数据后,就唤起消费者线程,通知其消费
        }
        pthread_mutex_unlock(&_mtx);
        // 发信号在解锁之前和之后都是可以的
    }

    void pop(T *out)
    {
        pthread_mutex_lock(&_mtx);

        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }
        *out = _bq.front();
        _bq.pop();
        pthread_mutex_unlock(&_mtx);
        pthread_cond_signal(&_Full); // 消费者取走数据后,就唤起生产者线程,通知其生产
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

private:
    queue<T> _bq;          // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 同来表示bq 是否为空的条件
    pthread_cond_t _Full;  // 同来表示bq 是否为满的条件
};
  • 生产者生产数据的流程:
    1.先检测当前的临界资源是否满足访问条件;
    2.访问临界资源;

  • pthread_cond_wait是在临界区中的,此时进程是持有锁的,如果去等待了,锁怎么办?
    pthread_cond_wait第二个参数是一个锁,当此进程成功挂起后,传入的锁,会被自动释放;

  • 当此进程被唤醒时,从哪里阻塞的,就从哪里唤醒,被唤醒的时候,此进程还是在临界区内部的;
    当被唤醒的时候,pthread_cond_wait会帮助此线程获取锁;

  • pthread_cond_wait:只要是一个函数,就有可能调用失败,也有可能存在伪唤醒的情况;
    因此条件变量的使用规范:使用while循环持续进行条件检测;
    这样在访问临界资源时,就能100%确定资源是就绪的;

ConPod.cc:

#include"BlockQueue.hpp"

using namespace std;

void* consumer(void* args)
{
    BlockQueue<int>* bqueue = (BlockQueue<int>*)args;
    while(true)
    {
        int a;
        bqueue->pop(&a);
        cout << "消费一个数据:" << a << endl;
        sleep(1);
    }

    return nullptr;
}

void* productor(void* args)
{
    BlockQueue<int>* bqueue = (BlockQueue<int>*)args;
    int a = 1;
    while(true)
    {
        bqueue->push(a++);
        cout << "生产一个数据:" << a << endl;
    }

    return nullptr;
}


int main()
{
    BlockQueue<int>* bqueue = new BlockQueue<int>();

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bqueue);
    pthread_create(&p, nullptr, productor, bqueue);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bqueue;

    return 0;
}

运行结果:在这里插入图片描述

  • 注:效率高:在于利用缓冲区,提高了生产和消费线程的并发度;

4.多生产者多消费者模型

Task.hpp:

#pragma once

#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

class Task
{

public:
    Task(){}
    Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
    {}
    int operator ()()
    {
        return func_(x_, y_);
    }
public:
    int x_;
    int y_;
    func_t func_;
};

封装一个Task类,队列中存储这个类,类中能够调用回调函数;

BlockQueue.hpp:(同上)

ConPod.cc:

#include"BlockQueue.hpp"
#include"Task.hpp"
#include <ctime>
using namespace std;

int myAdd(int x, int y)
{
    return x + y;
}

void* consumer(void* args)
{
    BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;
    while(true)
    {
        //获取任务
        Task t;
        bqueue->pop(&t);
        //完成任务
        cout << pthread_self() << "consumer: " << t.x_ << "+" << t.y_ << "=" << t() <<  endl;
        sleep(1);
    }

    return nullptr;
}

void* productor(void* args)
{
    BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;
    int a = 1;
    while(true)
    {
        //制作任务
        int x = rand() % 10 + 1;
        usleep(rand()%1000);
        int y = rand() % 5 + 1;
        Task t(x, y, myAdd);
        //生产任务
        bqueue->push(t);
        cout << pthread_self() << "productor: " << t.x_ << "+" << t.y_ << "=?" << endl;
        sleep(1);
    }

    return nullptr;
}


int main()
{
    srand((uint64_t)time(nullptr) ^ getpid());
    BlockQueue<Task>* bqueue = new BlockQueue<Task>();

    pthread_t c[2], p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, consumer, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;

    return 0;
}

结果:
在这里插入图片描述

  • 注:多生产和多消费模型中,生产者线程生产商品可以是并发的,但是向仓库中输送商品的行为是互斥的;
    同理,消费者线程获取商品的行为是互斥的,但是处理任务的行为可以是并发的;

5.锁的封装

lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx)
        : _pmtx(mtx)
    {
    }

    void lock()
    {
        pthread_mutex_lock(_pmtx);
    }

    void unlock()
    {
        pthread_mutex_unlock(_pmtx);
    }

    ~Mutex()
    {}

private:
    pthread_mutex_t *_pmtx;
};


class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx)
        : _mtx(mtx)
    {
        _mtx.lock();
    }

    ~lockGuard()
    {
        _mtx.unlock();
    }
private:
    Mutex _mtx;
};
  • RAII风格的加锁方式:
    这里面的两个类成员中并没有真实的锁,只是传入锁的地址来进行对象的构造,进而在构造的时候就进行加锁操作,在对象析构的时候自动进行解锁;

BlockQueue.hpp:

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include"lockGuard.hpp"

using namespace std;

const int gDefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }

    void push(const T &in) // 生产者放数据
    {
        lockGuard lockguard(&_mtx);//自动调用构造函数,加锁

        while (isQueueFull())
        {
            pthread_cond_wait(&_Full, &_mtx);
        }
        
        _bq.push(in);

        if (_bq.size() >= _capacity / 2)
        {
            pthread_cond_signal(&_Empty);}
        //自动调用析构函数,解锁
    }

    void pop(T *out)
    {
        lockGuard lockguard(&_mtx);//自动调用构造函数,加锁

        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }
        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_Full); 
        //自动调用析构函数,解锁
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

private:
    queue<T> _bq;          // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 同来表示bq 是否为空的条件
    pthread_cond_t _Full;  // 同来表示bq 是否为满的条件
};

构造lockGuard对象的时候,就已经加锁完成了;
析构的时候,自动解锁;

三、POSIX信号量

1.信号量的概念与使用

共享资源:任何一个时刻只有一个执行流在进行访问,共享资源是被当作整体使用的,执行流之间都是互斥的;
如果一个共享资源不被当做一个整体,而让不同的执行流访问不同的区域,就可以多执行流并发访问了,不同执行流只有在访问同一个区域的时候才需要进行互斥;
当前共享资源中还有多少份资源,特定的执行流使用可以是否可以得到一个共享资源,这些都可以通过信号量来实现;

  • 信号量的本质,就是一个计数器;
    访问临界资源的时候,必须先申请信号量资源(sem- -,预定资源,P操作),使用完毕信号量资源(sem++, 释放资源,V操作);

2.信号量的使用场景

(1)有共享资源;
(2)共享资源可以被局部性访问;
(3)需要对局部性资源的数量进行描述;

3.信号量接口

  • 信号量初始化:
    在这里插入图片描述
    参数:
    sem:信号量对象
    pshared:是否共享
    value:初始默认值(计数器的值)

  • 申请信号量(P操作)
    在这里插入图片描述
    wait是会默认阻塞一个进程,直到申请到信号量,将信号量–;

  • 释放信号量(V操作)
    在这里插入图片描述
    将信号量++;

4.基于环形队列的生产消费模型

  • 使用数组实现环形队列:
    在这里插入图片描述
    在这里插入图片描述
    (1)当下标走到数组尾部的时候,下一个下标是数组头部,为了实现这一点,每次下标变动时,下标值都需要 %= n;
    (2)环形队列的逻辑结构是环形的,物理结构是数组;
    (3)生产消费模型需要两个下标,一个生产者,一个消费者;
    (4)两个下标重合的时候,队列既有可能是空的,也有可能是满的;
    判空/判满的方法:1.计数器;2.专门浪费一个位置;
    (5)当生产者和消费者指向同一个位置时,线程之间具有互斥同步的关系;
    当生产者和消费者指向不同位置时,让他们并发执行;

    (6)生产者不能将消费者套圈;
    消费者不能超过生产者;
    队列为空,一定要先让生产者运行;
    队列为满,一定要先让消费者运行;
    (7)生产者最关心的是空间资源 -> spaceSem 剩余空间信号量,初值为N;
    消费者最关心的是数据资源 -> dataSem 剩余数据信号量,初值为0;
    (8)当生产者生产了一个数据后,空间资源被占用,但是数据资源多了一个;
    (9)生产者生产资源前,要先申请空间信号量(spaceSem - -),之后在特定位置生产资源,生产完成后,释放数据信号量(dataSem++);
    消费者消费资源前,要先申请数据信号量(dataSem - -),之后消费特定位置的资源,消费完成后,释放空间信号量(spaceSem++);

    (10)当生产线程申请信号量失败,证明空间已满,进程就会被挂起;

5.基于环形队列的生产消费模型实现

  • 多生产多消费模型的意义:
    并不是将任务或者数据放在交易场所或者取出就是生产和消费,生产数据或任务和拿到数据或任务之后的处理,才是生产和消费,这才是最耗时的;
    多生产多消费模型的意义在于能够并发处理任务;
    生产的本质:将私有的任务或数据,放到公共空间中;
    消费的本质:将公共空间中的任务或数据,拿到并私有;

sem.hpp
信号量的封装,初始化对象时,就调用构造进行信号量的初始化;
对象销毁时,就自动调用析构,销毁信号量 ;

#ifndef _SEM_HPP_
#define _SEM_HPP_

#include<iostream>
#include<semaphore.h>

class Sem
{
public:
    Sem(int val)
    {
        sem_init(&_sem, 0, val);
    }

    void p()
    {
        sem_wait(&_sem);
    }

    void v()
    {
        sem_post(&_sem);
    }

    ~Sem()
    {
        sem_destroy(&_sem);
    }
private:
    sem_t _sem;
};

#endif

ringQueue.hpp

  • 单生产者和单消费者在队列为空或为满的时候,需要进行信号量的申请,因此信号量自动就形成了两者的互斥关系,一定会有一方竞争失败;
  • 如果改成多生产多消费模型,就会有生产者之间和消费者之间的关系,因此需要两把锁,生产和消费各一把;
  • 多生产:当一个生产者线程访问一个下标时,加锁,其他线程来访问时就需要等待;
  • 加锁和申请信号量的先后:信号量一定是安全的,具有原子性,资源是要配发给线程的,资源配发的越快,运行效率越高,因此先申请信号量,再加锁,加锁区域的粒度越小越好;
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include "sem.hpp"

const int g_default_num = 5;

using namespace std;

template <class T>
class RingQueue
{
public:
    RingQueue(int default_num = g_default_num)
        : _ring_queue(default_num)
        , _num(default_num)
        , _c_step(0)
        , _p_step(0)
        , _space_sem(default_num)
        , _data_sem(0)
    {
        pthread_mutex_init(&_clock, nullptr);
        pthread_mutex_init(&_plock, nullptr);
    }

    ~RingQueue()
    {
        pthread_mutex_destroy(&_clock);
        pthread_mutex_destroy(&_plock);
    }

    // 生产者:空间资源,生产者们的临界资源是下标
    // 加锁和申请信号量的先后:信号量一定是安全的,具有原子性,
    // 资源是要配发给线程的,资源配发的越快,运行效率越高,因此先申请信号量,再加锁
    // 加锁的粒度越小越好
    void push(const T &in)
    {
        // 先申请空间信号量
        _space_sem.p();
        // 多生产进程访问时,当一个生产者线程访问一个下标时,加锁,其他线程来访问时就需要等待
        pthread_mutex_lock(&_plock);
        // 成功竞争到锁的线程继续执行下面操作
        // 放入数据
        _ring_queue[_p_step++] = in;
        _p_step %= _num;
        // 生产完后,解锁
        pthread_mutex_unlock(&_plock);
        // 释放数据信号量
        _data_sem.v();
    }

    void pop(T *out)
    {
        _data_sem.p();
        pthread_mutex_lock(&_clock);
        *out = _ring_queue[_c_step++];
        _c_step %= _num;
        pthread_mutex_unlock(&_clock);
        _space_sem.v();
    }

private:
    vector<T> _ring_queue;
    int _num;
    int _c_step;            // 消费下标
    int _p_step;            // 生产下标
    Sem _space_sem;         // 空间信号量
    Sem _data_sem;          // 数据信号量
    pthread_mutex_t _clock; // 多消费者进程的锁
    pthread_mutex_t _plock; // 多生产者进程的锁
};

#endif

ConPod.cc

#include"ringQueue.hpp"

void* consumer(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)args;
    while(true)
    {
        sleep(1);
        int x = 0;
        //从环形队列中获取任务或数据
        rq->pop(&x);
        //进行一定的处理
        cout << "消费:" << x << "[" << pthread_self() << "]" << endl;
    }
}

void* procudtor(void* args)
{
    RingQueue<int>* rq = (RingQueue<int>*)args;
    while(true)
    {
        //构建数据或任务对象
        int x = rand() % 100 + 1;
        //放入环形队列
        rq->push(x);
        cout << "生产:" << x << "[" << pthread_self() << "]" << endl;
    }
}


int main()
{
    srand((uint64_t)time(nullptr) ^ getpid());
    RingQueue<int>* rq = new RingQueue<int>();

    pthread_t c[3], p[2];

    pthread_create(c, nullptr, consumer, (void*)rq);
    pthread_create(c + 1, nullptr, consumer, (void*)rq);
    pthread_create(c + 2, nullptr, consumer, (void*)rq);
    
    pthread_create(p, nullptr, procudtor, (void*)rq);
    pthread_create(p + 1, nullptr, procudtor, (void*)rq);

    for(int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }

    for(int i = 0; i < 2; i++)
    {
        pthread_join(p[i], nullptr);
    }

    return 0;
}

运行结果:
在这里插入图片描述

6.信号量的意义

信号量的本质是一个计数器,它的意义在于可以不用进入临界区,就可以得知资源的情况,甚至可以减少临界区内部的判断;
申请锁和释放锁的过程,本质在于我们并不清楚临界资源的情况;
信号量要预设临界资源的情况,而且在pv变化过程中,我们在外部就能够知晓临界资源的情况;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/919057.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何创建「录取查询系统」?

随着高校招生规模的不断扩大&#xff0c;学校录取工作变得越来越繁琐。为了提高效率和准确性&#xff0c;许多学校开始采用电子化的录取查询系统。易查分作为一款功能强大的在线查询工具&#xff0c;可以帮助学校快速搭建自己的「录取查询系统」。 好消息&#xff01;博主给大家…

【C++代码】二分查找,移除元素

题目&#xff1a;二分查找 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 题解 在升序数组 nums \textit{nums} nums …

Linux安装mysql ( ARM架构,rpm包)

下载对应的mysql 阿里源&#xff1a;阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区 华为开源镜像站_软件开发服务_华为云 华为源&#xff1a; 华为开源镜像站_软件开发服务_华为云 选择华为鲲鹏镜像 https://repo.huaweicloud.com/kunpeng/yum/el/7/aarch64/ 下载mysql wg…

【3ds Max】练习——制作衣柜

目录 步骤 一、制作衣柜顶部 二、制作衣柜门板 三、制作衣柜底部 四、制作柜子腿部 五、制作柜子底板 步骤 一、制作衣柜顶部 1. 首先创建一个平面&#xff0c;然后将图片素材拖入平面 2. 平面大小和图片尺寸比例保持一致 3. 单机鼠标右键&#xff0c;选择对象属性 勾选…

简历考察点2_《CiCi-基于Vue3.0的智能音乐分享平台》

&#xff08;1&#xff09;项目初始化和推荐页面开发&#xff1a; 重点&#xff1a;轮播图、Scroll、下拉加载方法实现、 问题一&#xff1a;轮播图实现 ① 获取轮播图数据&#xff1a;虽然找到接口了&#xff0c;但是由于XHR请求在浏览器端会有跨域的限制&#xff0c;不能直…

8_分类算法-k近邻算法(KNN)

文章目录 1 KNN算法1.1 KNN算法原理1.2 KNN过程1.3 KNN三要素1.4 KNN分类预测规则1.5 KNN回归预测规则1.6 KNN算法实现方式&#xff08;重点&#xff09;1.7 k近邻算法优缺点 2 KD-Tree2.1 KD Tree构建方式2.2 KD Tree查找最近邻2.3 KNN参数说明 1 KNN算法 定义&#xff1a;如…

fineReport10问题笔记

1. word导出相关问题 1.1 导出文字为图片 fineReport技术文档 1&#xff09;文本控制 选中单元格&#xff0c;点击「单元格属性>样式>对齐」&#xff0c;文本控制设置有四种&#xff0c;分别为「自动换行、单行显示、单行显示&#xff08;调整字体&#xff09;、多行显…

AA实验是什么?

AA实验是什么&#xff1a;AA实验是在AB实验正式上线前做的分流均匀性检验&#xff0c;这个时候还没有正式上实验策略&#xff0c;只是为了检验两组的分流是否均匀先空跑一段时间。 AA实验的准备工作&#xff1a;这个时候要进行的工作是检查 两组分流是否均匀 、埋点是否正常 。…

精益求精:通付盾安卓应用加固升级,为移动安全保驾护航!

在如今竞争激烈的移动应用领域&#xff0c;保障应用资源的安全性成为刻不容缓的任务。最近&#xff0c;通付盾针对资源加密方案进行了全面升级&#xff0c;大幅增强了其兼容性&#xff0c;实现了更全面的资源文件类型保护。这次升级为移动应用的安全性和稳定性迈出了坚实的一步…

driver‘s license exam 2

机动车科目二内容 driver‘s license exam 1_spencer_tseng的博客-CSDN博客 driver‘s license exam 2_spencer_tseng的博客-CSDN博客 driver‘s license exam 3_spencer_tseng的博客-CSDN博客 driver‘s license exam 4_spencer_tseng的博客-CSDN博客 car indicator light…

跨平台图表:ChartDirector for .NET 7.1 Crack

什么是新的 ChartDirector for .NET 7.0 支持跨平台使用&#xff0c;但仅限于 .NET 6。这是因为在 .NET 7 中&#xff0c;Microsoft 停止了用于非 Windows 使用的 .NET 图形库 System.Drawing.Common。由于 ChartDirector for .NET 7.0 依赖于该库&#xff0c;因此它不再支持 .…

物通博联嵌入式数据采集网关采集传感器的数据上传到云端

在当今的物联网&#xff08;IoT&#xff09;时代&#xff0c;各种传感器广泛应用于各种工业领域。传感器数据采集是实现自动化生产的基础&#xff0c;可以为企业决策提供科学的数据支持&#xff0c;通过各类智能传感器采集传输终端&#xff0c;将采集的传感器数据实时传输到设备…

2048. 下一个更大的数值平衡数;1292. 元素和小于等于阈值的正方形的最大边长;2707. 字符串中的额外字符

2048. 下一个更大的数值平衡数 核心思想&#xff1a;枚举直接从n1开始枚举它是不是平衡数即可。 1292. 元素和小于等于阈值的正方形的最大边长 核心思想:枚举正方形的左上角优化。优化部分有两部分&#xff0c;第一部分是计算面积的优化&#xff0c;预先处理好g&#xff0c;让…

LVS集群 (NET模式搭建)

目录 一、集群概述 一、负载均衡技术类型 二、负载均衡实现方式 二、LVS集群结构 一、三层结构 二、架构对象 三、LVS工作模式 四、LVS负载均衡算法 一、静态负载均衡 二、动态负载均衡 五、ipvsadm命令详解 六、搭建实验流程 一、首先打开三台虚拟机 二、…

SpreadsheetGear 2017 2023 for .NET Crack

SpreadsheetGear 2017 & 2023 for .NET Crack Spreadsheet Gear for.NET被描述为允许用户和开发人员使用iOS、Android、Mac OS、Linux&#xff0c;最后是UWP&#xff0c;以利用可扩展的Excel报告以及与图表API兼容的全面Excel&#xff0c;以及为用户和开发人员提供的最快、…

Rancher证书更新

一、环境 主机名 IP地址 操作系统 rancher版本 K8s-Master 192.168.10.236 Centos 7 2.5.9 二、更新证书 1、查看当前证书到期时间 2、进行证书轮换 [rootK8s-Master ~]# docker ps |grep rancher/rancher d581da2b7c4e rancher/rancher:v2.5.9 …

appium2.0+ 单点触控和多点触控新的解决方案

在 appium2.0 之前&#xff0c;在移动端设备上的触屏操作&#xff0c;单手指触屏和多手指触屏分别是由 TouchAction 类&#xff0c;Multiaction 类实现的。 在 appium2.0 之后&#xff0c;这 2 个方法将会被舍弃。 "[Deprecated] TouchAction action is deprecated. Ple…

【25考研】- 整体规划及高数一起步

【25考研】- 整体规划及高数一起步 一、整体规划二、专业课870计算机应用基础参考网上考研学长学姐&#xff1a; 三、高数一典型题目、易错点及常用结论&#xff08;一&#xff09;典型题目&#xff08;二&#xff09;易错点&#xff08;三&#xff09;常用结论1.arcsinxarccos…

Python爬虫实战案例——第二例

某某美剧剧集下载(从搜索片名开始) 本篇文章主要是为大家提供某些电影网站的较常规的下载电影的分析思路与代码思路(通过爬虫下载电影)&#xff0c;我们会从搜索某部影片的关键字开始直到成功下载某一部电影。 地址&#xff1a;aHR0cHM6Ly93d3cuOTltZWlqdXR0LmNvbS9pbmRleC5od…

为什么选择网络安全?为什么说网络安全是IT行业最后的红利?

一、为什么选择网络安全&#xff1f; 这几年随着我国《国家网络空间安全战略》《网络安全法》《网络安全等级保护2.0》等一系列政策/法规/标准的持续落地&#xff0c;网络安全行业地位、薪资随之水涨船高。 未来3-5年&#xff0c;是安全行业的黄金发展期&#xff0c;提前踏入…