lesson8-Linux多线程

news2024/11/15 9:14:44

Linux线程概念

  •  线程在进程内部执行,是OS调度的基本单位
  • OS是可以做到让进程进行资源的细粒度划分的

  •  物理内存是以4kb为单位
  • 我们的.exe可执行程序本来就是按照地址空间的方式进行编译的

 页表映射 - 详细图

 理解线程

  •  线程在进程的地址空间内运行,
  •  进程内部具有多个执行流的,而线程只有一个执行流,
  • 线程独有一组寄存器,栈等等

CPU的视角

  • Linux下,PCB <= 其他OS内的PCB的,所以Linux下的进程:统一称之为轻量级进程
  • CPU其实不关心执行流是进程还是线程,只关心PCB

Linux没有真正意义上的线性结构,Linux是用进程PCB模拟线程的,

Linux并不能直接给我们提供线程的接口,只能提供轻量级进程的接口!

  •   它也在用户层实现了一套用户层多线程方案,以库的方式提供给用户进行使用pthread线程库 -- 原生线程库

为什么线程切换的成本更低

  • CPU内部是有L1~L3cache 对应内存的代码和数据,根据局部性原理,预读CPU内部!!!
  •  线程是不用切换页表 && 改变地址空间,
  • 进程需要切换cache就会立即失效,新进程过来,只能重新缓存,

创建线程

pthread_create

 

  •  这段代码也可以证明CPU是用LWP来区分线程

线程ID及进程地址空间布局

  • pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID 不是一回事。
  • 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
  • pthread_ create函数第 一个参数指向一个虚拟内存单元 ,该内存单元的地址即为新创建线程的线程ID,

 pthread_self

  •  可以获得线程自身的ID

 pthread_t类型的理解

  • 对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID
  • 本质就是一个进程地址空间上的一个地址。

 线程终止

如果需要只终止某个线程而不终止整个进程 , 可以有三种方法 :
  1. 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
  2. 线程可以调用pthread_ exit终止自己
  3. 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。

 pthread_cancel

 线程等待

为什么需要线程等待?
  • 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
  • 创建新的线程不会复用刚才退出线程的地址空间。

pthread_join 

分离线程

  • 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
  • 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。

pthread_detach

 Linux线程互斥

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用 原子性
  • 不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
    char *id = (char*)arg;
    while ( 1 ) {
        if ( ticket > 0 ) {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        } else {
            break;
        }
    }
}

int main( void )
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, route, (void*)"thread 1");
    pthread_create(&t2, NULL, route, (void*)"thread 2");
    pthread_create(&t3, NULL, route, (void*)"thread 3");
    pthread_create(&t4, NULL, route, (void*)"thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

  • 多个线程并发的操作共享变量,会带来一些问题。 

为什么可能无法获得争取结果?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • --ticket 操作本身就不是一个原子操作

 要解决以上问题,需要做到三点:

  1. 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  2. 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  3. 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量 

pthread_mutex_lock && pthread_mutex_unlock

 

#include <iostream>
#include <thread>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <cassert>
#include <cstdio>

using namespace std;

int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题!临界资源

#define THREAD_NUM 10// 线程数量

class ThreadData
{
public:
    ThreadData(const std::string& n, pthread_mutex_t* pm) 
        :tname(n), pmtx(pm)
    {}
public:
    std::string tname;
    pthread_mutex_t* pmtx;
};

void* getTickets(void* args)
{
    ThreadData* td = (ThreadData*)args;
    while (true)
    {
        // 抢票逻辑
        int n = pthread_mutex_lock(td->pmtx);// 加锁
        assert(n == 0);
        // 临界区
        if (tickets > 0) // 1. 判断的本质也是计算的一种
        {
            usleep(rand() % 1500);
            printf("%s: %d\n", td->tname.c_str(), tickets);
            tickets--; // 2. 也可能出现问题
            n = pthread_mutex_unlock(td->pmtx);// 解锁
            assert(n == 0);
        }
        else {
            n = pthread_mutex_unlock(td->pmtx);// 解锁
            assert(n == 0);
            break;
        }

        // 抢完票,其实还需要后续的动作
        usleep(rand() % 2000);
    }
    delete td;
    return nullptr;
}

int main()
{
    time_t start = time(nullptr);

    pthread_mutex_t mtx;// 局部变量 + pthread_mutex_init初始化
    pthread_mutex_init(&mtx, nullptr);

    srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);// 让随机数更随机
    pthread_t t[THREAD_NUM];
    // 多线程抢票的逻辑
    for (int i = 0; i < THREAD_NUM; i++)
    {
        std::string name = "thread ";
        name += std::to_string(i + 1);
        ThreadData* td = new ThreadData(name, &mtx);
        pthread_create(t + i, nullptr, getTickets, (void*)td);
    }

    for (int i = 0; i < THREAD_NUM; i++)
    {
        pthread_join(t[i], nullptr);
    }

    pthread_mutex_destroy(&mtx);

    time_t end = time(nullptr);

    cout << "cast: " << (int)(end - start) << "S" << endl;
}

  • pthread_mutex_t 就是原生线程库提供的一个数据类型 
  •  如果多线程访问同一个全局变量,并对它进行数据计算,多线程不会互相影响
    • 可以定义一个全局变量 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
  • 加锁保护:加锁的时候,一定要保证加锁的力度,越小越好!!
  • 加锁之后就是串行执行的了,线程在临界区,切换也不会有什么问题,
    • 锁只有一把,线程切换走了,持有锁也会被带走,而其他抢票的线程要执行临界区的代码
    • 又要申请锁,这时锁是无法申请成功的,也就不会让其他线程进入临界区,保证了临界区中数据的一致性

