一、线程池
1.1 概念
线程池一种线程使用模式:
线程过多会带来调度开销,进而影响缓存局部性和整体性能。
而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务:(线程池的优点)
- 这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核的充分利用,还能防止过分调度。
注意:可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。比如突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误
线程池使用:
- 创建固定线程数量的线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
1.2 实现
1.2.1 封装线程对象thread + RAII自动加锁解锁
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <functional>
#include "logmessage.hpp"
namespace zty
{
typedef void *(*func_t)(void *);
//1.封装线程对象
class thread
{
pthread_t _tid;
func_t _callback = nullptr;
void *_args = nullptr;
public:
thread() {}
thread(func_t callback, void *args)
: _callback(callback),
_args(args)
{
pthread_create(&_tid, nullptr, _callback, _args);
}
thread(const thread &other) = delete;
thread &operator=(const thread &other) = delete;
void run(func_t callback, void *args)
{
_callback = callback;
_args = args;
pthread_create(&_tid, nullptr, _callback, _args);
// printf("[%d] run\n", _tid%10000);
LogMessage(DEBUG, "[%d] run", _tid%10000);
}
void join()
{
// printf("[%d] join\n", _tid%10000);
LogMessage(DEBUG, "[%d] join", _tid%10000);
pthread_join(_tid, nullptr);
}
pthread_t get_id()
{
return _tid;
}
};
//2. RAII自动加锁解锁
class lock_guard
{
pthread_mutex_t &_pmtx;
public:
lock_guard(pthread_mutex_t &pmtx)
: _pmtx(pmtx)
{
pthread_mutex_lock(&_pmtx);
}
~lock_guard()
{
pthread_mutex_unlock(&_pmtx);
}
};
}
1.2.2 线程池的实现
下面进行实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理,没有任务就阻塞等待。
- 线程池对外提供一个push接口,用于让外部线程能够将任务push到任务队列当中
实现代码如下:(在堆区创建的单例懒汉模式)
#pragma once
#include "lockguard.hpp"
#include "thread.hpp"
#include "task.hpp"
#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
namespace zty
{
const int THREAD_NUM = 3;
template <class T>
class thread_pool
{
static thread_pool *s_ins;
int _thread_num;
std::vector<zty::thread> _threads;
std::queue<T> _tasks;
static pthread_mutex_t _mtx;
static pthread_cond_t _cond;
bool _terminate = false; // 结束标志
thread_pool(int thread_num = THREAD_NUM)
: _thread_num(thread_num),
_threads(_thread_num)
{}
~thread_pool()
{
_terminate = true; // 设置结束标志
pthread_cond_broadcast(&_cond); // 唤醒所有等待条件变量的线程
for (auto &e : _threads) // 回收所有子线程
{
e.join();
}
}
thread_pool(const thread_pool &other) = delete;
thread_pool &operator= (const thread_pool &other) = delete;
struct GC
{
~GC()
{
if (s_ins != nullptr)
{
delete s_ins;
s_ins = nullptr;
}
}
};
public:
static thread_pool &GetInstance(int num = THREAD_NUM)
{
if (s_ins == nullptr)
{
zty::lock_guard lock(_mtx);
if (s_ins == nullptr)
{
s_ins = new thread_pool(num);
}
}
return *s_ins;
}
void push(const T &task)
{
zty::lock_guard lock(_mtx);
_tasks.push(task);
pthread_cond_signal(&_cond);
}
bool pop(T &out)
{
zty::lock_guard lock(_mtx);
while (_tasks.empty())
{
pthread_cond_wait(&_cond, &_mtx);
if (_terminate)
return false;
}
out = _tasks.front();
_tasks.pop();
return true;
}
void run()
{
for (auto &e : _threads)
{
e.run(routine, this);
}
}
//pthread_creat函数要求的线程入口点函数的参数只有一个void*,不能有this指针。
static void *routine(void *args)
{
thread_pool *self = (thread_pool *)args; // self实际就是this指针
T task;
while (!self->_terminate)
{
bool ret = self->pop(task);
if (ret) // 判断self->pop是否获取到任务了
{
LogMessage(NORMAL, "[%d] %d%c%d=%d", pthread_self() % 10000, task._l, task._op, task._r, task());
sleep(1);
}
}
return (void *)0;
}
};
template <class T>
thread_pool<T> *thread_pool<T>::s_ins = nullptr;
template <class T>
pthread_mutex_t thread_pool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
template <class T>
pthread_cond_t thread_pool<T>::_mtx = PTHREAD_CONDITION_INITIALIZER;
template <class T>
typename thread_pool<T>::GC thread_pool<T>::s_gc;
}
test.cc
#include "thread.hpp"
#include "lockguard.hpp"
#include "task.hpp"
#include "threadpool.hpp"
#include "logmessage.hpp"
#include <thread>
#include <unistd.h>
#include <ctime>
#include <cstdlib>
using namespace std;
int main()
{
srand((unsigned int)time(nullptr));
// zty::thread_pool<zty::Task> tp(5);
zty::thread_pool<zty::Task> &tp = zty::thread_pool<zty::Task>::GetInstance();
tp.run();
char ops[] = {'+', '-', '*', '/' ,'%'};
int cnt = 5;
while(cnt--)
{
int l = rand()%100;
int r = rand()%100;
char op = ops[rand()%5];
zty::Task task(l, r, op);
// printf("main_thread: %d%c%d=?\n", task._l, task._op, task._r);
LogMessage(NORMAL, "main_thread: %d%c%d=?", task._l, task._op, task._r);
tp.push(task);
sleep(1);
}
return 0;
}
logmessage.hpp
#pragma once
#include <cstdarg>
#include <cstdio>
#include <ctime>
#include <cstring>
enum LEVEL
{
DEBUG,
NORMAL,
WARNING,
ERROR,
FATAL
};
const char *lvtable[] = {"DEBUG", "NORMAL", "WARNING", "ERROR", "FATAL"};
const char *filepath = "./log.txt";
void LogMessage(LEVEL level, const char *format, ...)
{
#ifndef DEBUGSHOW
if (level == DEBUG)
return;
#endif
char buffer[1024];
time_t timestamp;
time(×tamp);
tm *timeinfo = localtime(×tamp);
snprintf(buffer, sizeof(buffer), "[%d/%d/%d]%s: ", timeinfo->tm_year + 1900, timeinfo->tm_mon, timeinfo->tm_mday, lvtable[level]);
va_list ap;
va_start(ap, format);
vsnprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer), format, ap);
va_end(ap);
printf("%s\n", buffer);
FILE *fp = fopen(filepath, "a");
fprintf(fp, "%s\n", buffer);
}
二、线程安全的单例模式
这里在C++篇章已经谈过,这里不再赘述,链接::【C++】特殊类设计 {不能被拷贝的类;只能在堆上创建的类;只能在栈上创建的类;不能被继承的类;单例模式:懒汉模式,饿汉模式}-CSDN博客
注意事项:
- 单例模式有两种实现模式:饿汉模式(启动时实例化对象)和懒汉模式(在任意程序模块第一次访问单例时实例化对象)。
- 单例对象可以在堆区创建,也可以在静态区创建
- 在堆区创建的懒汉单例,获取单例指针时需要双检查加锁:双重 if 判定, 避免不必要的锁竞争
- 在堆区创建单例时,包含一个静态的内部类对象,该对象析构时会顺便析构单例,自动释放。
三、STL容器、智能指针的线程安全
STL中的容器是否是线程安全的?不是
-
STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。
-
而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)
-
因此 STL 默认不是线程安全。如果需要在多线程环境下使用。往往需要调用者自行保证线程安全
智能指针是否是线程安全的? 是
-
对于unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题。
-
对于shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证了 shared_ptr 能够高效、原子地进行引用计数。
四、其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。(乐观锁需要被设计)
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。(与互斥锁原理一样)
4.1 pthread自旋锁
特性
- 自旋锁是一种基于忙等待的锁,用于保护共享资源的访问。当一个线程尝试获取自旋锁时,如果锁已经被其他线程占用,则该线程会一直循环等待,直到锁被释放。这种等待的过程称为自旋
- 不同于互斥锁,线程竞争自旋锁失败后不会被阻塞挂起,而是会进行自旋检测,直到获取锁。
- 无论是互斥锁的挂起等待还是自旋锁的轮询检测都是由pthread库完成的,我们不需要自行操作。因此自旋锁的使用方法和互斥锁相同。
使用场景
- 自旋锁不会引起线程的上下文切换,而互斥锁(如Mutex)可能会引起线程的上下文切换,因此在一些锁的持有时间很短的场景下,使用自旋锁可以减少上下文切换的开销。
- 自旋锁适用于等待时间短的情况,在等待时间短的情况下,自旋锁的效率较高。如果等待时间较长,自旋锁可能会消耗大量的CPU资源。
相关接口
#include <pthread.h>
pthread_spinlock_t spinlock; // 定义一个自旋锁
pthread_spin_init(&spinlock, 0); // 初始化自旋锁
pthread_spin_lock(&spinlock); // 获取自旋锁
pthread_spin_unlock(&spinlock); // 释放自旋锁
pthread_spin_destroy(&spinlock); // 销毁自旋锁
应用层实现
//可以使用while循环+pthread_mutex_trylock在应用层实现一个自旋锁
while(pthread_mutex_trylock(&lock) == EBUSY);
//访问临界资源
//......
pthread_mutex_unlock(&lock);
4.2 pthread读写锁
特性
读写锁(Read-Write Lock)是一种特殊的锁机制,它允许多个线程同时获得读访问权限,但同一时间只有一个线程可以获得写访问权限。读写锁适用于读操作远远超过写操作的场景,可以提高并发性能。
使用场景
读写锁适用于读操作频繁而写操作较少的场景。如果读操作和写操作的数量相差不大,或者写操作频繁,可能会导致读写锁的效率不如互斥锁。
读者写者模型的特点
321原则(便于记忆)
- 三种关系: 写者和写者(互斥关系)、读者和读者(没有关系)、写者和读者(互斥关系、同步关系)。
- 两种角色: 写者和读者(通常由线程承担)
- 一个交易场所: 通常指的是内存中的一段共享缓冲区(共享资源)
读者写者模型 VS 生产消费模型的本质区别:消费者会拿走数据,读者不会(读者只会对数据进行拷贝、访问…)
相关接口
#include <pthread.h>
//定义一个读写锁
pthread_rwlock_t xxx
//初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t*restrict attr);
//销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
//加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读锁加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写锁加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//解锁共用
示例代码
下面是一个使用pthread库中读写锁的简单示例:
#include <pthread.h>
#include <stdio.h>
// 定义一个全局读写锁
pthread_rwlock_t rwlock;
int shared_data = 0;
void* reader(void* arg) {
int tid = *((int*)arg);
while (1) {
pthread_rwlock_rdlock(&rwlock); // 获取读锁
printf("Reader %d: Shared data = %d\n", tid, shared_data);
pthread_rwlock_unlock(&rwlock); // 释放读锁
// 模拟读操作完成后的延迟
usleep(1000000);
}
pthread_exit(NULL);
}
void* writer(void* arg) {
int tid = *((int*)arg);
while (1) {
pthread_rwlock_wrlock(&rwlock); // 获取写锁
shared_data++; // 修改共享数据
printf("Writer %d: Modified shared data to %d\n", tid, shared_data);
pthread_rwlock_unlock(&rwlock); // 释放写锁
// 模拟写操作完成后的延迟
usleep(2000000);
}
pthread_exit(NULL);
}
int main() {
pthread_t reader1, reader2, writer1;
int tid1 = 1, tid2 = 2, tid3 = 3;
pthread_rwlock_init(&rwlock, NULL); // 初始化读写锁
// 创建两个读线程和一个写线程
pthread_create(&reader1, NULL, reader, &tid1);
pthread_create(&reader2, NULL, reader, &tid2);
pthread_create(&writer1, NULL, writer, &tid3);
// 主线程等待读写线程结束
pthread_join(reader1, NULL);
pthread_join(reader2, NULL);
pthread_join(writer1, NULL);
pthread_rwlock_destroy(&rwlock); // 销毁读写锁
return 0;
}
需要注意的是,读写锁采用共享/排他的锁控制策略。当有读者线程持有读锁时,其他读者线程可以继续获取读锁;但当有写者线程持有写锁时,其他任何读者线程或写者线程都无法获取读或写锁。
读写锁的实现原理: