线程互斥与同步--Linux

news2024/10/9 22:20:54

文章目录

  • 线程互斥的概念与意义
    • 互斥的原理--原子性
    • 关于售票模拟的互斥应用
    • 死锁问题
  • 线程同步的概念与意义
    • 条件变量实现同步
  • 生产者消费者模型--互斥与同步
    • 基于阻塞队列的生产者消费者模型
    • 基于环形队列的生产者消费者模型
      • POSIX信号量
  • 线程池
  • 线程安全下的单例模式
  • 总结

线程互斥的概念与意义

线程作为执行流和基本的调度单位,在一个进程中被调度和执行时,一个全局变量很可能会被多个线程同时访问到,一旦涉及写入操作,并且某个线程A在写入时又被切走了,线程B切入,线程B写入完成后又切回线程A,此时就很有可能会出现问题。具体可能出什么问题,我们以++操作来进行阐述:

image-20230129221411813

可以观察出,在++的过程中,多线程还是可能会出问题的,线程被切走的可能性还是有的,有可能时间片到了,也有可能是被信号暂停了……可能性虽然小,但是必须要预防,一旦出现线程安全问题,在涉及到金钱交易时,更是可能会导致经济损失。为了解决这种类似的问题,互斥的概念以及应用就应运而生。

首先,类似上面的全局变量gval被多个线程同时看到并被使用,被称为临界资源。每个线程内部,访问临界资源的代码,就叫做临界区。而任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。因此线程互斥的意义在于保证多个线程能够安全地使用共享资源。

互斥的原理–原子性

实现互斥,就得保证临界区在同一时间只能有一个线程进入,一旦有线程进入,就给临界区上锁,使得其他线程一旦准备进入该区域,就因为没有进入权限而被挂起,阻塞的等待在临界区前。上面给临界区上锁的过程称为加锁。问题在于,既然所有的线程都可能竞争到锁,那么这个锁也可以被认为是临界资源,这就会造成概念上的死循环:使用临界资源来保护临界资源。打破这个循环的编程者在设计锁的时候,使得申请锁的过程为原子操作。原子操作的解决问题的根源在于:内存与寄存器的数据交换是不可分割的,要不是交换完成,要不就是交换失败,根本不可能存在中间态从而给其他线程制造切入的机会。原子性的特性就很好理解了:不可分割与拆分。最终使得申请锁的过程中,即使发生了线程切换,也不会在发生在数据交换的时候,这是汇编语言保证的。下面给出图解:

image-20230130203502738

关于售票模拟的互斥应用

接下来我们演示一下多线程非互斥与互斥的情景,具体应用一下。

抢票模拟:创建若干线程,使得所有的线程对一个全局变量进行–操作,以实现购票的操作。

​ 非互斥抢票:

#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;

int tickets=10000;//总票数
void* buyTickets(void* arg)
{
    char* name=static_cast<char*>(arg);
    while(true)
    {
        if(tickets>0)//票数大于0才进行抢票
        {
            usleep(1000);//制造被切走的机会
            cout<<name<<" 抢到了票: "<<tickets<<endl;
            tickets--;
        }
        else break;
    }
    return nullptr;
}
int main()
{
    pthread_t t1,t2,t3,t4;//创建若干个线程抢票
    pthread_create(&t1,nullptr,buyTickets,(void*)"thread 1");
    pthread_create(&t2,nullptr,buyTickets,(void*)"thread 2");
    pthread_create(&t3,nullptr,buyTickets,(void*)"thread 3");
    pthread_create(&t4,nullptr,buyTickets,(void*)"thread 4");

    //回收线程资源
    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
    pthread_join(t3,nullptr);
    pthread_join(t4,nullptr);

    return 0;
}

image-20230130205747371

运行的结果中居然在票数为0和-1的时候也会抢票成功,证明多线程不加锁确实会出问题。

在模拟互斥抢票之前,我们得先认识锁的数据类型和几个函数来帮助我们进行加锁操作。

pthread_mutex_t : 锁的数据类型

image-20230130211501959

pthread_mutex_init:

作用:初始化锁

参数:

mutex:要初始化的锁的地址

attr:要设置的锁的属性,默认传nullptr

返回值:成功返回0,失败返回错误码

pthread_mutex_destroy:

作用:销毁锁,防止内存泄漏(与pthread_mutex_init搭配使用)

参数:

mutex:要销毁的锁的地址

返回值:成功返回0,失败返回错误码

pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER 语句可以直接设置一个可以使用的锁,且不需要进行手动销毁。

pthread_mutex_lock:

作用:加锁

参数:

