【Linux】多线程(二)

news2024/12/22 21:49:01

文章目录

  • 生产者消费者模型
      • 为何要使用生产者消费者模型
      • 生产者消费者模型优点
      • 基于BlockingQueue的生产者消费者模型
      • 条件变量
      • 条件变量代码
  • POSIX信号量
      • 基于环形队列的生产消费模型

生产者消费者模型

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

在这里插入图片描述

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
在这里插入图片描述

条件变量



#include <iostream>
#include <string>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>

using namespace std;
const int num = 5;

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *active(void *args)
{
    string name = static_cast<const char *>(args);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex); // pthread_cond_wait,调用的时候,会自动释放锁, TODO
        cout << name << " 活动" << endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t tids[num];
    for (int i = 0; i < num; i++)
    {
        char *name = new char[32];
        snprintf(name, 32, "thread-%d", i + 1);
        pthread_create(tids + i, nullptr, active, name);
    }

    sleep(3);

    while (true)
    {
        cout << "main thread wakeup thread..." << endl;
        pthread_cond_signal(&cond);//唤醒单个线程
        //pthread_cond_broadcast(&cond);//唤醒全部线程

        sleep(1);
    }

    for (int i = 0; i < num; i++)
    {
        pthread_join(tids[i], nullptr);
    }
}

条件变量代码

main.cc

#include<iostream>
#include<stdlib.h>
#include <unistd.h>
#include"blockQueue.hpp"
#include"task.hpp"

using namespace std;

void* consumer(void* args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        Task t;
        // 1. 将数据从blockqueue中获取 -- 获取到了数据
        bq->pop(&t);
        t();
        // 2. 结合某种业务逻辑,处理数据! -- TODO
        std::cout << pthread_self() << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;
    }
}

void* productor(void* args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    std::string opers = "+-*/%";
    while (true)
    {
        sleep(1);
        // 1. 先通过某种渠道获取数据
        int x = rand() % 20 + 1;
        int y = rand() % 10 + 1;
        char op = opers[rand() % opers.size()];
        // 2. 将数据推送到blockqueue -- 完成生产过程
        Task t(x, y, op);
        bq->push(t);
        std::cout << pthread_self() << " | productor Task: " <<  t.formatArg() << "?" << std::endl;
    }
}



int main()
{
    BlockQueue<Task> *bq=new BlockQueue<Task>();
     pthread_t c[2], p[3];
    pthread_create(&c[0], nullptr, consumer, bq);
    pthread_create(&c[1], nullptr, consumer, bq);
    pthread_create(&p[0], nullptr, productor, bq);
    pthread_create(&p[1], nullptr, productor, bq);
    pthread_create(&p[2], nullptr, productor, bq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    delete bq;
    return 0;
}

blockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

const int gcap = 5;

// 不要认为,阻塞队列只能放整数字符串之类的
// 也可以放对象
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int cap = gcap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consumerCond, nullptr);
        pthread_cond_init(&_productorCond, nullptr);
    }
    bool isFull(){ return _q.size() == _cap; }
    bool isEmpty() { return _q.empty(); }
    void push(const T &in)
    {
        pthread_mutex_lock(&_mutex);
        // 细节1:一定要保证,在任何时候,都是符合条件,才进行生产
        while(isFull()) // 1. 我们只能在临界区内部,判断临界资源是否就绪!注定了我们在当前一定是持有锁的!
        {
            // 2. 要让线程进行休眠等待,不能持有锁等待!
            // 3. 注定了,pthread_cond_wait要有锁的释放的能力!
            pthread_cond_wait(&_productorCond, &_mutex); // 我休眠(切换)了,我醒来的时候,在哪里往后执行呢?
            // 4. 当线程醒来的时候,注定了继续从临界区内部继续运行!因为我是在临界区被切走的!
            // 5. 注定了当线程被唤醒的时候,继续在pthread_cond_wait函数出向后运行,又要重新申请锁,申请成功才会彻底返回
        }
        // 没有满的,就要让他进行生产
        _q.push(in);
        // 加策略
        // if(_q.size() >= _cap/2) 
        pthread_cond_signal(&_consumerCond);
        pthread_mutex_unlock(&_mutex);
        // pthread_cond_signal(&_consumerCond);
    }
    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        while(isEmpty()) 
        {
            pthread_cond_wait(&_consumerCond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        // 加策略
        pthread_cond_signal(&_productorCond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_productorCond);
    }
private:
    std::queue<T> _q;
    int _cap;
    // 为什么我们的这份代码,只用一把锁呢,根本原因在于,
    // 我们生产和消费访问的是同一个queue&&queue被当做整体使用!
    pthread_mutex_t _mutex; 
    pthread_cond_t _consumerCond;   // 消费者对应的条件变量,空,wait
    pthread_cond_t _productorCond;  // 生产者对应的条件变量,满,wait
};

