【Linux进程篇】进程终章:POSIX信号量线程池线程安全的单例模式自旋锁读者写者问题

news2024/11/18 9:22:40

W...Y的主页  😊

代码仓库分享 💕 

前言:在之前的进程间通信时我们就讲到过信号量,他的本质就是一个计数器,用来描述临界资源的一个计数器。我们当时使用电影院的例子来说明信号量。电影院的座位被我们称为临界资源,只有买到票才能有座位去看电影,申请信号量就是预定买票,申请成功才可以继续往下走下去。而我们看完电影就会释放临界资源,这些资源就可以被别人申请了。

目录

POSIX信号量 

基于环形队列的生产消费模型 

线程池

STL,智能指针和线程安全

线程安全的单例模式

 什么是单例模式

什么是设计模式 

单例模式的特点 

饿汉实现方式和懒汉实现方式 

饿汉方式实现单例模式 

懒汉方式实现单例模式 

其他常见的各种锁

自旋锁

读者写者问题

读写锁接口


POSIX信号量 

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量 

int sem_destroy(sem_t *sem);

等待信号量

 功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V() 

上述就是我信号量的基本操作,现在我们需要实现一个基于环形队列一个生产消费者模型,我们为什么不用上篇博客中的阻塞队列呢。因为阻塞队列我们将其看作一个整体只能进行首尾操作,不能访问队列中间内容。而环形队列我们使用的是vecotor数组进行模拟,可以使用下标进行访问中间内容。

基于环形队列的生产消费模型 

环形队列采用数组模拟,用模运算来模拟环状特性

当hend与tail指向同一块位置时,数组可能为满也可能为空,其余的时候可以进行并发访问数组。 

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

ringqueue.hpp 

#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
template<typename T>
class RingQueueu
{
private:
    void P(sem_t& sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t& sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueueu(int cap)
        :_cap(cap),_ring_queue(cap)
    {
        sem_init(&_room_sem,0,_cap);
        sem_init(&_data_sem,0,0);
        pthread_mutex_init(&_productor_mutex,nullptr);
        pthread_mutex_init(&_consumer_mutex,nullptr);
    }
    void Enqueue(const T& in)
    {
        Lock(_productor_mutex);
        P(_room_sem);
        _ring_queue[_productor_step++] = in;
        _productor_step %= _cap;
        V(_data_sem);
        Unlock(_productor_mutex);
    }
    void Pop(T* out)
    {
        Lock(_consumer_mutex);
        P(_data_sem);
        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;
        V(_room_sem);
        Unlock(_consumer_mutex);
    }
    ~RingQueueu()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);
        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
private:
    vector<T> _ring_queue;
    int _cap;
    int _productor_step = 0;
    int _consumer_step = 0;
    sem_t _room_sem;
    sem_t _data_sem;
    //加锁,维护多生产多消费
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
};

单生产单消费时可以不用加锁,而多生产多消费时就需要加锁防止同时访问临界资源。

而加锁解锁应该放在申请信号量的后面进行才是比较好的,为什么呢?因为申请信号量是预定机制是原子的,不会出现线程安全问题, 这样可以先预定再等锁,锁来了之后直接运行后面代码,可以提高效率。(等也是等,不如再等的时候申请信号量)

所以这样写更好:

void Enqueue(const T &in)
    {
        P(_room_sem);
        Lock(_productor_mutex);
        _ring_queue[_productor_step++] = in;
        _productor_step %= _cap;
        Unlock(_productor_mutex);
        V(_data_sem);
    }
    void Pop(T *out)
    {
        P(_data_sem);
        Lock(_consumer_mutex);
        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;
        Unlock(_consumer_mutex);
        V(_room_sem);
    }

main.cc

#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
int a = 10;
using namespace ThreadModule;
using namespace std;
using ringqueue_t = RingQueueu<Task>;