对锁的理解 

  •  在上那段代码中,锁只有一把,拿1的汇编只有1条
  • 一个线程被切换走,它会将自己的数据对应执行到的汇编 

补充一点: 可重入函数就是安全的

常见锁概念 

死锁

  •  一把锁也可能发生死锁问题

死锁四个必要条件

  1. 互斥条件:一个资源每次只能被一个执行流使用
  2. 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  3. 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  4. 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

避免死锁

  • 破坏死锁的四个必要条件
  • 加锁顺序一致
  • 避免锁未释放的场景
  • 资源一次性分配

Linux线程同步

条件变量

  •  当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

  • 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量

同步概念与竞态条件 

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

 引入同步: 主要是为了解决访问临界资源合理性问题的,使线程按照一定顺序,进行临界资源的访问,线程同步

方案一: 条件变量

  • 当我们申请临界资源前,先要做临界资源是否存在的检测,而检测的本质: 也是访问临界资源
  • 所以对临界资源的检测,也一定需要在加锁和解锁之间的

 

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

#define TNUM 4
typedef void (*func_t)(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
    :name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
    {}
public:
    std::string name_;
    func_t func_;
    pthread_mutex_t *pmtx_;
    pthread_cond_t *pcond_;
};

void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while(!quit)
    {
        // wait一定要在加锁和解锁之间进行wait!
        // v2: 
        pthread_mutex_lock(pmtx);
        // if(临界资源是否就绪-- 否) pthread_cond_wait
        pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running -- 播放" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}

void func2(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        if(!quit) std::cout << name << " running  -- 下载" << std::endl;
        pthread_mutex_unlock(pmtx);

    }
}
void func3(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- 刷新" << std::endl;
        pthread_mutex_unlock(pmtx);

    }
}
void func4(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while(!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- 扫码用户信息" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}

void *Entry(void *args)
{
    ThreadData *td = (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存
    td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回!
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx;
    pthread_cond_t cond;// 条件变量
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);// 初始化条件变量

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};// 函数指针数组
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "Thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void*)td);
    }

    sleep(5);

    // ctrl new thread
    int cnt = 10;
    while(cnt)
    {
        std::cout << "resume thread run code ...." << cnt-- << std::endl;
        pthread_cond_signal(&cond);// 唤醒等待
        sleep(1);
    }

    std::cout << "ctrl done" << std::endl;
    quit = true;
    pthread_cond_broadcast(&cond);// 唤醒等待

    for(int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread: " << tids[i] << "quit" << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

生产者消费者模型 

 

 321原则

  • 3种关系:
    • 生产者和生产者(竞争,互斥)
    • 消费者和消费者(竞争,互斥)
    • 生产者和消费者(互斥,同步)
  • 2种角色:
    • 生产者和消费者
  • 1个交易场所
    • 超市

BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"

const int gDefaultCap = 5;// 默认容量

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return bq_.size() == 0;
    }
    bool isQueueFull()
    {
        return bq_.size() == capacity_;
    }
public:
    // 构造
    BlockQueue(int capacity = gDefaultCap) 
        : capacity_(capacity)
    {
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&Empty_, nullptr);
        pthread_cond_init(&Full_, nullptr);
    }
    void push(const T &in) // 生产者
    {
        // 1. 先检测当前的临界资源是否能够满足访问条件
        lockGuard lockgrard(&mtx_); // 自动调用构造函数
        while (isQueueFull()){
            pthread_cond_wait(&Full_, &mtx_);
        }
        
        // 2. 访问临界资源,100%确定,资源是就绪的!
        bq_.push(in);
        pthread_cond_signal(&Empty_);
        // 局部变量 出了作用域会自动调用lockgrard 析构函数
    } 
    void pop(T *out)
    {
        lockGuard lockguard(&mtx_);
        while (isQueueEmpty())
            pthread_cond_wait(&Empty_, &mtx_);
        
        *out = bq_.front();
        bq_.pop();
        pthread_cond_signal(&Full_);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&Empty_);
        pthread_cond_destroy(&Full_);
    }
