目录
- 前言
- 重构动机
- 模块介绍
- FileUtil,LogFile,LogStream,Logging
- AsyncLogging(重要)
- 压测
- 源码
前言
上次参考TinyWebserver
的实现思路是:实现一个blockQueue, 然后实现一个日志接口类,这个接口类承担了打开,写入,关闭日志文件,以及创建一个专门线程用于循环取出队列内的数据。
基于这个思想,上版本实现了这样一个日志库:
#ifndef LOG_H
#define LOG_H
#include <stdio.h>
#include <iostream>
#include <string>
#include <stdarg.h>
#include <pthread.h>
#include "block_queue.h"
using namespace std;
class Log
{
public:
//C++11以后,使用局部变量懒汉不用加锁
static Log *get_instance()
{
static Log instance;
return &instance;
}
static void *flush_log_thread(void *args)
{
Log::get_instance()->async_write_log();
}
//可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
bool init(const char *file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
void write_log(int level, const char *format, ...);
void flush(void);
private:
Log();
virtual ~Log();
void *async_write_log()
{
string single_log;
//从阻塞队列中取出一个日志string,写入文件
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
private:
char dir_name[128]; //路径名
char log_name[128]; //log文件名
int m_split_lines; //日志最大行数
int m_log_buf_size; //日志缓冲区大小
long long m_count; //日志行数记录
int m_today; //因为按天分类,记录当前时间是那一天
FILE *m_fp; //打开log的文件指针
char *m_buf;
block_queue<string> *m_log_queue; //阻塞队列
bool m_is_async; //是否同步标志位
locker m_mutex;
int m_close_log; //关闭日志
};
#define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();}
输出效果:
2023-01-26 18:16:42.990669 [erro]: MySQL Error
2023-01-26 18:20:25.579885 [erro]: MySQL Error
现在来看,这个日志库存在如下缺点:
1 异步线程直接将阻塞队列的string写入日志,每写一次就flush一次,频繁访问磁盘,而访问磁盘的耗时是极大的;
2 阻塞队列自身存在生产者与消费者的锁竞争,因为生产者和消费者都需要先把队列锁住才能继续对应的操作,如果生产者在正要写队列时消费者锁住了队列,那生产者就只能等待了,这将会影响日志系统整体的吞吐效率。
3 日志采用C stdio格式, 只适合需要格式化输出的数据, 对于那些定义了operator<<的自定义对象并不友好。
4 类设计上,首先太过耦合,Log类包揽文件管理,日志前后缀定义,提供写日志接口,并发互斥等任务,可以拆成几个单元来实现(这也是TinyWebServer所有模块的通病),其次并不需要异步日志开闭操作,实战下肯定都是专门线程异步写日志,哪能让日志阻塞工作线程
重构动机
综上问题,决定重写这个模块。新的模块仿照Muduo, 将包括如下要素:
1 专门的文件工具类以及日志文件类(FileUtil, LogFile)将日志读写从整个过程中解耦;
2 采用双缓冲队列并将日志系统分前后端,前后端各两块缓冲,交替读写,以避免锁竞争,加快效率;
3 采用C++的stream形式,这样就能自动对接定义了operator<<的class, 更灵活;
4 新的日志输出格式,以符合多种需求的需要
这次重构日志的另一个动机是,将其实现为一个更为通用的日志库,这样不仅是WebServer, 以后做的其他程序也能应用进去,这样一旦我以后有哪类信息,比如调试类信息不想和普通信息一起输出到终端,我就能通过日志把它们输出到文件中去,实现消息分流。
模块介绍
FileUtil,LogFile,LogStream,Logging
模块主要分为5个部分,FileUtil,LogFile,LogStream,Logging 和 AsyncLogging, 其中前三个类是基础,第四个和第五个分别是接口和实现。
FileUtil实现了一个scope生命期的File class,一次append写入一行; LogFile作为日志专用类,在FileUtil基础上加入了一个每写入几行就将内容flush入硬盘的机制,减少访问磁盘的次数,LogStream是自己实现了一个stream类,Logging作为接口类,通过构造和构析的方式分别为LogStream缓冲中放入的内容添加前后缀(也就是前面的日期 类型 线程号 以及后面的文件名及具体行)并最终输出(还挺有意思,不知道为什么不用单例,根据陈硕的说法是压测效率并不低),这些都比较简单,不详细叙述。
AsyncLogging(重要)
AsyncLogging是实现的核心,之前说的双缓冲队列实现高效写的思路也就在这里,实际上这个类初始共配置了4个缓冲,前端和后端各两块,前端为currBuf_
, nextBuf_
, 后端为newBuffer1和newBuffer2。为了避免无谓拷贝和不必要的引用,这些Buffer都以堆的形式创建并使用unique_ptr管理。
#include"LogFile.h"
#include"LogStream.h"
#include"LoggingImpl.h"
#include<vector>
#include<memory>
#include<mutex>
#include<condition_variable>
#include<semaphore.h>
#include<thread>
class AsyncLogging : public LoggingImpl{
public:
typedef FixedBuffer<kLargeBuffer> Buffer; //4mb = 1000 logs
typedef std::unique_ptr<Buffer> BufferPtr;
typedef std::vector<BufferPtr> BufferVector;
typedef std::mutex MutexLock;
typedef std::unique_lock<std::mutex> MutexLockGuard;
typedef std::condition_variable Condition;
AsyncLogging(const char* filename, int flushInterval = 2);
~AsyncLogging();
void start();
void append(const char* msg, int len);
void stop(){
running_ = false;
cond_.notify_all();
thread_->join();
}
private:
void threadFunc();
private:
LogFile file_;
int flushInterval_;
bool running_;
BufferPtr currBuf_;
BufferPtr nextBuf_;
BufferVector readyBufs_; //queue
MutexLock mtx_;
Condition cond_;
std::unique_ptr<std::thread> thread_;
};
写入的日志首先被写入前端,填充的是currBuf_
,currBuf_
一旦被填满之后就会被加入预备队列readyBufs_
并通过条件变量唤醒写线程,同时将nextBuf_
移动赋给currBuf_
完成生产缓冲的“接棒”。
void AsyncLogging::append(const char* msg, int len){
assert(running_);
MutexLockGuard guard(mtx_);
if(currBuf_->avail() > len){
currBuf_->append(msg, len);
}
else{
readyBufs_.push_back(std::move(currBuf_));
//reload currBuf_
if(nextBuf_){
currBuf_ = std::move(nextBuf_);
//nextBuf_ will be reload by thread;
}
else{
currBuf_ .reset(new Buffer());
}
currBuf_->append(msg,len);
cond_.notify_all();
}
这里有一个特殊情况,就是可能由于生产者生产太快,前端的两个Buffer已经都拿到后端去写了,前端实际无Buffer可用,这时候前端只能给自己“扩容”,new一个Buffer出来作为备用。
后端线程的处理是最复杂的部分,首先线程会给自己初始化newBuffer1和newBuffer2, 这两个buffer将会用于currBuf_
和nextBuf_
的更新,并准备一块写队列bufferForWrite
, 线程被调用时将会尝试将待写队列readyBufs_
swap到bufferForWrite
开始将缓存内容逐个写入到底层LogFile的buffer中。注意这里对readyBufs_
的等待条件是waitForSeconds, 目的是实现日志的间隔刷新,以避免日志写入频率极慢的情况下currBuf_
写不完无法输出的情况。因此currBuf_
被加入readyBufs_
的情况有两个,一是被写满,二是写不完超时。
void AsyncLogging::threadFunc(){
pthread_setname_np(pthread_self(),"Logger");
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector bufferForWrite;
bufferForWrite.reserve(16);
while(running_){
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
{
MutexLockGuard guard(mtx_);
if(readyBufs_.empty()){
cond_.wait_for(guard, sec(flushInterval_));
}
readyBufs_.push_back(std::move(currBuf_));
currBuf_ = std::move(newBuffer1);
bufferForWrite.swap(readyBufs_);
if(!nextBuf_)
nextBuf_ = std::move(newBuffer2);
}
assert(!bufferForWrite.empty());
//设置buffer超过25即丢弃
if(bufferForWrite.size() > 25){
char buf[256] = {0};
snprintf(buf,sizeof(buf),"too much buffers, dropped %ld buffers\n", bufferForWrite.size() - 2);
file_.append(buf, strlen(buf));
bufferForWrite.erase(bufferForWrite.begin() + 2, bufferForWrite.end());
}
//允许输出[3,25]个缓冲
for(auto &buf : bufferForWrite){
file_.append(buf->data(),buf->length());
}
//最终确保后台buffer数保持2个
if(bufferForWrite.size() > 2){
bufferForWrite.resize(2);
}
//overload newbuffer1, newbuffer2;
if(!newBuffer1){
assert(!bufferForWrite.empty());
newBuffer1 = std::move(bufferForWrite.back());
bufferForWrite.pop_back();
newBuffer1->reset();
}
if(!newBuffer2){
assert(!bufferForWrite.empty());
newBuffer2 = std::move(bufferForWrite.back());
bufferForWrite.pop_back();
newBuffer2->reset();
}
file_.flush();
bufferForWrite.clear();
}
file_.flush();
}
如果日志的生产速度极快,超过了日志系统后端的处理能力(如文件读写速度)就会造成日志堆积,表现为前端不断new Buffer给currBuf_
来应对频繁的日志请求,如果后端没法及时的处理在buffersToWrite中的Buffer的话,readyBufs_
中的待写缓冲将会不断堆积,最终导致程序内存不足发生崩溃。
muduo日志系统对该情况的处理方法是直接丢弃堆积的日志,强制将buffersToWrite内的buffer缩减至2个,虽然做法简单粗暴,但一来保证程序的稳定运行,二来可以扩展一个报警程序,简化问题定位,为后续的优化做准备。
buffersToWrite的缓冲都写完之后,内部缓冲将会栈的顺序重新赋给newBuffer1和newBuffer2,为下次的currentBuffer和nextBuffer留好预备。
总结下来,四个缓冲的循环被写的顺序是:currBuf_
, nextBuf_
,newBuffer1
, newBuffer2
。
----------------------------------------currBuf_ <----move------ nextBuf_
\ ^ ^
\ | |
| newBuffer1 newBuffer2
\ ^ ^
\ | |
\-- swap--> bufferForWrite --> pop_back----|--------------------|
自此整个日志系统的架构就完成了,我还写了个阻塞队列的版本来和双缓冲比较,下面只展示阻塞队列版实现代码,具体结果见压测部分。
BlockQueue.h
#pragma once
#include<deque>
#include<limits.h>
#include<cassert>
#include<unistd.h>
#include<mutex>
#include<condition_variable>
#include<thread>
typedef std::mutex MutexLock;
typedef std::unique_lock<std::mutex> MutexLockGuard;
typedef std::condition_variable Condition;
//BlockQueue make sure only one thread can access this queue per time
template <typename T>
class BlockQueue{
public:
BlockQueue(size_t maxSize = 5000) : maxSize_(maxSize), isClose_(false){};
~BlockQueue(){ close(); };
void push(const T& obj);
bool pop();
bool pop(T& item);
T front();
T back();
bool empty();
bool size();
void close();
private:
size_t maxSize_;
bool isClose_;
MutexLock mtx_;
Condition condProducer_;
Condition condConsumer_;
std::deque<T> deq_;
};
template <typename T>
void BlockQueue<T>::push(const T& obj){
MutexLockGuard guard(mtx_);
while(deq_.size() >= maxSize_){
condProducer_.wait(guard);
}
deq_.push_back(obj);
condConsumer_.notify_one();
};
template <typename T>
bool BlockQueue<T>::pop(){
MutexLockGuard guard(mtx_);
while(deq_.empty() && !isClose_){
condConsumer_.wait(guard);
}
if(isClose_)
return false;
assert(!deq_.empty());
deq_.pop_front();
condProducer_.notify_one();
return true;
};
template <typename T>
bool BlockQueue<T>::pop(T& item){
MutexLockGuard guard(mtx_);
while(deq_.empty() && !isClose_){
condConsumer_.wait(guard);
}
if(isClose_)
return false;
assert(!deq_.empty());
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
};
template <typename T>
T BlockQueue<T>::front(){
MutexLockGuard guard(mtx_);
assert(!deq_.empty());
return deq_.front();
}
template <typename T>
T BlockQueue<T>::back(){
MutexLockGuard guard(mtx_);
assert(!deq_.empty());
return deq_.back();
}
template <typename T>
bool BlockQueue<T>::empty(){
MutexLockGuard guard(mtx_);
return deq_.empty();
}
template <typename T>
bool BlockQueue<T>::size(){
MutexLockGuard guard(mtx_);
return deq_.size();
}
template <typename T>
void BlockQueue<T>::close(){
{
MutexLockGuard guard(mtx_);
deq_.clear();
isClose_ = true;
}
condProducer_.notify_all();
condConsumer_.notify_all();
};
//除了currBuf_, nextBuf_, readyBufs_这些被换成BlockQueue外其他均不变
#include"AsyncLoggingBlockQueue.h"
#include<cassert>
#include<chrono>
#include<functional>
typedef std::chrono::seconds sec;
using std::string;
AsyncLoggingBlockQueue::AsyncLoggingBlockQueue(const char* filename, int flushInterval)
:file_(filename), flushInterval_(flushInterval), running_(false){
sem_init(&latch_,0,0);
}
void AsyncLoggingBlockQueue::start(){
assert(!running_);
running_ = true;
thread_.reset(new std::thread(std::bind(&AsyncLoggingBlockQueue::threadFunc, this)));
sem_post(&latch_);
}
void AsyncLoggingBlockQueue::append(const char* msg, int len){
MutexLockGuard guard(mtx_);
queue_.push(string(msg,len));
}
void AsyncLoggingBlockQueue::threadFunc(){
pthread_setname_np(pthread_self(),"Logger");
sem_wait(&latch_); //确保生产者先于消费者
string log;
while(queue_.pop(log)){
MutexLockGuard guard(mtx_);
file_.append(log.c_str(), log.size());
}
}
AsyncLoggingBlockQueue::~AsyncLoggingBlockQueue(){
if(running_) stop();
}
压测
压测方法:分长日志字符和短日志字符,共30轮,每轮输出1000次,共30000次,计算输出这30000次日志的时间(单位为毫秒)。
压测目的:验证和比较双缓冲和阻塞队列两种实现下的写日志效率
双缓冲版本代码:
#include"../base/Logging.h"
#include<iostream>
#include<time.h>
#include<unistd.h>
#include<string>
using std::string;
off_t kRollSize = 500*1000*1000;
void myLogBench(bool longLog, int round, int kBatch){
Logger::setLogFileName("../../logFile/myLogAsync");
int cnt = 0;
string empty = " ";
string longStr(3000,'X');
clock_t start = clock();
for(int t = 0; t < round; ++t){
for(int i = 0; i < kBatch; ++i){
LOG_INFO << "Hello 123456789" << " abcdefghijklmnopqrstuvwxyz " << (longLog ? longStr : empty) << cnt;
++cnt;
}
}
clock_t end = clock();
printf("%lf\n", (float)(end-start)*1000 / CLOCKS_PER_SEC);
}
int main(){
printf("program started, pid = %d\n", getpid()),
myLogBench(false, 30, 1000);
myLogBench(true, 30, 1000);
}
阻塞队列版本代码,为了能够同时调试两个版本的代码,特意对Logger类做了扩展,使其能够设置具体的日志实现。
#include"../base/Logging.h"
#include"../base/LoggingImpl.h"
#include"../base/AsyncLoggingBlockQueue.h"
#include<iostream>
#include<time.h>
#include<unistd.h>
#include<string>
using std::string;
off_t kRollSize = 500*1000*1000;
void myLogBench(bool longLog, int round, int kBatch){
Logger::setLogFileName("../../logFile/myLogAsyncBlockQueue");
Logger::setLogImpl(new AsyncLoggingBlockQueue(Logger::getLogFileName().c_str()));
int cnt = 0;
string empty = " ";
string longStr(3000,'X');
clock_t start = clock();
for(int t = 0; t < round; ++t){
for(int i = 0; i < kBatch; ++i){
LOG_INFO << "Hello 123456789" << " abcdefghijklmnopqrstuvwxyz " << (longLog ? longStr : empty) << cnt;
++cnt;
}
}
clock_t end = clock();
printf("%lf\n", (float)(end-start)*1000 / CLOCKS_PER_SEC);
}
int main(){
printf("program started, pid = %d\n", getpid());
myLogBench(false, 30, 1000);
myLogBench(true, 30, 1000);
}
测试和比较
阻塞队列和我实现的四个缓冲对比
root@iZbp13zqzr3c74v3o1ry3mZ:/home/LinuxC++/Project/re_webserver/test/build# ./AsyncLoggingBlockQueue_test
program started, pid = 1130994
584.836975
791.396973
root@iZbp13zqzr3c74v3o1ry3mZ:/home/LinuxC++/Project/re_webserver/test/build# ./AsyncLogging_test
program started, pid = 1131046
123.179001
292.510010
可以看到同样输出3万条日志,短日志情况下,双缓冲要比阻塞队列快5倍,长日志情况下双缓冲也比阻塞队列快约2.6倍,3万条短日志花了约123毫秒,长日志为292毫秒,也就是每秒双缓冲能写30万条段日志和9万条长日志,而阻塞队列只能写6万条短日志和2万条长日志,综上,吞吐率显著提升。
不过相比原版muduo的每秒100-200万日志,我的山寨版本显然性能很低,和Linyacool的实现在同一个数量级(不同的是linyacool用的是Linux系统pthread库自带的锁,我则全部换成了C++11版本),这说明我还有很多可以优化的细节,但这就超出我做这个server的目的了。
我的实现
107.847000
270.713989
linyacool
85.678001
242.023010
muduo
20.646000
151.054001
源码
https://gitee.com/tracker647/my-muduo-log