void PrintHello()
{
    cout << "hello" << endl;
}
void Consumer(ringqueue_t &rq)
{
    while (true)
    {
        Task t;
        rq.Pop(&t);
        t();
    }
}
void Productor(ringqueue_t &rq)
{
    
    while (true)
    {
        sleep(1);
        rq.Enqueue(Download);
        
    }
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, rq, name);
        //(*threads)[threads->size()-1].Start();
        //threads->back().Start();
    }
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
    InitComm(threads, num, rq, Consumer);
}

void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
    InitComm(threads, num, rq, Productor);
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> threads)
{
    for(auto thread : threads)
    {
        thread.Join();
    }
}
void StartAll(vector<Thread<ringqueue_t>>& threads)
{
    for(auto& thread : threads)
    {
        thread.Start();
    }
}
int main()
{
    ringqueue_t *bq = new ringqueue_t(10);
    std::vector<Thread<ringqueue_t>> threads;
    InitConsumer(&threads, 1, *bq);
    InitProductor(&threads, 1, *bq);
    StartAll(threads);
    WaitAllThread(threads);
    return 0;
}

这里使用的也不是原生线程库,而是我们自己封装的thread。上述代码都是完整无误的,但是这里我们要说一个非常隐蔽的问题。我们先将Thread对象放入vector中,再使用StartAll()函数遍历了vector创建了线程,但是为什么我们要分开写,不直接使用threads->back().Start();这个写法呢?

因为线程的创建是跳转到Start函数中去,但是主线程还会继续循环进行,可能下一个Thread即将放入vector中导致vector最后一个内容发生变化,最终某个线程创建失败。这就是一个非常隐讳的并发问题。

线程池

线程池:
 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

线程池:

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include"Log.hpp"
using namespace std;
using namespace ThreadModule;

const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }
public:
    ThreadPool(int threadnum = defaultthreadnum)
        : _threadnum(threadnum)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(INFO, "ThreadPool Construct()");
    }
    void InitThreadPool()
    {
        for (int num = 0; num < _threadnum; num++)
        {
            string name = "thread-" + to_string(num + 1);
            // _threads.emplace_back(Print, name);
            _threads.emplace_back(bind(&ThreadPool::HandlerTask, this, placeholders::_1), name);
            LOG(INFO, "init thread %s done", name.c_str());
        }
         _isrunning = true;
    }
    void HandlerTask(string name)
    {
        LOG(INFO, " %s is running ...", name.c_str());
        while (true)
        {
            LockQueue();
            while (_task_queue.empty() && _isrunning)
            {
                waitnum++;
                ThreadSleep();
                waitnum--;
            }
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();
            LOG(DEBUG, "%s get a task", name.c_str());
            t();
            LOG(DEBUG, "%s hander a task result: %s", name.c_str(), t.ResultToString().c_str());
        }
    }
    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(t);
            if (waitnum > 0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG, "enqueue task success");
            ret = true;
        }
        UnlockQueue();
        return ret;
    }
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        ThreadWakeAll();
        UnlockQueue();

    }
    void Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
        }
    }
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
            LOG(INFO, "%s is quit", thread.name().c_str());
        }
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _threadnum;
    vector<Thread> _threads;
    queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int waitnum = 0;
    bool _isrunning = false;
};

我们创建一个日志Log.hpp

#pragma once

#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"

bool gIsSave = false;
const std::string logname = "log.txt";

// 1. 日志是由等级的
enum Level
{
    DEBUG = 0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};

void SaveFile(const std::string &filename, const std::string &message)
{
    std::ofstream out(filename, std::ios::app);
    if (!out.is_open())
    {
        return;
    }
    out << message;
    out.close();
}

std::string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unknown";
    }
}

std::string GetTimeString()
{
    time_t curr_time = time(nullptr);
    struct tm *format_time = localtime(&curr_time);
    if (format_time == nullptr)
        return "None";
    char time_buffer[1024];
    snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return time_buffer;
}

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 2. 日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{

    std::string levelstr = LevelToString(level);
    std::string timestr = GetTimeString();
    pid_t selfid = getpid();

    char buffer[1024];
    va_list arg;
    va_start(arg, format);
    vsnprintf(buffer, sizeof(buffer), format, arg);
    va_end(arg);

    std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
    LockGuard lockguard(&lock);
    // pthread_mutex_lock(&lock);

    if (!issave)
    {
        std::cout << message;
    }
    else
    {
        SaveFile(logname, message);
    }
    // pthread_mutex_lock(&lock); // bug??

    // std::cout << levelstr << " : " << timestr << " : " << filename << " : " << line << ":" << buffer << std::endl;
}

