一、需求一丢,谁累成狗
最近由于某些需要,计划手撸一个高性能的日志系统。需求很简单:
1、 不允许丢一条日志信息(很重要很重要)
2、支持多线程,必须线程安全
3、性能要越优越好,尽量百万可秒级
二、手拿把掐?
掐指一算,也只是掐指,这会我老婆拎着我耳朵你敢信。诶,有了。
我们看好这几个关键词:1、高性能 2、不丢数据
高性能的保障是什么,我们先不要想具体的方案,来一场头脑风暴。
对,多线程、内存预分配,减少I/O次数,零拷贝…
保证数据有序,不丢数据该怎么搞? 按序处理,对对,可以一条条的写,哦哦哦,好像不行,生产日志同时进行I/O太慢了,特别是是当前实时性要求很高的业务中,日志只能异步处理;对对对,可以用队列…
(一万年以后~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~)
灵光闪现,于是乎一个基于多线程的生产者与消费者的经典方案出现在脑中。
好像还不错哦,思路有理有据。队列本来就是先进先出的,在日志push
进队列和在pop
出队列时,加上锁,不就okk啦,拿捏~ 那么,按照这个思路,我们浅写一个日志框架,并且来测试下按照这个思路能达到的性能。
三、诶诶诶,我要从后面进入啦~
// tinylog.hpp (hpp是有故事的, 好像hit pp)
#ifndef TINYLOG_HPP
#define TINYLOG_HPP
#include <iostream>
#include <fstream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <atomic>
#include <sstream>
#include <ctime>
#include <iomanip>
#ifdef _WIN32 || _WIN64
#include <Windows.h>
#include <time.h>
#endif
class LogQueue {
public:
LogQueue(){}
LogQueue(size_t max_size) : max_size_(max_size) {}
void push(const std::string& log) {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this] { return log_queue_.size() < max_size_; });
log_queue_.push(log);
cond_var_.notify_one();
}
std::string pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this] { return !log_queue_.empty(); });
std::string log = log_queue_.front();
log_queue_.pop();
cond_var_.notify_one();
return log;
}
bool empty(){ return log_queue_.empty();}
void setMax_size(const size_t &max_size)
{
max_size_ = max_size;
}
private:
size_t max_size_;
std::queue<std::string> log_queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
};
class TinyLog {
public:
using LogCallback = std::function<void(std::ofstream&,const std::string&)>;
TinyLog(size_t queue_size = 1024,size_t thread_count = 1)
: queue_size_(queue_size),stop_(false), log_callback_(defaultLogCallback)
{
log_file.open("log.txt",std::ios::app);
log_queue_.setMax_size(queue_size_);
for (size_t i = 0; i < thread_count; ++i) {
thread_pool_.emplace_back(&TinyLog::processLogs, this);
}
}
~TinyLog() { stop();}
void log(const std::string& message) {
log_queue_.push(message);
cond_var_.notify_one();
}
void setLogCallback(LogCallback callback) {log_callback_ = callback;}
bool isStopped(){return stop_.load();}
void setStopped(){stop_.store(true);}
private:
static std::string getcurrentAscTime(){
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()) % 1000000;
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;
std::ostringstream oss;
oss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S")
<< " " << std::setw(3) << std::setfill('0') << milliseconds.count()
<< " " << std::setw(6) << std::setfill('0') << microseconds.count();
return oss.str();
}
static void defaultLogCallback(std::ofstream& stream, const std::string& message) {
#ifdef TEST_DEMO
static long times = 0;
static long long epleased = 0;
auto start = std::chrono::high_resolution_clock::now(); // Start time
#endif
{
stream << message << std::endl;
}
#ifdef TEST_DEMO
auto end = std::chrono::high_resolution_clock::now(); // End time
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
epleased += duration;
if(++times >= 999900){
std::cout << "[thread id] : " << std::this_thread::get_id() << " times :" << times
<< " total write time :" << (epleased/1000.00) << " milliseconds" << std::endl;
}
#endif
}
void stop() {
if(log_file.is_open()){
log_file.close();
}
{
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
cond_var_.notify_all();
for (auto& thread : thread_pool_) {
if (thread.joinable()) {
thread.join();
}
}
}
void processLogs() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this] { return !log_queue_.empty() || stop_; });
if (stop_ && log_queue_.empty()) {
std::cout << "process thread was stopped ,and logs all has been dealed." << std::endl;
return;
}
if(log_queue_.empty()){
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "wait for comming logs ..." << std::endl;
continue;
}
std::string& message = log_queue_.pop();
log_callback_(log_file,message);
cond_var_.notify_one();
}
}
private:
size_t queue_size_;
LogQueue log_queue_;
std::vector<std::thread> thread_pool_;
std::mutex mutex_;
std::condition_variable cond_var_;
std::atomic<bool> stop_;
LogCallback log_callback_;
std::ofstream log_file;
};
#endif // TINYLOG_HPP
#endif // TINYLOG_HPP
解读一下,笔者实现了一个日志队列,日志队列中出入站都加锁,哦哦,保证有序,暂时没问题。但是咱们追求的是高性能啊,都说锁会降低性能的,不行,带上“贞操锁”只会影响我进出的速度啊。这里咱们先忍一下。
然后,实现在TinyLog
中实现日志 输出, 多线程日志处理接口,这里特别设置成了回调函数,因为,谁TMD的能保证就是直接在本机存盘啊,没准需要已改,变成要做成可分布式部署的,那没准回调就变成了 基于SOCKET的各种通讯协议的消息分发了。这个必须要给自己的机智点赞!
接下来,我们得实测一下了,没有数据也不敢高声讲话啊。
四、请上我的专属测试:右手哥
#include <iostream>
#include <fstream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <atomic>
#include <sstream>
#include <ctime>
#include <iomanip>
#include "tinylog.hpp"
#ifdef _WIN32 || _WIN64
#include <Windows.h>
#include <time.h>
#endif
#define TEST_DEMO
std::unique_ptr<TinyLog> _log = nullptr;
BOOL WINAPI ConsoleCtrlHandler(DWORD ctrlType) {
if (ctrlType == CTRL_C_EVENT) {
std::cout << "CTRL+C received. Exiting..." << std::endl;
if(_log){
_log->setStopped();
}
return TRUE;
}
return FALSE;
}
int main(int argc,char* argv[])
{
_log = std::make_unique<TinyLog>(102400,2);
if (!SetConsoleCtrlHandler(ConsoleCtrlHandler, TRUE)) {
std::cerr << "Error setting control handler" << std::endl;
return 1;
}
std::thread thrd([](){
#ifdef TEST_DEMO
auto start = std::chrono::high_resolution_clock::now(); // Start time
#endif
char message[255] = {};
for(size_t time = 0; time < 1000000; ++time){
memset(message,0,255);
snprintf(message,255,"Log message %d",time);
_log->log(message);
}
#ifdef TEST_DEMO
auto end = std::chrono::high_resolution_clock::now(); // End time
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
double secs = duration / 1000000.00 ;
std::cout << "produce time: " << secs << " seconds.\n" << std::endl;
#endif
});
if(thrd.joinable()){
thrd.join();
}
while(!_log->isStopped()){
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return 0;
}
测试不说硬件环境好像你们也会喷,别CHAO喷。
笔者:11th Gen Intel(R) Core(TM) i5-1155G7 @ 2.50GHz 2.50 GHz HDD
(我甚至没有SSD,哭晕)
笔者测试过了,百万条数据综合落盘写入日志文件,约5s;太酷了吧(有点拉胯)。能用么,绝大多数的场景基本都够用了,但是本文的标题是什么,“百万级”“高性能”,这显然达不到要求,那么,该怎么优化呢?
五、如何C出残影
思路肯定是一大把,毕竟C和被C了这么多年,一个优质的PIAO客还是有很多技巧的。首先我们定位耗时影响性能的点,那肯定不用想,必抓 消息队列的出入速度了,不够快,怎会有激情。
大致说说改善的点,锁这笔玩意,坚决不能要,我们要自己撸一个无锁的高性能环形队列;其次,队列的内存布局要优化下,必须要能预分配,有需要可以上个内存池;再次有必要我们可以上一个双缓冲;再再次…
下篇继续,下篇继续。“它山之石哦,可以攻玉兮”~