private:
    std::queue<T> bq_;     // 阻塞队列
    int capacity_;         // 容量上限
    pthread_mutex_t mtx_;  // 通过互斥锁保证队列安全
    pthread_cond_t Empty_; // 用它来表示bq 是否空的条件
    pthread_cond_t Full_;  //  用它来表示bq 是否满的条件
};

ConProd.cc

#include "BlockQueue.hpp"
#include "Task.hpp"

#include <pthread.h>
#include <unistd.h>
#include <ctime>

int myAdd(int x, int y)
{
    return x + y;
}

void* consumer(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true)
    {
        // 获取任务
        Task t;
        bqueue->pop(&t);
        // 完成任务
        std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
        // sleep(1);
    }

    return nullptr;
}

// 生产者
void* productor(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true)
    {
        // 制作任务
        int x = rand()%10 + 1;
        usleep(rand()%1000);
        int y = rand()%5 + 1;
        Task t(x, y, myAdd);
        // 生产任务
        bqueue->push(t);
        // 输出消息
        std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
    BlockQueue<Task> *bqueue = new BlockQueue<Task>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);// 等待
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;

    return 0;
}

Task.hpp

#pragma once

#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

class Task
{

public:
    Task(){}
    Task(int x, int y, func_t func)
        :x_(x), y_(y), func_(func)
    {}
    int operator ()()
    {
        return func_(x_, y_);
    }
public:
    int x_;
    int y_;
    func_t func_;
};

lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>
// 定义一把锁
class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx)
        :pmtx_(mtx)
    {}
    void lock() 
    {
        std::cout << "要进行加锁" << std::endl;
        pthread_mutex_lock(pmtx_);
    }
    void unlock()
    {
        std::cout << "要进行解锁" << std::endl;
        pthread_mutex_unlock(pmtx_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *pmtx_;
};

// RAII风格的加锁方式
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx)
        :mtx_(mtx)
    {
        mtx_.lock();
    }
    ~lockGuard()
    {
        mtx_.unlock();
    }
private:
    Mutex mtx_;
};

Makefile

cp:ConProd.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f cp
  • pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
  • 从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的啊
  • RAII风格的加锁(封装了一个锁的类)

POSIX信号量

信号量的概念

 

  • 共享资源->任何时候都只有一个执行流在进行访问,
  • 如果一个共享资源,不当做一个整体,而让不同的执行流访问不同的局域的话,那就可以并发
  • 我们用信号量来表示这段共享资源中还剩多少个资源,

 信号量本质:

  • 是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--,预订资源,P),
  • 使用完毕信号量资源(sem++,释放资源,V)

 环形队列实现生产者消费者模型 

  • 生产者不能将消费者套圈,消费者不能超过生产者,
  • 为空: 一定要让生产者先运行
  • 为满: 一定要让消费者先运行

 生产者: 最关心的是空间资源->spaceSem->N

  •  P(spaceSem),将会在特定位置生产,然后V(dataSem)

