【Linux】信号量和线程池

news2025/1/13 14:22:58

目录

一、POSIX信号量

二、基于环形队列和信号量的生产消费模型

三、线程池


一、POSIX信号量

POSIX信号量(POSIX Semaphores)是一种进程间或线程间同步机制,它允许进程或线程以协调的方式访问共享资源或进行其他形式的同步。与System V信号量相比,POSIX信号量提供了更简单的接口和更好的可移植性,并且可以用于线程间同步。下面是对POSIX信号量主要操作的详细解释:

信号量的本质是一把计数器,申请信号的本质就是预定资源。信号量有P操作和V操作,PV来源于荷兰语中的"Passeren"(等待/通过)和"Verhogen"(增加),但在英文文献中,它们通常被简单地称为P(或Wait)和V(或Signal)。

初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:指向sem_t类型变量的指针,该变量将被初始化为信号量。sem_t 代表一个信号量。
  • pshared:控制信号量的共享属性。如果pshared的值为0,则信号量用于同一进程的线程间的同步;如果pshared的值非0,则信号量用于不同进程间的同步。
  • value:信号量的初始值。通常,这个值是非负的,表示可用的资源数量或可以同时进入临界区的线程数。

销毁信号量

int sem_destroy(sem_t *sem);

参数:

  • sem:指向要销毁的信号量的指针。

这个函数会释放信号量所占用的资源。在调用此函数之前,所有使用该信号量的线程都应该已经退出或完成了对该信号量的等待。

等待信号量(等同于 P操作)

int sem_wait(sem_t *sem); 

参数:

  • sem:指向要等待的信号量的指针。

当信号量的值大于0时,sem_wait会原子地将信号量的值减1并立即返回;如果信号量的值为0,调用线程将被阻塞,直到信号量的值变为正数,此时将其值减1,然后sem_wait返回。通常用于保护临界区或等待资源变得可用。

发布信号量(等同于 V操作)

int sem_post(sem_t *sem);

参数:

  • sem:指向要发布的信号量的指针。

这个函数会原子地将信号量的值加1。如果这导致信号量的值大于0,且有一个或多个线程在sem_wait调用中阻塞,那么这些线程中的一个将被唤醒以继续执行。这通常用于表示资源已被释放或临界区已退出。

 注意:

  • 使用POSIX信号量时,必须确保在进程或线程结束前销毁所有已初始化的信号量,以避免资源泄漏。
  • 在使用信号量时,必须确保没有死锁或竞态条件的发生。
  • 在多线程程序中,如果信号量被多个线程共享,则需要确保对这些信号量的访问是原子的,但幸运的是,sem_init、sem_destroy、sem_wait和sem_post等函数本身就是线程安全的。
  • 当使用进程间共享的信号量时(即pshared非0),可能需要考虑额外的同步机制来确保进程能够正确访问和修改信号量的值。这通常涉及到使用进程间通信(IPC)机制,如UNIX域套接字、命名管道或共享内存等。然而,对于大多数应用而言,线程间同步是更常见的用例。

举例演示POSIX信号量的用法:

#include <iostream>
#include <semaphore.h>
#include <thread>
#include <unistd.h>
#include <chrono>

sem_t sem;

void Producer()
{
    for (int i = 0; i < 5; i++)
    {
        sem_wait(&sem); // 等待信号量
        std::cout << "Producer produced item : " << i << std::endl;
        sem_post(&sem);
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
    }
}

void Consumer()
{
    for (int i = 0; i < 5; i++)
    {
        sem_wait(&sem);
        std::cout << "Consumer consumed item " << i << std::endl;
        sem_post(&sem);
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟消费耗时
    }
}

int main()
{
    sem_init(&sem, 0, 1); // 初始化信号量为1,0表示信号量用于在同一进程内的同步

    std::thread p_td(Producer);
    std::thread c_td(Consumer);

    p_td.join();
    c_td.join();

    sem_destroy(&sem);

    return 0;
}

二、基于环形队列和信号量的生产消费模型

上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,但是使用锁都是把公共资源当作一个整体使用。如果公共资源不当作整体,多线程不访问临界资源的同一个区域,来同时进入临界区。

现在基于固定大小的环形队列重写这个程序(POSIX信号量)

  • 环形队列采用数组模拟,用模运算来模拟环状特性。
  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

RingQueue.hpp如下:

#pragma once

#include <iostream>
#include <semaphore.h>
#include <thread>
#include <unistd.h>
#include <chrono>
#include <mutex>
#include <vector>

const int defaultSize = 3;

template <class T>
class RingQueue
{
public:
    RingQueue(int size = defaultSize)
        : _rq(size), _size(size), _p_step(0), _c_step(0)
    {
        sem_init(&_space_sem, 0, size); // 生产者一开始的资源个数为size
        sem_init(&_data_sem, 0, 0);
    }