task.hpp

#pragma once
#include <iostream>
#include <string>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "=";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V()

上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量)

基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性
在这里插入图片描述
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
判断满或者空。另外也可以预留一个空的位置,作为满的状态
在这里插入图片描述
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

RingQueue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

static const int N = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
    void Lock(pthread_mutex_t &m)
    {
        pthread_mutex_lock(&m);
    }
    void Unlock(pthread_mutex_t &m)
    {
        pthread_mutex_unlock(&m);
    }

public:
    RingQueue(int num = N) : _ring(num), _cap(num)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, num);
        _c_step = _p_step = 0;

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    // 生产
    void push(const T &in)
    {
        // 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
        // 2. 什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
        //
        P(_space_sem);  // P() 
        Lock(_p_mutex); //?    1
        // 一定有对应的空间资源给我!不用做判断,是哪一个呢?
        _ring[_p_step++] = in;
        _p_step %= _cap;
        Unlock(_p_mutex);
        V(_data_sem);
    }
    // 消费
    void pop(T *out)
    {
        P(_data_sem);
        Lock(_c_mutex); //?
        *out = _ring[_c_step++];
        _c_step %= _cap;
        Unlock(_c_mutex);
        V(_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);

        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ring;
    int _cap;         // 环形队列容器大小
    sem_t _data_sem;  // 只有消费者关心
    sem_t _space_sem; // 只有生产者关心
    int _c_step;      // 消费位置
    int _p_step;      // 生产位置

    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;
};

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }

        usleep(100000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>

using namespace std;

const char *ops = "+-*/%";

void *consumerRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->pop(&t);
        t();
        cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
    }
}

void *productorRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        // sleep(1);
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % strlen(ops)];
        Task t(x, y, op);
        rq->push(t);
        cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>();
    // 单生产单消费
    // pthread_t c, p;
    // pthread_create(&c, nullptr, consumerRoutine, rq);
    // pthread_create(&p, nullptr, productorRoutine, rq);

    // pthread_join(c, nullptr);
    // pthread_join(p, nullptr);

    // 多生产,多消费,该如何更改代码呢?done
    // 意义在哪里呢?意义绝对不在从缓冲区冲放入和拿去,意义在于,放前并发构建Task,获取后多线程可以并发处理task,因为这些操作没有加锁!
    pthread_t c[3], p[2];
    for (int i = 0; i < 3; i++)
        pthread_create(c + i, nullptr, consumerRoutine, rq);
    for (int i = 0; i < 2; i++)
        pthread_create(p + i, nullptr, productorRoutine, rq);

    for (int i = 0; i < 3; i++)

        pthread_join(c[i], nullptr);
    for (int i = 0; i < 2; i++)

        pthread_join(p[i], nullptr);

    delete rq;
    return 0;
}

什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
在上面写过阻塞队列的代码可以发现他是一个整体,而环型队列是将他自己打散了,通过下标去访问。
在阻塞队列当中可以发现消费者和生产者是不能并发的,而环形队列就是可以的,因为他将每一个下标当作一个整体也。所以消费者和生产者就是可以并发的互不影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/767394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qt与opencv学习记录