消费者: 最关心的是数据资源->dataSem->0

  • P(dataSem),消费特定的数据,然后V(spaceSem)

Makefile

ring_queue:testMain.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ring_queue

 ringQueue.hpp

#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"

const int g_default_num = 5;

// 多线程
template<class T>
class RingQueue
{
public:
    RingQueue(int default_num = g_default_num)
    : ring_queue_(default_num), 
      num_(default_num),
      c_step(0),
      p_step(0),
      space_sem_(default_num),
      data_sem_(0)
    {
        // 初始化锁
        pthread_mutex_init(&clock, nullptr);
        pthread_mutex_init(&plock, nullptr);
    }
    ~RingQueue()
    {
        // 销毁锁
        pthread_mutex_destroy(&clock);
        pthread_mutex_destroy(&plock);
    }
    // 生产者: 空间资源, 生产者们的临界资源是什么?下标
    void push(const T &in)
    {
        // 先申请信号量(0)
        space_sem_.p();// --
        pthread_mutex_lock(&plock); // ?
        
        // 一定是竞争成功的生产者线程 -- 就一个!
        ring_queue_[p_step++] = in;
        p_step %= num_;
        
        pthread_mutex_unlock(&plock);
        data_sem_.v();// ++ 
    }
    // 消费者: 数据资源, 消费者们的临界资源是什么?下标
    void pop(T *out)
    {
        data_sem_.p();
        pthread_mutex_lock(&clock);
        
        // 一定是竞争成功的消费者线程 -- 就一个!
        *out = ring_queue_[c_step++];
        c_step %= num_;
        
        pthread_mutex_unlock(&clock);
        space_sem_.v();
    }
    // void debug()
    // {
    //     std::cerr << "size: " << ring_queue_.size() << " num: " << num << std::endl;
    // } 
private:
    std::vector<T> ring_queue_;
    int num_;
    int c_step; // 消费下标
    int p_step; // 生产下标
    Sem space_sem_;// 记录空间的信号量
    Sem data_sem_;// 记录空间数据的信号量
    pthread_mutex_t clock;// 消费者的锁
    pthread_mutex_t plock;// 生产者的锁
};

#endif

sem.hpp

#ifndef _SEM_HPP_
#define _SEM_HPP_

#include <iostream>
#include <semaphore.h>

class Sem
{
public:
    Sem(int value)
    {
        // 信号量初始化
        sem_init(&sem_, 0, value);
    }
    void p()
    {
        // 等待信号量,会将信号量的值减1
        sem_wait(&sem_);
    }
    void v()
    {
        // 发布信号量,会将信号量的值加1
        sem_post(&sem_);
    }
    ~Sem()
    {
        // 销毁相互量
        sem_destroy(&sem_);
    }
private:
    sem_t sem_;
};

#endif

 testMain.cc

#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

// 消费者
void *consumer(void *args)
{
    // 生产者和消费者看到的是同一个rq
    RingQueue<int> *rq = (RingQueue<int> *)args;
    while(true)
    {
        sleep(1);
        int x;
        // 1. 从环形队列中获取任务或者数据
        rq->pop(&x);
        // 2. 进行一定的处理 -- 不要忽略它的时间消耗问题
        std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;
    } 
}


// 生产者
void *productor(void *args)
{
    // 生产者和消费者看到的是同一个rq
    RingQueue<int> *rq = (RingQueue<int> *)args;
    while(true)
    {
        // sleep(1);

        // 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题
        int x = rand() % 100 + 1;
        std::cout << "消费:" << x << " [" << pthread_self() << "]" << std::endl;

        // 2. 推送到环形队列中
        rq->push(x); // 完成生产的过程
    }
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid());// 产生随机数
    RingQueue<int> *rq = new RingQueue<int>();

    // 多线程模式
    pthread_t c[3],p[2];
    pthread_create(c, nullptr, consumer, (void*)rq);
    pthread_create(c+1, nullptr, consumer, (void*)rq);
    pthread_create(c+2, nullptr, consumer, (void*)rq);

    pthread_create(p, nullptr, productor, (void*)rq);
    pthread_create(p+1, nullptr, productor, (void*)rq);

    for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
    for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
    return 0;
}

 多生产多消费的意义

  • 将数据或者任务生产前和拿到之后处理,才是最耗费时间的。

  • 生产的本质:私有的任务-> 公共空间中

  • 消费的本质:公共空间中的任务-> 私有化

信号量的意义

  • 信号量本质是一把计数器, 可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断!
  • 信号量可以提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况

线程池

Makefile

thread_pool:testMain.cc
	g++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:
	rm -f thread_pool
  • -DDEBUG_SHOW命令中定义了一个宏,多和条件变量配合使用

 Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"

typedef std::function<int(int, int)> func_t;

class Task
{
public:
    Task(){}
    Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
    {}
    void operator ()(const std::string &name)
    {
        // std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
        logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
            name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
    }
public:
    int x_;
    int y_;
    // int type;
    func_t func_;
};

lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
    {}
    void lock() 
    {
        // std::cout << "要进行加锁" << std::endl;
        pthread_mutex_lock(pmtx_);
    }
    void unlock()
    {
        // std::cout << "要进行解锁" << std::endl;
        pthread_mutex_unlock(pmtx_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *pmtx_;
};

// RAII风格的加锁方式
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
    {
        mtx_.lock();
    }
    ~lockGuard()
    {
        mtx_.unlock();
    }
private:
    Mutex mtx_;
};

log.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>

// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};

#define LOGFILE "./threadpool.log"

// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
    if(level== DEBUG) return;
#endif
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);

    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);

    FILE *fp = fopen(LOGFILE, "a");
    // printf("%s%s\n", stdBuffer, logBuffer);
    fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    fclose(fp);
}

 

thread.hpp 

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>

// typedef std::function<void* (void*)> fun_t;
typedef void* (*fun_t)(void*);

class ThreadData
{
public:
    void* args_;
    std::string name_;
};

class Thread
{
public:
    Thread(int num, fun_t callback, void* args) : func_(callback)
    {
        char nameBuffer[64];
        snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
        name_ = nameBuffer;

        tdata_.args_ = args;
        tdata_.name_ = name_;
    }
    void start()
    {
        pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
    }
    void join()
    {
        pthread_join(tid_, nullptr);
    }
    std::string name()
    {
        return name_;
    }
    ~Thread()
    {
    }

private:
    std::string name_;
    fun_t func_;
    ThreadData tdata_;
    pthread_t tid_;
};

threadPool.hpp

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"

const int g_thread_num = 3;
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
    pthread_mutex_t* getMutex()
    {
        return &lock;
    }
    bool isEmpty()
    {
        return task_queue_.empty();
    }
    void waitCond()
    {
        pthread_cond_wait(&cond, &lock);
    }
    T getTask()
    {
        T t = task_queue_.front();
        task_queue_.pop();
        return t;
    }

private:
    ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
    {
        pthread_mutex_init(&lock, nullptr);
        pthread_cond_init(&cond, nullptr);
        for (int i = 1; i <= num_; i++)
        {
            threads_.push_back(new Thread(i, routine, this));
        }
    }
    ThreadPool(const ThreadPool<T>& other) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& other) = delete;

public:
    // 考虑一下多线程使用单例的过程
    static ThreadPool<T>* getThreadPool(int num = g_thread_num)
    {
        // 可以有效减少未来必定要进行加锁检测的问题
        // 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
        if (nullptr == thread_ptr)
        {
            lockGuard lockguard(&mutex);
            // 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口
            // 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的
            // pthread_mutex_lock(&mutex);
            if (nullptr == thread_ptr)
            {
                thread_ptr = new ThreadPool<T>(num);
            }
            // pthread_mutex_unlock(&mutex);
        }
        return thread_ptr;
    }
    // 1. run()
    void run()
    {
        for (auto& iter : threads_)
        {
            iter->start();
            // std::cout << iter->name() << " 启动成功" << std::endl;
            logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
        }
    }
    // 线程池本质也是一个生产消费模型
    // void *routine(void *args)
    // 消费过程
    static void* routine(void* args)
    {
        ThreadData* td = (ThreadData*)args;
        ThreadPool<T>* tp = (ThreadPool<T> *)td->args_;
        while (true)
        {
            T task;
            {
                lockGuard lockguard(tp->getMutex());
                while (tp->isEmpty())
                    tp->waitCond();
                // 读取任务
                task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
            }
            task(td->name_);
            // lock
            // while(task_queue_.empty()) wait();
            // 获取任务
            // unlock

            // 处理任务
        }
    }
    // 2. pushTask()
    void pushTask(const T& task)
    {
        lockGuard lockguard(&lock);
        task_queue_.push(task);
        pthread_cond_signal(&cond);
    }
    // test func
    // void joins()
    // {
    //     for (auto &iter : threads_)
    //     {
    //         iter->join();
    //     }
    // }
    ~ThreadPool()
    {
        for (auto& iter : threads_)
        {
            iter->join();
            delete iter;
        }
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&cond);
    }