    void Push(const T &in)
    {
        // 生产,先申请信号量再加锁
        sem_wait(&_space_sem);
        {
            std::lock_guard<std::mutex> lock(_p_mutex);
            _rq[_p_step] = in;
            _p_step++;
            _p_step %= _size;
        }
        sem_post(&_data_sem); // 通知消费者
    }

    void Pop(T *out)
    {
        // 生产,先申请信号量再加锁
        sem_wait(&_data_sem);
        {
            std::lock_guard<std::mutex> lock(_c_mutex);
            *out = _rq[_c_step];
            _c_step++;
            _c_step %= _size;
        }
        sem_post(&_space_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);
    }

private:
    std::vector<T> _rq;
    int _size;
    int _p_step; // 生产者的生产位置
    int _c_step; // 消费者的消费位置

    sem_t _space_sem; // 生产者需要的资源是空间
    sem_t _data_sem;  // 消费者需要的资源是数据

    std::mutex _p_mutex;
    std::mutex _c_mutex;
};

Task.hpp如下:

#include <iostream>
#include <string>
#include <unistd.h>

const int defaultRet = 0;
const std::string opers = "+-*/%";

enum
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op)
        : _x(x), _y(y), _op(op), _flag(ok), _ret(defaultRet)
    {
    }

    void Run()
    {
        switch (_op)
        {
        case '+':
            _ret = _x + _y;
            break;
        case '-':
            _ret = _x - _y;
            break;
        case '*':
            _ret = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _flag = div_zero;
            else
                _ret = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _flag = mod_zero;
            else
                _ret = _x % _y;
        }
        break;
        default:
            _flag = unknow;
            break;
        }
    }
    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(_x);
        s += _op;
        s += std::to_string(_y);
        s += "=?";

        return s;
    }
    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(_x);
        s += _op;
        s += std::to_string(_y);
        s += "=";
        s += std::to_string(_ret);
        s += " [";
        s += std::to_string(_flag);
        s += "]";

        return s;
    }
    ~Task()
    {}

private:
    int _x;
    int _y;
    char _op;
    int _ret;
    int _flag;//为0可信,其余不可信
};

 Main.cc如下:

#include "RingQueue.hpp"
#include "Task.hpp"

void *Productor(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        int data1 = rand() % 10;
        usleep(rand() % 200); // 防止速度过快数据相同
        int data2 = rand() % 10;
        usleep(rand() % 200);
        char op = opers[rand() % (opers.size())];
        Task t(data1, data2, op);
        std::cout << "productor task: " << t.PrintTask() << std::endl;
        rq->Push(t);
    }
}

void *Consumer(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        sleep(1);
        Task t;
        rq->Pop(&t);
        t();
        std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;
    }
}

int main()
{
    srand((uint64_t)time(nullptr) ^ pthread_self());
    pthread_t c[3], p[2];
    RingQueue<Task> *rq = new RingQueue<Task>();

    pthread_create(&p[0], nullptr, Productor, rq);
    pthread_create(&p[1], nullptr, Productor, rq);
    pthread_create(&c[0], nullptr, Consumer, rq);
    pthread_create(&c[1], nullptr, Consumer, rq);
    pthread_create(&c[2], nullptr, Consumer, rq);

    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(c[2], nullptr);
    return 0;
}

三、线程池

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

 ThreadPool.hpp如下:

#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <pthread.h>
#include "Log.hpp"
#include "Task.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

const int defaultnum = 5; // 线程池默认线程数量

class ThreadData
{
public:
    ThreadData(const std::string &threadname)
        : _threadname(threadname)
    {
    }
    ~ThreadData()
    {
    }

public:
    std::string _threadname;
};

template <class T>
class ThreadPool
{
public:
    ThreadPool(int thread_num = defaultnum)
        : _thread_num(thread_num)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
        // 构建指定个数的线程
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread-";
            threadname += std::to_string(i + 1);
            ThreadData td(threadname);