qtopencv开发入门&#xff1a;4步搞定环境配置-1_哔哩哔哩_bilibili qtopencv开发入门&#xff1a;4步搞定opencv环境配置2_哔哩哔哩_bilibili 文章内容来自上面两个视频&#xff0c;感谢创作者。 ps&#xff1a;配置环境的过程中&#xff0c;遇到了很多问题&#xff0c;我…

UML 图

统一建模语言&#xff08;Unified Modeling Language&#xff0c;UML&#xff09;是用来设计软件的可视化建模语言。它的特点是简单、统一、图形化、能表达软件设计中的动态与静态信息。 UML 从目标系统的不同角度出发&#xff0c;定义了用例图、类图、对象图、状态图、活动图…

网络流量监控分析

网络管理员是维护健全网络基础设施的关键&#xff0c;这通常是一项艰巨的任务&#xff0c;因为管理员需要 24x7 全天候监控和管理网络和服务器。但是&#xff0c;即使进行全天候监控&#xff0c;每个网络也容易受到带宽占用的影响&#xff0c;如果导致关键业务应用程序变慢&…

CSS 实现 Turbo 官网 3D 网格线背景动画

转载请注明出处&#xff0c;点击此处 查看更多精彩内容 查看 Turbo 官网 时发现它的背景动画挺有意思&#xff0c;就自己动手实现了一下。下面对关键点进行解释说明&#xff0c;查看完整代码及预览效果请 点击这里。 简单说明原理&#xff1a;使用 mask-image 遮罩绘制网格&a…

二叉树--C语言实现数据结构

本期带大家一起用C语言实现二叉树&#x1f308;&#x1f308;&#x1f308; 1、二叉树的定义 二叉树是一种特殊的树状数据结构&#xff0c;它由节点组成&#xff0c;每个节点最多有两个子节点&#xff0c;分别称为左子节点和右子节点 二叉树的链式存储结构是指用 链表 来表示…

公司私服Maven踩坑,项目配置都OK但是包就是下载不下来【已解决】

我的问题是公司的私服Maven下载不下来&#xff0c;因为公司保密协议&#xff0c;这里用阿里云为例&#xff01; 具体的至少参考&#xff1a;(32条消息) 这篇博文只讲MirrorOf_Java软件工程师的博客-CSDN博客 1&#xff1a;Java的Maven爆红了就找到资源库&#xff0c;然后把对于…

2.10messagebox弹窗

2.10messagebox弹窗 messagebox部件 其实这里的messagebox就是我们平时看到的弹窗。 我们首先需要定义一个触发功能&#xff0c;来触发这个弹窗 这里我们就放上以前学过的button按钮 tk.Button(window, texthit me, commandhit_me).pack()通过触发功能&#xff0c;调用messa…

超高性能协议框架fury完爆protostuff(附性能测试对比)

简单介绍: 序列化框架是系统通信的基础组件&#xff0c;在大数据、AI 框架和云原生等分布式系统中广泛使用。当对象需要跨进程、跨语言、跨节点传输、持久化、状态读写、复制时&#xff0c;都需要进行序列化&#xff0c;其性能和易用性影响运行效率和开发效率。 Fury 是一个基于…

3.2.18 DIR函数的补充说明

【分享成果&#xff0c;随喜正能量】人与人之间都是相互的&#xff0c;你给人搭桥&#xff0c;别人为你铺路&#xff1b;你让人难堪&#xff0c;别人给你添堵。。 我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的劳动效…

【小梦C嘎嘎——启航篇】C++ 基础中的精华(二)

【小梦C嘎嘎——启航篇】C 基础中的精华&#xff08;二&#xff09;&#x1f60e; 前言&#x1f64c;1、引用的使用场景1.1 做参数1.2 做返回值 2、const修饰 的引用2.1 权限上的探讨2.1.1权限放大2.1.2 权限平移2.1.3 全新缩小 4、函数重载的延伸条件编译&#xff1a; 条件编译…

自定义类型详解(C语言)

