利用升序定时器链表处理非活动连接

news2025/1/10 20:50:09

参考自游双《Linux高性能服务器编程》

背景

服务器同常需要定期处理非活动连接:给客户发一个重连请求,或关闭该连接,或者其他。我们可以通过使用升序定时器链表处理非活动连接,下面的代码利用alarm函数周期性的触发SIGALRM信号,该信号的处理函数利用管道通知主循环执行定时器链表上的定时任务—关闭非活动连接。

实现代码

升序定时器链表

定时器通常包含两个成员:超时时间和任务回调函数。

有时还会包含回调函数被执行时需要传入的参数。

下方代码实现了一个简单的升序定时器链表,按照超时时间做升序排列。

// lst_timer.h
// 升序定时器链表
#ifndef LST_TIMER
#define LST_TIMER

#include <time.h>
#define BUFFER_SIZE 64
class util_timer;

// 用户数据结构
struct client_data
{
    sockaddr_in address;   // 客户端socket地址
    int sockfd;            // socket 文件描述符
    char buf[BUFFER_SIZE]; // 读缓冲
    util_timer *timer;     // 链表
};

// 定时器类
class util_timer
{
public:
    util_timer() : prev(NULL), next(NULL) {}

public:
    time_t expire;                  // 任务的超时时间,绝对时间
    void (*cb_func)(client_data *); // 任务回调函数
    client_data *user_data;         // 回调函数处理的客户数据,由定时器执行者传递给回调函数
    util_timer *prev;
    util_timer *next;
};

// 定时器链表,升序,双向,有头尾节点
class sort_timer_lst
{
public:
    sort_timer_lst() : head(NULL), tail(NULL){};
    // 删除所有定时器
    ~sort_timer_lst()
    {
        util_timer *tmp = head;
        while (tmp)
        {
            head = tmp->next;
            delete tmp;
            tmp = head;
        }
    }

    // 将定时器timer添加到链表中
    void add_timer(util_timer *timer)
    {
        if (!timer)
        {
            return;
        }
        if (!head) // 空链表
        {
            head = tail = timer;
            return;
        }
        // 若目标定时器超时时间小于当前链表中所有定时器的超时时间
        // 则把该定时器插入到头部,作为链表头节点
        // 否则就要插入合适的位置以保证升序
        if (timer->expire < head->expire)
        {
            timer->next = head;
            head->prev = timer;
            head = timer;
            return;
        }
        add_timer(timer, head);
    }
    // 当某个定时任务发生变化时,调整对应的定时器的超时时间
    // 这个函数只考虑被调整的定时器的【超时时间的延长情况】,即该定时器要往链表尾部移动
    void adjust_timer(util_timer *timer)
    {
        if (!timer)
        {
            return;
        }
        util_timer *tmp = timer->next;
        // 被调整定时器在链表尾部,或该定时器超时时间仍小于下一个定时器的超时时间,则不用调整
        if (!tmp || (timer->expire < tmp->expire))
        {
            return;
        }
        // 若目标定时器时链表头节点,则将该定时器取出重新插入链表
        if (timer == head)
        {
            head = head->next;
            head->prev = NULL;
            timer->next = NULL;
            add_timer(timer, head);
        }
        // 若目标定时器不是链表头节点,则将该定时器从链表中取出,然后插入原来所在位置之后的部分链表中
        else
        {
            timer->prev->next = timer->next;
            timer->next->prev = timer->prev;
            add_timer(timer, timer->next);
        }
    }

    void del_timer(util_timer *timer)
    {
        if (!timer)
        {
            return;
        }
        // 链表只剩待删除定时器
        if ((timer == head) && (timer == tail))
        {
            delete timer;
            head = NULL;
            tail = NULL;
            return;
        }
        if (timer == head)
        {
            head = head->next;
            head->prev = NULL;
            delete timer;
            return;
        }
        if(timer == tail) 
        {
            tail = tail->prev;
            tail->next = NULL;
            delete timer;
            return;
        }

        // 目标定时器位于链表中间
        timer->prev->next = timer->next;
        timer->next->prev = timer->prev;
        delete timer;
    }