// C99新特性__VA_ARGS__
#define LOG(level, format, ...)                                                \
    do                                                                         \
    {                                                                          \
        LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
    } while (0)

#define EnableFile()    \
    do                  \
    {                   \
        gIsSave = true; \
    } while (0)
#define EnableScreen()   \
    do                   \
    {                    \
        gIsSave = false; \
    } while (0)

其中日志中的锁是我们使用RALL思想管理的锁。

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__

#include <iostream>
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex); // 构造加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

#endif

同样我们使用的Thread是分装后的接口。

Main.cc 

#include "ThreadPool.hpp"
#include <iostream>
#include <vector>
#include "Task.hpp"
#include <string>
#include "Log.hpp"
#include <unistd.h>
#include <memory>
#include <ctime>
using namespace std;

int main()
{
    srand(time(nullptr) ^ getpid() ^ pthread_self());
    EnableScreen();
    // EnableFile();
    unique_ptr<ThreadPool<Task>> tp = make_unique<ThreadPool<Task>>(5);
    tp->InitThreadPool();
    tp->Start();
    int tasknum = 10;
    while (tasknum)
    {
        int a = rand() % 10 + 1; 
        usleep(1234);
        int b = rand() % 5 + 1;
        Task t(a, b);
        LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
        tp->Enqueue(t);
        sleep(1);
        tasknum--;
    }
    tp->Stop();
    tp->Wait();
    // LogMessage(DEBUG, "helloworld");

    return 0;
}

以上是线程池的部分代码,线程池的思想与生产消费者模型相似,需要用vector来存放线程,使用queue来存放任务。使用智能指针来控制线程池。

STL,智能指针和线程安全

STL中的容器是否是线程安全的?

不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全. 

智能指针是否是线程安全的? 

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数. 

线程安全的单例模式

 什么是单例模式

单例模式是一种 "经典的, 常用的, 常考的" 设计模式.

什么是设计模式 

 IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式

单例模式的特点 

某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.

饿汉实现方式和懒汉实现方式 

[洗完的例子] 

吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式. 

懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度. 

饿汉方式实现单例模式 

template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.

懒汉方式实现单例模式 

template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};

存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了. 

 这两种方法在结果是相同的,但是在最开始使用时,饿汉模式是空间换时间,懒汉模式是时间换空间。

我们对上述的线程池代码进行修改让其也可以实现懒汉模式下的单例模式。

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include "Log.hpp"
using namespace std;
using namespace ThreadModule;

const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(INFO, "ThreadPool Construct()");
    }
    void InitThreadPool()
    {
        // 指向构建出所有的线程,并不启动
        for (int num = 0; num < _threadnum; num++)
        {
            std::string name = "thread-" + std::to_string(num + 1);
            _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
            LOG(INFO, "init thread %s done", name.c_str());
        }
        _isrunning = true;
    }
    void Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
        }
    }
    void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!
    {
        LOG(INFO, "%s is running...", name.c_str());
        while (true)
        {
            // 1. 保证队列安全
            LockQueue();
            // 2. 队列中不一定有数据
            while (_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            // 2.1 如果线程池已经退出了 && 任务队列是空的
            if (_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            // 2.2 如果线程池不退出 && 任务队列不是空的
            // 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
            // 3. 一定有任务, 处理任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();
            LOG(DEBUG, "%s get a task", name.c_str());
            // 4. 处理任务,这个任务属于线程独占的任务
            t();
            LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
        }
    }
    // 复制拷贝禁用
    ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
    ThreadPool(const ThreadPool<T> &) = delete;

public:
    static ThreadPool<T> *GetInstance()
    {
        // 如果是多线程获取线程池对象下面的代码就有问题了!!
        // 只有第一次会创建对象,后续都是获取
        // 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
        if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象
        {
            LockGuard lockguard(&_lock);
            if (nullptr == _instance)
            {
                _instance = new ThreadPool<T>();
                _instance->InitThreadPool();
                _instance->Start();
                LOG(DEBUG, "创建线程池单例");
                return _instance;
            }
        }
        LOG(DEBUG, "获取线程池单例");
        return _instance;
    }
    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(t);
            if (_waitnum > 0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG, "enqueue task success");
            ret = true;
        }
        UnlockQueue();
        return ret;
    }
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        ThreadWakeAll();
        UnlockQueue();
    }
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
            LOG(INFO, "%s is quit", thread.name().c_str());
        }
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _threadnum;
    vector<Thread> _threads;
    queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitnum;
    bool _isrunning = false;
    static ThreadPool<T> *_instance;
    static pthread_mutex_t _lock;
};

