[Linux#43][线程] 死锁 | 同步 | 基于 BlockingQueue 的生产者消费者模型

news2025/1/19 22:19:09

目录

1. 死锁

解决死锁问题

2. 同步

2.1 条件变量函数 cond

2.2 条件变量的使用:

3.CP 问题--理论

4. 基于 BlockingQueue 的生产者消费者模型

1. 基本概念

2.BlockQueue.hpp

基本设置:

生产关系控制:

消费关系的控制

⭕思考点

test 函数:

进化执行 Task.hpp

3. 注意点


1. 死锁

• 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源,而处于的一种永久等待状态

死锁四个必要条件 (必须同时满足)

  • 互斥条件:一个资源每次只能被一个执行流使用--前提
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放--原则
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺--原则
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系--重要条件

解决死锁问题

理念:破坏四个必要条件--只需要一个不满足就可以的

方法:

  • 加锁顺序一致

我们在申请锁的时候,A线程先申请A锁,在申请B锁,而B线程先申请B锁,在申请A锁,所以两个线程天然申请锁的顺序就是环状的。我们可以尽量不让线程出现这个环路情况,我们让两个线程申请锁的顺序保持一致,就可以破坏循环等待问题。两个线程都是先申请A锁在申请B锁。

  • 避免锁未释放的场景

接口:pthread_mutex_trylock,失败了就会返回退出,释放锁

  • 资源一次性分配

资源一次性分配,比如说你有一万行代码,有五处要申请锁,你可以最开始一次就给线程分配好,而不要把五处申请打散到代码各个区域里所导致加锁场景非常复杂

避免死锁算法

  • 死锁检测算法(了解)
  • 银行家算法(了解)

2. 同步

同步!同步问题是保证数据安全的情况下,让我们的线程访问资源具有一定的顺序性

保证线程安全同步了,为什么还要设置锁?

注意前言和后果。排队是结果。例如突然新来了一个线程,被锁挡在了门外,才开始到后面排队的。

分配均衡的可以使用纯互斥,同步是解决分配不均衡问题的

  1. 快速提出解决方案 条件变量
  • 锁和铃铛(条件变量--布尔类型)都是一个结构体,OS 先描述再组织
  • 条件变量必须依赖于锁的使用(条件就是被锁了,所以才加入等待队列)

条件变量

• 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它 什么也做不了。

• 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个 节点添加到队列中。这种情况就需要用到条件变量。


2.1 条件变量函数 cond

条件变量就相当于是铃铛,和锁的设置非常的相似

  • 初始化 – pthread_cond_init()
    • 静态分配:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    • 动态分配:
      • 原型:
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
      • 参数:
        • cond: 需要初始化的条件变量
        • attr: 初始化条件变量的属性,一般设置为 nullptr
      • 返回值: 成功返回 0,失败返回错误码
  • 销毁 – pthread_cond_destroy()
    • 原型:
int pthread_cond_destroy(pthread_cond_t *cond);
    • 参数:
      • cond: 需要销毁的条件变量
    • 返回值: 成功返回 0,失败返回错误码
    • 注意: 使用 PTHREAD_COND_INITIALIZER 初始化的条件变量不需要销毁
  • 等待条件变量 – pthread_cond_wait()
    • 原型:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    • 参数:
      • cond: 需要等待的条件变量
      • mutex: 当前线程所处临界区对应的互斥锁
  • ⭕ (为什么要传这个锁变量呢?
  • 1. pthread_cond_wait让线程等待的时候,会自动释放锁,将其加入到等待队列中,不用管临界资源的状态情况
    • 返回值: 成功返回 0,失败返回错误码
    • 注意: wait 一定要在加锁和解锁之间进行!
  • 唤醒等待
    • 唤醒全部线程 – pthread_cond_broadcast()
      • 原型:
int pthread_cond_broadcast(pthread_cond_t *cond);
      • 功能: 唤醒等待队列中的全部线程
      • 参数:
        • cond: 需要等待的条件变量
      • 返回值: 成功返回 0,失败返回错误码
    • 唤醒首个线程pthread_cond_signal()
      • 原型:
int pthread_cond_signal(pthread_cond_t *cond);
      • 功能: 唤醒等待队列中的首个线程
      • 参数:
        • cond: 需要等待的条件变量
      • 返回值: 成功返回 0,失败返回错误码

测试:

错乱原因:多线程打印出现错乱,显示器是文件,看作一个共享资源

uint64 是什么?

一种跨平台的方式来表示至少 64 位的无符号整数

2.2 条件变量的使用:

原理图,以我们的单人自习室为例

对于线程的管理: 先所有都锁上,再依次唤醒,就实现了每个人进去执行一次,退出,下一个执行

 while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);              
        std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    for(uint64_t i = 0; i < 5; i++)
    {
        pthread_t tid;
        pthread_create(&tid, nullptr, Count, (void*)i);
    }
    while(true) 
    {
        sleep(1);
        pthread_cond_broadcast(&cond);
        std::cout << "signal one thread..." << std::endl;
    }

    return 0;
}

