生产者——消费者模型:
多线程场景中最常见的模型之一,异步写日志时负责产生日志消息的业务线程充当生产者,负责写日志的线程充当消费者,两种角色进行数据交互需要依靠一块缓冲区。
单缓冲区的缺点
传统的单缓冲区PC模型一般是基于循环队列的方式进行实现的,对于读写位置的指针管理较为复杂,并且由于PC模型中任意两者之间都处于竞争关系,只有一个缓冲区意味着更加激烈的锁竞争,写日志线程在写日志的时候众多业务线程无法将日志推入缓冲区造成业务线程阻塞,这显然是不合理的。
改进使用双缓冲区
因此考虑将单缓冲区升级为双缓冲区,为业务线程和写日志线程单独分配一块缓冲区,在满足一定的条件下交换二者的缓冲区,这样就能将生产者和消费者的锁竞争降低到只有交换缓冲区的那一次,并且由于日志系统中写日志线程只有1个,消费者之间不存在竞争关系,效率就得到很大程度上的提升。
异步缓冲区实现
class Buffer
{
#define DEFAULT_BUFFERSIZE 1024 * 1024 //默认缓冲区大小
#define THRES 8 * DEFAULT_BUFFERSIZE //缓冲区能够二倍扩容的阈值
#define LINEAR_INCREMENT 1024 * 1024 //缓冲区大小达到阈值后每一次线性增长的大小
/*极端情况下业务线程产生日志的速度可能非常快,在写日志线程还没有处理完对应缓冲区时,业务线程会由于写入空间不足而阻塞,当然有方法可以比避免这种情况,那就是扩容,但是扩容的缺点是会导致缓冲区不断增长导致崩溃*/
private:
std::vector<char> _buffer;
size_t _w = 0; // 写入位置
size_t _r = 0; // 读取位置
public:
Buffer(size_t buffersize = DEFAULT_BUFFERSIZE) : _buffer(buffersize) {}
bool empty(); // 判断缓冲区是否为空
void swap(Buffer &buf); // 交换2个缓冲区
bool push(const char *data, size_t len); // 向缓冲区写入数据
const char *readBegin(); // 获取可读数据起始地址
void moveReader(size_t len); // 移动读下标
size_t writeAbleSize(); // 获取可写长度
size_t readAbleSize(); // 获取可读长度
//writeAbleSize和readAbleSize保证缓冲区的读写不会越界
void reset(); // 归置读写位置
private:
void moveWriter(size_t len); // 移动写下标
void resize(size_t len); // 扩容
};
void Buffer::resize(size_t len)
{
//*对_buffer进行扩容,采用2倍扩容+线性扩容
if (_buffer.size() < THRES) _buffer.resize(_buffer.size() * 2 + len);
else _buffer.resize(_buffer.size() + LINEAR_INCREMENT + len);
}
void Buffer::reset()
{ _w = _r = 0;}
bool Buffer::empty()
{return _w == _r;}
void Buffer::swap(Buffer &buf)
{
_buffer.swap(buf._buffer);
std::swap(_w, buf._w);
std::swap(_r, buf._r);
}
size_t Buffer::readAbleSize()
{return _w - _r;}
size_t Buffer::writeAbleSize()
{return _buffer.size() - _w;}
void Buffer::moveReader(size_t len)
{
assert(_r + len <= _w);
_r += len;
}
void Buffer::moveWriter(size_t len)
{
assert(_w + len <= _buffer.size());
_w += len;
}
const char *Buffer::readBegin()
{return &_buffer[_r];}
bool Buffer::push(const char *data, size_t len)
{
//*判断len是否大于可写入的最大空间
if (len > writeAbleSize()) // 条件成立则2种处理方式:扩容与阻塞
{
#ifdef RESIZE
resize(len);
#else
return false;
#endif
}
std::copy(data, data + len, &_buffer[_w]);
moveWriter(len);
return true;
}
异步日志器中异步工作器的实现
异步日志器中只需要管理一个异步工作器对象
class AsynLooper
{
using Function = std::function<void(Buffer &)>;
private:
std::atomic<bool> _stop;
std::thread _thread; // 异步工作器工作线程
Buffer _pro_buf; //提供给业务线程的缓冲区
Buffer _con_buf; //提供给写日志线程的缓冲区
std::mutex _mtx;
std::condition_variable _con_cond;
std::condition_variable _pro_cond; //条件变量,避免无效的缓冲区交换
private:
void threadEntry(); //写日志线程入口函数
Function _callBack; // 对缓冲区数据进行处理的回调函数,具体如何读取缓冲区数据由上层决定
public:
AsynLooper(Function callback);
~AsynLooper();
void stop();
void push(const char *data, size_t len); // 将日志消息载入缓冲区
};
AsynLooper::AsynLooper(Function callback)
: _callBack(callback), _stop(false)
{
_thread = std::thread(&AsynLooper::threadEntry, this);
}
AsynLooper::~AsynLooper()
{
stop();
}
void AsynLooper::stop()
{
_stop = true;
_con_cond.notify_all();
_thread.join();
/*当异步工作器被叫停之前需要先处理完所有日志才能真正销毁,因此_stop置为真后需要马上唤醒写日志线程将残余日志都进行落地*/
}
void AsynLooper::push(const char *data, size_t len)
{
if (!_stop)
{
std::unique_lock<std::mutex> lock(_mtx);
if (!_pro_buf.push(data, len)) // 写入缓冲区不足
{
_pro_cond.wait(lock, [&](){ return _pro_buf.empty(); });
// 挂起等待至缓冲区可写入
_pro_buf.push(data, len);
}
_con_cond.notify_all(); //缓冲区有数据后可以告知写日志线程已经满足可交换条件之一(另一个条件是写日志现场的缓冲区为空)
}
}
void AsynLooper::threadEntry()
{
while (1)
{
{
std::unique_lock<std::mutex> lock(_mtx);
if (_stop && _pro_buf.empty()) break;
_con_cond.wait(lock, [&](){ return !_pro_buf.empty() || _stop; });
_con_buf.swap(_pro_buf);
_pro_cond.notify_all(); //告知阻塞的业务线程缓冲区已有足够的空间供写入
}
//!!!务必在回调函数_callBack之前把锁释放,写日志线程进行落地是不应该影响业务线程运行
_callBack(_con_buf);
_con_buf.reset();
}
}