private:
    std::vector<Thread*> threads_;
    int num_;
    std::queue<T> task_queue_;

    static ThreadPool<T>* thread_ptr;
    static pthread_mutex_t mutex;

    // 方案2:
    //  queue1,queue2
    //  std::queue<T> *p_queue, *c_queue
    //  p_queue->queue1
    //  c_queue->queue2
    //  p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程
    //  当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)
    //  因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针

    pthread_mutex_t lock;
    pthread_cond_t cond;
};
template <typename T>
ThreadPool<T>* ThreadPool<T>::thread_ptr = nullptr;

template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;

  • 进入{}创建lockGuard对象,并调用构造函数
  • 出{}调用lockGuard对象的析构函数

 

  •  这里使用双重判断,主要为了拦截大量的在已经创建好单例的时候,
  • 剩余线程请求单例的而直接访问锁的行为

testMain.cc

#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>

// void *run(void *args)
// {
//     while(true)
//     {
//         ThreadPool<Task>::getThreadPool();
//     }
// }

int main()
{
    // logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);

    srand((unsigned long)time(nullptr) ^ getpid());
    // ThreadPool<Task> *tp = new ThreadPool<Task>();
    // ThreadPool<Task> *tp = ThreadPool<Task>::getThreadPool();
    // 那么,如果单例本身也在被多线程申请使用呢??
    ThreadPool<Task>::getThreadPool()->run();
    //thread1,2,3,4

    while(true)
    {
        //生产的过程,制作任务的时候,要花时间
        int x = rand()%100 + 1;
        usleep(7721);
        int y = rand()%30 + 1;
        Task t(x, y, [](int x, int y)->int{
            return x + y;
        });

        // std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;
        logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
        logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
        logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
        logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);

        // 推送任务到线程池中
        ThreadPool<Task>::getThreadPool()->pushTask(t);

        sleep(1);
    }
    return 0;
}

threadpool.log

[NORMAL] [1675916038] Thread-1 启动成功
[NORMAL] [1675916038] Thread-2 启动成功
[NORMAL] [1675916038] Thread-3 启动成功
[WARNING] [1675916038] Thread-1处理完成: 21+20=41 | Task.hpp | 20
[WARNING] [1675916039] Thread-2处理完成: 69+30=99 | Task.hpp | 20
[WARNING] [1675916040] Thread-3处理完成: 97+21=118 | Task.hpp | 20
[WARNING] [1675916041] Thread-1处理完成: 74+21=95 | Task.hpp | 20
[WARNING] [1675916042] Thread-2处理完成: 30+24=54 | Task.hpp | 20
[WARNING] [1675916043] Thread-3处理完成: 95+17=112 | Task.hpp | 20
[WARNING] [1675916044] Thread-1处理完成: 76+27=103 | Task.hpp | 20
[WARNING] [1675916045] Thread-2处理完成: 67+7=74 | Task.hpp | 20
[WARNING] [1675916046] Thread-3处理完成: 38+4=42 | Task.hpp | 20
[WARNING] [1675916047] Thread-1处理完成: 62+19=81 | Task.hpp | 20
[WARNING] [1675916048] Thread-2处理完成: 93+21=114 | Task.hpp | 20
[WARNING] [1675916049] Thread-3处理完成: 64+13=77 | Task.hpp | 20
[WARNING] [1675916050] Thread-1处理完成: 81+3=84 | Task.hpp | 20
[WARNING] [1675916051] Thread-2处理完成: 86+26=112 | Task.hpp | 20
[WARNING] [1675916052] Thread-3处理完成: 24+21=45 | Task.hpp | 20
[WARNING] [1675916053] Thread-1处理完成: 69+6=75 | Task.hpp | 20

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/399706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【java基础】集合基础说明