template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;

template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

 我们一定要注意懒汉模式的线程安全问题,因为单例是在不创建对象的前提通过调用函数来实现的,函数是不可重入的所以在多线程并发访问时可能出现执行多次函数导致创建多个单例而违背单例初衷。所以我们要在函数中加锁来保护。

其他常见的各种锁

悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
自旋锁:自旋锁(Spinlock)是一种用于多线程同步的锁机制,它与互斥锁(mutex)在某些方面相似,但有一个关键的区别:当一个线程尝试获取一个已经被其他线程持有的自旋锁时,该线程不会立即阻塞(即不会进入睡眠状态),而是在当前位置“自旋”,也就是循环等待,直到锁被释放。

自旋锁

我们从自旋锁的定义就可以看出自旋锁与互斥锁mutex唯一区别是线程是否需要阻塞等待,而这就取决于申请到锁的线程在临界区执行时长的问题。如果时间比较就我们就要使用mutex挂起等待,反之可以使用自旋锁spinlock一直去申请访问。

自旋锁的接口与互斥锁大相径庭,我们来学习一下:

 

 使用时我们只需要定义一个pthread_spinlock_t对象即可。

无论是自旋锁还是互斥锁都需要我们程序员去判断然后使用,正确使用可以提高效率。反之无条件使用自旋锁可能会将CPU打满死机!!!

读者写者问题

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。 

什么是读者写者问题,这个与生产消费者模型有极大相似性。举个例子,我们写CSDN文章,出黑板报等待都是读者写者问题。写者将内容发送到一个临界区,读者只需要读即可。所以本质也是321原则。一个交易场所、两个角色:读者、写者,三种关系:读者VS读者、写者VS写者、读者VS写者。

其中读者与写者一定有互斥和同步的关系,写者与写者之间有互斥的关系。但是读者与读者之间却没有关系,这与生产消费者模型就有不同之处。因为消费者是需要将临界区的数据拿走,而读者只需要拷贝数据,不会讲数据拿走,这就导致读者之间没有关系。

这种关系自己使用加锁解锁逻辑是没问题的,下面有一段伪代码就是整体思路:

int reader_count = 0; //读者计数器
pthread_mutex_t wlock;    //写者锁
pthread_mutex_t rlock;    //读者锁


//读者
lock(&rlock);
if(reader_count == 0)
    lock(&wlock);
++reader_count;
unlock(&rlock);

//读者读操作


lock(&rlock);
--reader_count;
if(reader_count == 0)
    unlock(&wlock);
unlock(&rlock);


//写者

lock(&wlock);


//写者写操作


unlock(&wlock);

写者思路非常简单,只需要维护只有一个写者进入写即可。因为读者计数器也属于临界资源我们要加锁保护。当我们判断读者为0时证明是第一个读者进入,所以我们要申请写者锁防止写者进入,如果我们申请不到锁证明写者持有锁读者进入不了,这就做到了互斥与同步。后面当最后一个--后计数器为0证明是最后一个走的读者,里面没有读者了就可以释放写者锁了。

