Linux线程概念
- 线程在进程内部执行,是OS调度的基本单位
- OS是可以做到让进程进行资源的细粒度划分的
- 物理内存是以4kb为单位的
- 我们的.exe可执行程序本来就是按照地址空间的方式进行编译的
页表映射 - 详细图
理解线程
- 线程在进程的地址空间内运行,
- 进程内部具有多个执行流的,而线程只有一个执行流,
- 线程独有的一组寄存器,栈等等
CPU的视角
- Linux下,PCB <= 其他OS内的PCB的,所以Linux下的进程:统一称之为轻量级进程
- CPU其实不关心执行流是进程还是线程,只关心PCB
Linux没有真正意义上的线性结构,Linux是用进程PCB模拟线程的,
Linux并不能直接给我们提供线程的接口,只能提供轻量级进程的接口!
- 它也在用户层实现了一套用户层多线程方案,以库的方式提供给用户进行使用pthread线程库 -- 原生线程库
为什么线程切换的成本更低
- CPU内部是有L1~L3cache 对应内存的代码和数据,根据局部性原理,预读CPU内部!!!
- 线程是不用切换页表 && 改变地址空间,
- 进程需要切换cache就会立即失效,新进程过来,只能重新缓存,
创建线程
pthread_create
- 这段代码也可以证明CPU是用LWP来区分线程
线程ID及进程地址空间布局
-
pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID 不是一回事。
-
前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
-
pthread_ create函数第 一个参数指向一个虚拟内存单元 ,该内存单元的地址即为新创建线程的线程ID,
pthread_self
- 可以获得线程自身的ID
pthread_t类型的理解
- 对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,
- 本质就是一个进程地址空间上的一个地址。
线程终止
如果需要只终止某个线程而不终止整个进程
,
可以有三种方法
:
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
pthread_cancel
线程等待
为什么需要线程等待?
-
已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
-
创建新的线程不会复用刚才退出线程的地址空间。
pthread_join
分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
pthread_detach
Linux线程互斥
-
临界资源:多线程执行流共享的资源就叫做临界资源
-
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
-
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用 原子性
-
不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char*)arg;
while ( 1 ) {
if ( ticket > 0 ) {
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
} else {
break;
}
}
}
int main( void )
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
- 多个线程并发的操作共享变量,会带来一些问题。
为什么可能无法获得争取结果?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- --ticket 操作本身就不是一个原子操作
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
pthread_mutex_lock && pthread_mutex_unlock
#include <iostream>
#include <thread>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <cassert>
#include <cstdio>
using namespace std;
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题!临界资源
#define THREAD_NUM 10// 线程数量
class ThreadData
{
public:
ThreadData(const std::string& n, pthread_mutex_t* pm)
:tname(n), pmtx(pm)
{}
public:
std::string tname;
pthread_mutex_t* pmtx;
};
void* getTickets(void* args)
{
ThreadData* td = (ThreadData*)args;
while (true)
{
// 抢票逻辑
int n = pthread_mutex_lock(td->pmtx);// 加锁
assert(n == 0);
// 临界区
if (tickets > 0) // 1. 判断的本质也是计算的一种
{
usleep(rand() % 1500);
printf("%s: %d\n", td->tname.c_str(), tickets);
tickets--; // 2. 也可能出现问题
n = pthread_mutex_unlock(td->pmtx);// 解锁
assert(n == 0);
}
else {
n = pthread_mutex_unlock(td->pmtx);// 解锁
assert(n == 0);
break;
}
// 抢完票,其实还需要后续的动作
usleep(rand() % 2000);
}
delete td;
return nullptr;
}
int main()
{
time_t start = time(nullptr);
pthread_mutex_t mtx;// 局部变量 + pthread_mutex_init初始化
pthread_mutex_init(&mtx, nullptr);
srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);// 让随机数更随机
pthread_t t[THREAD_NUM];
// 多线程抢票的逻辑
for (int i = 0; i < THREAD_NUM; i++)
{
std::string name = "thread ";
name += std::to_string(i + 1);
ThreadData* td = new ThreadData(name, &mtx);
pthread_create(t + i, nullptr, getTickets, (void*)td);
}
for (int i = 0; i < THREAD_NUM; i++)
{
pthread_join(t[i], nullptr);
}
pthread_mutex_destroy(&mtx);
time_t end = time(nullptr);
cout << "cast: " << (int)(end - start) << "S" << endl;
}
- pthread_mutex_t 就是原生线程库提供的一个数据类型
- 如果多线程访问同一个全局变量,并对它进行数据计算,多线程不会互相影响
- 可以定义一个全局变量 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
- 加锁保护:加锁的时候,一定要保证加锁的力度,越小越好!!
- 加锁之后就是串行执行的了,线程在临界区,切换也不会有什么问题,
- 锁只有一把,线程切换走了,持有锁也会被带走,而其他抢票的线程要执行临界区的代码
- 又要申请锁,这时锁是无法申请成功的,也就不会让其他线程进入临界区,保证了临界区中数据的一致性
对锁的理解
- 在上那段代码中,锁只有一把,拿1的汇编只有1条
- 一个线程被切换走,它会将自己的数据和对应执行到的汇编
补充一点: 可重入函数就是安全的
常见锁概念
死锁
- 一把锁也可能发生死锁问题
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
Linux线程同步
条件变量
-
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
-
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
引入同步: 主要是为了解决访问临界资源合理性问题的,使线程按照一定顺序,进行临界资源的访问,线程同步
方案一: 条件变量
- 当我们申请临界资源前,先要做临界资源是否存在的检测,而检测的本质: 也是访问临界资源
- 所以对临界资源的检测,也一定需要在加锁和解锁之间的
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#define TNUM 4
typedef void (*func_t)(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;
class ThreadData
{
public:
ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
:name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
{}
public:
std::string name_;
func_t func_;
pthread_mutex_t *pmtx_;
pthread_cond_t *pcond_;
};
void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
// wait一定要在加锁和解锁之间进行wait!
// v2:
pthread_mutex_lock(pmtx);
// if(临界资源是否就绪-- 否) pthread_cond_wait
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 播放" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func2(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
if(!quit) std::cout << name << " running -- 下载" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func3(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 刷新" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void func4(const std::string &name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " running -- 扫码用户信息" << std::endl;
pthread_mutex_unlock(pmtx);
}
}
void *Entry(void *args)
{
ThreadData *td = (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存
td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回!
delete td;
return nullptr;
}
int main()
{
pthread_mutex_t mtx;
pthread_cond_t cond;// 条件变量
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);// 初始化条件变量
pthread_t tids[TNUM];
func_t funcs[TNUM] = {func1, func2, func3, func4};// 函数指针数组
for (int i = 0; i < TNUM; i++)
{
std::string name = "Thread ";
name += std::to_string(i + 1);
ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
pthread_create(tids + i, nullptr, Entry, (void*)td);
}
sleep(5);
// ctrl new thread
int cnt = 10;
while(cnt)
{
std::cout << "resume thread run code ...." << cnt-- << std::endl;
pthread_cond_signal(&cond);// 唤醒等待
sleep(1);
}
std::cout << "ctrl done" << std::endl;
quit = true;
pthread_cond_broadcast(&cond);// 唤醒等待
for(int i = 0; i < TNUM; i++)
{
pthread_join(tids[i], nullptr);
std::cout << "thread: " << tids[i] << "quit" << std::endl;
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
生产者消费者模型
321原则
- 3种关系:
- 生产者和生产者(竞争,互斥)
- 消费者和消费者(竞争,互斥)
- 生产者和消费者(互斥,同步)
- 2种角色:
- 生产者和消费者
- 1个交易场所
- 超市
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 5;// 默认容量
template <class T>
class BlockQueue
{
private:
bool isQueueEmpty()
{
return bq_.size() == 0;
}
bool isQueueFull()
{
return bq_.size() == capacity_;
}
public:
// 构造
BlockQueue(int capacity = gDefaultCap)
: capacity_(capacity)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&Empty_, nullptr);
pthread_cond_init(&Full_, nullptr);
}
void push(const T &in) // 生产者
{
// 1. 先检测当前的临界资源是否能够满足访问条件
lockGuard lockgrard(&mtx_); // 自动调用构造函数
while (isQueueFull()){
pthread_cond_wait(&Full_, &mtx_);
}
// 2. 访问临界资源,100%确定,资源是就绪的!
bq_.push(in);
pthread_cond_signal(&Empty_);
// 局部变量 出了作用域会自动调用lockgrard 析构函数
}
void pop(T *out)
{
lockGuard lockguard(&mtx_);
while (isQueueEmpty())
pthread_cond_wait(&Empty_, &mtx_);
*out = bq_.front();
bq_.pop();
pthread_cond_signal(&Full_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&Empty_);
pthread_cond_destroy(&Full_);
}
private:
std::queue<T> bq_; // 阻塞队列
int capacity_; // 容量上限
pthread_mutex_t mtx_; // 通过互斥锁保证队列安全
pthread_cond_t Empty_; // 用它来表示bq 是否空的条件
pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int myAdd(int x, int y)
{
return x + y;
}
void* consumer(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 获取任务
Task t;
bqueue->pop(&t);
// 完成任务
std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
// sleep(1);
}
return nullptr;
}
// 生产者
void* productor(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 制作任务
int x = rand()%10 + 1;
usleep(rand()%1000);
int y = rand()%5 + 1;
Task t(x, y, myAdd);
// 生产任务
bqueue->push(t);
// 输出消息
std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
BlockQueue<Task> *bqueue = new BlockQueue<Task>();
pthread_t c[2],p[2];
pthread_create(c, nullptr, consumer, bqueue);
pthread_create(c + 1, nullptr, consumer, bqueue);
pthread_create(p, nullptr, productor, bqueue);
pthread_create(p + 1, nullptr, productor, bqueue);
pthread_join(c[0], nullptr);// 等待
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bqueue;
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func)
:x_(x), y_(y), func_(func)
{}
int operator ()()
{
return func_(x_, y_);
}
public:
int x_;
int y_;
func_t func_;
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
// 定义一把锁
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx)
:pmtx_(mtx)
{}
void lock()
{
std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx)
:mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
Makefile
cp:ConProd.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cp
- pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
- 从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的啊
- RAII风格的加锁(封装了一个锁的类)
POSIX信号量
信号量的概念
- 共享资源->任何时候都只有一个执行流在进行访问,
- 如果一个共享资源,不当做一个整体,而让不同的执行流访问不同的局域的话,那就可以并发
- 我们用信号量来表示这段共享资源中还剩多少个资源,
信号量本质:
- 是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--,预订资源,P),
- 使用完毕信号量资源(sem++,释放资源,V)
环形队列实现生产者消费者模型
- 生产者不能将消费者套圈,消费者不能超过生产者,
- 为空: 一定要让生产者先运行
- 为满: 一定要让消费者先运行
生产者: 最关心的是空间资源->spaceSem->N
- P(spaceSem),将会在特定位置生产,然后V(dataSem)
消费者: 最关心的是数据资源->dataSem->0
- P(dataSem),消费特定的数据,然后V(spaceSem)
Makefile
ring_queue:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ring_queue
ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int g_default_num = 5;
// 多线程
template<class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
: ring_queue_(default_num),
num_(default_num),
c_step(0),
p_step(0),
space_sem_(default_num),
data_sem_(0)
{
// 初始化锁
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
~RingQueue()
{
// 销毁锁
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&plock);
}
// 生产者: 空间资源, 生产者们的临界资源是什么?下标
void push(const T &in)
{
// 先申请信号量(0)
space_sem_.p();// --
pthread_mutex_lock(&plock); // ?
// 一定是竞争成功的生产者线程 -- 就一个!
ring_queue_[p_step++] = in;
p_step %= num_;
pthread_mutex_unlock(&plock);
data_sem_.v();// ++
}
// 消费者: 数据资源, 消费者们的临界资源是什么?下标
void pop(T *out)
{
data_sem_.p();
pthread_mutex_lock(&clock);
// 一定是竞争成功的消费者线程 -- 就一个!
*out = ring_queue_[c_step++];
c_step %= num_;
pthread_mutex_unlock(&clock);
space_sem_.v();
}
// void debug()
// {
// std::cerr << "size: " << ring_queue_.size() << " num: " << num << std::endl;
// }
private:
std::vector<T> ring_queue_;
int num_;
int c_step; // 消费下标
int p_step; // 生产下标
Sem space_sem_;// 记录空间的信号量
Sem data_sem_;// 记录空间数据的信号量
pthread_mutex_t clock;// 消费者的锁
pthread_mutex_t plock;// 生产者的锁
};
#endif
sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_
#include <iostream>
#include <semaphore.h>
class Sem
{
public:
Sem(int value)
{
// 信号量初始化
sem_init(&sem_, 0, value);
}
void p()
{
// 等待信号量,会将信号量的值减1
sem_wait(&sem_);
}
void v()
{
// 发布信号量,会将信号量的值加1
sem_post(&sem_);
}
~Sem()
{
// 销毁相互量
sem_destroy(&sem_);
}
private:
sem_t sem_;
};
#endif
testMain.cc
#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
// 消费者
void *consumer(void *args)
{
// 生产者和消费者看到的是同一个rq
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
sleep(1);
int x;
// 1. 从环形队列中获取任务或者数据
rq->pop(&x);
// 2. 进行一定的处理 -- 不要忽略它的时间消耗问题
std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;
}
}
// 生产者
void *productor(void *args)
{
// 生产者和消费者看到的是同一个rq
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
// sleep(1);
// 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题
int x = rand() % 100 + 1;
std::cout << "消费:" << x << " [" << pthread_self() << "]" << std::endl;
// 2. 推送到环形队列中
rq->push(x); // 完成生产的过程
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());// 产生随机数
RingQueue<int> *rq = new RingQueue<int>();
// 多线程模式
pthread_t c[3],p[2];
pthread_create(c, nullptr, consumer, (void*)rq);
pthread_create(c+1, nullptr, consumer, (void*)rq);
pthread_create(c+2, nullptr, consumer, (void*)rq);
pthread_create(p, nullptr, productor, (void*)rq);
pthread_create(p+1, nullptr, productor, (void*)rq);
for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
return 0;
}
多生产多消费的意义
-
将数据或者任务生产前和拿到之后处理,才是最耗费时间的。
-
生产的本质:私有的任务-> 公共空间中
-
消费的本质:公共空间中的任务-> 私有化
信号量的意义
- 信号量本质是一把计数器, 可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断!
- 信号量可以提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况
线程池
Makefile
thread_pool:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:
rm -f thread_pool
- -DDEBUG_SHOW在命令中定义了一个宏,多和条件变量配合使用
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
void operator ()(const std::string &name)
{
// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
// int type;
func_t func_;
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
// printf("%s%s\n", stdBuffer, logBuffer);
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// typedef std::function<void* (void*)> fun_t;
typedef void* (*fun_t)(void*);
class ThreadData
{
public:
void* args_;
std::string name_;
};
class Thread
{
public:
Thread(int num, fun_t callback, void* args) : func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
void start()
{
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
void join()
{
pthread_join(tid_, nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{
}
private:
std::string name_;
fun_t func_;
ThreadData tdata_;
pthread_t tid_;
};
threadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num = 3;
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
pthread_mutex_t* getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
private:
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this));
}
}
ThreadPool(const ThreadPool<T>& other) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& other) = delete;
public:
// 考虑一下多线程使用单例的过程
static ThreadPool<T>* getThreadPool(int num = g_thread_num)
{
// 可以有效减少未来必定要进行加锁检测的问题
// 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
if (nullptr == thread_ptr)
{
lockGuard lockguard(&mutex);
// 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口
// 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的
// pthread_mutex_lock(&mutex);
if (nullptr == thread_ptr)
{
thread_ptr = new ThreadPool<T>(num);
}
// pthread_mutex_unlock(&mutex);
}
return thread_ptr;
}
// 1. run()
void run()
{
for (auto& iter : threads_)
{
iter->start();
// std::cout << iter->name() << " 启动成功" << std::endl;
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 线程池本质也是一个生产消费模型
// void *routine(void *args)
// 消费过程
static void* routine(void* args)
{
ThreadData* td = (ThreadData*)args;
ThreadPool<T>* tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty())
tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
// lock
// while(task_queue_.empty()) wait();
// 获取任务
// unlock
// 处理任务
}
}
// 2. pushTask()
void pushTask(const T& task)
{
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
// test func
// void joins()
// {
// for (auto &iter : threads_)
// {
// iter->join();
// }
// }
~ThreadPool()
{
for (auto& iter : threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread*> threads_;
int num_;
std::queue<T> task_queue_;
static ThreadPool<T>* thread_ptr;
static pthread_mutex_t mutex;
// 方案2:
// queue1,queue2
// std::queue<T> *p_queue, *c_queue
// p_queue->queue1
// c_queue->queue2
// p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程
// 当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)
// 因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针
pthread_mutex_t lock;
pthread_cond_t cond;
};
template <typename T>
ThreadPool<T>* ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
- 进入{}创建lockGuard对象,并调用构造函数
- 出{}调用lockGuard对象的析构函数
- 这里使用双重判断,主要为了拦截大量的在已经创建好单例的时候,
- 剩余线程请求单例的而直接访问锁的行为
testMain.cc
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
// void *run(void *args)
// {
// while(true)
// {
// ThreadPool<Task>::getThreadPool();
// }
// }
int main()
{
// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);
srand((unsigned long)time(nullptr) ^ getpid());
// ThreadPool<Task> *tp = new ThreadPool<Task>();
// ThreadPool<Task> *tp = ThreadPool<Task>::getThreadPool();
// 那么,如果单例本身也在被多线程申请使用呢??
ThreadPool<Task>::getThreadPool()->run();
//thread1,2,3,4
while(true)
{
//生产的过程,制作任务的时候,要花时间
int x = rand()%100 + 1;
usleep(7721);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 推送任务到线程池中
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
threadpool.log
[NORMAL] [1675916038] Thread-1 启动成功
[NORMAL] [1675916038] Thread-2 启动成功
[NORMAL] [1675916038] Thread-3 启动成功
[WARNING] [1675916038] Thread-1处理完成: 21+20=41 | Task.hpp | 20
[WARNING] [1675916039] Thread-2处理完成: 69+30=99 | Task.hpp | 20
[WARNING] [1675916040] Thread-3处理完成: 97+21=118 | Task.hpp | 20
[WARNING] [1675916041] Thread-1处理完成: 74+21=95 | Task.hpp | 20
[WARNING] [1675916042] Thread-2处理完成: 30+24=54 | Task.hpp | 20
[WARNING] [1675916043] Thread-3处理完成: 95+17=112 | Task.hpp | 20
[WARNING] [1675916044] Thread-1处理完成: 76+27=103 | Task.hpp | 20
[WARNING] [1675916045] Thread-2处理完成: 67+7=74 | Task.hpp | 20
[WARNING] [1675916046] Thread-3处理完成: 38+4=42 | Task.hpp | 20
[WARNING] [1675916047] Thread-1处理完成: 62+19=81 | Task.hpp | 20
[WARNING] [1675916048] Thread-2处理完成: 93+21=114 | Task.hpp | 20
[WARNING] [1675916049] Thread-3处理完成: 64+13=77 | Task.hpp | 20
[WARNING] [1675916050] Thread-1处理完成: 81+3=84 | Task.hpp | 20
[WARNING] [1675916051] Thread-2处理完成: 86+26=112 | Task.hpp | 20
[WARNING] [1675916052] Thread-3处理完成: 24+21=45 | Task.hpp | 20
[WARNING] [1675916053] Thread-1处理完成: 69+6=75 | Task.hpp | 20