    // SIGALARM信号每次触发就在其信号处理函数中执行一次tick函数
    // 来处理链表上到期的任务。
    void tick()
    {
        if(!head)
        {
            return ;
        }
        printf("timer tick\n");
        time_t cur = time(NULL);
        util_timer *tmp = head;
        // 从头开始依次处理每个定时器,直到遇到一个尚未到期的定时器
        while(tmp)
        {
            // 未来的时间比现在的时间大
            if(cur < tmp->expire)
            {
                break;
            }
            tmp->cb_func(tmp->user_data);
            head = tmp->next;
            if(head)
            {
                head->prev = NULL;
            }
            delete tmp;
            tmp = head;
        }
    }
private:
    // 重载的辅助函数
    // 被add_timer和adjust_timer调用
    // 功能:将目标定时器timer添加到lst_head之后的部分链表中
    void add_timer(util_timer *timer, util_timer *lst_head)
    {
        util_timer *prev = lst_head;
        util_timer *tmp = prev->next; // 可能插入的位置
        while(tmp) 
        {
            if(timer->expire < tmp->expire)
            {
                prev->next = timer;
                timer->next = tmp;
                tmp->prev = timer;
                timer->prev = prev;
                break;
            }
            prev = tmp;
            tmp = tmp->next;
        }
        if(!tmp)
        {
            prev->next = timer;
            timer->prev = prev;
            timer->next = NULL;
            tail = timer;
        }
    }
private:
    util_timer *head;
    util_timer *tail;
};
#endif

处理非活动连接

// 11_3_closeUnactiveConnections.cpp
// 利用alarm函数周期性触发 SIGALRM信号
// 该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务即关闭非活动链接
// 一个用户对应一个连接fd、一个定时器检测是否活跃
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdlib.h>
#include "lst_timer.h"

#define FD_LIMIT 65535  
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5

static int pipefd[2]; // 管道传输信号
// 利用升序链表管理定时器
static sort_timer_lst timer_lst;
static int epollfd = 0;

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET; // 注册可读事件
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(pipefd[1], (char *)&msg, 1, 0);
    errno = save_errno;
}

void addsig(int sig)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = sig_handler;
    sa.sa_flags |= SA_RESTART;
    sigfillset(&sa.sa_mask); // 设置所有信号
    // 为信号注册处理函数
    assert(sigaction(sig, &sa, NULL) != -1);
}

void timer_handler()
{
    // 定时处理任务,检查有没有到时的定时器,执行其对应任务
    timer_lst.tick();
    // 重新定时
    alarm(TIMESLOT); // 到时会发出SIGALARM信号
}

// 定时器回调函数,删除非活动连接socket上的注册事件,并关闭之
void cb_func(client_data *user_data)
{
    epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
    assert(user_data);
    close(user_data->sockfd);
    printf("close fd %d\n", user_data->sockfd);
}