mutex:使用哪个锁进行加锁

返回值:成功返回0,失败返回错误码

pthread_mutex_unlock:

作用:解锁

参数:

mutex:使用哪个锁进行解锁

返回值:成功返回0,失败返回错误码

​ 互斥抢票模拟:

#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;

pthread_mutex_t mutex; // 互斥锁
int tickets = 10000;   // 总票数
void *buyTickets(void *arg)
{
    char *name = static_cast<char *>(arg);
    while (true)
    {
        pthread_mutex_lock(&mutex); // 加锁

        if (tickets > 0) // 票数大于0才进行抢票
        {
            usleep(1000); // 制造被切走的机会
            cout << name << " 抢到了票: " << tickets << endl;
            tickets--;
            pthread_mutex_unlock(&mutex); // 解锁
            usleep(123);                  // 抢完票暂时沉寂一下,给其他线程抢的机会
        }
        else 
        {
            pthread_mutex_unlock(&mutex); // 解锁,在这里多放一个解锁语句,防止条件不满足时有一个线程无法完成解锁
            usleep(123);                  // 抢完票暂时沉寂一下,给其他线程抢的机会
            break;
        }
    }
    return nullptr;
}
int main()
{
    pthread_mutex_init(&mutex, nullptr); // 初始化锁
    pthread_t t1, t2, t3, t4;            // 创建若干个线程抢票
    pthread_create(&t1, nullptr, buyTickets, (void *)"thread 1");
    pthread_create(&t2, nullptr, buyTickets, (void *)"thread 2");
    pthread_create(&t3, nullptr, buyTickets, (void *)"thread 3");
    pthread_create(&t4, nullptr, buyTickets, (void *)"thread 4");

    // 回收线程资源
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);
    pthread_join(t4, nullptr);
    pthread_mutex_destroy(&mutex); // 销毁锁
    cout<<"抢票完成"<<endl;
    return 0;
}

image-20230130214505483

加完锁之后,多个线程之间明显没有冲突了。有兴趣的老铁可以试一下代码,明显加完锁的代码运行时间更长。

死锁问题

在使用锁的过程中,很可能会因为使用不当造成死锁现象,导致整个进程都被阻塞。

死锁四个必要条件

互斥条件:一个资源每次只能被一个执行流使用

请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

上面前三种条件是使用锁时的默认条件,最后一个条件一般是程序员的代码造成的。

要想避免死锁,就得规范使用锁,首先要加锁顺序一致,避免未释放锁,资源一次性分配等等。

下面我们模拟一个由于加锁顺序不一致而造成的死锁的场景:

#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;

pthread_mutex_t mutexA,mutexB;//两个互斥锁
void* startRoutineA(void* arg)
{
    pthread_mutex_lock(&mutexA);
    sleep(1);
    pthread_mutex_lock(&mutexB);
    while(true)
    {
        cout<<pthread_self()<<" is running……"<<endl;
        sleep(1);
    }
    pthread_mutex_unlock(&mutexA);
    pthread_mutex_unlock(&mutexB);
}
void* startRoutineB(void* arg)
{
    pthread_mutex_lock(&mutexB);
    sleep(1);
    pthread_mutex_lock(&mutexA);
    while(true)
    {
        cout<<pthread_self()<<" is running……"<<endl;
        sleep(1);

    }
    pthread_mutex_unlock(&mutexB);
    pthread_mutex_unlock(&mutexA);
}
int main()
{
    pthread_mutex_init(&mutexA, nullptr); 
    pthread_mutex_init(&mutexB, nullptr);
    pthread_t t1, t2;            
    pthread_create(&t1, nullptr, startRoutineA, nullptr);
    pthread_create(&t2, nullptr, startRoutineB, nullptr);

    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_mutex_destroy(&mutexA);
    pthread_mutex_destroy(&mutexB);
    return 0;
}

运行的结果就是线程都阻塞住,主要原因就是两个线程形成一种头尾相接的循环等待资源的关系,都各自拿了一个锁,去申请对方手中的锁。

线程同步的概念与意义

线程同步是在线程互斥的基础之上进行优化的产物,乍一看互斥与同步在概念上应该是冲突的,但实际上这里的同步不是指所有的线程同时访问临界区,而是有序的访问临界区。由于加锁与解锁的操作相比于唤醒一个线程的成本是要低很多的,因此很可能会出现一个线程释放锁之后,在其他线程醒来之前马上就抢到锁了,这会导致其他线程被闲置,浪费资源。并且,如果因某些资源暂时不能满足某些线程的运行条件,完全可以设定一些运行顺序使得该线程等待,避免饥饿问题。因此线程同步在有些场景下是的非常有必要的。