自定义类型 一. 结构体1.1 什么是结构体1.2 结构体的声明1.3 特殊的声明1.4 结构体的自引用1.5 结构体变量的定义和初始化1.5.1 结构体变量的定义1.5.2 结构体变量的初始化 1.6 结构体内存对齐1.6.1 为什么存在内存对齐 1.7 修改默认对齐数1.8 结构体传参 二. 位段2.1 什么是位…

PVE安装好后拔显卡后连接不了网络

目录 前因 原因 解决办法 前因 前几天装了个​Proxmox​ ve当做一个服务器7*24开机 但是由于转好系统后&#xff0c;显卡就不需要了 加上它耗电的原因&#xff08;我的gtx650平时空载有10w左右的功耗&#xff09; 我在想拔显卡拔了&#xff0c;我用xshell进行ssh连接不就…

MVCC:多版本并发控制

MVCC 1. MVCC是什么2. 快照读和当前读2.1 快照读2.2 当前读 3. Read View3.1 Read View中含有什么内容3.2 ReadView的规则 4. MVCC整体操作流程 1. MVCC是什么 MVCC&#xff08;Multi Version Concurrency Control&#xff09;&#xff0c;多版本并发控制&#xff1b;MVCC用于…

【数学建模】——相关系数

第一部分&#xff1a;皮尔逊相关系数的计算以及数据的描述性统计 本讲我们将介绍两种最为常见的相关系数&#xff1a;皮尔逊person相关系数和斯皮尔曼spearman等级相关系数。它们可以用来衡量两个变量之间的相关性的大小&#xff0c;根据数组满足的不同条件&#xff0c;我们要选…

linux图形界面总结——X、Xorg、WM、QT、GTK、KDE、GNOME的区别与联系

文章目录 一、 linux图形界面二、X协议三、Xfree86 Xorg四、WM(window manager:窗口管理器)五、X协议的Client端实现六、KDE、GNOME、QT和GTK直接关系七、参考&#xff1a; 一、 linux图形界面 linux本身没有图形界面&#xff0c;linux现在的图形界面的实现只是linux下的应用程…

网络类型及数据链路层协议

目录 网络类型的分类 数据链路层协议 MA网络以太网协议 P2P网络 HDLC ---高级数据链路控制协议 更改链路协议的方法 HDLC数据帧封装结构 PPP---点到点协议 PPP协议的优点 PPP数据帧封装结构 PPP会话的搭建 链路建立阶段---LCP建立 认证阶段 网络层协议协商阶段--- NCP协商 网络…

大型风电叶片研发项目管理体系建设实践︱中车时代新材PMO负责人姚运帅

中车株洲时代新材料科技股份有限公司风电运维事业部总经理、PMO负责人姚运帅先生受邀为由PMO评论主办的2023第十二届中国PMO大会演讲嘉宾&#xff0c;演讲议题&#xff1a;大型风电叶片研发项目管理体系建设实践。大会将于8月12-13日在北京举办&#xff0c;敬请关注&#xff01…

sqlserver 存储过程当中如何实现增删改查

--声明存储过程 新增编辑 ALTER procedure [dbo].[Eng_MyAddOrEdtADPro] My_Cocode int, Type int, -- --1 新增 2 编辑 My_KeyId uniqueidentifier, My_PCode int, My_SCode int, My_PName nvarchar(36), My_SName nvarchar(36), My_Orde…

IPUU的小工具拍了拍你(下)

IPUU是埃文科技旗下的综合性IP查询网站&#xff0c;提供多维度的IP数据信息。通过在线查询&#xff0c;用户可以获取目标IP地址的详尽信息&#xff0c;包括位置属性、网络属性、风险属性以及业务属性等&#xff0c;同时还可以查询域名信息。无论您是需要查看某个IP地址归属地&a…

1.13 通过aop日志监控service执行时间

步骤1&#xff1a;添加aop依赖包 <!-- aop切面 依赖--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId> </dependency>步骤2&#xff1a;创建AOP日记监控记录切面类 …