int main(int argc, char *argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_num\n", basename(argv[0]));
        return 1;
    }

    const char *ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &addr.sin_addr);
    addr.sin_port = htons(port);

    // 创建TCP socket,并将其绑定到端口port上
    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);
    // 设置端口复用
    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    ret = bind(listenfd, (struct sockaddr *)&addr, sizeof(addr));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd);

    // 管道
    ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
    assert(ret != -1);
    setnonblocking(pipefd[1]); // 设置写端非阻塞
    addfd(epollfd, pipefd[0]); // 将读端加入epoll树中进行监视

    // 设置信号处理函数
    addsig(SIGALRM); // SIGALRM 到来往管道写端发送信号的数值
    addsig(SIGTERM);
    bool stop_server = false;

    client_data *users = new client_data[FD_LIMIT]; // 客户端数组
    bool timeout = false;
    alarm(TIMESLOT);

    while(!stop_server) 
    {
        int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);

        if((number < 0) && (errno != EINTR))
        {
            printf("epoll failure\n");
            break;
        }

        for(int i = 0; i < number; ++i)
        {
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (sockaddr*)&client_address, &client_addrlength);
                addfd(epollfd, connfd); // 
                users[connfd].address = client_address;
                users[connfd].sockfd = connfd;
                // 创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据
                // 最后将定时器添加到链表 timer_lst中
                util_timer *timer = new util_timer;
                timer->user_data = &users[connfd];
                timer->cb_func = cb_func;
                time_t cur = time(NULL);
                // 设置过期时间,当前时间超过该时间就要回收该定时器绑定的connfd
                timer->expire = cur + 3 * TIMESLOT;
                users[connfd].timer = timer;
                timer_lst.add_timer(timer);
            }
            // 处理信号
            else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
            {
                int sig;
                char signals[1024];
                // 管道读端接受数据
                // send是在SIGARLRM和SIGTERM信号被触发时,通过sig_handler函数来调用的
                ret = recv(pipefd[0], signals, sizeof(signals), 0);
                if(ret == -1)
                {
                    continue; // 处理下一个到来的事件
                }
                else if(ret == 0)
                {
                    continue;
                }
                else
                {
                    for(int i = 0; i < ret; ++i)
                    {
                        switch(signals[i])
                        {
                            case SIGALRM:
                            {
                                // timeout标志有定时任务要处理
                                // 但不立即处理,因为通常定时任务优先级不高
                                timeout = true;
                                break;
                            }
                            case SIGTERM:
                            {
                                stop_server = true;
                            }
                        }
                    }
                }
            }
            // 处理客户连接上收到的数据
            else if(events[i].events & EPOLLIN)
            {
                memset(users[sockfd].buf, BUFFER_SIZE - 1, 0);
                ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
                printf("get %d bytes of client data %s from %d \n", ret, users[sockfd].buf, sockfd);

                util_timer *timer = users[sockfd].timer;
                if(ret < 0)
                {
                    if(errno != EAGAIN)
                    {
                        cb_func(&users[sockfd]); // 回收connfd
                        if(timer)
                        {
                            timer_lst.del_timer(timer);
                        }
                    }
                }
                else if(ret == 0)
                {
                    // 若对方关闭连接,则我们也关闭连接并删除定时器
                    cb_func(&users[sockfd]);
                    if(timer)
                    {
                        timer_lst.del_timer(timer);
                    }
                }
                else
                {
                    // 若某个客户的连接上有数据可读
                    // 则要调整对应的定时器的过期时间(通过users数组找到定时器)
                    if(timer)
                    {
                        time_t cur = time(NULL);
                        timer->expire = cur + 3 * TIMESLOT;
                        printf("adjust timer once\n");
                        timer_lst.adjust_timer(timer);
                    }
                    else
                    {
                        // other
                    }
                }
            }
        }
        // 最后处理定时事件,因为通常IO事件有更高的优先级
        // 但这样导致定时任务不能精确的执行
        if(timeout)
        {
            timer_handler(); // 检查是否有到时(太久没有使用)的定时器(对应一个用户的connfd),有就回收fd删除定时器
            timeout = false;
        }
    }
    close(listenfd);
    close(pipefd[1]);
    close(pipefd[2]);
    delete []users;

    return 0;
}

测试

目录结构

.
├── 11_3_closeUnactiveConnections.cpp
├── build
├── CMakeLists.txt
└── lst_timer.h

输入编译指令

g++ -o closeConnection 11_3_closeUnactiveConnections.cpp -I ./

也可以使用CMake

cmake_minimum_required (VERSION 2.8)
PROJECT(closeConnection)
# 手动加入文件
SET(SRC_LIST 11_3_closeUnactiveConnections.cpp)

#INCLUDE_DIRECTORIES("${CMAKE_CURRENT_SOURCE_DIR}/dir1")
# 相对路径的方式
INCLUDE_DIRECTORIES(.)


# 用SRC_LIST所存的名字的源文件来生成可执行文件 darren
ADD_EXECUTABLE(closeConnection ${SRC_LIST} )

执行程序

在本机任意地址的6666端口监听,同一个机器上不同会话使用客户端程序连接服务器

情况1

当客户端连接上服务器后,若socket在三次tick时间里没有IO操作,第四次tick时就回收socket。

服务器

在这里插入图片描述

客户端

在这里插入图片描述

情况2

当客户端连接上服务器后,若socket在三次tick时间里有IO操作,就会续上3次tick的时间( 3 * TIMESLOT)。

