提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 1、用timerfd加epoll封装定时器的优点
- 2、代码实现
1、用timerfd加epoll封装定时器的优点
定时器为什么需要timerfd
在设计定时器时,我们首先想到的就是设置一个定时任务触发的时间,然后不断判断(死循环)当前时间是否大于等于定时任务触发的时间,如果是,那么就处理定时任务。这就是最为简单的设计,在我之前的博客中[定时器的简单实现],就是这么实现的,但是这样设计会存在诸多问题
- CPU资源浪费
使用死循环来检查时间意味着CPU必须不断地执行这段代码,即使大部分时间都是在做无用的比较。这会导致CPU资源的浪费,尤其是在高性能的服务器或多任务环境中。 - 响应性下降
由于CPU忙于执行定时器的检查,它可能无法及时响应其他重要的事件或任务,导致系统响应性下降。 - 不准确
依赖于系统的时钟分辨率和调度器延迟,使用死循环检查时间的方法可能无法实现精确的定时。例如,如果系统时钟的分辨率是毫秒级,而你尝试实现微秒级的定时,那么这种方法就无法满足需求。 - 不适合长时间等待
如果定时任务触发的时间间隔很长(例如几小时或几天),那么使用死循环来等待这段时间是非常低效的。
为解决上述问题,就产生了timerfd,当使用timerfd_create创建timerfd时,并设置了定时任务,当定时任务时间到达时,那么timerfd就变成了可读,经常与 select/poll/epoll搭配使用
这里我们不需要轮询这个timerfd,判断timerfd是否有数据(是否可读),因为这样做也会带来上述问题,因此我们需要将timerfd加入到select/poll/epoll中,让它们轮询,一般来说使用epoll更高效
- 统一的事件处理:epoll是Linux下多路复用IO接口select/poll的增强版本,它可以高效地处理大量的文件描述符和I/O事件。通过将timerfd的文件描述符加入epoll的监控集合中,可以将定时器超时事件与其他I/O事件进行统一处理,简化了事件驱动编程的复杂性。
- 提高并发性能:在高并发的网络服务器中,使用epoll可以监控多个套接字的I/O事件,而使用timerfd可以实现定时任务(如心跳检测、超时处理等)。这种结合使用的方式可以提高系统的并发性能和吞吐量。
- 减少系统调用开销:由于epoll采用I/O多路复用机制,并且只在有事件发生时才进行通知,因此可以减少不必要的系统调用开销。同时,由于timerfd的精度较高,可以减少因轮询而产生的额外开销。
2、代码实现
定时任务
//TimerEvent.h
#pragma once
#include <cstdint>
#include <functional>
#include <sys/time.h>
#include <memory>
class TimerEvent
{
public:
using s_ptr = std::shared_ptr<TimerEvent>;
template<typename F, typename... Args>
TimerEvent(int interval, bool is_repeated, F&& f, Args&&... args):interval_(interval), is_repeated_(is_repeated)
{
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
task_ = task;
}
int64_t getArriveTime() const
{
return arrive_time_;
}
void setCancler(bool flag)
{
is_cancled_ = flag;
}
bool isCancle()
{
return is_cancled_;
}
bool isRepeated()
{
return is_repeated_;
}
std::function<void()> getCallBack()
{
return task_;
}
//重新设定任务到达时间
void resetArriveTime();
//获取当前时间
static int64_t getNowMs();
private:
int64_t arrive_time_;//ms 执行任务时毫秒级时间戳,达到对应的时间戳就执行对应的任务
int64_t interval_;//ms 隔多少ms后执行
bool is_repeated_{false};//是否为周期性的定时任务
bool is_cancled_{false};//是否取消
std::function<void()> task_;
};
//TimerEvent.cpp
#include"TimerEvent.h"
int64_t TimerEvent::getNowMs()
{
timeval val;
gettimeofday(&val, NULL);
return val.tv_sec*1000 + val.tv_usec/1000;
}
void TimerEvent::resetArriveTime()
{
arrive_time_ = getNowMs() + interval_;
}
对timerfd的封装
//Timer.h
#pragma once
#include <map>
#include <vector>
#include <iostream>
#include "TimerEvent.h"
class Timer
{
public:
Timer();
~Timer();
int getFd()
{
return fd_;
}
void addTimerEvent(TimerEvent::s_ptr event);
void deleteTimerEvent(TimerEvent::s_ptr event);
//时间到达就触发
void onTimer();
std::vector<std::function<void()>> &getCallbacks()
{
return callbacks_;
}
//重新设置任务的到达时间
void resetArriveTime();
private:
int fd_;
std::multimap<int64_t, TimerEvent::s_ptr> pending_events_;
std::vector<std::function<void()>> callbacks_;
};
//Timer.cpp
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "Timer.h"
Timer::Timer() : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK))
{
}
Timer::~Timer()
{
}
void Timer::resetArriveTime()
{
if (pending_events_.empty())
{
return;
}
int64_t now = TimerEvent::getNowMs();
auto it = pending_events_.begin();
int64_t inteval = 0;
// 第一个任务的定时时间比当前时间大,则重新设置
if (it->second->getArriveTime() > now)
{
inteval = it->second->getArriveTime() - now;
}
else
{
// 第一个任务的定时时间比当前时间小或相等,说明第一个任务已经超时了,应该立马执行该任务
inteval = 100; // ms
}
timespec ts;
memset(&ts, 0, sizeof(ts));
ts.tv_sec = inteval / 1000;//秒
ts.tv_nsec = (inteval % 1000) * 1000000;//纳秒
itimerspec value;
memset(&value, 0, sizeof(value));
value.it_value = ts;
int result = timerfd_settime(fd_, 0, &value, NULL);
if (result != 0)
{
printf("timerfd_settime error, errno=%d, error=%s", errno, strerror(errno));
}
}
void Timer::addTimerEvent(TimerEvent::s_ptr event)
{
bool is_reset_timerfd = false;
if (pending_events_.empty())
{
is_reset_timerfd = true;
}
else
{
auto it = pending_events_.begin();
// 当前需要插入的定时任务时间比已经存在的定时任务时间要早,那么就需要重新设定超时时间,防止任务延时
if (it->first > event->getArriveTime())
{
is_reset_timerfd = true;
}
}
pending_events_.emplace(event->getArriveTime(), event);
if (is_reset_timerfd)
{
resetArriveTime();
}
}
void Timer::deleteTimerEvent(TimerEvent::s_ptr event)
{
event->setCancler(true);
//pending_events_是multimap,key是时间,可能存在多个相同时间的event
//将对应的event从pending_events_中删除
auto begin = pending_events_.lower_bound(event->getArriveTime());
auto end = pending_events_.upper_bound(event->getArriveTime());
auto it = begin;
for(;it != end; ++it)
{
if(it->second == event)
{
break;
}
}
if(it != end)
{
pending_events_.erase(it);
}
}
void Timer::onTimer()
{
char buf[8];
for(;;)
{
if((read(fd_, buf, 8) == -1) && errno == EAGAIN)
{
break;
}
}
int64_t now = TimerEvent::getNowMs();
std::vector<TimerEvent::s_ptr> tmps;
std::vector<std::function<void()>>& callbacks_ = getCallbacks();
auto it = pending_events_.begin();
for(; it != pending_events_.end(); ++it)
{
// 任务已经到时或者超时,并且没有被取消,就需要执行
if((it->first <= now) && !it->second->isCancle())
{
tmps.push_back(it->second);
callbacks_.push_back(it->second->getCallBack());
}
else
{
break;// 因为定时任务是升序排的,只要第一个任务没到时,后面的都没到时
}
}
//因为把任务已经保存好了,因此需要把m_pending_events中对应的定时任务删除,防止下次又执行了
pending_events_.erase(pending_events_.begin(), it);
// 需要把重复的TimerEvent再次添加进去
for(auto i = tmps.begin(); i != tmps.end(); ++i)
{
if(!(*i)->isCancle())
{
//std::cout<<"重新添加"<<std::endl;
(*i)->resetArriveTime();
addTimerEvent(*i);
}
}
resetArriveTime();
}
对epoll的封装
//TimerPollPoller.h
#pragma once
#include <sys/epoll.h>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
#include <atomic>
#include <iostream>
#include "ThreadPool.h"
#include "Timer.h"
class TimerPollPoller
{
public:
TimerPollPoller(unsigned int num = std::thread::hardware_concurrency())
:epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
thread_pool_(ThreadPool::instance()),
stop_(true)
{
timer_ = std::make_shared<Timer>();
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.data.ptr = reinterpret_cast<void*>(&timer_);
event.events = EPOLLIN;
::epoll_ctl(epollfd_, EPOLL_CTL_ADD, timer_->getFd(), &event);
start();
}
~TimerPollPoller()
{
::close(epollfd_);
stop();
if(t.joinable())
{
std::cout << "主线程 join thread " << t.get_id() << std::endl;
t.join();
}
}
void start();
void stop();
void addTimerEvent(TimerEvent::s_ptr event);
void cancelTimeEvent(TimerEvent::s_ptr event);
void handleTimerfdInEpoll();
private:
const int epollfd_;
std::shared_ptr<Timer> timer_;
std::thread t;//单独起一个线程,进行轮询epoll
ThreadPool& thread_pool_;
std::atomic<bool> stop_;
};
//TimerPollPoller.cpp
#include "TimerPollPoller.h"
void TimerPollPoller::start()
{
t = std::move(std::thread(&TimerPollPoller::handleTimerfdInEpoll, this));
}
void TimerPollPoller::stop()
{
stop_.store(true);
}
void TimerPollPoller::addTimerEvent(TimerEvent::s_ptr event)
{
timer_->addTimerEvent(event);
}
void TimerPollPoller::cancelTimeEvent(TimerEvent::s_ptr event)
{
timer_->deleteTimerEvent(event);
}
void TimerPollPoller::handleTimerfdInEpoll()
{
struct epoll_event event;
stop_.store(false);
while(!stop_.load())
{
int numEvents = ::epoll_wait(epollfd_, &event, 1, 0);
if(numEvents == 1)
{
std::shared_ptr<Timer> timer_ptr = *reinterpret_cast<std::shared_ptr<Timer>*>(event.data.ptr);
timer_ptr->onTimer();
std::vector<std::function<void()>> callbacks = std::move(timer_ptr->getCallbacks());
for(auto task:callbacks)
{
thread_pool_.commit(task);
}
}
}
}
处理任务的线程池
#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>
class ThreadPool {
public:
static ThreadPool& instance()
{
static ThreadPool ins;
return ins;
}
using Task = std::packaged_task<void()>;
~ThreadPool()
{
stop();
}
template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load())
return std::future<RetType>{};
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(cv_mt_);
//将任务放进任务队列中
tasks_.emplace([task] { (*task)(); });
}
//唤醒一个线程
cv_lock_.notify_one();
return ret;
}
int idleThreadCount() {
return thread_num_;
}
private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(unsigned int num = std::thread::hardware_concurrency())
: stop_(false) {
{
if (num < 1)
thread_num_ = 1;
else
thread_num_ = num;
}
start();
}
//启动所有线程
void start()
{
for (int i = 0; i < thread_num_; ++i) {
pool_.emplace_back([this]() {
while (!this->stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {
//stop_为true或者tasks_不为空(return 返回true),则进行下一步,否则阻塞在条件变量上
return this->stop_.load() || !this->tasks_.empty();
});
if (this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->thread_num_--;
task();
this->thread_num_++;
}
});
}
}
void stop()
{
stop_.store(true);
cv_lock_.notify_all();
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}
private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
std::atomic_bool stop_;
std::atomic_int thread_num_;
std::queue<Task> tasks_;
std::vector<std::thread> pool_;
};
测试代码
#include "TimerPollPoller.h"
#include <iostream>
void print()
{
std::cout << "I love psy" << std::endl;
}
void print1()
{
std::cout << "I love fl" << std::endl;
}
int main()
{
TimerPollPoller timerPollPoller;
TimerEvent::s_ptr timer1 = std::make_shared<TimerEvent>(500, true, print);
TimerEvent::s_ptr timer2 = std::make_shared<TimerEvent>(1000, true, print1);
timerPollPoller.addTimerEvent(timer1);
timerPollPoller.addTimerEvent(timer2);
std::this_thread::sleep_for(std::chrono::seconds(2));
timerPollPoller.cancelTimeEvent(timer1);
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
makefile
PATH_SRC := .
PATH_BIN = bin
PATH_OBJ = obj
CXX := g++
CXXFLAGS := -g -O0 -std=c++11 -lpthread -Wall -Wno-deprecated -Wno-unused-but-set-variable
CXXFLAGS += -I./
SRCS := $(wildcard $(PATH_SRC)/*.cpp)
OBJS := $(patsubst $(PATH_SRC)/%.cpp,$(PATH_OBJ)/%.o,$(SRCS))
TARGET := $(PATH_BIN)/main
# 默认目标:生成可执行文件
all : $(TARGET)
# 链接规则
$(TARGET): $(OBJS)
$(CXX) $(CXXFLAGS) $(OBJS) -o $@
$(PATH_OBJ)/%.o: $(PATH_SRC)/%.cpp
$(CXX) $(CXXFLAGS) -c $< -o $@
clean:
rm -rf $(PATH_OBJ)/*.o $(TARGET)
.PHONY : clean
使用之间,在当前目录下需要创建bin目录和obj目录,然后再进行make,就能在bin目录下生产可执行程序main