            // _threads.emplace_back(threadname,
            //                       std::bind(&ThreadPool<T>::ThreadRun, this,
            //                                 std::placeholders::_1),
            //                       td);
            Thread<ThreadData> t(threadname,
                                 std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
            _threads.push_back(t);
            // ThreadRun是成员函数,第一个参数为this,bind的作用是将this绑定到函数的第一个变量
            lg.LogMessage(Info, "%s is created...\n", threadname.c_str());
        }
    }
    ThreadPool(const ThreadPool<T> &tp) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;

    void ThreadWait(const ThreadData &td)
    {
        lg.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
        pthread_cond_wait(&_cond, &_mtx);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            T t; // 取任务
            {
                LockGuard lock(&_mtx);
                while (_q.empty())
                {
                    ThreadWait(td);
                    lg.LogMessage(Debug, "thread %s is wakeup\n", td._threadname.c_str());
                }
                t = _q.front();
                _q.pop();
            }
            // 处理任务
            t();
            lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
                          td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }
    bool Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
            lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }
        return true;
    }

    void Push(T &in)
    {
        lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
        LockGuard lock(&_mtx);
        _q.push(in);
        ThreadWakeup();
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
        for (auto &thread : _threads)
        {
            thread.Join();
        }
    }

private:
    std::queue<T> _q;                         // 任务队列
    std::vector<Thread<ThreadData>> _threads; // 线程集合
    int _thread_num;                          // 线程池中线程的数量
    pthread_mutex_t _mtx;                     // 互斥锁,同步对任务队列的访问
    pthread_cond_t _cond;                     // 条件变量,和互斥锁一起使用
};

main.cc如下:

#include "ThreadPool.hpp"
#include <unistd.h>

int main()
{
    lg.ChangeStyle(ClassFile);
    ThreadPool<Task> tp;
    tp.Start();
    srand((uint64_t)time(nullptr) ^ getpid());

    while (true)
    {
        int x = rand() % 100 + 1;
        usleep(1234);
        int y = rand() % 200;
        usleep(1234);
        char oper = opers[rand() % opers.size()];
        Task t(x,y,oper);
        tp.Push(t);
        sleep(1);
    }

    return 0;
}

具体代码:模拟实现线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1921052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用MATLAB绘制三向应力圆

% 定义主应力值 sigma1 100; % MPa sigma2 50; % MPa sigma3 -33; % MPa sigma_m1(sigma1 sigma3)/2; sigma_m2(sigma1 sigma2)/2; sigma_m3(sigma2 sigma3)/2; % 计算半径 r1 (sigma1 - sigma3) / 2; r2 (sigma1 - sigma2) / 2; r3 (sigma2 - sigma3…

2024年16个适合现代应用程序的最佳API网关

什么是API&#xff1f; API是一个软件解决方案&#xff0c;作为中介&#xff0c;使两个应用程序能够相互交互。以下一些特征让API变得更加有用和有价值&#xff1a; 遵守REST和HTTP等易于访问、广泛理解和开发人员友好的标准。API不仅仅是几行代码&#xff1b;这些是为移动开…

生成式AI (Generative artificial intelligence, GenAI or GAI)

安利一个新加坡南洋理工大学的论文总结The Age of Generative AI 一、什么是生成式AI Generative AI, sometimes called gen AI, is artificial intelligence (AI) that can create original content—such as text, images, video, audio or software code—in response to a …

AIGC降痕指南:如何让AI写作不留痕迹

随着AI技术的飞速发展&#xff0c;AI论文工具正逐渐成为学术界的新宠。它们以高效、便捷的优势&#xff0c;吸引了众多学者的目光。然而&#xff0c;随之而来的学术诚信与原创性问题&#xff0c;也成为人们关注的焦点。 如何在享受AI带来的便利的同时&#xff0c;确保论文的原…

凯泽斯劳滕理工大学通过TS-AWG全新DDS固件选件加速量子计算机开发

凯泽斯劳滕理工大学&#xff08;Technische Universitt Kaiserslautern&#xff09;&#xff0c;位于德国莱茵兰-普法尔茨州&#xff0c;是一所国立理工科大学。该大学成立于1970年7月13日&#xff0c;最初是特里尔/凯泽斯劳滕兄弟大学的一部分。1975年&#xff0c;凯泽斯劳滕理…

2025~《数据结构》试题~考研

作者主页: 知孤云出岫 目录 数据结构模拟卷一、选择题&#xff08;每题2分&#xff0c;共20分&#xff09;二、填空题&#xff08;每题3分&#xff0c;共15分&#xff09;三、简答题&#xff08;每题10分&#xff0c;共30分&#xff09;四、编程题&#xff08;每题15分&#x…

Flutter跨平台开发技术

仅分享文字&#xff0c;见谅 Flutter Flutter 介绍 功能跨平台性架构流行度Flutter vs React Native 配置 Windows Flutter App 环境配置 Tizen Flutter App 环境用 Dart 语言开发 Flutter AppFlutter-Tizen 的限制 Flutter 介绍 Flutter 是由 Google 推出的开源移动应用开发…

zabbix web页面添加对nginx监控

1.nginx安装zabbix-agent2,并修改配置文件中server地址为zabbix-server的地址 ]# egrep ^Server|^Hostname /etc/zabbix/zabbix_agent2.conf Server172.16.1.162 ServerActive172.16.1.162 Hostnameweb01 2.zabbix web页面上进行添加客户端 3.默认的nginx监控模板中的状态模块…