条件变量实现同步

条件变量的使用和锁有点类似,都是使用一个变量来控制所有依赖该变量的线程,进行挂起或唤醒某些线程的操作。

image-20230131193255876

pthread_cond_t:条件变量的数据类型

pthread_cond_init:

作用:初始化条件变量

参数:

cond:要初始化的条件变量

attr:条件变量的属性,默认传空指针

返回值:成功返回0,失败返回错误码

pthread_cond_destroy:

作用:销毁条件变量,一般与pthread_cond_init搭配使用

参数:

cond:要销毁的条件变量

返回值:成功返回0,失败返回错误码

pthread_cond_t cond=PTHREAD_COND_INITIALIZER 语句可以直接设置一个可以使用的条件变量,且不需要进行手动销毁。

pthread_cond_wait:

作用:使一个依赖某个条件变量的线程挂起等待

参数:

cond:线程所依赖的条件变量

mutex:互斥锁。🔺一般线程在使用条件变量时,可能在访问临界区,因此可能会在加锁的情况下使用的cond_wait,而等待会使得整个线程被挂起,但是不能拿着锁被挂起,否则会使得其他线程申请不到锁而被阻塞,因此在挂起该线程之前要释放锁,因此传入锁来实现解锁挂起,唤醒之后自动加锁的操作。之所以在一个函数中完成解锁和挂起等待的操作,是为了保证整个过程是原子的。一旦设计成两个独立的操作,就会导致在解锁与挂起等待之间线程被切走,一旦切走就可能会导致释放的锁再也拿不回来了,存在死锁的隐患。

返回值:成功返回0,失败返回错误码

pthread_cond_signal:

作用:给依赖某个条件变量的线程发信号,唤醒该线程(按序循环唤醒)

参数:

cond:线程所依赖的条件变量

返回值:成功返回0,失败返回错误码

pthread_cond_broadcast:

作用:使得所有依赖于某个条件变量的线程被唤醒

参数:

cond:线程所依赖的条件变量

返回值:成功返回0,失败返回错误码

生产者消费者模型–互斥与同步

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

image-20230131205722378

优点:解耦、支持并发、支持忙闲不均

基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)

image-20230131225554293

#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include<string>
using namespace std;
#define NUM 5

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int cap = NUM) // 在创建阻塞队列时,完成对锁和条件变量的初始化
        : _cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_proCond, nullptr);
        pthread_cond_init(&_comCond, nullptr);
    }
    ~BlockQueue() // 退出时完成资源的释放
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_proCond);
        pthread_cond_destroy(&_comCond);
    }

public:
    bool isFull()
    {
        return _q.size() == _cap;
    }
    bool isEmpty()
    {
        return _q.size() == 0;
    }
    void lockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void unlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void productorWait()
    {
        pthread_cond_wait(&_proCond, &_mutex);
    }
    void consumerWait()
    {
        pthread_cond_wait(&_comCond, &_mutex);
    }
    void productorWake()
    {
        pthread_cond_signal(&_proCond);
    }
    void consumerWake()
    {
        pthread_cond_signal(&_comCond);
    }
    void put(const T &data)
    {
        lockQueue();
        while (isFull())
        {
            consumerWake();
            productorWait();
        }
        _q.push(data);
        consumerWake();
        unlockQueue();
    }
    T take()
    {
        lockQueue();
        while (isEmpty())
        {
            productorWake();
            consumerWait();
        }
        productorWake();
        T tmp = _q.front();
        _q.pop();
        unlockQueue();
        return tmp;
    }
private:
    queue<T> _q;             // 队列
    int _cap;                // 队列的大小
    pthread_mutex_t _mutex;  // 生产者与消费者共用一个互斥锁
    pthread_cond_t _proCond; // 生产者所使用的条件变量
    pthread_cond_t _comCond; // 消费者所使用的条件变量
};

static const string str="+-*/%";
class Task//任务类
{
public:
    Task() : elemOne_(0), elemTwo_(0), operator_('0')
    {}
    Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
    {}
    int operator() ()
    {
        return run();
    }
    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
        {
            if (elemTwo_ == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ / elemTwo_;
            }
        }
        break;
        case '%':
        {
            if (elemTwo_ == 0)
            {
                cout << "mod zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ % elemTwo_;
            }
        }
        break;
        default:
            cout << "非法操作: " << operator_ << endl;
            break;
        }
        return result;
    }
    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }
private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};