如下在第二次tick后,客户端向服务器发送了一条数据 hello

服务器

在这里插入图片描述

客户端

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339270.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

半人半妖时代来啦

未来是半人半妖时代&#xff01;&#xff01;&#xff01; 碳基生命与硅基生命结合 趣讲大白话&#xff1a;人和机器结合是大趋势 *********** 人工智能就是宗&#xff5e;教 科技宗&#xff5e;教的一支最强势的教派 日常使用智能机器的人就是信众 维护机器的人就是牧师 创造这…

【mock】手把手带你用mock写自定义接口+mock常用语法

mock自定义接口完整流程 官网语法规范:https://github.com/nuysoft/Mock/wiki/Syntax-Specification 首先: 要有一个项目,我这里是vue3项目,以下从vue3项目搭建开始,已搭建好的请直接看2 1.空目录下新建vue3项目 运行创建项目命令&#xff1a; 在bash中:(文件路径处输入cm…

【计组】内存和总线--《深入浅出计算机组成原理》(十)

课程链接&#xff1a;深入浅出计算机组成原理_组成原理_计算机基础-极客时间 一、虚拟内存和内存保护 日常使用的操作系统下&#xff0c;程序不能直接访问物理内存。内存需要被分成固定大小的页&#xff08;Page&#xff09;&#xff0c;再通过虚拟内存地址&#xff08;Virtu…

卡通形象人物2 写代码-睡觉 丝滑如德芙

目录 本次实现效果 目录结构 index static/css/style.css static/js/script.js 结语&#xff1a; 前期回顾 【 css动画 】—— 把你喜欢css动画嵌入到浏览器中_0.活在风浪里的博客-CSDN博客常用酷炫动画999合集&#xff0c;代码直接复制可用&#xff0c;总用你想找的…

【Java】 JAVA Notes

JAVA语言帮助笔记Java的安装与JDKJava命名规范JAVA的数据类型自动类型转换强制类型转换JAVA的运算符取余运算结果的符号逻辑运算的短路运算三元运算符运算符优先级JAVA的流程控制分支结构JAVA类Scanner类Math 类random方法获取随机数Java的安装与JDK JDK安装网站&#xff1a;h…

AXI 总线协议学习笔记(4)

引言 前面两篇博文从简单介绍的角度说明了 AXI协议规范。 AXI 总线协议学习笔记&#xff08;2&#xff09; AXI 总线协议学习笔记&#xff08;3&#xff09; 从本篇开始&#xff0c;详细翻译并学习AXI协议的官方发布规范。 文档中的时序图说明&#xff1a; AXI指&#xff1…

基础面试题:堆和栈的区别

面试题&#xff1a;堆和栈的区别&#xff08;往往讲的是内存zha&#xff09; 为什么说访问栈栈比访问堆快些&#xff1f; 目录 一、数据结构中的堆栈 1、数据结构中的堆 1&#xff09;堆的定义 2&#xff09;堆的效率 2、 数据结构中的栈 二、内存中的堆栈 1、内存堆的定义…

Stm32 for arduino STM32G071GBU6 I2C and SERIAL

文件目录: C:\Users\Administrator\AppData\Local\Arduino15\packages\STMicroelectronics\hardware\stm32\2.3.0\variants\STM32G0xx\G071G(6-8-B)U_G081GBU boards_entry.txt Generic G071GBUx GenG0.menu.pnum.GENERIC_G071GBUXGeneric G071GBUx GenG0.menu.pnum.GENERIC…

SpringMVC:统一异常处理(11)

统一异常处理1. 说明2. 问题描述3. 异常处理器使用3.1 创建异常处理器类3.2 让程序抛出异常3.3 测试4. 项目异常处理方案4.1 异常分类4.2 异常解决方案4.3 异常解决方案的具体实现4.4 测试5. 总结1. 说明 \quad本篇文章是在文章SpringMVC&#xff1a;SSM整合&#xff08;Spring…

【Vuex 源码学习】第六篇 - Vuex 的模块收集

一&#xff0c;前言 上一篇&#xff0c;主要介绍了 Vuex 中 Mutations 和 Actions 的实现&#xff0c;主要涉及以下几个点&#xff1a; 将 options 选项中定义的 mutation 方法绑定到 store 实例的 mutations 对象&#xff1b;创建并实现 commit 方法&#xff08;同步&#x…

