文章目录
- 线程的应用
- 生产消费者模型
- 自制锁
- 生产消费队列
- 成员参数
- 生产函数
- 消费函数
- 任务处理方式
- 主函数
- POSIX信号量
- sem_wait()
- sem_post()
- 线程池
- 应用场景
- 示例
- 单例模式
- 饿汉实现单例
- 吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
- 懒汉实现单例
- 吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
线程的应用
生产消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而
通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者
要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队
列就是用来给生产者和消费者解耦的。
其中生产则会与消费者相当于两个进程,进行生产与消费。
自制锁
通过封装线程锁,达到智能指针的效果,作用域结束,则自动释放锁,以防忘记释放锁,造成线程死锁!
#pragma once
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
:_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
生产消费队列
成员参数
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 给生产者的
pthread_cond_t _c_cond; // 给消费者的
生产函数
判断队列是否是满的,如果满的那么则进行等待,
void Push(const T &in)
{
LockGuard lockguard(&_mutex);//加锁。
while (IsFull())//判断队列是否为满?
{
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
}
消费函数
判断队列是否为空,若为空,则进行等待
void Pop(T *out)
{
LockGuard lockguard(&_mutex);
while(IsEmpty())
{
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
}
整体:
#pragma once
#include <iostream>
#include <queue>
#include "LockGuard.hpp"
const int defaultcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap)
: _capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in)
{
LockGuard lockguard(&_mutex);
while (IsFull())
{
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
}
void Pop(T *out)
{
LockGuard lockguard(&_mutex);
while(IsEmpty())
{
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 给生产者的
pthread_cond_t _c_cond; // 给消费者的
};
任务处理方式
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultvalue = 0;
const std::string opers = "+-*/%)(&";
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: data_x(x),
data_y(y),
oper(op),
result(defaultvalue),
code(ok)
{
}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
void operator()()
{
Run();
sleep(2);
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += std::to_string(result);
s += '[';
s += std::to_string(code);
s += ']';
return s;
}
~Task()
{
}
private:
int data_x;
int data_y;
char oper;
int result;
int code;
};
主函数
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
class ThreadData
{
public:
BlockQueue<Task> *bq;
std::string name;
};
void *consumer(void *args)
{
ThreadData *td = (ThreadData *)args;
while (true)
{
Task t;
td->bq->Pop(&t);
t();
std::cout << "consumer data: " << t.PrintResult() << ", " << td->name << std::endl;
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
bq->Push(t);
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[2];
ThreadData *td = new ThreadData();
td->bq = bq;
td->name = "thread_1";
pthread_create(&c[0], nullptr, consumer, td);
ThreadData *td1 = new ThreadData();
td1->bq = bq;
td1->name = "thread_2";
pthread_create(&c[1], nullptr, consumer, td1);
ThreadData *td2 = new ThreadData();
td2->bq = bq;
td2->name = "thread_3";
pthread_create(&c[2], nullptr, consumer, td2);
pthread_create(&p[0],nullptr,productor,bq);
pthread_create(&p[1],nullptr,productor,bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}
POSIX信号量
POSIX信号量(POSIX semaphores)是POSIX(Portable Operating System Interface)标准定义的一种同步原语,用于协调多线程或多进程之间的资源访问。它们提供了一种机制,允许一个或多个线程或进程等待某个条件成立,或者通知其他线程或进程某个条件已经成立。
sem_wait()
sem_wait 函数用于等待信号量 sem 变为非零值。具体来说,它执行以下操作:
检查信号量 sem 的当前值是否大于零。
如果大于零,则将其值减一,并立即返回。
如果等于零,则调用线程会被阻塞,直到信号量的值变为非零(通常是由另一个线程或进程通过 sem_post 操作增加信号量的值)。
sem_post()
sem_post 函数用于增加信号量 sem 的值。具体来说,它执行以下操作:
将信号量 sem 的值加一。
如果有线程在等待该信号量(即之前被 sem_wait 阻塞),则唤醒其中一个线程(通常是按照先进先出(FIFO)的顺序)。
拿上面的消费生产这模型举例:
将生产者的信号量设置为队列的大小,消费者的信号量设置为0.
当进行生产时,先进行sem_wait判断是否还有空位置进行生产,当生产完则将消费者的信号量进行增加,告诉消费者可以进行消费。
当进行消费时,则与之相反。
#pragma once
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{
// 生产
P(_space_sem);
{
LockGuard lockGuard(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
V(_data_sem);
}
void Pop(T *out)
{
// 消费
P(_data_sem);
{
LockGuard lockGuard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
std::vector<T> _ringqueue;
int _size;
int _p_step; // 生产者的生产位置
int _c_step; // 消费位置
sem_t _space_sem; // 生产者
sem_t _data_sem; // 消费者
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
应用场景
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 - 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情
况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
示例
- 创建固定数量线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
单例模式
某些类, 只应该具有一个对象(实例), 就称之为单例。
饿汉实现单例
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
懒汉实现单例
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};