文章目录基本介绍Collection接口Iterator和Iterable接口Map接口关于Iterator接口的一些说明框架中的接口具体集合总结基本介绍 集合就是存储用来存储一系列数据的一种数据结构。在这篇文章中会介绍集合的一些基本概念。 Collection接口 集合的基本接口是Collection接口&…

Metabase和Superset 对比分析

Metabse中文社区Metabase和Superset都是排名靠前的开源的数据可视化软件&#xff0c;在技术上有许多相似之处。他们的比较可以帮助用户选择更好的开源平台进行数据可视化。关于Superset 编辑切换为居中添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09;Super…

Golang反射源码分析

在go的源码包及一些开源组件中&#xff0c;经常可以看到reflect反射包的使用&#xff0c;本文就与大家一起探讨go反射机制的原理、学习其实现源码 首先&#xff0c;了解一下反射的定义&#xff1a; 反射是指计算机程序能够在运行时&#xff0c;能够描述其自身状态或行为、调整…

智慧赋能,聚力开源——第四届OpenI/O 启智开发者大会开源治理专场顺利举办!

为汇聚国内外知名开源组织共同探讨中国开源生态建设及开源治理相关议题&#xff0c;推进产学研用开源合作&#xff0c;2月24日下午&#xff0c;第四届OpenI/O启智开发者大会在深圳人才研修院智汇中心举办以“构建开源联合体&#xff0c;共建开源生态”为主题的开源治理专场分论…

C++基础了解-17-C++日期 时间

C日期 & 时间 一、C日期 & 时间 C 标准库没有提供所谓的日期类型。C 继承了 C 语言用于日期和时间操作的结构和函数。为了使用日期和时间相关的函数和结构&#xff0c;需要在 C 程序中引用 头文件。 有四个与时间相关的类型&#xff1a;clock_t、time_t、size_t 和 …

opencv识别车道线(霍夫线变换)

目录1、前言2、霍夫线变换2.1、霍夫线变换是什么&#xff1f;2.2、在opencv中的基本用法2.2.1、HoughLinesP函数定义2.2.2、用法3、识别车道3.1、优化3.1.1、降噪3.1.2、过滤方向3.1.3、截选区域3.1.4、测试其它图片图片1图片2图片31、前言 最近学习opencv学到了霍夫线变换&am…

ruoyi对接CAS统一身份认证

暂定逻辑如下&#xff1a;搭建CAS服务器端&#xff1a;项目地址&#xff1a;https://gitee.com/weigang_wu/cas-server-webapp.git项目里有二开的说明文档&#xff0c;如&#xff1a;按照自定义的数据库校验修改如下&#xff1a;首先&#xff1a;修改数据库连接以及查询数据这里…

博客系统(前后端分离版)

博客系统的具体实现 文章目录博客系统的具体实现软件开发的基本流程具体实现的八大功能数据库设计创建数据库操作数据库引入依赖封装DataSource创建实体类将JDBC增删改查封装起来实现博客列表页web.xml的配置文件实现博客系统的展示功能登录功能强制要求用户登录显示用户信息退…

求职复盘:干了四年外包出来,面试5次全挂

我的情况 大概介绍一下个人情况&#xff0c;男&#xff0c;毕业于普通二本院校非计算机专业&#xff0c;18年跨专业入行测试&#xff0c;第一份工作在湖南某软件公司&#xff0c;做了接近4年的外包测试工程师&#xff0c;今年年初&#xff0c;感觉自己不能够再这样下去了&…

为什么做知识管理,就想选择Baklib呢?

随着科技的不断发展&#xff0c;知识管理已经成为现代企业不可或缺的一个重要组成部分。由于信息化快速发展&#xff0c;企业每天都会产生大量的数据和信息&#xff0c;如何高效地获取、整理和利用这些信息已经成为了企业成功的关键因素之一。为了更好地管理企业知识&#xff0…

