1. logging
日志(logging) 有两个意思:
- 诊断日志: 即是我们日常debug 使用的文本文件记录trace。常用的log 有log4j, logback, log4cpp, ezlogger等常用的日志库。
- 交易日志: 即是数据库的write-ahead log, 文件系统的journaling 等, 用于记录状态的变更,通过回访日志可以逐步恢复每一次修改之后的状态。
这篇文章我们主要介绍下muduo的log的实现,我觉得挺有意思的。muduo的日志库是C++ stream 风格的, 不需要保持格式字符串和参数类型的一致性, 可以随用随写,而且是类型安全的。我们需要的日志功能主要有下面几个需求:
- 日志消息有多种级别(level), 如TRACE, DEBUG, INFO, WARN, FATAL 等。
- 调整日志级别不需要重新编译,也不需要重启进程, 只要调用setloglevel() 就能即时6生效。
- 往本地存储写文件时,考虑日志文件的滚动。文件大小(例如每写满1GB就换下一个文件)和时间(例如每天零点新建一个日志文件,不论前一个文件有没有写满)
注意: 往文件中写日志的一个常见的问题时,万一程序崩溃,那么最后若干条日志就丢失了,因为日志库不能每条消息都flush硬盘,更不能每条日志都open/close文件,这样性能开销太大。 muduo采用两个办法来应对这一点:
- 定期(默认3s)将缓冲区内的日志消息flush到硬盘;
- 每条内存的日志消息都带有cookie, 其值为某个函数的地址,这样通过core dump 文件中查找cookie就能找到尚未来得及写入磁盘的消息。
日志消息的格式有几个要点:
- 尽量每条日志占一行。
- 时间戳精确到微秒。 muduo的每条消息时通过gettimeofday(2) 获得当前时间, 这个函数不是系统调用, 不会有什么性能损失。
- 打印线程的id。
- 打印日志的级别
- 打印源文件名和行号。
推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,
TCP/IP,协程,DPDK等技术内容,点击立即学习:链接
2. 性能需求
编写linux 服务端程序的时候,我们需要一个高效的日志库。
- 每秒写几千上万条日志的时候没有明显的i性能损失
- 能应对一个进程产生大量的日志数据的场景, 例如 1GM/min
- 不阻塞正常的执行流程
- 在多线程中,不造成race condition.
为了满足这样的性能指标, muduo日志库的实现有几个优化措施。
- 时间戳字符串中的日期和时间两部分时缓存的, 一秒之内的多条日志只需重新格式化微秒部分。
- 日志消息的前4个字段时定长的,避免在运行期求字符串的长度。
- 线程id时格式化为字符串。
- 每行日志消息的源文件名部分采用了编译器计算来获得basename, 避免运行期strchr开销。
3. muduo的日志库分析
muduo的日志库采用的是双缓冲(double buffering)的技术. 准备两块buffer: A和B, 前端负责往buffer A填充数据, 后端负责将buffer B的数据写入文件。
当buffer A写满之后,交换A和B, 让后端将buffer A的数据写入文件,而前端则往buffer B填入新的日志消息,如此往复。用两个buffer的好处是在新建日志消息的时候不必等待磁盘文件操作, 也避免每条新日志消息都触发后端日志线程。换言之,前端将多条日志消息拼成一个大的buffer传送给后端,相当于批处理,减少了线程唤醒的频度,降低开销。muduo异步日志的性能开销大约是前端每写一条日志消息耗时1.0us ~ 1.6us.
在介绍AsyncLogging
文件之前,我们先看下LogFile
和 Logger
, LogStream
,FixBuffer
的实现。
LogFile
logfile
比较简单,主要是rollfile的实现比较有意思。 我们先看下头文件。
class LogFile : noncopyable // 不可被复制
{
public:
LogFile(const string& basename, // basename 文件名
off_t rollSize, // rollSize 设置roll的大小的阈值,大于这个size的大小就开始roll.
bool threadSafe = true, // 是否为线程安全, 为mutex 服务
int flushInterval = 3, // 几秒一次flush
int checkEveryN = 1024); // 检查roll 或者 flush的 次数
~LogFile();
void append(const char* logline, int len); // 给外部调用的只有这三个函数
void flush();
bool rollFile();
private:
void append_unlocked(const char* logline, int len);
static string getLogFileName(const string& basename, time_t* now);
const string basename_;
const off_t rollSize_;
const int flushInterval_;
const int checkEveryN_;
int count_;
std::unique_ptr<MutexLock> mutex_;
time_t startOfPeriod_;
time_t lastRoll_;
time_t lastFlush_;
std::unique_ptr<FileUtil::AppendFile> file_; // 文件操作
const static int kRollPerSeconds_ = 60*60*24;
};
cpp文件, 构造函数比较简单。
LogFile::LogFile(const string& basename,
off_t rollSize,
bool threadSafe,
int flushInterval,
int checkEveryN)
: basename_(basename),
rollSize_(rollSize),
flushInterval_(flushInterval),
checkEveryN_(checkEveryN),
count_(0),
mutex_(threadSafe ? new MutexLock : NULL),
startOfPeriod_(0), // 记录每一次roll的时间, roll 完会更新
lastRoll_(0), // 记录上一次roll的时间
lastFlush_(0) // 记录上一次的flush/roll的时间
{
assert(basename.find('/') == string::npos);
rollFile();
}
我们看下append的实现,append 需要输入append的内容和长度, 如果是线程的安全的,加锁去append。
void LogFile::append(const char* logline, int len)
{
if (mutex_)
{
MutexLockGuard lock(*mutex_);
append_unlocked(logline, len);
}
else
{
append_unlocked(logline, len);
}
}
这里调用了append_unlocked
, 我们看下这个函数的实现。我们调用文件操作append写入len字节的logline, writtenBytes
会返回append
的大小,如果 writtenBytes
大于设置的rollSize_
, 我们就开始’roolsize
的操作, 否则就判断count的次数, 我们默认的是1024次。如果上一次lastFlush_
超过了3s, 我们就写到文件中。
void LogFile::append_unlocked(const char* logline, int len)
{
file_->append(logline, len);
if (file_->writtenBytes() > rollSize_)
{
rollFile();
}
else
{
++count_;
if (count_ >= checkEveryN_)
{
count_ = 0;
time_t now = ::time(NULL);
time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;
if (thisPeriod_ != startOfPeriod_)
{
rollFile();
}
else if (now - lastFlush_ > flushInterval_)
{
lastFlush_ = now;
file_->flush();
}
}
}
}
我们 看下真正rollsize
的实现, 我们创建文件名+时间+主机名+ pid.log的文件名, 返回文件名和时间,如果此刻大于我们的上一次的roll的时间, 我们就新建一个文件。
bool LogFile::rollFile()
{
time_t now = 0;
string filename = getLogFileName(basename_, &now);
time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;
if (now > lastRoll_)
{
lastRoll_ = now;
lastFlush_ = now;
startOfPeriod_ = start;
file_.reset(new FileUtil::AppendFile(filename));
return true;
}
return false;
}
string LogFile::getLogFileName(const string& basename, time_t* now)
{
string filename;
filename.reserve(basename.size() + 64);
filename = basename;
char timebuf[32];
struct tm tm;
*now = time(NULL);
gmtime_r(now, &tm); // FIXME: localtime_r ?
strftime(timebuf, sizeof timebuf, ".%Y%m%d-%H%M%S.", &tm);
filename += timebuf;
filename += ProcessInfo::hostname();
char pidbuf[32];
snprintf(pidbuf, sizeof pidbuf, ".%d", ProcessInfo::pid());
filename += pidbuf;
filename += ".log";
return filename;
}
Logger
我们看到的loglevel 有六个级别,用枚举类型来定义的, setLogLevel
是static
类型的, 所以在多线程的写多个文件, 也会全局生效。
class Logger
{
public:
enum LogLevel
{
TRACE,
DEBUG,
INFO,
WARN,
ERROR,
FATAL,
NUM_LOG_LEVELS,
};
// compile time calculation of basename of source file
class SourceFile
{
public:
Logger(SourceFile file, int line);
Logger(SourceFile file, int line, LogLevel level);
Logger(SourceFile file, int line, LogLevel level, const char* func);
Logger(SourceFile file, int line, bool toAbort);
~Logger();
LogStream& stream() { return impl_.stream_; }
static LogLevel logLevel();
static void setLogLevel(LogLevel level);
typedef void (*OutputFunc)(const char* msg, int len);
typedef void (*FlushFunc)();
static void setOutput(OutputFunc);
static void setFlush(FlushFunc);
static void setTimeZone(const TimeZone& tz);
private:
class Impl
{
public:
typedef Logger::LogLevel LogLevel;
Impl(LogLevel level, int old_errno, const SourceFile& file, int line);
void formatTime();
void finish();
Timestamp time_;
LogStream stream_;
LogLevel level_;
int line_;
SourceFile basename_;
};
Impl impl_;
};
extern Logger::LogLevel g_logLevel;
inline Logger::LogLevel Logger::logLevel()
{
return g_logLevel;
}
muduo使用预定义的方式定义了八种log 的级别, 这样我们调用LOG_TRACE
就会调用到logstream
流里面。
#define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \
muduo::Logger(__FILE__, __LINE__, muduo::Logger::TRACE, __func__).stream()
#define LOG_DEBUG if (muduo::Logger::logLevel() <= muduo::Logger::DEBUG) \
muduo::Logger(__FILE__, __LINE__, muduo::Logger::DEBUG, __func__).stream()
#define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \
muduo::Logger(__FILE__, __LINE__).stream()
#define LOG_WARN muduo::Logger(__FILE__, __LINE__, muduo::Logger::WARN).stream()
#define LOG_ERROR muduo::Logger(__FILE__, __LINE__, muduo::Logger::ERROR).stream()
#define LOG_FATAL muduo::Logger(__FILE__, __LINE__, muduo::Logger::FATAL).stream()
#define LOG_SYSERR muduo::Logger(__FILE__, __LINE__, false).stream()
#define LOG_SYSFATAL muduo::Logger(__FILE__, __LINE__, true).stream()
我们再看下setLogLevel()
, 就是把level 设置给全局的loglevel, 比较简单。
void Logger::setLogLevel(Logger::LogLevel level)
{
g_logLevel = level;
}
LogStream
在logstream 类里面定义了两个类, 一个是FixedBuffer, 设置我们在stream 定义的缓存的大小,定义了小缓存为kSmallBuffer = 4k, 最大的缓存为4MB.
FixedBuffer
的定义了一个data_[size]
的数组和一个指向当前写入的字节的指针, 作用很简单, 就是预先开辟出一块内存,然后往里面写入(append) len 个字节, cur 会指向当前已写入的字节的多少。
const int kSmallBuffer = 4000;
const int kLargeBuffer = 4000*1000;
template<int SIZE>
class FixedBuffer : noncopyable
{
public:
FixedBuffer()
: cur_(data_)
{
setCookie(cookieStart);
}
~FixedBuffer()
{
setCookie(cookieEnd);
}
void append(const char* /*restrict*/ buf, size_t len)
{
// FIXME: append partially
if (implicit_cast<size_t>(avail()) > len)
{
memcpy(cur_, buf, len);
cur_ += len;
}
}
const char* data() const { return data_; }
int length() const { return static_cast<int>(cur_ - data_); }
// write to data_ directly
char* current() { return cur_; }
int avail() const { return static_cast<int>(end() - cur_); }
void add(size_t len) { cur_ += len; }
void reset() { cur_ = data_; }
void bzero() { memZero(data_, sizeof data_); }
// for used by GDB
const char* debugString();
void setCookie(void (*cookie)()) { cookie_ = cookie; }
// for used by unit test
string toString() const { return string(data_, length()); }
StringPiece toStringPiece() const { return StringPiece(data_, length()); }
private:
const char* end() const { return data_ + sizeof data_; }
// Must be outline function for cookies.
static void cookieStart();
static void cookieEnd();
void (*cookie_)();
char data_[SIZE];
char* cur_;
};
这里最重要的是logstream的类。定义了Buffer的大小为4k, 并且重载了若干个operator<<
, 这样我们写log文件的时候可以使用LOGTRACE<<
来写入, 这里也很简单, 就是把字符串,数字等写入缓存buffer内。
class LogStream : noncopyable
{
typedef LogStream self;
public:
typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer;
self& operator<<(bool v)
{
buffer_.append(v ? "1" : "0", 1);
return *this;
}
self& operator<<(short);
self& operator<<(unsigned short);
self& operator<<(int);
self& operator<<(unsigned int);
self& operator<<(long);
self& operator<<(unsigned long);
self& operator<<(long long);
self& operator<<(unsigned long long);
self& operator<<(const void*);
self& operator<<(float v)
{
*this << static_cast<double>(v);
return *this;
}
self& operator<<(double);
// self& operator<<(long double);
self& operator<<(char v)
{
buffer_.append(&v, 1);
return *this;
}
self& operator<<(const char* str)
{
if (str)
{
buffer_.append(str, strlen(str));
}
else
{
buffer_.append("(null)", 6);
}
return *this;
}
self& operator<<(const unsigned char* str)
{
return operator<<(reinterpret_cast<const char*>(str));
}
self& operator<<(const string& v)
{
buffer_.append(v.c_str(), v.size());
return *this;
}
self& operator<<(const StringPiece& v)
{
buffer_.append(v.data(), v.size());
return *this;
}
self& operator<<(const Buffer& v)
{
*this << v.toStringPiece();
return *this;
}
void append(const char* data, int len) { buffer_.append(data, len); }
const Buffer& buffer() const { return buffer_; }
void resetBuffer() { buffer_.reset(); }
private:
void staticCheck();
template<typename T>
void formatInteger(T);
Buffer buffer_;
static const int kMaxNumericSize = 48;
};
接下来就是我们最重要的异步log的实现。
AsyncLogging
在AsyncLogging 中, 采用了四个缓冲区,这样可以进一步减少或者避免等待。
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging()
{
if (running_)
{
stop();
}
}
void append(const char* logline, int len);
void start()
{
running_ = true;
thread_.start();
latch_.wait();
}
void stop() NO_THREAD_SAFETY_ANALYSIS
{
running_ = false;
cond_.notify();
thread_.join();
}
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; // 4MB的缓冲
typedef std::vector<std::unique_ptr<Buffer>> BufferVector; // 缓冲数组
typedef BufferVector::value_type BufferPtr; // 缓冲的指针
const int flushInterval_;
std::atomic<bool> running_;
const string basename_;
const off_t rollSize_;
muduo::Thread thread_;
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_; // 线程安全
muduo::Condition cond_ GUARDED_BY(mutex_);
BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前缓冲
BufferPtr nextBuffer_ GUARDED_BY(mutex_); // 预备缓冲
BufferVector buffers_ GUARDED_BY(mutex_); //待写入文件的已填满的缓冲
};
先来看发送方的代码。
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
: flushInterval_(flushInterval),
running_(false),
basename_(basename),
rollSize_(rollSize),
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),
latch_(1),
mutex_(),
cond_(mutex_),
currentBuffer_(new Buffer),
nextBuffer_(new Buffer),
buffers_()
{
currentBuffer_->bzero();
nextBuffer_->bzero();
buffers_.reserve(16);
}
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len) // 缓存没满, 写入缓冲中
{
currentBuffer_->append(logline, len);
}
else // 缓存满了
{
buffers_.push_back(std::move(currentBuffer_)); // 放入待写入的缓存中
if (nextBuffer_) // 准备好另一块缓冲
{
currentBuffer_ = std::move(nextBuffer_); // 把当前缓存的指针指向空buffer中
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
}
currentBuffer_->append(logline, len); // 当前缓冲继续append, 并通知
cond_.notify();
}
}
前端生成一条日志消息的时候会调用append()。 在这个函数中,如果当前缓冲(currentBuffer_)剩余空间足够大,则会直接把消息追加在到缓存中。否则说明当前缓冲区已满/不够,就把它送入buffers, 并把另一块备用的buffer设置为当前的缓冲区,然后追加消息并通知后端写入日志数据。如果写入速度太快,一下子把两块缓存都用完了,那么只好分配新的缓存作为当前缓存。
再看下接收方实现。
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false); // LogfIle是真正写入的
BufferPtr newBuffer1(new Buffer); // 准备好第一块buffer
BufferPtr newBuffer2(new Buffer); // 准备好第二块buffer
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_); // 等待条件触发
}
buffers_.push_back(std::move(currentBuffer_)); // 条件满足时, 先将当前缓冲区移入buffers
currentBuffer_ = std::move(newBuffer1); // 并立刻将空闲的newBuffer1 作为新的缓冲
buffersToWrite.swap(buffers_); // 将空的buffersToWrite 填充
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2); // 如果nextBuffer为空, 将buffer2 作为新的nextBuffer, 这样前端始终有一个buffer可供调配
}
}
assert(!buffersToWrite.empty());
if (buffersToWrite.size() > 25) // 如果长度超过25, 丢弃一部分
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
for (const auto& buffer : buffersToWrite) // 遍历数组,真正写入内存中
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2); // buffersToWrite的大小设置为2
}
if (!newBuffer1) // 将buffersToWrite内的buffer重新填充为newBuffer1
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2) // 将buffersToWrite内的buffer重新填充为newBuffer2
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}
我们看下运行的图示:
一开始我们先分配好四个缓存, A, B ,C ,D. 初始时都是空的。
- **缓冲A, B都没满, 但是超时。**写入频度不高, 后端3s 超市后将" 当前缓冲currentBuffer" 写入文件。在2.9s的时候, currBuffer使用了80%, 在第三秒的时候后端线程先把curr送到buffers, 再把newBuffer1设置为curr。 随后3s+, 交换buffers 和buffersToWrite, 后端开始将buffer A写入文件,写完再把new1再填上,供下次cond返回。
- **缓冲A满了,B未满。**在3s超时之前已经写满了当前缓冲,于是唤醒后端线程开始写入文件。在第1.5s的时候, currBuffer使用了80%, 第1.8s, curr写满,将当前缓冲送入buffers_, 并将nextbuffer_ 设置为当前缓冲,然后开始写入。当后端线程唤醒之后(1.8s+), 先将curr送入buffers_, 再将new1, new2设置为当前缓冲。
- **缓冲AB都满。**前端在短时间内写入大量的消息,用完了两个缓冲,并重新分配了一块新的缓冲。在第1.8s的时候, A已经写满, B也接近写满。并且已经notify() 后端线程,但是由于种种原因, 后端线程并没有立刻开始工作, 直到1.9s的之后, B也写满了, 前端线程重新分配了缓冲E, 到了1.8s+, 后端线程开始写入,将CD 两块缓冲交给前端,并开始将ABE写入文件。完成之后, 用AB重新填充那两块缓冲,释放缓冲E。
- 文件写入速度慢,导致前端耗尽了两个缓冲。 前1.8s+ 和前面的第二种相同, 前端写满了一个缓冲,唤醒后后端线程开始写入文件。之后,后端花了较长时间才将数据写完。这期间前端又用完了两个缓冲(CD),并分配了新的缓冲(E), 这期间前端的notify已经丢失。当后端写完之后,发现buffers_ 不为空, 立刻进入下一循环。替换前端的两个缓冲,并开始一次将CDE写入。如果buffersToWrite的大小超过了两个, 将重新把buffersToWrite的大小设置为2个, 再将new1, new2填充缓冲。