void *productor(void *arg)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);
    while(true)
    {
        int one=rand()%100;
        int two=rand()%100;
        char op=str[rand()%(str.size())];
        cout<<pthread_self()<< "于 "<<(unsigned int)time(nullptr)<<" 生产任务: "<<one<<op<<two<<"=?"<<endl;
        Task t(one,two,op);
        bq->put(t);
        sleep(1);
    }
    return nullptr;
}

void *consumer(void *arg)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);
    while(true)
    {
        Task t=bq->take();
        int one,two;
        char op;
        t.get(&one,&two,&op);
        cout<<pthread_self()<<"于 "<<(unsigned int)time(nullptr)<<" 处理任务: "<<one<<op<<two<<"="<<t.run()<<endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    pthread_t c1, c2, c3, p1, p2, p3;
    BlockQueue<Task> bq;
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self()); // 随机生成数字
    pthread_create(&p1, nullptr, productor, (void *)&bq);
    pthread_create(&p2, nullptr, productor, (void *)&bq);
    pthread_create(&p3, nullptr, productor, (void *)&bq);
    pthread_create(&c1, nullptr, consumer, (void *)&bq);
    pthread_create(&c2, nullptr, consumer, (void *)&bq);
    pthread_create(&c3, nullptr, consumer, (void *)&bq);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    return 0;
}

image-20230131225736616

基于环形队列的生产者消费者模型

关于环形队列,这里我就不多赘述了,环形队列的优势在于,如果队列不为空或者不为满,那么不会对同一个数据进行写入数据和拿走数据的操作,这种情况下生产者与消费者不会出现互斥的现象,也就可以实现并发,即生产者和消费者同时工作。

POSIX信号量

环形队列的难点在于队列为空或者为满时的状态是一样的,不好区分。但是如果我们使用两个计数器来分别记录剩余空间和生成数据的个数,就很容易实现线程间的并发了。并且比较特殊的是,信号量的++和–操作都是原子性的,因此在使用的时候并不用担心会出现线程安全问题。

image-20230201084525881

sem_t:信号量的数据类型

sem_init:

作用:初始化信号量

参数:

sem:要初始化的信号量

pshared :0表示线程间共享,非零表示进程间共享

value:信号量初始值

返回值:成功返回0;失败返回-1,并设置errno

sem_destroy

作用:销毁信号量

参数:

sem:要销毁的信号量

返回值:成功返回0;失败返回-1,并设置errno

sem_wait:

作用:使得信号量在大于0时自减1(原子性),等于0时使得该线程被挂起,直到信号量大于0才会被唤醒。

参数:

sem:要–的信号量

返回值:成功返回0;失败返回-1,并设置errno

sem_post:

作用:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。(原子性)

参数:

sem:要++的信号量

返回值:成功返回0;失败返回-1,并设置errno

#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <string>
#include <vector>
using namespace std;
#define NUM 5

static const string str="+-*/%";
class Task//任务类
{
public:
    Task() : elemOne_(0), elemTwo_(0), operator_('0')
    {
    }
    Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
    {
    }
    int operator() ()
    {
        return run();
    }
    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
        {
            if (elemTwo_ == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ / elemTwo_;
            }
        }
        break;
        case '%':
        {
            if (elemTwo_ == 0)
            {
                cout << "mod zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ % elemTwo_;
            }
        }
        break;
        default:
            cout << "非法操作: " << operator_ << endl;
            break;
        }
        return result;
    }
    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }
private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

template<class T>
class RingQueue
{
public:
    RingQueue(const int cap=NUM):ringqueue_(cap),pIndex_(0), cIndex_(0)
    {
        pthread_mutex_init(&pmutex_,nullptr);
        pthread_mutex_init(&cmutex_,nullptr);
        sem_init(&roomSem_,0,cap);
        sem_init(&dataSem_,0,0);
    }
    ~RingQueue()
    {
        pthread_mutex_destroy(&pmutex_);
        pthread_mutex_destroy(&cmutex_);
        sem_destroy(&roomSem_);
        sem_destroy(&dataSem_);
    }
    void put(const T& data)
    {
        sem_wait(&roomSem_);
        pthread_mutex_lock(&pmutex_);
        ringqueue_[pIndex_]=data;
        pIndex_++;
        pIndex_%=ringqueue_.size();
        pthread_mutex_unlock(&pmutex_);
        sem_post(&dataSem_);
    }
    T take()
    {
        sem_wait(&dataSem_);
        pthread_mutex_lock(&cmutex_);
        T tmp=ringqueue_[cIndex_];
        cIndex_++;
        cIndex_%=ringqueue_.size();
        pthread_mutex_unlock(&cmutex_);
        sem_post(&roomSem_);
        return tmp;
    }
private:
    vector<T> ringqueue_; // 数组模拟环形队列
    sem_t roomSem_;       // 衡量空间计数器,productor
    sem_t dataSem_;       // 衡量数据计数器,consumer
    uint32_t pIndex_;     // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
    uint32_t cIndex_;     // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源