我们怎么知道我们要让一个线程去等待了?

一定是临界资源(自习室里面有人)不就绪,没错,临界资源也是有状态的!!

直接走人叫互斥,去后面排队叫同步

你怎么知道临界资源是就绪还是不就绪的?即怎么知道自习室里面有没有人

你判断出来的!判断是访问临界资源(自习室) 吗?必须是的,也就是判断必须在加锁之后!!!

所以等待的过程,一定要在加锁和解锁之间pthread_cond_wait让线程等待的时候,会自动释放锁,将其加入到等待队列中

sum

  • 等待条件满足的时候往往是在临界区内等待的
    • 当该线程进入等待的时候,互斥锁会自动释放
    • 而当该线程被唤醒时,又会自动获得对应的互斥锁
  • 条件变量需要配合互斥锁使用,其中条件变量是用来完成同步的,而互斥锁是用来完成互斥的

3.CP 问题--理论

生产者消费者模型(consumer producter)

存在超市的原因

  • 效率高,中转站
  • 大号的缓存,解决了忙闲不均。生产者视角:有多少存储空间,消费者:有多少商品数
  • 让生产和消费的行为,进行一定程度的解耦

在计算机中,抽象出来

  • 生产者:线程承担
  • 超市:特定结构的内存空间->共享资源->存在并发问题
  • 消费者:线程承担

将商品理解为数据,执行流在做通信

如何高效的通信

互斥是一个保证安全的手段

研究超市的并发 三种关系:

  • 生产者 vs 生产者(竞争的互斥关系,只允许一个)
  • 消费者 vs 消费者(互斥)
  • 生产者 vs 消费者(互斥--安全,同步--一定的顺序性)

321 原则(便与记忆和给别人介绍)

  • 3 种关系
  • 2 种角色--生产和消费
  • 1 个交易场所--特点结构的内存空间

例如解耦 add 和 main ,实现高并发


4. 基于 BlockingQueue 的生产者消费者模型

1. 基本概念

在多线程编程中,阻塞队列(Blocking Queue)是一种常用的数据结构,用于实现生产者和消费者模型。与普通队列相比,阻塞队列具有以下特点:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中有新元素被放入。
  • 当队列满时,往队列里存放元素的操作也会被阻塞,直到队列中有元素被取出。
  • 其余时间就是边生产边消费,同时进行

2.BlockQueue.hpp

基本设置:
template <class T>
class BlockQueue
{
    static const int defalutnum = 20;
public:
    BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        // low_water_ = maxcap_/3;
        // high_water_ = (maxcap_*2)/3;
    }
 ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

private:
    std::queue<T> q_; 
    //int mincap_;
    int maxcap_;      // 极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;
    // int low_water_;
    // int high_water_;
};
  1. 队列 q_ 共享资源, q被当做整体使用的,q只有一份,加锁
