文章目录
- 主要工作
- 流程图
- FiberCondition
- Buffer
- BufferManager
- LogEvent 序列化 & 反序列化
- Logger
- RotatingFileLogAppender
主要工作
- 实现, LogEvent 序列化和反序列化 (使用序列化是为了更标准,如果转成最终的日志格式再存储(确实会快点,但是对于缓冲区的利用就不够了)
- 优化,使用 LoggerBuild 建造者 实现和管理日志器;
- 双缓冲区设计,使用条件变量等同步技术实现日志异步处理器。支持定时检查缓冲区 和 生产者缓冲区 唤醒;
- 使用 WorkerManager 管理多个调度器;
- 增加 循环日志写入,按时间分片的 LoggerAppender
支持上传下载和展示功能,支持备份重要日志;(待实现网络库后实现)- 性能测试,2h4g 服务器下,异步日志器每秒输出 130MB 日志器。
流程图
FiberCondition
仿照 std::condition_variable
存在的问题,由于是 notify_one 是把 fiber 重新添加调度,并且调度策略是先来先服务。
所以,存在日志时序性错误问题。(解决方法,后期修改调度策略,增加优先级。)⭐
class FiberCondition{
public:
using MutexType = Spinlock;
void wait(MutexType::Lock& lock);
template <typename Predicate>
void wait(MutexType::Lock& lock, Predicate pred){
while(!pred()){
wait(lock);
}
}
void notify_one();
void notify_all();
private:
void printWaiters() const;
private:
MutexType m_mutex;
std::list<std::pair<Scheduler*, Fiber::ptr>> m_waiters;
};
void FiberCondition::wait(MutexType::Lock& lock){
SYLAR_ASSERT(Scheduler::GetThis());
{
MutexType::Lock lock(m_mutex);
m_waiters.push_back(std::make_pair(Scheduler::GetThis(), Fiber::GetThis()));
printWaiters();
}
lock.unlock();
Fiber::GetThis()->yield();
lock.lock();
}
void FiberCondition::notify_one(){
MutexType::Lock lock(m_mutex);
if (!m_waiters.empty()) {
auto next = m_waiters.front();
m_waiters.pop_front();
next.first->schedule(next.second);
}
}
void FiberCondition::notify_all() {
MutexType::Lock lock(m_mutex);
for (auto& waiter : m_waiters) {
waiter.first->schedule(waiter.second);
}
m_waiters.clear();
}
Buffer
class Buffer {
public:
using ptr = std::shared_ptr<Buffer>;
using MutexType = Spinlock;
Buffer(size_t buffer_size);
Buffer(size_t buffer_size, size_t threshold, size_t linear_growth);
void push(const char* data, size_t len);
void push(const std::string& str);
char* readBegin(int len);
bool isEmpty();
void swap(Buffer& buf);
size_t writeableSize();
size_t readableSize() const;
const char* Begin() const;
void moveWritePos(int len);
void moveReadPos(int len);
void Reset();
protected:
void ToBeEnough(size_t len);
private:
MutexType m_mutex;
size_t m_buffer_size;
size_t m_threshold;
size_t m_linear_growth;
std::vector<char> m_buffer;
size_t m_write_pos = 0;
size_t m_read_pos = 0;
};
BufferManager
重点操作:
BufferManager::BufferManager(const functor& cb, // 消费者缓冲区写入的回调函数 ⭐
AsyncType::Type asyncType,
size_t buffer_size,
size_t threshold,
size_t linear_growth,
size_t swap_time,
IOManager* iom)
: m_stop(false),
m_swap_status(false),
m_asyncType(asyncType),
m_buffer_productor(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),
m_buffer_consumer(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),
m_callback(cb),
m_swap_time(swap_time)
{
assert(iom != nullptr);
iom->schedule(std::bind(&BufferManager::ThreadEntry, this));
m_timer = iom->addTimer(m_swap_time, std::bind(&BufferManager::TimerThreadEntry, this), true);
}
// 写入线程,生产者
void BufferManager::push(const char* data, size_t len) {
MutexType::Lock lock(m_mutex);
if(m_asyncType == AsyncType::ASYNC_SAFE){
if (len > m_buffer_productor->writeableSize()) {
SYLAR_LOG_DEBUG(g_logger) << "notify consumer";
m_cond_consumer.notify_one();
}
m_cond_producer.wait(lock, [&](){
return (m_stop || (len <= m_buffer_productor->writeableSize()));
});
}
if(m_stop){
throw std::runtime_error("BufferManager is stopped");
}
m_buffer_productor->push(data, len);
}
// 使用Timer,按照频率访问缓冲区
// 如果生产者没有就退出
void BufferManager::TimerThreadEntry(){
{
MutexType::Lock lock(m_mutex);
if ((!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty()) || m_stop) {
swap_buffers();
if(m_asyncType == AsyncType::ASYNC_SAFE){
m_cond_producer.notify_all();
}
}else{
return;
}
}
{
MutexType::Lock lock(m_swap_mutex);
m_callback(m_buffer_consumer);
m_buffer_consumer->Reset();
}
}
void BufferManager::ThreadEntry() {
while(true){
{
MutexType::Lock lock(m_mutex);
SYLAR_LOG_DEBUG(g_logger) << "ThreadEntry started.";
m_cond_consumer.wait(lock, [&](){
return m_stop || (!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty());
});
swap_buffers();
if(m_asyncType == AsyncType::ASYNC_SAFE){
m_cond_consumer.notify_all();
}
}
{
MutexType::Lock lock(m_swap_mutex);
m_callback(m_buffer_consumer);
m_buffer_consumer->Reset();
if(m_stop && m_buffer_productor->isEmpty()) return;
}
}
}
LogEvent 序列化 & 反序列化
#pragma pack(push, 1)
struct LogMeta {
uint64_t timestamp; // 时间戳
uint32_t threadId; // 线程ID
uint32_t fiberId; // 协程ID
int32_t line; // 行号
uint32_t elapse;
LogLevel::Level level; // 日志级别
uint16_t fileLen; // 文件名长度
uint32_t threadNameLen;// 线程名长度
uint32_t msgLen; // 消息内容长度
};
#pragma pack(pop)
Buffer::ptr LogEvent::serialize() const {
LogMeta meta{ // ⭐
.timestamp = m_time,
.threadId = m_threadId,
.fiberId = m_fiberId,
.line = m_line,
.elapse = m_elapse,
.level = m_level,
.fileLen = static_cast<uint16_t>(m_file.size()),
.threadNameLen = static_cast<uint16_t>(m_threadName.size()),
.msgLen = static_cast<uint32_t>(m_ss.str().size())
};
const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;
auto buffer = std::make_shared<Buffer>(total_need); // 使用 shared_ptr 管理内存
// 序列化元数据
buffer->push(reinterpret_cast<const char*>(&meta), sizeof(meta));
// 序列化变长数据(包含终止符)
buffer->push(m_file.c_str(), meta.fileLen);
buffer->push(m_threadName.c_str(), meta.threadNameLen);
buffer->push(m_ss.str().c_str(), meta.msgLen);
return buffer; // 返回 shared_ptr
}
// 每次调用,解析一个LogEvent
static LogEvent::ptr LogEvent::deserialize(Buffer& buffer) {
if(buffer.readableSize() < sizeof(LogMeta)) {
return nullptr;
}
LogMeta meta;
memcpy(&meta, buffer.Begin(), sizeof(LogMeta));
const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;
if(buffer.readableSize() < total_need){
return nullptr;
}
// 4. 提取各字段数据(使用临时指针操作)
const char* data_ptr = buffer.Begin() + sizeof(LogMeta);
// 文件名处理
std::string file(data_ptr, meta.fileLen);
data_ptr += meta.fileLen;
// 线程名
std::string thread_name(data_ptr, meta.threadNameLen);
data_ptr += meta.threadNameLen;
// 消息内容处理
std::string message(data_ptr, meta.msgLen);
// 5. 统一移动读指针(原子操作保证数据一致性)
buffer.moveReadPos(total_need);
// 6. 构建日志事件对象
auto event = std::make_shared<LogEvent>(
std::move(file),
meta.line,
meta.elapse,
meta.threadId,
std::move(thread_name),
meta.fiberId,
meta.timestamp,
meta.level
);
event->getSS() << message;
return event;
}
Logger
重构时,出现的问题:Logger 对 BufferManager 的依赖,并且 BufferManger 也依赖 IOMgr调度器。
简单说,就说 全局静态变量的初始化顺序问题
解决方法:Logger默认构造的时候,不提供BufferParams,就使用同步方式创建。
导入 yaml 配置后,重置 logger,再创建 异步日志器
Logger(
const std::string name,
LogLevel::Level level,
std::vector<LogAppender::ptr>& appenders,
const BufferParams& bufParams
) :m_name(name),
m_level(level),
m_appenders(appenders.begin(), appenders.end())
{
if(bufParams.isValid()){
m_bufMgr = std::make_shared<BufferManager>(
std::bind(&Logger::realLog, this, std::placeholders::_1), bufParams);
}else{
m_bufMgr = nullptr;
}
}
// 由 iom_log 写入真正的文件。
void realLog(Buffer::ptr buffer) {
MutexType::Lock lock(m_log_mutex); // 强制 只能 一个线程写入。
if (!buffer) {
std::cerr << "realLog: invalid buffer pointer" << std::endl;
return;
}
std::vector<LogEvent::ptr> events;
while (true) { // 解析 buffer
LogEvent::ptr event = LogEvent::deserialize(*buffer);
// 理论上 buffer 里是多个Event的数据,不存在处理失败。
if (event) {
events.push_back(event);
} else {
if (buffer->readableSize() == 0) { // 读完了
break;
} else {
// 处理失败但数据未读完(说明发生严重错误)
std::cout << "Log deserialization error, remaining data: " << buffer->readableSize() << std::endl;
break;
}
}
}
auto self = shared_from_this();
for (auto& appender : m_appenders) {
appender->log(self, events);
}
}
// 写入缓冲区
// 多个线程的 写日志,写入缓存区
void log(LogEvent::ptr event){
if(event->getLevel() >= m_level){
if(m_bufMgr != nullptr){
// MutexType::Lock lock(m_mutex); 当协程阻塞,这个锁就一直没释放。搞半天,给我整的怀疑人生了。
Buffer::ptr buf = event->serialize();
m_bufMgr->push(buf);
}else{
// 如果没有配置iom,直接同步输出日志
auto self = shared_from_this();
for(auto& appender : m_appenders) {
appender->log(self, event);
}
}
}
}
RotatingFileLogAppender
FileLogAppeneder 改为使用 FILE 库函数
支持功能:
- max_size,限制单个日志文件大小(按照时间片创建文件名)
- m_maxFile = 0,无限增加日志
- m_maxFile > 0,限制日志文件的个数。当超过,从第一个文件循环写入。
class RotatingFileLogAppender : public LogAppender{
public:
typedef std::shared_ptr<RotatingFileLogAppender> ptr;
RotatingFileLogAppender(const std::string& filename,
LogLevel::Level level,
LogFormatter::ptr formatter,
size_t max_size,
size_t max_file = 0, // 默认是无限增加
FlushRule::Rule flush_rule = FlushRule::Rule::FFLUSH // 默认是普通日志
);
~RotatingFileLogAppender(){
if(m_curFile){
fclose(m_curFile);
m_curFile = NULL;
}
}
std::string toYamlString();
void log(std::shared_ptr<Logger> logger, LogEvent::ptr event) override;
void log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events) override;
private:
void initLogFile(size_t len = 0);
/**
* 判断是否写的下,如果写的下就 ss<<str,缓存
* 如果写不写了,就把 ss 缓存一次性写入。重置ss
*/
bool checkLogFile(const std::string& str);
std::string createFilename();
private:
std::string m_filename;
FILE* m_curFile = NULL;
std::vector<std::string> m_fileNames;
size_t m_maxSize;
size_t m_maxFile;
FlushRule::Rule m_flushRule;
size_t m_curFilePos = 0;
size_t m_curFileIndex = 0;
Buffer m_buffer;
};
void RotatingFileLogAppender::initLogFile(size_t len){
if(m_curFile == NULL || (m_curFilePos + len) > m_maxSize){
// 写不下了,保证日志的完整性,直接新建文件。
if(m_curFile != NULL){
fflush(m_curFile);
fclose(m_curFile);
if(m_maxFile == 0){
// 无限增加日志文件
m_curFileIndex++;
}else{
m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;
if(!m_fileNames[m_curFileIndex].empty()){ // 说明 循环到 已有的文件了。
std::string newfilename = createFilename();
if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){ // 文件 改新名字
perror("rename failed");
}
m_fileNames[m_curFileIndex] = newfilename;
m_curFile = fopen(newfilename.c_str(), "r+b");
fseek(m_curFile, 0, SEEK_SET); // 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。
m_curFilePos = 0;
return;
}
}
}
std::string filename = createFilename();
m_fileNames[m_curFileIndex] = filename;
m_curFile = fopen(filename.c_str(), "ab");
if(m_curFile==NULL){
std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;
perror(NULL);
}
m_curFilePos = 0;
return;
}
}
std::string RotatingFileLogAppender::createFilename() {
time_t now = time(nullptr);
struct tm tm;
localtime_r(&now, &tm);
char time_buf[64];
strftime(time_buf, sizeof(time_buf), "%Y%m%d_%H%M%S", &tm);
return m_filename + "_" + time_buf + "_" + std::to_string(m_curFileIndex) + ".log";
}
void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, LogEvent::ptr event){
MutexType::Lock lock(m_mutex);
if(event->getLevel() >= m_level){
std::string data = m_formatter->format(logger , event);
initLogFile(data.size());
fwrite(data.c_str(), 1, data.size() , m_curFile);
if(ferror(m_curFile)){
std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;
perror(NULL);
}
m_curFilePos += data.size();
if(m_flushRule == FlushRule::Rule::FFLUSH){
if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。
std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;
perror(NULL);
}
}else if(m_flushRule == FlushRule::Rule::FSYNC){
fflush(m_curFile);
fsync(fileno(m_curFile));
}
}
}
bool RotatingFileLogAppender::checkLogFile(const std::string& data){
if(m_curFile == NULL || (m_curFilePos + data.size()) > m_maxSize){
// 写不下了,保证日志的完整性,直接新建文件。
if(m_curFile != NULL){
// 把 ss 缓存一次性写入
fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);
m_buffer.Reset();
// 判断错误信息
if(ferror(m_curFile)){
std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;
perror(NULL);
}
if(m_flushRule == FlushRule::Rule::FFLUSH){
if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。
std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;
perror(NULL);
}
}else if(m_flushRule == FlushRule::Rule::FSYNC){
fflush(m_curFile);
fsync(fileno(m_curFile));
}
fclose(m_curFile);
if(m_maxFile == 0){
// 无限增加日志文件
m_curFileIndex++;
}else{
m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;
if(!m_fileNames[m_curFileIndex].empty()){ // 说明 循环到 已有的文件了。
std::string newfilename = createFilename();
if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){ // 文件 改新名字
std::cout << "rename failed" << std::endl;
perror(NULL);
}
m_fileNames[m_curFileIndex] = newfilename;
m_curFile = fopen(newfilename.c_str(), "r+b");
if (m_curFile == NULL) {
std::cout << __FILE__ << __LINE__ << "open file failed" << std::endl;
perror(NULL);
}
// 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。
fseek(m_curFile, 0, SEEK_SET);
m_curFilePos = 0;
// 模拟写入文件,实际上写入缓存。
m_buffer.push(data);
m_curFilePos += data.size();
return true;
}
}
}
// m_curFile为空,创建新文件
std::string filename = createFilename();
if(m_maxFile > 0){
m_fileNames[m_curFileIndex] = filename; // 只有限制最大文件数,记录文件名
}
m_curFile = fopen(filename.c_str(), "ab");
if(m_curFile==NULL){
std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;
perror(NULL);
}
m_curFilePos = 0;
}
// 模拟写入文件,实际上写入缓存。
m_buffer.push(data);
m_curFilePos += data.size();
return false;
}
void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events){
// 这个时候,m_curFilePos 转变成对 m_buffer 写入数据的 pos 长度。 ⭐
MutexType::Lock lock(m_mutex);
for(auto& event : events){
if(event->getLevel() >= m_level){
std::string data = m_formatter->format(logger , event);
checkLogFile(data);
}
}
// 最后再次,把缓存里的写入
if(m_buffer.readableSize() > 0 && m_curFile != NULL){
fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);
m_buffer.Reset();
// 判断错误信息
if(ferror(m_curFile)){
std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;
perror(NULL);
}
if(m_flushRule == FlushRule::Rule::FFLUSH){
if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。
std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;
perror(NULL);
}
}else if(m_flushRule == FlushRule::Rule::FSYNC){
fflush(m_curFile);
fsync(fileno(m_curFile));
}
}
}
剩下的就是:
works,多个调度器的的管理~
对Config监听函数的修改,补充BufferParams
分离works.yml和log.yml,分别导入。提前导入works.yml 保证 调度器创建完成。
详见 代码 https://github.com/star-cs/webserver