spdlog生产者消费者模式
spdlog提供了异步模式,显示的创建async_logger, 配合环形队列实现的消息队列和线程池实现了异步模式。异步logger提交日志信息和自身指针, 任务线程从消息队列中取出消息后执行对应的sink和flush动作。
1. 环形队列
1.1 环形队列基础
环形队列是一种首尾相连的队列,符合先进先出的逻辑。相比于普通队列而言,能够复用内存。通常被使用在任务调度、消费队列等场景中。spdlog中的环形队列用于异步模式下存储日志,线程池中的消费者线程不断的读取队列消息进行落日志。
环形队列使用两个头尾指针表示实际数据的起点和终点。
- 出队列
出队列时,head指针向前移动即可,并不会对head位置的元素进行删除操作, - 队列满
当tail + 1 == head时,认为是满队列。 - 入队列
tail指针向前移动,当队列满时新数据会覆盖之前的老数据,这时head指针也要向前移动。 - 队列长度
实际队列长度size = tail - head, 也存在一种情况,head位置大于tail位置(已经出现过满队列),此时size= max_element - (head - tail)
1.2 spdlog的环形队列实现
spdlog 在details/circular_q.h 中实现了环形队列模板类。
- 使用了数据来模拟队列,提供按照元素下标访问的能力
- 实现了移动拷贝
- circular_q 多了一个属性 overrun_counter_记录因满队列丢弃的元素数
- max_items_ 比实际长度大1,是为了便于判断队列满状态
template <typename T>
class circular_q {
size_t max_items_ = 0;
typename std::vector<T>::size_type head_ = 0;
typename std::vector<T>::size_type tail_ = 0;
size_t overrun_counter_ = 0;
std::vector<T> v_;
public:
using value_type = T;
// empty ctor - create a disabled queue with no elements allocated at all
circular_q() = default;
explicit circular_q(size_t max_items)
: max_items_(max_items + 1) // one item is reserved as marker for full q
,
v_(max_items_) {}
circular_q(const circular_q &) = default;
circular_q &operator=(const circular_q &) = default;
// move cannot be default,
// since we need to reset head_, tail_, etc to zero in the moved object
circular_q(circular_q &&other) SPDLOG_NOEXCEPT { copy_moveable(std::move(other)); }
circular_q &operator=(circular_q &&other) SPDLOG_NOEXCEPT {
copy_moveable(std::move(other));
return *this;
}
// push back, overrun (oldest) item if no room left
void push_back(T &&item) {
if (max_items_ > 0) {
v_[tail_] = std::move(item);
tail_ = (tail_ + 1) % max_items_;
if (tail_ == head_) // overrun last item if full
{
head_ = (head_ + 1) % max_items_;
++overrun_counter_;
}
}
}
// Return reference to the front item.
// If there are no elements in the container, the behavior is undefined.
const T &front() const { return v_[head_]; }
T &front() { return v_[head_]; }
// Return number of elements actually stored
size_t size() const {
if (tail_ >= head_) {
return tail_ - head_;
} else {
return max_items_ - (head_ - tail_);
}
}
// Return const reference to item by index.
// If index is out of range 0…size()-1, the behavior is undefined.
const T &at(size_t i) const {
assert(i < size());
return v_[(head_ + i) % max_items_];
}
// Pop item from front.
// If there are no elements in the container, the behavior is undefined.
void pop_front() { head_ = (head_ + 1) % max_items_; }
bool empty() const { return tail_ == head_; }
bool full() const {
// head is ahead of the tail by 1
if (max_items_ > 0) {
return ((tail_ + 1) % max_items_) == head_;
}
return false;
}
size_t overrun_counter() const { return overrun_counter_; }
void reset_overrun_counter() { overrun_counter_ = 0; }
private:
// copy from other&& and reset it to disabled state
void copy_moveable(circular_q &&other) SPDLOG_NOEXCEPT {
max_items_ = other.max_items_;
head_ = other.head_;
tail_ = other.tail_;
overrun_counter_ = other.overrun_counter_;
v_ = std::move(other.v_);
// put &&other in disabled, but valid state
other.max_items_ = 0;
other.head_ = other.tail_ = 0;
other.overrun_counter_ = 0;
}
};
2. 生产者消费者模式
生产者负责往环形队列中写入, 消费者负责从队列中取出数据,进行消费。可以看到,存在一个共享数据,也就是队列,所以需要一个锁来控制并发的读写;同时由于队列是有大小限制的,存在两个临界状态,也即队列空和队列满,所以需要两个条件变量,入队列的时候需要等待队列非满, 出队列的时候需要等待队列非空。
2.1 消息队列
spdlog在 details/mpmc_blocking_q.h中实现了消息队列。
入队列是提供了两种模式,阻塞和非阻塞方式,非阻塞情况下,会直接往环形队列中写入数据,在队列满时会导致数据被覆盖。
同样出队列,也提供了两种阻塞和非阻塞两种模式。
2.2 生产者消费者线程池
在异步模式下,存在一个全局的线程池。
// set global thread pool.
inline void init_thread_pool(size_t q_size,
size_t thread_count,
std::function<void()> on_thread_start,
std::function<void()> on_thread_stop) {
auto tp = std::make_shared<details::thread_pool>(q_size, thread_count, on_thread_start,
on_thread_stop);
details::registry::instance().set_tp(std::move(tp));
}
线程池中的线程会一直从环形队列中阻塞模式取出数据,执行对应的sink动作、flush动作、终止。
而async_logger 则是通过sink_it_往队列中写入数据。