文章目录
- 异步缓冲区模块
- 模块设计
- 缓冲区设计
- 单个缓冲区
- 实现
- 测试
异步缓冲区模块
模块设计
异步日志器的思想是为了避免业务线程因为写日志的过程时间较长而长时间阻塞
异步日志器的工作就是把业务输出的日志内容放入内存缓冲区中,使用专门的线程进行日志写入
这个模块的主要内容是
- 实现一个线程安全缓冲区
- 创建一个异步工作线程,专门负责缓冲区日志消息的落地操作
缓冲区设计
- 使用队列缓存日志消息,进行逐条处理,要求不涉及到空间的频繁申请和释放,否则会降低效率
- 使用环形队列,提前将空间申请号,然后对空间循环利用
- 缓冲区会涉及多线程,需要保证线程安全
- 写日志操作只需要一个线程,涉及到的锁冲突是生产者与生产者的互斥,生产者和消费者的互斥,冲突较为严重
- 双缓冲区思想,第一个为任务写入的缓冲区,第二个是任务处理缓冲区
双缓冲区的好处是,降低了生产者和消费者之间的冲突,只有在交换的适合需要冲突一次
单个缓冲区
单个缓冲区如果让进程使用LogMsg类来回访问,会有很多构造的过程,这里我们直接用格式化后的日志消息字符串
减少LogMsg频繁构造的消耗,并且可以针对缓冲区中的日志消息,一次进行全部的IO操作,减少IO次数,提高整体消息
缓冲区类的设计:
- 使用vector进行空间管理,用作存放字符串中的日志消息
- 一个当前写入位置的指针和一个读取位置的指针,分别指向可写和可读的第一个位置
- 当两个指针相遇时,表示数据取完了
实现
/*
实现异步日志缓冲区
*/
#pragma once
#include "util.hpp"
#include <vector>
#include <cassert>
namespace Xulog
{
// 设置默认缓冲区大小10MB,阈值缓冲区大小100MB,增量大小10MB
#define DEFAULT_BUFFER_SIZE (10 * 1024 * 1024)
#define THRESHOLD_BUFFER_SIZE (100 * 1024 * 1024)
#define INCREMENT_BUFFER_SIZE (10 * 1024 * 1024)
class Buffer
{
public:
Buffer()
: _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _reader_idx(0)
{
}
// 向缓冲区写入数据
void push(const char *data, size_t len)
{
// 缓冲区空间不够扩容或者阻塞
// 实际场景:固定大小,则直接返回阻塞
// if (len > writeAbleSize())
// return;
// 极限测试:动态空间,则扩容写入
ensureEnoughSize(len);
// 数据写入缓冲区
std::copy(data, data + len, &_buffer[_writer_idx]);
// 写指针偏移
moveWriter(len);
}
// 返回可读数据的起始地址
const char *begin()
{
return &_buffer[_reader_idx];
}
// 返回可读数据的长度
size_t readAbleSize()
{
// 单方向写入读出的缓冲区
return (_writer_idx - _reader_idx);
}
// 返回可写的数据长度
size_t writeAbleSize()
{
// 扩容测试,总可写
return (_buffer.size() - _writer_idx);
}
// 读指针的偏移操作
void moveReader(size_t len)
{
assert(len <= readAbleSize());
_reader_idx += len;
}
// 重写读写位置,初始化
void reset()
{
_reader_idx = 0;
_writer_idx = 0;
}
// 交换
void swap(Buffer &buffer)
{
_buffer.swap(buffer._buffer);
std::swap(_reader_idx, buffer._reader_idx);
std::swap(_writer_idx, buffer._writer_idx);
}
// 判空
bool empty()
{
return _reader_idx == _writer_idx;
}
private:
// 写指针的偏移操作
void moveWriter(size_t len)
{
assert(len + _writer_idx <= _buffer.size());
_writer_idx += len;
}
// 扩容操作
void ensureEnoughSize(size_t len)
{
if (len <= writeAbleSize())
return;
size_t new_size = 0;
if (_buffer.size() < THRESHOLD_BUFFER_SIZE)
new_size = _buffer.size() * 2 + len; // 小于阈值则翻倍
else
new_size = _buffer.size() + INCREMENT_BUFFER_SIZE + len; // 大于阈值则线性增长
_buffer.resize(new_size);
}
private:
std::vector<char> _buffer;
size_t _reader_idx; // 可读数据指针
size_t _writer_idx; // 可写数据指针
};
}
测试
// 测试异步缓冲区
// 读取文件数据,逐步写入缓冲区,最终将缓冲区写入文件,判断新文件是否与原文件是否一致
std::ifstream ifs("./log/test.log", std::ios::binary);
if (ifs.is_open() == false)
return -1;
ifs.seekg(0, std::ios::end); // 跳转至文件末尾
size_t fsize = ifs.tellg(); // 获取文件长度
ifs.seekg(0, std::ios::beg); // 跳转至文件开头
std::string body;
body.resize(fsize);
ifs.read(&body[0], fsize);
if (ifs.good() == false)
{
std::cout << "read error\n";
return -1;
}
ifs.close();
Xulog::Buffer buffer;
for (int i = 0; i < body.size(); i++)
{
buffer.push(&body[i], 1);
}
std::ofstream ofs("./log/tmp.log", std::ios::binary);
// 一次写入
// ofs.write(buffer.begin(), fsize);
// 逐字节写入
size_t read_size = buffer.readAbleSize();
for (int i = 0; i < read_size; i++)
{
ofs.write(buffer.begin(), 1);
buffer.moveReader(1);
}
ofs.close();