    pthread_mutex_t pmutex_;// 生产者使用的互斥锁
    pthread_mutex_t cmutex_;// 消费者使用的互斥锁
};

void *productor(void *arg)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(arg);
    while(true)
    {
        int one=rand()%100;
        int two=rand()%100;
        char op=str[rand()%(str.size())];
        cout<<pthread_self()<< "于 "<<(unsigned int)time(nullptr)<<" 生产任务: "<<one<<op<<two<<"=?"<<endl;
        Task t(one,two,op);
        rq->put(t);
        sleep(1);
    }
    return nullptr;
}

void *consumer(void *arg)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(arg);
    while(true)
    {
        Task t=rq->take();
        int one,two;
        char op;
        t.get(&one,&two,&op);
        cout<<pthread_self()<<"于 "<<(unsigned int)time(nullptr)<<" 处理任务: "<<one<<op<<two<<"="<<t.run()<<endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    pthread_t c1, c2, c3, p1, p2, p3;
    RingQueue<Task> rq;
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self()); // 随机生成数字
    pthread_create(&p1, nullptr, productor, (void *)&rq);
    pthread_create(&p2, nullptr, productor, (void *)&rq);
    pthread_create(&p3, nullptr, productor, (void *)&rq);
    pthread_create(&c1, nullptr, consumer, (void *)&rq);
    pthread_create(&c2, nullptr, consumer, (void *)&rq);
    pthread_create(&c3, nullptr, consumer, (void *)&rq);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    return 0;
}

image-20230201093519687

线程池

线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

线程池示例:

  1. 创建固定数量线程池,循环从任务队列中获取任务对象,

  2. 获取到任务对象后,执行任务对象中的任务接口

#include <iostream>
#include <cstdio>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <string>
using namespace std;
#define NUM 5
static const string str = "+-*/%";
class Task // 任务类
{
public:
    Task() : elemOne_(0), elemTwo_(0), operator_('0')
    {
    }
    Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
    {
    }
    int operator()()
    {
        return run();
    }
    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
        {
            if (elemTwo_ == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ / elemTwo_;
            }
        }
        break;
        case '%':
        {
            if (elemTwo_ == 0)
            {
                cout << "mod zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ % elemTwo_;
            }
        }
        break;
        default:
            cout << "非法操作: " << operator_ << endl;
            break;
        }
        return result;
    }
    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }

private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

template <class T>
class ThreadPool
{
public:
    ThreadPool(const int& threadNum=NUM):threadNum_(threadNum),isStart_(false)
    {
        assert(threadNum_>0);
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&cond_,nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
    static void* threadpool(void* arg)
    {
        pthread_detach(pthread_self());
        ThreadPool<T>* tp=static_cast<ThreadPool<T>*>(arg);
        while(true)
        {
            tp->lockQueue();
            while(!tp->haveTask())
            {
                tp->waitForTask();
            }
            T tmp=tp->pop();
            tp->unlockQueue();


            int one,two;
            char op;
            tmp.get(&one,&two,&op);
            cout<<pthread_self()<<"于 "<<(unsigned int)time(nullptr)<<" 处理任务: "<<one<<op<<two<<"="<<tmp.run()<<endl;
        }
        return nullptr;
    }
    void start()
    {
        assert(!isStart_);
        for(int i=0;i<threadNum_;++i)
        {
            pthread_t tmp;
            pthread_create(&tmp,nullptr,threadpool,this);
        }
    }
    void put(const T &data)
    {
        lockQueue();
        taskQueue_.push(data);
        unlockQueue();
        choiceThreadForHandler();
    }
private:
    void lockQueue() { pthread_mutex_lock(&mutex_); }
    void unlockQueue() { pthread_mutex_unlock(&mutex_); }
    bool haveTask() { return !taskQueue_.empty(); }
    void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
    void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
    T pop()
    {
        T temp = taskQueue_.front();
        taskQueue_.pop();
        return temp;
    }

private:
    bool isStart_;
    int threadNum_;
    queue<T> taskQueue_;
    pthread_mutex_t mutex_;
    pthread_cond_t cond_;
};
int main()//主线程给线程池发布任务
{
    ThreadPool<Task> tp;
    tp.start();
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self()); // 随机生成数字
    while(true)
    {
        int one,two;
        char op;
        one=rand()%100,two=rand()%100;
        op=str[rand()%str.size()];
        Task t(one,two,op);
        tp.put(t);
        cout<<pthread_self()<< "于 "<<(unsigned int)time(nullptr)<<" 生产任务: "<<one<<op<<two<<"=?"<<endl;
        sleep(1);
    }
    return 0;
}

