Linux多线程
- 1.Linux线程概念
- 1.1线程的优点
- 1.2线程的缺点
- 2.Linux线程VS进程
- 3.Linux线程控制
- 3.1创建线程
- 3.2线程tid及进程地址空间布局
- 3.3线程终止
- 3.4线程等待
- 4.分离线程
- 5.线程互斥
- 5.1互斥锁mutex
- 5.2互斥锁接口
- 5.3互斥锁实现原理
- 5.4可重入VS线程安全
- 6.线程同步
- 6.1条件变量
- 6.2pthread_cond_wait需要互斥锁的原因
- 7.生产者消费者模型
- 7.1信号量(计数器)
- 8.线程池
1.Linux线程概念
线程是操作系统能够进行运算调度的最小单位,是程序执行的基本单元。
1.1线程的优点
线程的优点主要包括:
资源共享:同一进程中的线程共享进程的内存和资源,减少了资源使用的开销。
提高效率:通过并发执行,多个线程可以同时处理任务,充分利用多核处理器的计算能力,提高程序的整体效率。
响应性:在用户界面应用中,使用线程可以保持界面响应,避免因为长时间的计算阻塞界面。
快速切换:线程之间的切换比进程之间的切换更快,减少了上下文切换的时间成本。
降低延迟:在需要频繁执行短小任务的场景中,线程可以减少延迟,提高响应速度。
1.2线程的缺点
线程的缺点包括:
复杂性:多线程程序的设计和调试相对复杂,容易出现死锁、竞态条件等问题。
上下文切换开销:尽管线程切换比进程切换快,但频繁的上下文切换仍然会消耗资源,影响性能。
共享数据的安全性:多个线程共享同一块内存区域,可能导致数据不一致,需要额外的同步机制(如锁)来确保数据安全。
资源竞争:多个线程同时访问共享资源可能导致性能下降,尤其是在锁的竞争情况下。
调试困难:多线程环境中的错误通常难以重现和调试,增加了开发和维护的难度。
内存使用:虽然线程共享内存,但每个线程都有自己的栈空间,过多线程会导致内存消耗增加。
平台依赖性:线程的实现和调度可能因操作系统而异,导致跨平台开发时的兼容性问题。
2.Linux线程VS进程
进程是资源分配的基本单位
线程是调度的基本单位
线程共享进程的数据,但也拥有自己的一部分数据:
如栈,寄存器内容,线程id,errno,信号屏蔽字,调度优先级
3.Linux线程控制
头文件#include<pthread.h>
链接线程库函数时要使用编译器命令"-lpthread"选项
3.1创建线程
int pthread_create(pthread_t* thread,const pthread_arr_t* arr,void*(*start_routine)(void*),void* arg)
//参数
//thread:返回线程ID
//attr:设置线程的属性,一般设置为nullptr
//start_routine:函数地址,线程启动后要执行的函数
//arg:传给线程启动函数的参数
//返回值:成功返回0,失败返回错误码
示例代码:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
int gcnt = 100;
// 新线程
void *ThreadRoutine(void *arg)
{
const char *threadname = (const char *)arg;
while (true)
{
std::cout << "I am a new thread" << threadname << ",pid: " << getpid() << "gcnt: " << gcnt << "&gcnt" << std::endl;
gcnt--;
sleep(1);
}
}
int main()
{
// 已有进程了
pthread_t tid;
pthread_create(&tid, nullptr, ThreadRoutine, (void *)"thread 1");
// sleep(3);
pthread_t tid1;
pthread_create(&tid1, nullptr, ThreadRoutine, (void *)"thread 2");
// sleep(3);
pthread_t tid2;
pthread_create(&tid2, nullptr, ThreadRoutine, (void *)"thread 3");
// sleep(3);
pthread_t tid3;
pthread_create(&tid3, nullptr, ThreadRoutine, (void *)"thread 4");
// sleep(3);
// 主线程
while (true)
{
std::cout << "I am main thread" << ",pid: " << getpid() << "gcnt: " << "&gcnt: " << &gcnt << std::endl;
sleep(1);
}
return 0;
}
3.2线程tid及进程地址空间布局
pthread_create函数会产生一个线程id,存放在第一个参数指向的地址中
//pthread_t pthread_self(void);
//线程本身调用该函数会返回自身的线程地址,也就是LWP
3.3线程终止
终止某个线程而不是终止整个进程,三种方法:
1.在线程函数return,对主线程不适应,从main函数return相当于调用了exit
2.线程可以调用pthread_exit终止自己。
3.线程可以调用pthread_cancel终止同一个进程中的另一个线程
//void pthread_exit(void* retval);
//retval:这是一个指向任意类型的指针,表示线程的返回值。当线程结束时,这个值可以被其他线程通过 pthread_join 获取。
//pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了
//int pthread_exit(pthread_t thread);
//thread:要取消线程ID
//返回值:成功返回0,失败返回错误码
3.4线程等待
已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
创建新的线程不会复用刚才退出线程的地址空间
//int pthread_join(pthread_t thread,void** value_ptr);
//thread:线程ID
//value_ptr:用于接收线程的返回值,线程的返回值是void* ,所以这里是void**
//返回值:成功返回0,失败返回错误码
//1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
//2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
//3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
//4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
4.分离线程
当线程退出时,自动释放线程资源。
一个线程不能既是joinable又是分离的。
线程如果被分离,该线程可以被取消,但是不能被join
//int pthread_detach(pthread_t thread);
//pthread_detach(pthread_self());
5.线程互斥
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
5.1互斥锁mutex
互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥锁。
5.2互斥锁接口
初始化互斥锁
初始化互斥锁两种方法:
1.静态分配:
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
2.动态分配
int pthread_mutex_init(pthread_mutex_t* mutex,const pthread_mutexattr_t* attr);
//参数:
//mutex:要初始化的互斥锁
//attr:nullptr
销毁互斥锁
使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
不要销毁一个已经加锁的互斥量
已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t* mutex);
互斥锁加锁和解锁
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
//返回值:成功返回0,失败返回错误码
5.3互斥锁实现原理
cnt++语句其实是由三条汇编指令完成的,所以不是原子的
数据在内存中,本质是被线程共享的。
数据被读取到寄存器中,本质变成了线程的上下文,属于线程私有数据!
线程加锁的本质:
大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换
,这里的交换是由一条汇编语句组成的,所以是原子的
每一个线程都有一份,属于自己的上下文
xchgb作用:讲一个共享的mutex资源,交换到自己上下文中,属于线程自己
5.4可重入VS线程安全
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
可重入还是不可重入,描述的是函数的特点!
线程安全与否,描述的是线程的特征
产生死锁的四个必要条件:
互斥条件:一个资源每次只能被一个执行流使用
请求与保持条件:一个执行流因请求资源而阻塞,对以获得的资源保持不放
不剥夺条件:一个执行流已获得的资源,在未完成之前,不能强行剥夺
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
解决或避免死锁只需要破坏其中一个条件或者多个条件
6.线程同步
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。
6.1条件变量
条件变量是线程同步中一种常用的机制,用于在线程之间进行协作,尤其是当一个线程需要等待某个条件成立时。它提供了一种方式来让线程在某个条件未满足时挂起,并且在条件满足时能够被通知从挂起状态恢复执行。
条件变量的初始化
int pthread_cond_init(pthread_cond_t* cond,const pthread_condattr_t* attr);
//cond:要初始化的条件变量
//attr:nullptr
条件变量的销毁
int pthread_cond_destroy(pthread_cond_t* cond);
条件变量的等待
int pthread_cond_wait(pthread_cond_t* cond,pthread_mutex_t* mutex);
//cond:要在这个条件变量上等待
//mutex:互斥锁,条件变量要配合互斥锁才能使用
唤醒等待
int pthread_cond_broadcast(pthread_cond_t* cond);//唤醒所有线程
int pthread_cond_signal(pthread_cond_t* cond);//唤醒队列中的第一个线程
条件变量的测试示例代码
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000;
void *threadRoutine(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
// sleep(1);
// 单纯的互斥,能保证数据安全,不一定合理高效
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
std::cout << name << ",get a ticket: " << tickets-- << std::endl; // 模拟抢票
usleep(1000);
}
else
{
std::cout << "没有票了," << name << std::endl; // 就是每一个线程在大量的申请锁和释放锁
// 1.让线程子啊进行等待的时候,会自动释放锁
// 2.线程被唤醒的时候,是在临界区内唤醒的,当线程被唤醒,线程在pthread_cond_wait返回的时候,要重新申请并持有锁
// 3.当线程被唤醒的时候,重新申请并持有锁本质也是要参与锁的竞争的
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t t1, t2, t3;
pthread_create(&t2, nullptr, threadRoutine, (void *)"thread-2");
pthread_create(&t1, nullptr, threadRoutine, (void *)"thread-1");
pthread_create(&t3, nullptr, threadRoutine, (void *)"thread-3");
sleep(5); // for test,5s开始进行让cond成立,唤醒一个线程
while (true)
{
// pthread_cond_signal(&cond);
// pthread_cond_broadcast(&cond);
// 临时
sleep(6);
pthread_mutex_lock(&mutex);
tickets += 100;
pthread_mutex_unlock(&mutex);
// pthread_cond_broadcast(&cond);
pthread_cond_signal(&cond);
}
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
6.2pthread_cond_wait需要互斥锁的原因
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据
7.生产者消费者模型
//LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
//Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
const std::string opers = "+-*/%)(&)";
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
{
}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
if (data_y == 0)
{
code = div_zero;
}
else
{
result = data_x / data_y;
}
break;
case '%':
if (data_y == 0)
{
code = mod_zero;
}
else
{
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
void operator()()
{
Run();
sleep(2);
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += "[";
s += std::to_string(code);
s += "]";
return s;
}
~Task()
{
}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0:结果可信 !0:结果不可信,1,2,3,4
};
//Log.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
enum
{
Debug = 0,
Info,
Warning,
Error,
Fatal
};
std::string LevelToString(int level)
{
switch (level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
class Log
{
public:
Log() {}
void LogMessage(int level, const char *format, ...) // 类C的一个日志接口
{
char content[1024];
va_list args; // char*,void*
va_start(args, format);
// args 指向了可变参数部分
vsnprintf(content, sizeof(content), format, args);
va_end(args); // args=nullptr;
uint64_t currtime = time(nullptr);
printf("[%s][%s]%s\n", LevelToString(level).c_str(), std::to_string(currtime).c_str(), content);
}
~Log() {}
private:
};
生产者消费者模型(临界资源整体版)参考代码:
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"
const int defaultcap = 5; // for test
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap)
: _capacity(cap)
{
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in) // 生产者
{
LockGuard lockguard(&_mutex);
// pthread_mutex_lock(&_mutex);//2.lockguard 3.重新理解生产消费者模型(代码+理论)4.代码整体改成多生产多消费
while (IsFull())
{
// 阻塞等待
pthread_cond_wait(&_p_cond, &mutex);
}
_q.push(in);
// if(_q.size()>_productor_water_line) pthread_cond_signal(&_c_cond);
pthread_cond_signal(&_c_cond);
// pthread_mutex_unlock(&_mutex);
}
void Pop(T *out)
{
LockGuard lockguard(&_mutex);
// pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
// if(_q.size()<_consumer_water_line) pthread_cond_signal(&_p_cond);
pthread_cond_signal(&_p_cond);
// pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; //_q.size()==_capacity,满了,不能生产,_q.size()==0,空,不能消费了
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 给生产者的
pthread_cond_t _c_cond; // 给消费者的
// int _consumer_water_line;//_comsumer_water_line=_capacity/3*2
// int _productor_water_line;//_productor_water_line=_capacity/3
};
//Main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
class ThreadData
{
public:
BlockQueue<Task> *bq;
std::string name;
};
void *consumer(void *args)
{
ThreadData *td = static_cast<ThreadData *>(args);
// BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while (true)
{
// sleep(1);
Task t;
// 1.消费数据bq->pop(&data);
td->bq->Pop(&t);
// 2.进行处理
// t.Run();
t(); // 处理任务的时候会消耗时间
std::cout << "consumer data: " << t.PrintResult() << ", " << td->name << std::endl;
// 消费者没有sleep
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
// BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
// BlockQueue* bq=static_cast<BlockQueue *>(args);//生产者和消费者同时看到了一份公共资源
while (true)
{
// 1.有数据,从具体的场景中来,从网络中拿数据
// 生产前,任务从哪里来
int data1 = rand() % 10; //[0,9]
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
// 2.进行生产
// bq->Push(data);
bq->Push(t);
sleep(1);
// for debug
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据
// 生产者给消费者分派任务
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[2]; // 消费者和生产者
ThreadData *td = new ThreadData();
td->bq = bq;
td->name = "thread-1";
pthread_create(&c[0], nullptr, consumer, td);
ThreadData *td1 = new ThreadData();
td1->bq = bq;
td1->name = "thread-2";
pthread_create(&c[1], nullptr, consumer, td1);
ThreadData *td2 = new ThreadData();
td2->bq = bq;
td2->name = "thread-3";
pthread_create(&c[2], nullptr, consumer, td2);
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}
7.1信号量(计数器)
初始化信号量
//include<semaphone.h>
int sem_init(sem_t* sem,int pshared,unsigned int value);
//pshared:0表示线程间共享,非0表示进程间共享
//value:信号量初始值多少
销毁信号量
int sem_destroy(sem_t* sem);
等待信号量
int sem_wait(sem_t* sem);//P()
//信号量的值减一
发布信号量
int sem_post(sem_t* sem);//V()
//信号量的值加一
生产者消费者模型(临界资源局部版)参考代码:
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{
// 生产
// 先加锁1,还是先申请信号量?2
P(_space_sem);
{
LockGuard lockguard(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
V(_data_sem);
}
void Pop(T *out)
{
// 消费
P(_data_sem);
{
LockGuard lockguard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
std::vector<T> _ringqueue;
int _size;
int _p_step; // 生产者的生产位置
int _c_step; // 消费位置
sem_t _space_sem; // 生产者
sem_t _data_sem; // 消费者
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
//Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>
void *Productor(void *args)
{
// sleep(5);
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// 数据怎么来的?
// 1. 有数据,从具体场景中来,从网络中拿数据
// 生产前,你的任务从哪里来的呢???
int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand() % 123);
int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand() % 123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
// rq->push();
rq->Push(t);
// sleep(1);
}
}
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// sleep(1);
Task t;
rq->Pop(&t);
t();
std::cout << "consumer done,data is : " << t.PrintResult() << std::endl;
}
}
int main()
{
Log log;
log.LogMessage(Debug, "hello %d,%s,%f", 10, "LJH", 9.24);
log.LogMessage(Warning, "hello %d,%s,%f", 10, "LJH", 9.24);
log.LogMessage(Error, "hello %d,%s,%f", 10, "LJH", 9.24);
log.LogMessage(Info, "hello %d,%s,%f", 10, "LJH", 9.24);
// //单生产,单消费:321
// //如果是多生产多消费
// srand((uint64_t)time(nullptr)^pthread_self());
// pthread_t c[3],c[2];
// //唤醒队列中只能放置整形???
// //RingQueue<int>* rq=new RingQueue<int>();
// RingQueue<Task>* rq=new RingQueue<Task>();
// pthread_create(&p[0], nullptr, Productor, rq);
// pthread_create(&p[1], nullptr, Productor, rq);
// pthread_create(&c[0], nullptr, Consumer, rq);
// pthread_create(&c[1], nullptr, Consumer, rq);
// pthread_create(&c[2], nullptr, Consumer, rq);
// pthread_join(p[0], nullptr);
// pthread_join(p[1], nullptr);
// pthread_join(c[0], nullptr);
// pthread_join(c[1], nullptr);
// pthread_join(c[2], nullptr);
return 0;
}
8.线程池
//Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
// 设计者的视角
// typedef std::function<void()> func_t
template <class T>
using func_t = std::function<void(T &)>;
template <class T>
class Thread
{
public:
Thread(const std::string &threadname, func_t<T> func, T &data)
: _tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
{
}
static void *ThreadRoutine(void *args)
{
//(void)args;//仅仅是为了防止编译器有告警
Thread *ts = static_cast<Thread *>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
if (n == 0)
{
_isrunning = true;
return true;
}
else
{
return false;
}
}
bool Join()
{
if (_isrunning)
return true;
int n = pthread_join(_tid, nullptr);
if (n == 0)
{
_isrunning = false;
return true;
}
return false;
}
std::string ThreadName()
{
return _threadname;
}
bool IsRunning()
{
return _isrunning;
}
~Thread()
{
}
private:
pthread_t _tid;
std::string _threadname;
bool _isrunning;
func_t<T> _func;
T _data;
};
//ThreadBool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
static const int defaultnum = 5;
class ThreadData
{
public:
ThreadData(const std::string &name)
: threadname(name)
{
}
~ThreadData()
{
}
public:
std::string threadname;
};
template <class T>
class ThreadPool
{
private:
ThreadPool(int thread_num = defaultnum)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
// Thread<ThreadData> t(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
//_threads.push_back(t);
_threads.emplace_back(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
lg.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;
public:
// 线程安全问题
static ThreadPool<T> *GetInstance()
{
if (instance == nullptr)
{
LockGuard lockguard(&sig_lock);
if (instance == nullptr)
{
lg.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
}
return instance;
}
bool Start()
{
// 启动
for (auto &thread : _threads)
{
thread.Start();
lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void ThreadWait(const ThreadData &td)
{
lg.LogMessage(Debug, "no task,%s is sleeping...\n", td.threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void checkSelf()
{
// 1._task_num>_task_num_high_water && _thread_num<_thread_num_high_water
// 创建更多的线程,并且更新_thread_num
// 2._task_num==_task_num_low_water && _thread_num>=_thread_num_high_water
// 把自己退出了,并且更新_thread_num
}
void ThreadRun(ThreadData &td)
{
while (true)
{
checkSelf();
// 取任务
T t;
{
LockGuard lockguard(&_mutex);
while (_q.empty())
{
ThreadWait(td);
lg.LogMessage(Debug, "thread %s is wakeup\n", td.threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
td.threadname, t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
void Push(T &in)
{
lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
LockGuard lockguard(&_mutex);
_q.push(in);
ThreadWakeup();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
// for debug
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
static pthread_mutex_t sig_lock;
// 扩展1:
// int _thread_num;
// int _task_num;
// int _thread_num_low_water; // 3
// int _thread_num_high_water; // 10
// int _task_num_low_water; // 0
// int _task_num_high_water; // 30
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
//main.cc
#include <iostream>
#include <memory>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"
// std::autmic<int> cnt;
int main()
{
// std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
// tp->Start();
sleep(10);
ThreadPool<Task>::GetInstance()->Start();
srand((uint64_t)time(nullptr) ^ getpid());
while (true)
{
// 可以搞一个任务
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 200;
usleep(1234);
char oper = opers[rand() % opers.size()];
Task t(x, y, oper);
// std::cout<<"make task: "<<t.PrintTask()<<std::endl;
ThreadPool<Task>::GetInstance()->Push(t);
sleep(1);
}
ThreadPool<Task>::GetInstance()->Wait();
// TODO
return 0;
}