生产关系控制:
void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == maxcap_){ 
            pthread_cond_wait(&p_cond_, &mutex_);
        }
        q_.push(in);                   
        // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }

细节点:

  1. 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足

pthread_mutex_lock(&mutex_);

while(q_.size() == maxcap_)

  1. 判断也是在访问临界资源,在内部进行判断的,所以锁要放在外面保护
  2. 货物满了,就伪唤醒后加入等待队列
消费关系的控制
T pop()
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == 0) 
        {
            pthread_cond_wait(&c_cond_, &mutex_); 
        }
        
        T out = q_.front(); 
        q_.pop();

        // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
        pthread_cond_signal(&p_cond_);
        pthread_mutex_unlock(&mutex_);

        return out;
    }

细节点:

  1. 生产和消费需要分别设置两个等待队列
  2. 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足,while判断队列情况
⭕思考点

谁来唤醒呢?

例如:有生产,就可以解锁唤醒消费队列了

q_.pop();
pthread_mutex_unlock(&mutex_);

对策略的添加:

发现生产和消费的同步,通过水位线来进行范围管控,例如:

  1. if(q_.size()>high_water_) pthread_cond_signal(&c_cond_);//大于某一水位后,唤醒尽快消费
  2. if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);

test 函数:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

    while (true)
    {
        // 消费(存在管控的加入执行,调用等待队列的封装接口)
        Task t = bq->pop();
        
        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
        t.run();//接收到的任务对象,调用接口跑起来了
        sleep(1);//Pop前已经检验测试一大堆了
    }
}

void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    int x = 10;
    int y = 20;
    while (true)
    {
        // 模拟生产者生产数据
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 生产
        bq->push(t);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
    srand(time(nullptr));

    // 因为 321 原则
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete bq;
    return 0;
}

BlockQueue 内部可不可以传递其他数据,比如对象?比如任务??

可以。进化为基于任务的阻塞队列

进化执行 Task.hpp

Task t = bq->pop();cout 执行直接变为t.run()

例如执行如下任务

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

生产者消费者模型高效在哪里?

答案是生产者消费者模式并不高效在队列中拿放,而是在生产之前和消费之后,让线程并行执行!!
同样生产者消费者的意义也不再队列中,而是在放之前同时生产,拿之后同时消费


3. 注意点

  1. 判断生产消费条件
    • 这是因为线程可能被伪唤醒(即线程被唤醒但条件仍未满足),使用 while 可以确保线程在真正满足条件时才继续执行。
  1. pthread_cond_wait 函数
    • pthread_cond_wait 是让当前线程进入等待状态的函数。
    • 如果调用失败,线程将继续执行,可能导致逻辑错误(如尝试从空队列中取数据或向满队列中添加数据)。
  1. 多消费者情况下的唤醒
    • 使用 pthread_cond_broadcast 唤醒所有等待的消费者时,若只有一个数据可供消费,则会导致其他消费者被伪唤醒。
    • 为了避免这种情况,线程在被唤醒后应再次检查条件是否满足。
  1. 使用 while 判断的必要性
  • 在判断是否满足生产或消费条件时,应使用 while 循环而非 if 语句。
    • 为了防止伪唤醒导致的问题,必须使用 while ,确保线程在满足条件时才继续执行。

思路:上锁访问,检查达到某一条件,等待,未达到执行,检查完后解锁,所检查完后满足的线程,接收任务同时跑,线程再上锁访问,尝试获取下一个任务以此循环。

cp 模型的思路如上,简单实现了代码,下一章将结合信号量,优化完善代码并进行测试~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2059522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

公开整理-全国各省AI算力数据集(2000-2024年)

数据来源&#xff1a;本数据来源于&#xff0c;根据显卡HS编码筛选统计后获得时间跨度&#xff1a;2000-2024年数据范围&#xff1a;省级层面数据指标&#xff1a; 由于未发布2015至2016年的数据&#xff0c;因此该年份数据存在缺失。下表仅展示了部分指标及数据 年份 省份…