C++心决之stl中那些你不知道的秘密(string篇)

目录 1. 为什么学习string类&#xff1f; 1.1 C语言中的字符串 2. 标准库中的string类 2.1 string类 2.2 string类的常用接口说明 1. string类对象的常见构造 2. string类对象的操作 3.vs和g下string结构的说明 3. string类的模拟实现 3.2 浅拷贝 3.3 深拷贝 3.4 写…

【算法】【二分法】二分法详解

先给y总打一个广告。&#xff08;我这种废物收不到钱&#xff09; 本科时候就在打蓝桥杯玩玩算法&#xff0c;当时听朋友的一个刷题且涵盖教程的网站&#xff0c;ACWING。 www.acwing.com 里面好处是大部分基础算法都有&#xff0c;Y总的视频&#xff01; y总我的神&#xff01…

设计模式——适配器设计模式

设计模式——适配器设计模式 适配器设计模式1.1 基本介绍1.2 工作原理1.3 类适配器模式1.3.1 基本介绍1.3.2 示例1.3.3 代码实现1.3.4 注意事项 1.4 对象适配器模式1.4.1 基本介绍1.4.2 示例1.4.3 代码实现1.4.4 注意事项 1.5 接口适配器模式1.5.1 基本介绍1.5.2 示例1.5.3 代码…

Web3 社交领域的开发技术

Web3 社交领域的开发技术主要包括以下几种&#xff0c;随着 Web3 技术的不断发展&#xff0c;Web3 社交领域将会出现更多新的技术和应用场景。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1. 区块链技术 区块链技术是 Web3 社交的…

在Rstudio中点一点就出来了一个R包

新建一个Package Build一个Package 更多开发指南 https://r-pkgs.org/

vscode使用及调试方式和技巧

常用快捷键 ctrl ~ 显示隐藏终端面板 Ctrl\ 快速拆分文件编辑 Alt ↑↓ 移动当前代码行的位置 CtrlD 选中当前匹配项 CtrlB 切换侧边栏 alt 单机左键 或 长按鼠标滚轮鼠标左键下拉 添加多处光标 Ctrlp 快捷键设置 vscode调试 2022年了&#xff0c;该学会用VSC…

通用详情页的打造

背景介绍 大家都知道&#xff0c;详情页承载了站内的核心流量。它的量级到底有多大呢&#xff1f; 我们来看一下&#xff0c;日均播放次数数亿次&#xff0c;这么大的流量&#xff0c;其重要程度可想而知。 在这样一个页面&#xff0c;每一个功能都是大量业务的汇总点。 作为…

RayLink企业版正式上线!

哈咯大家~我是小R 经过RayLink团队的努力&#xff0c;大家期待的RayLink企业版正式上线了&#xff0c;相对于传统的远程控制软件&#xff0c;企业版本更能满足对于企业的安全性&#xff0c;扩展性&#xff0c;以来满足企业不断变化的业务需求。 RayLink企业版&#xff1a;一站…

Android C++系列:Linux网络(二)通信过程

上图对应两台计算机在同一网段中的情况,如果两台计算机在不同的网段中,那么数据从一台计算机到另一台计算机传输过程中要经过一个或多个路由器,如下图所示其实在链路层之下还有物理层,指的是电信号的传递方式,比如现在以太网通用的网线 (双绞线)、早期以太网采用的的同轴电…

逆向之在浏览器上对window等对象进行hook

一般情况下&#xff0c;在chrome浏览器上使用JS对window document等对象是无法hook的&#xff0c;除非魔改浏览器底层代码&#xff0c;原因是因为对象的configurable属性为false 这样如果需要对document对象使用JS进行hook,首先需要一个可配置的chrome浏览器&#xff0c;可以在…

亚信科技基于 Apache SeaTunnel 的二次开发应用实践

亚信科技在Apache SeaTunnel的实践分享 自我介绍 各位同学好&#xff0c;很荣幸通过Apache SeaTunnel社区和大家进行分享交流。我是来自亚信科技的潘志宏&#xff0c;主要负责公司内部数据中台产品的开发。 本次分享的主题是Apache SeaTunnel在亚信科技的集成实践&#xff0c…

简单客服聊天数据库设计

1、主要功能包含&#xff1a; 收发消息&#xff0c;聊天列表&#xff0c;未读消息&#xff0c;修改为已读消息&#xff0c;双方对话内容记录。2、表结构&#xff1a; bds_user_message&#xff08;聊天消息内容表&#xff09; 3、业务代码没有特殊处理&#xff0c;就只放几…