image-20230201135313445

线程安全下的单例模式

当时在学习特殊类[传送门]的时候,有一种类是只会有一个实体,只能构造一个对象,具体的代码片段如下:

//懒汉模式
class Singleton
{
public:
	static Singleton* CallObj()
	{
		if (_slt == nullptr)   //为空的话再决定开辟空间
		{
			_slt = new Singleton;
		}
		return _slt;
	}
	Singleton(const Singleton&) = delete;
	Singleton& operator=(const Singleton&) = delete;
	void Write()
	{
		cin >> _str;
	}
	void Read()
	{
		cout << _str << endl;
	}
private:
	Singleton()
	{}
	static Singleton* _slt;
	string _str;
};
Singleton* Singleton::_slt = nullptr;  //只初始化,并不开辟空间

在单线程之下这样跑是没问题的,但是多线程之下会在判断空指针那里出现问题:

image-20230201143101409

如此就会造成开辟两次(多次)空间来构造对象,这与单例预期相违背,因此必须得在开辟空间那里加锁。

接下来我们尝试线程池进行单例模式的实现:

#include <iostream>
#include <cstdio>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <string>
using namespace std;
#define NUM 5
static const string str = "+-*/%";
class Task // 任务类
{
public:
    Task() : elemOne_(0), elemTwo_(0), operator_('0')
    {
    }
    Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
    {
    }
    int operator()()
    {
        return run();
    }
    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
        {
            if (elemTwo_ == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ / elemTwo_;
            }
        }
        break;
        case '%':
        {
            if (elemTwo_ == 0)
            {
                cout << "mod zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ % elemTwo_;
            }
        }
        break;
        default:
            cout << "非法操作: " << operator_ << endl;
            break;
        }
        return result;
    }
    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }

private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