通过上述伪代码我们可以看出,其读者写者问题都是围绕读者优先实现的,所以当读者基数过大时肯定会导致写者饥饿问题。所以会出现读者优先(上述伪代码逻辑)与写者优先,写者优先的思路就是当写者到来时,我们会阻挡还未进入的读者,当已进入的读者全部出来时写者先进入!!!但是实现就有点复杂,这里我们不给予实现。 

为了不这么使用,pthread库提供了读写锁,而上述问题就会在库中得到解决!

读写锁接口

设置读写优先

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/

初始化 

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr); 

销毁 

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 

加锁和解锁 

 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//读写解锁都可以使用

下面代码是一段读写锁模型实例,可以参考怎么使用:

#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void * reader(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void * writer(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}

以上就是本次全部内容,感谢大家观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1991495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LVS集群实现四层负载均衡详解(以nat,dr模式为例)

目录 一、LVS集群的介绍 1、LVS 相关术语&#xff1a; 2、lvs工作原理 3、相关名词概念 4、lvs集群的类型 二、lvs的nat模式 1、介绍&#xff1a; 2、数据逻辑&#xff1a; 3、nat实验部署 环境搭建&#xff1a; 1、lvs中要去打开内核路由功能&#xff0c;实现网络互联…

关于区块链的公共医疗应用开发

区块链的养老保险平台应用开发 任务一:环境准备 1.编译区块链网络 目录:/root/xuperchain/ 在区块链网络目录下执行make命令,编译网络,编译成功后输出compile done! 启动区块链网络 2.创建钱包账户 创建普通钱包账户userTest,命令如下 bin/xchain-cli account newke…

6.2 Python 标准库简介:编程世界的百宝箱

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; 工&#x1f497;重&#x1f497;hao&#x1f497;&#xff1a;野老杂谈 ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题.…

QT界面设计开发(Visual Studio 2019)—学习记录一

一、控件升级 简要介绍&#xff1a; 简单来说&#xff0c;控件提升就是将一个基础控件&#xff08;Base Widget&#xff09;转换为一个更特定、更复杂的自定义控件&#xff08;Custom Widget&#xff09;。这样做的目的是为了在设计界面时能够使用更多高级功能&#xff0c;而不…

QT下载与安装

我们要下载开源的QT&#xff0c;方式下载方式&#xff1a; 官网 登录地址&#xff1a;http://www.qt.io.com 点击右上角的Download. Try.按钮&#xff1b;进入一下画面&#xff1a; 如果进入的是以下画面&#xff1a; 直接修改网址&#xff1a;www.qt.io/download-dev; 改为w…

pytorch的gpu环境安装

windows系统下pytorch的gpu环境安装 安装前置说明&#xff1a; 要成功安装pytorch的gpu环境&#xff0c;需要cuda版本&#xff0c;python版本和pytorch版本都要相匹配才能顺利使用&#xff0c;cuda版本建议不要安装最新的版本&#xff0c;以免找不到相匹配的pytorch版本pytho…

八问八答,深入浅出搞懂Transformer内部运作原理

导读 同学们在学习Transformer时是否觉得难以理解或者很难理清其内部运作原理呢。本文将通过八个关键问题帮助大家搞懂Transformer内部工作原理&#xff0c;希望对大家有所帮助。 七年前&#xff0c;论文《Attention is all you need》提出了 transformer 架构&#xff0c;颠…

KVM——虚拟机的复制与克隆

目录 一. 复制虚拟机 二. 虚拟机克隆 1. 使用 virt-clone 2. 使用 virt-manager&#xff08;图形界面&#xff09; 3. 使用 qemu-img &#xff08;磁盘镜像克隆&#xff09; 一. 复制虚拟机 配置文件路径&#xff1a;/etc/libvirt/qemu/*.xml 磁盘镜像文件路径&#…

【扒代码】regression_head.py

import torch from torch import nnclass UpsamplingLayer(nn.Module):# 初始化 UpsamplingLayer 类def __init__(self, in_channels, out_channels, leakyTrue):super(UpsamplingLayer, self).__init__() # 调用基类的初始化方法# 初始化一个序列模型&#xff0c;包含卷积层、…

LeetCode 7, 703, 287