最近挺火的人工智能chatGPT注册

文章目录1.前提预备1.1 短信接收平台1.2 ip加速&#xff0c;不做说明2.注册chatGPT步骤2.1 进入chat.openai.com网址后&#xff0c;点击sign up2.2 可以使用qq邮箱注册2.3 填写好邮箱&#xff0c;然后点击Continue,然后再填写密码2.4 之后在qq邮箱进行验证注册(注意&#xff1a…

C++入门——内存管理

C入门——内存管理 C/C内存分布 分类是为了更好的管理 int globalVar 1; static int staticGlobalVar 1; void Test() {static int staticVar 1;int localVar 1;int num1[10] {1, 2, 3, 4};char char2[] "abcd";char* pChar3 "abcd";int* ptr1 (…

Java、JSP环境保护与宣传网站的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;本文对环境保护与宣传网站的设计和开发过程进行了详细地分析与叙述。按照系统开发的实际操作流程以及论文编写的规范&#xff0c;论文内容从系统概述、系统分析、系统设计和系统实现这四大模块对系统的开发过程分别进行了阐述。系统…

python3-API流量回放/锲约测试/自动化测试

PPL-Tester 简介 http工具集,通过代理获取到API的请求与响应信息,将这些请求信息进行流量回放/锲约测试或快速生成用例, 亦可通过人工进行修改参数化提取、变量引用、断言等形成API自动化测试用例等! 你以为只是流量回放吗?错~走去瞧瞧v2版本! 看官~请记得给个star呗? 项…

驱动 | Linux | NVMe - 1. 概述

本文主要参考2篇相关的解析 1’ 2 和 linux 源码 3。 此处推荐一个可以便捷查看 linux 源码的网站 bootlin 4。 更新&#xff1a;2022 / 02 / 11 驱动 | Linux | NVMe - 1. 概述与nvme_core_init函数解析NVMe 的前世今生NVMe CommandPCI 总线从架构角度看 NVMe 驱动NVMe 驱动的…

前端开发中如何处理接口数据过大的问题

题引&#xff1a; 当我们在公司做项目的时候&#xff0c;难免会遇到后端接口直接给你返回成千上万的数据进行渲染。如果我们直接一股脑遍历添加的话&#xff0c;就会导致空白页面的等待时间是很长且异常卡顿&#xff0c;那么对于数据过大的渲染就需要进行特殊的处理。这也是一…

PyQt5数据库开发1 4.1 SQL Server 2008 R2如何开启数据库的远程连接

文章目录 前言 步骤/方法 1 使用windows身份登录 2 启用混合登录模式 3 允许远程连接服务器 4 设置sa用户属性 5 配置服务器 6 重新登录 7 配置SSCM 8 确认防火墙设置 注意事项 前言 SQL Server 2008 R2如何开启数据库的远程连接 SQL Server 2008默认是不允许远程连…

ExecutorService、Callable、Future实现有返回结果的多线程原理解析

在并发多线程场景下&#xff0c;存在需要获取各线程的异步执行结果&#xff0c;这时&#xff0c;就可以通过ExecutorService线程池结合Callable、Future来实现。 我们先来写一个简单的例子—— public class ExecutorTest {public static void main(String[] args) throws Ex…

KMP 算法

1 应用场景-字符串匹配问题  字符串匹配问题&#xff1a;&#xff1a; 有一个字符串 str1 ““硅硅谷 尚硅谷你尚硅 尚硅谷你尚硅谷你尚硅你好””&#xff0c;和一个子串 str2“尚硅谷你尚硅 你” 2) 现在要判断 str1 是否含有 str2, 如果存在&#xff0c;就返回第一次出现…

数据与C(limits.h数据常数介绍)

本章简单的介绍一下limits.h的数据常量&#xff0c;这里简单了解一下就好了 目录 一.limits.h 二.float.h头文件 一.limits.h CHAR_BIT char类型的位数 CHARMAX char类型的最大值 CHAR_MIN char类型的最小值 SCHAR_MAX signed char类型的最大…