template <class T>
class ThreadPool
{
private:
    ThreadPool(const int &threadNum = NUM) : threadNum_(threadNum), isStart_(false)
    {
        assert(threadNum_ > 0);
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    ThreadPool(const ThreadPool<T> &) = delete;     // 删除拷贝构造
    void operator=(const ThreadPool<T> &) = delete; // 删除赋值构造
public:
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
    static ThreadPool<T> *getInstance() // 申请类的对象
    {
        pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
        if (instance == nullptr)
        {
            pthread_mutex_lock(&mutex);
            if (instance == nullptr)
            {
                instance = new ThreadPool<T>();
            }
            pthread_mutex_unlock(&mutex);
        }
        return instance;
    }
    static void *threadpool(void *arg)
    {
        pthread_detach(pthread_self());
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(arg);
        while (true)
        {
            tp->lockQueue();
            while (!tp->haveTask())
            {
                tp->waitForTask();
            }
            T tmp = tp->pop();
            tp->unlockQueue();

            int one, two;
            char op;
            tmp.get(&one, &two, &op);
            cout << pthread_self() << "于 " << (unsigned int)time(nullptr) << " 处理任务: " << one << op << two << "=" << tmp.run() << endl;
        }
        return nullptr;
    }
    void start()
    {
        assert(!isStart_);
        for (int i = 0; i < threadNum_; ++i)
        {
            pthread_t tmp;
            pthread_create(&tmp, nullptr, threadpool, this);
        }
    }
    void put(const T &data)
    {
        lockQueue();
        taskQueue_.push(data);
        unlockQueue();
        choiceThreadForHandler();
    }

private:
    void lockQueue() { pthread_mutex_lock(&mutex_); }
    void unlockQueue() { pthread_mutex_unlock(&mutex_); }
    bool haveTask() { return !taskQueue_.empty(); }
    void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
    void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
    T pop()
    {
        T temp = taskQueue_.front();
        taskQueue_.pop();
        return temp;
    }

private:
    bool isStart_;
    int threadNum_;
    queue<T> taskQueue_;
    pthread_mutex_t mutex_;
    pthread_cond_t cond_;
    static ThreadPool<T> *instance;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr; // 懒汉模式

int main()
{
    ThreadPool<Task>* tp=ThreadPool<Task>::getInstance();//申请对象
    tp->start();
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self()); // 随机生成数字
    while (true)
    {
        int one, two;
        char op;
        one = rand() % 100, two = rand() % 100;
        op = str[rand() % str.size()];
        Task t(one, two, op);
        tp->put(t);
        cout << pthread_self() << "于 " << (unsigned int)time(nullptr) << " 生产任务: " << one << op << two << "=?" << endl;
        sleep(1);
    }
    free(tp);//释放空间
    return 0;
}

image-20230201145141549

总结

线程的互斥保证线程安全,线程同步则可以有效避免多线程的饥饿问题,这两者在生产者与消费者模型中体现的非常明显,线程的互斥与同步对线程的应用至关重要,是使用多线程的基本功。对于生产者消费者模型,个人认为理解不同角色之间的关系是最重要的,其中关于并发的理解也必须认识到位:生产任务和消费任务是可以同时进行的!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/190905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OMS标准 第二卷 主要通讯

版本4.1.2/2016-12-16 发布 1 引言 1.1 概述 本部分描述了从设备&#xff08;仪表或执行器或断路器&#xff09;和&#xff08;固定的&#xff0c;通常由市电供电的&#xff09;主设备&#xff08;网关或其他通信单元&#xff09;之间有线和无线通信的最低开放式计量系统要…

1. Mybatis 入门

文章目录1. Mybatis 简介2. Mybatis 快速入门3. 使用 idea 写 SQL4. Mapper 代理开发5. MyBatis 核心配置文件1. Mybatis 简介 MyBatis 是一款优秀的持久层框架&#xff0c;用于简化 JDBC 开发。 官方文档&#xff1a;https://mybatis.org/mybatis-3/zh/index.html 持久层&am…

【自学Docker】Docker cp命令

Docker cp命令 大纲 docker cp命令教程 docker cp 命令用于在本地文件系统与 Dokcer容器 之间复制文件或者文件夹。该命令后面的 CONTAINER 可以是容器Id&#xff0c;或者是容器名。 docker cp命令语法 从容器复制到宿主机 haicoder(www.haicoder.net)# docker cp [OPTION…

第57篇-某日头条signature参数分析【2023-02-01】

声明:该专栏涉及的所有案例均为学习使用,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!如有侵权,请私信联系本人删帖! 文章目录 一、前言二、网站分析三、signature参数四、完整代码一、前言 今天来看一下新闻网站,分析一下参数 二、网站分析 网…

WebDAV之葫芦儿·派盘+一叶日记

一叶日记 支持WebDAV方式连接葫芦儿派盘。 推荐一款操作方便、界面简洁,记录生活点滴与心情,具有诗情画意的日记软件。 一叶日记是一款记录日记的手机软件,在这款软件中它里面有着各种不同的工具,可以方便用户去随时随地的记录日记,同时里面还有着各种不同的主题背景&…

补充:论Unity_InputSystemPacakage如何使用

图1补充一下默认特殊值如何设定&#xff0c;点击ProjectingSettings——InputSystemPacakage——Create Settings Asset 即可设置默认特殊值&#xff0c;或者点击图1中的Open input settings也可以打开此界面。 创建后会在Project窗口出现一个配置文件&#xff0c;不需要时删除…

你说反射有点难追,我觉得应该知难而退。

文章目录问题源码解析溯源问题解决方案第一种&#xff1a;第二种&#xff1a;第三种&#xff1a;问题 今天小伙伴遇到一个问题&#xff0c;有关于反射的&#xff0c;写个demo&#xff0c;大家看一下。 如上&#xff0c;运行之后会报错&#xff1a;出现了非法参数。 Exception…

深度学习论文: YOLOv6 v3.0: A Full-Scale Reloading及其PyTorch实现

深度学习论文: YOLOv6 v3.0: A Full-Scale Reloading及其PyTorch实现 YOLOv6 v3.0: A Full-Scale Reloading PDF: https://arxiv.org/pdf/2301.05586.pdf PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代码: https://github.com/shanglianlm0525/PyTorch…

2023年IB考试该如何备考?

IB课程考试时间 考试时间&#xff1a;IBO官方近期公布了2023年的考试时间与计划&#xff0c;中国学生IB考试时间定在2023.5.1至5.19。 在世界各地&#xff0c;学生在IB体系中均按照相同的教学大纲进行&#xff0c;并且于毕业时参加全球统一考试。一年两次&#xff08;北半球于5…

拉伯证券|北向资金1月净买入超1400亿,啥信号?

2023年1月份&#xff0c;电视剧《狂飙》热播&#xff0c;被视为A股投资“风向标”的北向资金也敞开“狂飙”态势&#xff0c;月内五次净买入额超百亿&#xff0c;1月30日单日净买入额更是创2021年12月以来新高。 单月净买入超1400亿&#xff0c;刷新纪录&#xff01; 北向资金…

Ventoy安装教程

目录Ventoy五大优势Ventoy安装教程其他链接Ventoy是一款国人开发的新一代多ISO启动引导程序&#xff0c;用户只需要将所需的ISO镜像文件拷贝至优盘中即可在Ventoy界面中选择自己想要的ISO镜像文件。 Ventoy五大优势 广泛兼容&#xff1a;支持包括Windows 10、Windows 8.1、Wind…

【哈希表】leetcode15. 三数之和(C/C++/Java/Python/Js)--梦破碎的地方

leetcode15. 三数之和--梦破碎的地方1 题目2 思路2.1 哈希解法--含代码2.2 双指针2.3 去重逻辑的思考2.3.1 a的去重2.3.2 b与c的去重3 代码--双指针法3.1 C版本3.2 C版本3.3 Java版本3.4 Python3版本3.5 JavaScript版本4 总结用哈希表解决了两数之和 &#xff0c;那么三数之和呢…

[Lua实战]Skynet-2.如何启动(Win10-WSL环境Ubuntu18.04)[开箱可用]

Skynet-2.如何启动Win10-WSL环境Ubuntu18.04接上文,在linux运行skynet1.WIN10-WSL1.1 用Microsoft Store安装WSL(会遇到商店下载失败等问题...)1.1.1控制面板支持Linux配置1.1.2Microsoft Store 找到 Ubuntu18.041.1.3如果遇到安装问题如图请直接跳到1.21.2 使用PowerShell工具…

概论_第7章_参数估计_点估计之极大似然估计__性质

一 性质 极大似然估计 有一个简单有用的性质&#xff1a; 如果 θ^\hat\thetaθ^ 是 θ\thetaθ的极大似然估计&#xff0c; 则对任一 θ\thetaθ的函数g(θ)g(\theta)g(θ), 其极大似然估计为 g(θ^)g(\hat\theta)g(θ^) . 该性质称为极大似然估计的不变性&#xff0c;它使…

项目代码版本控制与维护

一、版本命名规则 1.1 需求开发分支命名规则 格式&#xff1a;dev_v版本号_需求名称 案例&#xff1a;dev_v01.31_TX202301141 dev_v01.31_数字产品平台订单查询优化 1.2 测试环境发布分支命名规则 格式&#xff1a;uat_deploy 1.3 预上环境分支命名规则 格式&#xff1a…

操作系统权限提升(六)之系统错误配置-不安全的服务提权

系列文章 操作系统权限提升(一)之操作系统权限介绍 操作系统权限提升(二)之常见提权的环境介绍 操作系统权限提升(三)之Windows系统内核溢出漏洞提权 操作系统权限提升(四)之系统错误配置-Tusted Service Paths提权 操作系统权限提升(五)之系统错误配置-PATH环境变量提权 注&…

九种查找算法-B树/B+树

B树/B树 在计算机科学中&#xff0c;B树&#xff08;B-tree&#xff09;是一种树状数据结构&#xff0c;它能够存储数据、对其进行排序并允许以O(log n)的时间复杂度运行进行查找、顺序读取、插入和删除的数据结构。B树&#xff0c;概括来说是一个节点可以拥有多于2个子节点的二…

数学建模与数据分析 || 2. 结构化与非结构化数据的读取方法

结构化与非结构化数据的读取方法 文章目录结构化与非结构化数据的读取方法1. 结构化数据的读取1.1 pandas 读取 excel 文件1.2 pandas 读取 csv 文件1.3 pandas 读取 txt 文件1.4 利用 scipy 读取 mat 格式文件数据1.5 利用 numpy 存储和读取 npz 格式文件2. python 读取图像的…

SpringBoot国际化

软件的国际化软件的国际化&#xff1a;软件开发时&#xff0c;要使它能同时应对世界不同地区和国家的访问&#xff0c;并针对不同地区和国家的访问&#xff0c;提供相应的、符合来访者阅读习惯的页面或数据。国际化internationalization&#xff0c;在i和n之间有 18 个字母&…

AXI 总线协议学习笔记(2)

引言 从本文开始&#xff0c;正式系统性学学习AXI总线。 如何获取官方协议标准&#xff1f; 第一步&#xff1a;登陆官网&#xff1a;armDeveloper 第二步&#xff1a;登录&#xff0c;无账号需要注册 第三步&#xff1a;点击文档 第四步&#xff1a; 第五步&#xff1a;浏…