利用Iptables构建虚拟路由器

利用Iptables构建虚拟路由器 &#xff08;1&#xff09;修改网络类型 在VMware Workstation软件中选择“编辑→虚拟网络编辑器”菜单命令&#xff0c;在虚拟网络列表中选中VMnet1&#xff0c;将其配置为“仅主机模式&#xff08;在专用网络内连接虚拟机&#xff09;”&#x…

模板进阶(仿函数,特化等介绍)

非类型模板参数 模板参数有类型形参和非类型形参&#xff1b; 类型形参&#xff1a;使用typename或者class修饰的参数类型名称 非类型形参&#xff1a;一个普通常量作为模板参数形参&#xff0c;不能为浮点数&#xff0c;字符类型以及类对象&#xff1b; #include<iostrea…

虹科新品| HK-TrueNAS企业存储

一、HK-TrueNAS概述HK-TrueNAS 是一种统一存储阵列&#xff0c;提供混合和全闪存配置&#xff0c;以前所未有的价格提供全面的功能集和高达 10.5PB 的容量。TrueNAS 全闪存存储阵列为以闪存为中心的数据中心提供了理想的统一数据存储。每个混合和全闪存 TrueNAS 系统都使用 Tru…

VSCode 开发配置,一文搞定(持续更新中...)

一、快速生成页面骨架 文件 > 首选项 > 配置用户代码片段 选择需要的代码片段或者创建一个新的&#xff0c;这里以 vue.json 举例&#xff1a; 下面为我配置的代码片段&#xff0c;仅供参考&#xff1a; {"Print to console": {"prefix": "…

Mac系统配置java、Android_sdk、gradle、maven、ndk、flutter、tomcat环境变量

搞了三天&#xff0c;终于搞定MAC系统下的各种环境变量了…… 旧版本10.13.6或者更老的MAC系统&#xff0c;只用在.bash_profile文件编辑就行了&#xff1b;新版本10.14.2、10.15.7或者更高的&#xff0c;还要去.zshrc文件加一句source ~/.bash_profile&#xff0c;才能使所有…

java明文数据加密、脱敏方法总结

前言 在一些安全性要求比较高的项目里&#xff0c;避免不了要对敏感信息进行加解密&#xff0c;比如配置文件中的敏感信息。 第一种方法&#xff08;自定义加解密&#xff09; 加解密工具类&#xff1a; public class SecurityTools {public static final String ALGORITHM…

最新!Windows 11 更新将整合 AI 技术

微软MVP实验室研究员张雅琪&#xff08;阿法兔&#xff09;微软最有价值专家&#xff08;MVP&#xff09;&#xff0c;毕业于外交学院和香港大学&#xff0c;IT 技术社区创始人&#xff0c;中关村互联网金融研究院兼职研究员&#xff0c;多次受邀在微软 Reactor 进行公开演讲&a…

JS的BroadcastChannel与MessageChannel

BroadcastChannel与MessageChannel BroadcastChannel BroadcastChannel以广播的形式进行通信 BroadcastChannel用于创建浏览器标签页之间的通信 使用BroadcastChannel的浏览器标签页面必须要遵循同源策略 页面1使用BroadcastChannel创建一个频道&#xff0c;页面2使用Broadc…

latex入门指南:插入图片、表格、公式方法一览

省事链接&#xff1a; 生成表格latex代码&#xff1a;www.tablesgenerator.com 生成公式latex代码&#xff1a;www.latexlive.com 目录1 插入图片1.1 移动标题位置1.2 双栏文章中图片横跨双栏2 插入表格2.1 常规表格2.2 设置单元格宽度2.3 合并单元格2.4 三线表2.5 移动标题位置…

脑机接口科普0018——前额叶切除手术

本文禁止转载&#xff01;&#xff01;&#xff01; 首先说明一下&#xff0c;前额叶切除手术&#xff0c;现在已经不允许做了。 其次&#xff0c;前额叶切除手术&#xff0c;发明这个手术的人居然还获得了诺贝尔奖。太过于讽刺。1949年的那次诺贝尔医学奖&#xff08;就是我…