文章目录 7. 整数反转题目链接标签思路反转操作反转的数的范围 代码 703. 数据流中的第 K 大元素题目链接标签思路代码 287. 寻找重复数题目链接标签思路代码 7. 整数反转 题目链接 7. 整数反转 标签 数学 思路 反转操作 反转实际上很简单&#xff0c;假设要反转数字 n…

数据结构之Map与Set(上)

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;数据结构&#xff08;Java版&#xff09; 目录 二叉搜索树 Map和Set的介绍与使用 Map的常用方法及其示例 Set的常用方法及其示例 哈希表…

客户管理系统平台(CRM系统)是什么?它的核心主要解决哪些问题?

客户管理系统平台CRM是什么&#xff1f;客户关系管理系统CRM的核心主要解决哪些问题&#xff1f; CRM系统不仅仅是一套软件&#xff0c;更是一种策略&#xff0c;一种管理理念和一种企业发展方向。它通过整合客户数据、优化业务流程、提升客户体验&#xff0c;帮助企业在激烈的…

K8s第三节:k8s1.23.1升级为k8s1.30.0

上回书说到我们使用了kubeadm安装了k8s1.23.1,但是在k8s1.24之前还是使用docker作为容器运行时&#xff0c;所以这一节我打算将我安装的k8s集群升级为1.30.0版本&#xff1b; 1、修改containerd 配置 因为我们安装的docker自带containerd&#xff0c;所以我们不需要重新安装con…

蓝凌EKP二次开发资料大全 完整蓝凌二次开发资料 蓝凌 EKP开发实战教程 蓝凌OA二次开发资料大全 蓝凌OA java开发快速入门

蓝凌EKP二次开发资料大全 完整蓝凌二次开发资料 蓝凌 EKP开发实战教程 蓝凌OA二次开发资料大全 记得两年前花了非常贵的费用去现场学习的资料&#xff0c;把这些开发技术文档分享出来&#xff0c;希望通过这个资料&#xff0c; 为大家学习开发大大减少时间。期待大家能快速上…

《UE5_C++多人TPS完整教程》学习笔记32 ——《P33 动画蓝图(Animation Blueprint)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P33 动画蓝图&#xff08;Animation Blueprint&#xff09;》 的学习笔记&#xff0c;该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版&#xff0c;UP主&#xff08;也是译者…

Python实战:类

一、圆的面积、周长 class Circle:# 初始化一个类参数&#xff1a;rdef __init__(self,r):self.r r# 计算面积的方法def get_area(self):return 3.14*pow(self.r,2)# 计算周长的方法def get_perimeter(self):return 2*3.14*self.r#创建对象 r eval(input(请输入圆的半径&…

Vue 2 和 Vue 3 生命周期钩子

Vue 2 和 Vue 3 生命周期钩子 在 Vue.js 开发中&#xff0c;了解生命周期钩子对于编写有效的组件至关重要。Vue 2 和 Vue 3 在生命周期钩子上大致相同&#xff0c;但 Vue 3 的 Composition API 引入了一种新的方式来处理它们。这里我会概述两者的生命周期钩子&#xff0c;并指…

2024年8月7日(mysql主从 )

回顾 主服务器 [rootmaster_mysql ~]# yum -y install rsync [rootmaster_mysql ~]# tar -xf mysql-8.0.33-linux-glibc2.12-x86_64.tar [rootmaster_mysql ~]# tar -xf mysql-8.0.33-linux-glibc2.12-x86_64.tar.xz [rootmaster_mysql ~]# cp -r mysql-8.0.33-linux-glibc2.…

QT找不到编辑框

问题展示&#xff1a; 解决办法&#xff1a;ALT0 然后我的变成了这种&#xff1a; 解决办法&#xff1a;文件系统改变成项目&#xff1a;

DNTR——F

文章目录 AbstractIntroductionContribution Related WorkAdvancements in Feature Pyramid Networks (FPNs)Coarse-to-Fine Image Partitioning in Drone Imagery DetectionDevelopments in Loss Function Approaches for Tiny Object DetectionR-CNN for Small Object Detect…