Mac apache 配置

命令 sudo apachectl -v //查看apache 版本 sudo apachectl -k start //启动apache sudo apachectl -k stop //停止apache sudo apachectl -k restart //重启apache配置 apache 的配置在 /etc/apache2/httpd.conf 默认情况下httpd.conf 为锁定状态&#xff0c;无法编辑 使用…

SAP B1 三大基本表单标准功能介绍-业务伙伴主数据(三)

背景 在 SAP B1 中&#xff0c;科目表、业务伙伴主数据、物料主数据被称为三大基本表单&#xff0c;其中的标准功能是实施项目的基础。本系列文章将逐一介绍三大基本表单各个字段的含义、须填内容、功能等内容。 附上 SAP B1 10.0 的帮助文档&#xff1a;SAP Business One 10…

单片机外部中断+定时器实现红外遥控NEC协议解码

单片机外部中断定时器实现红外遥控NEC协议解码 概述解码过程参考代码 概述 红外(Infrared&#xff0c;IR)遥控&#xff0c;是一种通过调制红外光实现的无线遥控器&#xff0c;常用于家电设备&#xff1a;电视机、机顶盒等等。NEC协议采用PPM(Pulse Position Modulation&#x…

敏感词替换为星号

编写一个函数&#xff0c;接收一个字符串参数&#xff0c;将其中 的敏感词替换为星号&#xff0c;并返回替换后的结果。 def getReplace(s):wordList["阿里巴巴","苹果","亚马逊","京东","字节","脸书"]for word …

月圆之夜梦儿时 贡秋竹唱响游子心声

自今年年初贡秋竹的首支单曲《逐梦》发布以来&#xff0c;其人气和传唱度便一直屡创新高&#xff0c;口碑上佳表现良好&#xff0c;网友们纷纷隔空喊话贡秋竹再发新作。时至今日&#xff0c;久经打磨的贡秋竹全新力作《低头思故乡》在千呼万唤中终于震撼首发&#xff01; 贡秋竹…

500以内开放式耳机哪款好?五款高性价比开放式耳机推荐

现在很多人会利用休闲时间进行锻炼&#xff0c;增强体质&#xff0c;在锻炼之前很多人会先入手一些运动设备&#xff0c;像慢跑鞋&#xff0c;还有臂环&#xff0c;运动手表等~当然运动耳机肯定也不能少&#xff0c;边运动边听音乐真的是一大享受&#xff01;但是哪种耳机比较适…

从零到一,全面掌握Apache DolphinScheduler发版流程,实战派经验分享!

引言 Apache DolphinScheduler的发版流程对于确保软件质量和社区协作至关重要&#xff0c;社区Committer王兴杰为我们详细介绍了Apache DolphinScheduler的发版流程&#xff0c;包括环境准备、流程文档、基础工具准备、依赖包确认等关键步骤&#xff0c;并指出了发版流程中可能…

一机两用的“多面手”既防勒索病毒又能做到数据防泄密!

随着数字化转型的加速&#xff0c;企业对互联网的依赖日益加深&#xff0c;网络安全风险也随之增加。勒索病毒作为网络安全领域的一大威胁&#xff0c;不仅加密重要文件&#xff0c;还可能泄露敏感信息&#xff0c;给企业带来巨大损失。SPN沙盒产品&#xff0c;以其独特的隔离技…

【python报错解决】ImportError: DLL load failed while importing win32gui: 找不到指定的程序

在 Python 中安装 pywin32 库 pip install pywin32安装完成后找到自己的 Python 根目录&#xff0c;在该目录下打开命令行。 在命令行中输入&#xff1a; python.exe Scripts/pywin32_postinstall.py -install执行后显示以下信息&#xff0c;即问题解决。 Parsed argumen…

KP8530X系列KP85302SGA 650V耐压 集成自举二极管的半桥栅极驱动器 专用于驱动功率MOSFET或IGBT

KP8530X系列KP85302SGA是一款 650V 耐压&#xff0c;集成自举二极管的半桥栅极驱动器&#xff0c;具有 0.3A 拉电流和 0.6A 灌电流能力&#xff0c;专用于驱动功率 MOSFETs 或 IGBTs。采用高压器件工艺技术&#xff0c;具有良好的电流输出及出色的抗瞬态干扰能力。在输入逻辑引…

React+Vis.js(05):vis.js的节点的点击事件

文章目录 需求实现思路抽屉实现完整代码需求 双击节点,弹出右侧的“抽屉”,显示节点的详细信息 实现思路 vis.network提供了一个doubleClick事件,代码如下: network.on(doubleClick, function (properties) {// console.log(nodes);let id = properties

el-date-picker根据某个时间动态规定可选的的时间范围

el-date-picker组件根据某一个时间段来动态规定当前时间选择的日期时间范围 例如&#xff1a;开始时间为2024-8-19&#xff0c;规定可循范围为30天的话&#xff0c;可选范围是2024-8-19至2024-9-19号之间 html <el-date-picker class"date" type"date"…

【GIS开发学员故事】地信本科前后跨过六个行业,勇气是人生的第七件装备

“出过外业、送过外卖、搞过环境设计......” 今天&#xff0c;我们就来看看X同学的就业故事&#xff1a; 自我介绍 我毕业于21年&#xff0c;大学是地理信息科学专业&#xff0c;考过一次研&#xff0c;但是没有考上。去年来的新中地学习GIS开发&#xff0c;目前是在广东的…

人机环境系统智能中有三种神经网络相互作用

在人机环境生态系统智能中&#xff0c;人、机器和环境之间的相互作用确实涉及到三种神经网络的协作&#xff0c;分别是人的神经网络、机器的神经网络和环境的神经网络。 1. 人的神经网络 人的神经网络指的是人类大脑及其神经系统的复杂结构&#xff0c;通过神经元之间的连接来处…

SpringBoot MySQL BinLog 监听数据变化(多库多表)

开始 1&#xff1a;引入mysql-binlog-connector-java.jar <!-- binlog --><dependency><groupId>com.zendesk</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.27.1</version></dependency>…

亦菲喊你来学习之机器学习(6)--逻辑回归算法

逻辑回归 逻辑回归&#xff08;Logistic Regression&#xff09;是一种广泛使用的统计方法&#xff0c;用于解决分类问题&#xff0c;尤其是二分类问题。尽管名字中有“回归”二字&#xff0c;但它实际上是一种分类算法&#xff0c;因为它试图通过线性回归的方式去预测一个事件…

【计算机组成原理】二、数据的表示和运算:3.算术逻辑单元ALU(逻辑运算、加法器)

4.运算器ALU 文章目录 4.运算器ALU4.1逻辑运算非&#xff08;NOT&#xff09;与&#xff08;AND&#xff09;或&#xff08;OR&#xff09;异或&#xff08;XOR&#xff09;同或&#xff08;XNOR&#xff09; 4.2加法器4.2.1一位全加器4.2.2串行加法器4.2.3并行加法器 4.3ALU功…

金九银十简历石沉大海?别投了,软件测试岗位饱和了....

各大互联网公司的接连裁员&#xff0c;政策限制的行业接连消失&#xff0c;让今年的求职雪上加霜&#xff0c;想躺平却没有资本&#xff0c;还有人说软件测试岗位饱和了&#xff0c;对此很多求职者深信不疑&#xff0c;因为投出去的简历回复的越来越少了。 另一面企业招人真的…

IDEA翻译插件-Translation

简介 Translation是一个为IntelliJ IDEA和其他基于JetBrains的IDE&#xff08;如 PyCharm、WebStorm 等&#xff09;设计的插件。这个插件的主要功能是帮助开发者在编写代码或文档时快速翻译文本。它集成了谷歌翻译、微软翻译、DeepL 翻译、OpenAI 翻译、有道翻译等众多翻译引…