【Linux】多线程协同

news2024/11/23 12:31:23

目录

生产消费模型

BlockQueue阻塞队列模型

BlockQueue.hp

Task.hpp

mypc.cc

RingQueue循环队列模型

POSIX信号量

RingQueue.hpp

Task.hpp

main.cc


生产消费模型

生产者与生产者之间关系:互斥(竞争)

消费者与消费者之间关系:互斥(竞争)

生产者和消费者之间关系:互斥(不能同时访问同一个资源)&& 同步(生产与消费可同时进行)

BlockQueue阻塞队列模型

生产消费模型的任务存取由于加锁解锁过程是串行执行的,所以从阻塞队列中存入和取出任务并不高效,而高效之处体现在生产任务之前和消费任务之后的多线程并发执行

先加锁、再检测生产或消费条件是否满足、再操作、再解锁

当阻塞队列满的时候,生产者进行阻塞等待,当阻塞队列空的时候,消费者进行阻塞等待

BlockQueue.hp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <unistd.h>

const int g_maxCap = 5;

template<class T>
class BlockQueue {
    public:
    BlockQueue(const int& maxCap = g_maxCap) 
    : _maxCap(maxCap) {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    void push(const T& in) {
        pthread_mutex_lock(&_mutex);
        while (is_full()) {
            //pthread_cond_wait这个函数的第二个参数,必须是正在使用的互斥锁
            //pthread_cond_wait该函数调用的时候,以原子性的方式,将锁释放,并将自己挂起
            //pthread_cond_wait该函数被唤醒返回的时候,会自动重新获取你传入的锁
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(in);
        //pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out) {
        pthread_mutex_lock(&_mutex);
        while (is_empty()) {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    private:
    bool is_empty() {
        return _q.empty();
    }
    bool is_full() {
        return _q.size() == _maxCap;
    }


    private:
    std::queue<T> _q;
    int _maxCap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;      //生产者对应的条件变量
    pthread_cond_t _ccond;      //消费者对应的条件变量
};

Task.hpp

#pragma once

#include <iostream>
#include <functional>
#include <cstdio>
#include <string>

class CalTask {
    using func_t = std::function<int(int, int, char)>;

    public:
    CalTask() {}
    CalTask(int x, int y, char op, func_t func) 
    : _x(x), _y(y), _op(op), _callbask(func) {}

    std::string operator()() {
        int result = _callbask(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString() {
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

    private:
    int _x, _y;
    char _op;
    func_t _callbask;
};

const std::string oper = "+-*/%";

int myMath(int x, int y, int op) {
    if (y == 0 && (op == '/' || op == '%')) {
        std::cerr << "div zero error!" << std::endl;
        return -1;
    }
    switch (op) {
        case '+': return x + y;
        case '-': return x - y;
        case '*': return x * y;
        case '/': return x / y;
        case '%': return x % y;
        default:
            std::cerr << "oper erro!" << std::endl;
            return -1;
    }
}

class SaveTask {
    typedef std::function<void(const std::string&)> func_t;

    public:
    SaveTask() {}
    SaveTask(const std::string& message, func_t func)
    : _message(message), _func(func) {}

    void operator()() {
        _func(_message);
    }

    private:
    std::string _message;
    func_t _func;
};

void Save(const std::string& message) {
    FILE* pf = fopen("./log.txt", "a");
    if (!pf) {
        std::cerr << "fopen error" << std::endl;
        return;
    }
    fputs(message.c_str(), pf);
    fputs("\n", pf);
    fclose(pf);
}

mypc.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <sys/types.h>
#include <unistd.h>
#include <ctime>

//C:计算
//S:存储
template<class C, class S>
class BlockQueues {
    public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;
};

void* productor(void* _bqs) {
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(_bqs))->c_bq;
    while (true) {
        // sleep(2);
        int x = rand() % 100 + 1;
        int y = rand() % 10;
        int operCode = rand() % oper.size();
        CalTask t(x, y, oper[operCode], myMath);
        bq->push(t);
        std::cout << "productor thread, 生产计算任务: " << t.toTaskString() << std::endl;
    }
    return nullptr;
}

void* consumer(void* _bqs) {
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(_bqs))->c_bq;
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(_bqs))->s_bq;
    while (true) {
        CalTask t;
        bq->pop(&t);
        std::string result = t();
        std::cout << "cal thread, 完成计算任务: " << result << "...done" << std::endl;
        SaveTask save(result, Save);
        save_bq->push(save);
        std::cout << "cal thread, 推送存储任务完成..." << std::endl;
        sleep(1);
    }
    return nullptr;
}

void* Saver(void* _bqs) {
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(_bqs))->s_bq;
    while (true) {
        SaveTask t;
        save_bq->pop(&t);
        t();
        std::cout << "save thread, 保存任务完成..." << std::endl;
    }
    return nullptr;
}

int main() {
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueues<CalTask, SaveTask> bqs;
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();

    pthread_t p[3], c[2], s;
    pthread_create(p, nullptr, productor, &bqs);
    pthread_create(p + 1, nullptr, productor, &bqs);
    pthread_create(p + 2, nullptr, productor, &bqs);
    pthread_create(c, nullptr, consumer, &bqs);
    pthread_create(c + 1, nullptr, productor, &bqs);
    pthread_create(&s, nullptr, Saver, &bqs);


    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(s, nullptr);

    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

RingQueue循环队列模型

POSIX信号量

信号量本质是一个计数器:衡量临界资源中资源数量的计数器

一份公共资源,运行同时访问不同的区域

不同的线程可以并发访问公共资源的不同区域

只要拥有信号量,就在未来一定拥有临界资源的一部分

申请信号量的本质:对临界资源的特定小块资源的预定机制

通过信号量,在线程真正访问临界资源之前,就已经提前知道了临界资源的使用情况

RingQueue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>

static const int g_cap = 5;

template<class T>
class RingQueue {
private:
    void P(sem_t& sem) {
        int n = sem_wait(&sem);
        assert(n == 0);
    }
    void V(sem_t& sem) {
        int n = sem_post(&sem);
        assert(n == 0);
    }

public:
    RingQueue(const int& cap = g_cap)
    : _queue(cap), _cap(cap) {
        int n = sem_init(&_spaceSem, 0, _cap);
        assert(n == 0);
        n = sem_init(&_dataSem, 0, 0);
        assert(n == 0);
        _productorStep = _consumerStep = 0;
        pthread_mutex_init(&_pmutex, nullptr);
        pthread_mutex_init(&_cmutex, nullptr);
    }
    //生产者
    void Push(const T& in) {
        P(_spaceSem);   //申请到了空间信号量,表示对空间进行预定
        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++] = in;
        _productorStep %= _cap;
        pthread_mutex_unlock(&_pmutex);
        V(_dataSem);
    }
    //消费者
    void Pop(T* out) {
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }
    ~RingQueue() {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }

private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;    //生产者:根据空间资源生产
    sem_t _dataSem;     //消费者:根据数据资源消费
    int _productorStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

Task.hpp

#pragma

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

class Task {
    using func_t = std::function<int(int, int, char)>;
    // typedef std::function<int(int, int, char)> func_t;

public:
    Task() {}
    Task(int x, int y, char op, func_t func)
    : _x(x), _y(y), _op(op), _callback(func) {}

    std::string operator()() {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString() {
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x, _y;
    char _op;
    func_t _callback;
};

const std::string oper = "+-*/%";

int myMath(int x, int y, char op) {
    if (y == 0 && (op == '/' || op == '%')) {
        std::cerr << "div zero error!" << std::endl;
        return -1;
    }
    switch (op) {
        case '+': return x + y;
        case '-': return x - y;
        case '*': return x * y;
        case '/': return x / y;
        case '%': return x % y;
        default:
            std::cerr << "op is wrong!" << std::endl;
            return -1;
    }
}

main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>

std::string SelfName() {
    char name[128];
    snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());
    return name;
}

void* ProductorRoutine(void* rq) {
    RingQueue<Task>* ringqueue = static_cast<RingQueue<Task>*>(rq);
    while (true) {
        int x = rand() % 10;
        int y = rand() % 5;
        char op = oper[rand() % oper.size()];
        Task t(x, y, op, myMath);
        //生产任务
        ringqueue->Push(t);
        std::cout << SelfName() << ", 生产者派发了一个任务: " << t.toTaskString() << std::endl;
        // sleep(1);
    }
}

void* ConsumerRoutine(void* rq) {
    RingQueue<Task>* ringqueue = static_cast<RingQueue<Task>*>(rq);
    while (true) {
        Task t;
        //消费任务
        ringqueue->Pop(&t);
        std::string result = t();
        std::cout << SelfName() << ", 消费者消费了一个任务: " << result << std::endl;
        // sleep(1);
    }
}

int main() {
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self());
    RingQueue<Task>* rq = new RingQueue<Task>();

    pthread_t p[4], c[8];
    for (int i = 0; i < 4; ++i) {
        pthread_create(p + i, nullptr, ProductorRoutine, rq);
    }
    for (int i = 0; i < 8; ++i) {
        pthread_create(c + i, nullptr, ConsumerRoutine, rq);
    }
    for (int i = 0; i < 4; ++i) {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 8; ++i) {
        pthread_join(c[i], nullptr);
    }
    delete rq;
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/422390.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

偏向锁到轻量级锁的升级过程(耗资源)

目录 上原理&#xff1a; 细说原理&#xff1a; 什么是锁记录呢&#xff1f; 什么是Mark Word 呢&#xff1f; 上图解&#xff1a; 上原理&#xff1a; 偏向锁使⽤了⼀种等到竞争出现才释放锁的机制&#xff0c;所以当其他线程尝试竞争偏向锁时&#xff0c; 持有偏向锁的…

nssctf web 入门(3)

目录 [NISACTF 2022]easyssrf [SWPUCTF 2021 新生赛]ez_unserialize [SWPUCTF 2021 新生赛]no_wakeup 这里通过nssctf的题单web安全入门来写&#xff0c;会按照题单详细解释每题。题单在NSSCTF中。 想入门ctfweb的可以看这个系列&#xff0c;之后会一直出这个题单的解析&…

FLStudio21中文版本好不好用?值不值得下载

FLStudio中文21最新版本以其使用速度而闻名&#xff0c;是一个高度复杂的音乐制作环境。现代的DAW是一种非凡的野兽。首先&#xff0c;它在很大程度上把自己放在了(几乎)每个人记录过程的核心。其次&#xff0c;通过在价格适中的软件中模拟完整的工作室体验&#xff0c;它在音乐…

国内版的ChatGPT弯道超车的机会在哪里?

前言 从去年11月最后一天ChatGPT诞生&#xff0c;截至目前&#xff0c;ChatGPT的热度可谓是爆了。众所周知&#xff0c;ChatGPT是美国“开放人工智能研究中心”研发的聊天机器人程序&#xff0c;它是一个人工智能技术驱动的自然语言处理工具&#xff0c;它能够通过学习和理解人…

【数据分析】— 特征工程、特征设计、特征选择、特征评价、特征学习

【数据分析】— 特征工程特征工程是什么&#xff1f; (Feature Engineering)特征工程的意义特征工程的流程特征的设计从原始数据中如何设计特征&#xff1f;基本特征的提取创建新的特征函数变换特征独热特征表示 One-hot Representation数据的统计特征TF-IDF&#xff08;词频-逆…

「Cpolar」看我如何实现公网远程控制Mac OS【使用mac自带VNC】

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后端的开发语言A…

探寻人工智能前沿 迎接AIGC时代——CSIG企业行(附一些好玩的创新点)

上周我有幸参加了由中国图像图形学会和合合信息共同举办的CSIG企业行活动。这次活动邀请了多位来自图像描述与视觉问答、图文公式识别、自然语言处理、生成式视觉等领域的学者&#xff0c;他们分享了各自的研究成果和经验&#xff0c;并与现场观众进行了深入的交流和探讨。干货…

重感知还是重地图?其实无需选择

近来&#xff0c;关于自动驾驶应该重感知还是重地图是个热点话题&#xff0c;很多重量级车厂、自动驾驶供应商都开始提出重感知轻地图的方案&#xff0c;并承诺很快能发布出对应的产品。业界也出现了高精地图已“死”等类似的言论。 一时之间&#xff0c;似乎轻地图已经成为了…

三种实现模型可视化的方式(print, torchinfo, tensorboard)

记录一下自己使用的三种模型可视化的方式&#xff0c;从简单到难 Print 最简单的是print&#xff0c;就不用多说了。 Torchinfo from torchinfo import summary import torch model (...) summary(model, (1,3,128,128))即可按照像文档路径一样的方式输出结构&#xff0c;…

算法模板(2):数据结构(5)做题积累

数据结构&#xff08;3&#xff09; 一、并查集 238. 银河英雄传说 有 NNN 艘战舰&#xff0c;也依次编号为 1,2,...,N1,2,...,N1,2,...,N&#xff0c;其中第 iii 号战舰处于第 iii 列。有 TTT 条指令&#xff0c;每条指令格式为以下两种之一&#xff1a;M i j&#xff0c;表…

Linux lvm管理讲解及命令

♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的绽放,树高千尺,落叶归根人生不易,人间真情 前言 目录 一、lvm管理 1.Logical Volume Manager,逻…

【运维笔记】VM centos 环境安装

镜像选择 阿里镜像源 注意在安装时&#xff0c;安装非图形化界面选择minimal版本安装。&#xff08;笔者在安装时选择了erverything和DVD&#xff0c;发现都是图形界面hhh&#xff0c;浪费了一早上时间&#xff09; 翻阅百度垃圾堆&#xff0c;版本号都推荐7.6-7.9&#xff…

图解国家网信办《生成式人工智能服务管理办法(征)》| 附下载

伴随ChatGPT兴起&#xff0c;生成式人工智能技术正作为一种创造性应用&#xff0c;牵引场景创新&#xff0c;推动新技术迭代升级和产业快速增长。由于生成式人工智能处于发展初期&#xff0c;技术成熟度、政策合规性等发展不足&#xff0c;导致其极易面临非法获取数据、个人隐私…

基于差分进化算法的含DG配电网无功优化模型

目录 1 主要内容 目标函数 算法流程 2 部分程序 3 程序结果 4 程序链接 1 主要内容 该程序参考《自适应多目标差分进化算法在计及电压稳定性的无功优化中的应用》&#xff0c;以网损和电压偏差为目标&#xff0c;考虑DG无功出力和电容器组&#xff0c;建立多目标无功优化…

OpenAI-ChatGPT最新官方接口《文本交互》全网最详细中英文实用指南和教程,助你零基础快速轻松掌握全新技术(一)(附源码)

Text completion 文本交互前言Introduction 导言Prompt design 提示设计Basics基础知识TroubleshootingClassificationImproving the classifiers efficiency 提高分类器的效率Generation 总结Conversation 对话Transformation 变化Translation 翻译Conversion 转化Summarizati…

联想服务器配置RAID

一、背景描述 目前有台联想服务器&#xff0c;配置如下&#xff1a; CPU&#xff1a;2颗处理器&#xff0c;40核 内存&#xff1a;512GB 磁盘&#xff1a;2*960GB SATA 4*2.4TB SAS 计划在联想物理机上安装 Vmware 的 ESXi 6.7 虚拟化管理软件&#xff0c;作为虚拟化服务器。…

Linux驱动开发——字符设备

目录 Linux设备分类 字符设备驱动基础 字符设备驱动框架 虚拟串口设备 Linux设备分类 Linux系统根据驱动程序实现的模型框架将设备驱动分为下面三种。 (1)字符设备驱动:设备对数据的处理是按照字节流的形式进行的&#xff0c;可以支持随机访问&#xff0c;也可以不支持随…

【解决】You May need an additional loader to handle the result of these loaders

说在前面 最近在新拉项目执行install以及run命令时&#xff0c;两个项目同时报了类似的错误&#xff0c;报错详情如下图所示。 因为之前同事运行代码没有问题&#xff0c;所以基本的问题可以定位在某个依赖版本问题&#xff0c;考虑到时间先后&#xff0c;大概率是新版本使用…

把中文翻译成英语的软件-chatgpt、谷歌、百度、批量翻译

用chatgpt怎么实现中文英文在线转换翻译 要实现中文英文在线转换翻译的代码&#xff0c;可以使用OpenAI的ChatGPT模型实现。以下是实现该功能的简要教程&#xff1a; 准备环境 安装Python 3.x&#xff0c;使用pip或conda安装transformers及相关依赖&#xff1a; pip install…

【算法训练 (day2)】积木画(dp问题)

目录 一.问题 题目描述 输入格式 输出格式 输出样例 二.解题思路 合法性判定&#xff08;状态压缩&#xff09;&#xff1a; 推导dp式&#xff1a; 代码实现&#xff1a; 一.问题 题目描述 小明最近迷上了积木画&#xff0c;有这么两种类型的积木&#xff0c;分别为 I…