使用原线程池 当 push 和 pop的对象过大时,消耗时延过高,需优化线程池
采用 std::move()+ unique_ptr的方法,能极大的减少时延,
实际就是避免了多次拷贝,直接使用指针。
代码实现
ThreadPool
#ifndef _THREAD_TOOL_H_
#define _THREAD_TOOL_H_
#include<iostream>
#include<pthread.h>
#include<queue>
#include<ctime>
#include<cstdlib>
#include<Tools/Misc.h>
#include<deque>
#include<memory>
#define NUM 5
using namespace std;
template<class T>
class ThreadPool
{
public:
ThreadPool(int cap = NUM) : thread_num(cap)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void LockQueue()
{
pthread_mutex_lock(&_lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void Wait()
{
pthread_cond_wait(&_cond, &_lock);
}
void Wake()
{
pthread_cond_signal(&_cond);
}
bool isQueueEmpty()
{
return _task_queue.size() == 0 ? true : false;
}
static void* Rountine(void *arg)
{
pthread_detach(pthread_self());
ThreadPool* self = (ThreadPool*)arg;
while (true)
{
self->LockQueue();
while (self->isQueueEmpty()) //避免被虚假唤醒
{
self->Wait();
}
std::unique_ptr<T> t;
self->Pop(t);
self->UnlockQueue();
// 执行任务
t->run();
}
}
void Thread_pool_init()
{
pthread_t tids[thread_num];
for (int i = 0; i < thread_num; i++)
{
pthread_create(&tids[i], nullptr, Rountine, this);
}
}
void push(std::unique_ptr<T>&& in)
{
LockQueue();
_task_queue.push_back(std::move(in)); // 使用 move 将任务指针传入队列
Wake();
UnlockQueue();
}
// Pop 时返回 unique_ptr<T> 类型的任务指针
void Pop(std::unique_ptr<T>& out)
{
out = std::move(_task_queue.front());
_task_queue.pop_front();
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
pthread_mutex_t _lock;
pthread_cond_t _cond;
deque<std::unique_ptr<T>> _task_queue; // 存储智能指针
int thread_num;
};
#endif
task 任务
#ifndef __TASK__H_
#define __TASK__H_
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>
#include <iostream>
#include <utility>
template<class T>
class Task
{
public:
Task() {
executeFunc = nullptr;
}
Task(void(*func)(T), T data)
: executeFunc(func), _data(std::move(data)) {}
Task(const Task<T>& other)
: executeFunc(other.executeFunc), _data(other._data) {}
~Task() = default;
Task<T>& operator=(Task<T> other)
{
std::swap(executeFunc, other.executeFunc);
std::swap(_data, other._data);
return *this;
}
void run()
{
if (executeFunc)
{
executeFunc(_data);
}
}
public:
void (*executeFunc)(T);
T _data;
};
#endif
main.cc
#include "task.h"
#include "ThreadPool.hpp"
#include <stdio.h>
#include <random>
#define TASK_NUM 10
static void DoJob(vector<int> data)
{
for(int i =0 ;i< data.size(); i++)
{
printf("data = %d ", data[i]);
}
printf("\n");
}
int main()
{
uint16_t num_threads = std::thread::hardware_concurrency();
printf("thread_nums %d\n", num_threads);
std::unique_ptr<ThreadPool<Task<vector<int>>>> tp(new ThreadPool<Task<vector<int>>>(num_threads));
tp->Thread_pool_init();
std::vector<int> v1;
std::random_device rd;
std::mt19937 gen(rd());
// 定义范围
int min = 10;
int max = 50;
while(true)
{
// 定义均匀分布
std::uniform_int_distribution<> dis(min, max);
// 生成随机数
int random_number = dis(gen);
v1.push_back(random_number);
if(v1.size() == TASK_NUM)
{
auto task = std::make_unique<Task<std::vector<int>>>(DoJob, std::move(v1));
tp->push(std::move(task));
}
}
return 0;
}
此线程池 可以极大的减少拷贝,降低时延,并根据当前硬件的核数,开启对应的线程数量。