epoll监听多路IO(多路传感器数据高效处理)

news2024/12/29 8:32:56

        知识点: epoll模型(使用成员ptr携带信息), udp(#pragma pack结构体对齐), socketcan(帧过滤), Linux多路uart232tousb列表获取, 正则匹配, ASCII乱码检测, C++线程(lambda), 非阻塞读。 

 一、代码

#include <iostream>
#include <stdlib.h>
#include <string>
#include <vector>
#include <map>
#include <dirent.h>     
#include <regex>
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <linux/can.h>
#include <linux/can/raw.h>
#include <linux/can/error.h>
#include <net/if.h>
#include <thread>

#define MAX_EVENTS  1024

enum class DataType { 

    IMU=1,          // uart232toUsb1
    Ultrasonic,     // uart232toUsb2
    GNSS,           // udp
    MWaveRadar      // socketCan
};

struct MyEpoll { 

    int fd;
	void *arg;
    double timeStamp=0.0;       // 记录每个传感器距离上次处理的时间

	void (*call_back)(int fd, void *arg);
	
	char buf[512];	    // 数据存储
    int len;			// 数据大小
};
struct MyEpoll privateData[MAX_EVENTS];  

/*****************函数声明*******************/
void getUsbList(std::vector<std::string>& UsbList);
bool isValidata(const std::string& check_data);
int uart_open(const char* ttysn);
int uart_conf_set(int fd,int nBaud,int nBits,int nStop,char nEvent) {}
void tryOpenUsbList(const std::vector<std::string>& UsbList, std::vector<int>& fdSet, std::map<DataType, int>& fdMap);
int udp_client_init(const int client_port);
int socket_can_init(const char * can_dev);
void usbDealCB(int fd, void* arg);
void usb1dataDeal(int fd, void* arg);
void usb2dataDeal(int fd, void* arg);
void udpdataDeal(int fd, void* arg);
void cansocketDataDeal(int fd, void* arg);

int main() { 

    // 获取usb列表
    std::vector<std::string> UsbList;
    getUsbList(UsbList);
    if (UsbList.empty()) {
        std::cout << "UsbList is Empty!" << std::endl;
        return -1;        
    }

    // 尝试波特率正确打开, 非阻塞读, 乱码检测, 绑定fd和设备
    std::vector<int> fdSet;
    std::map<DataType, int> fdMap;
    tryOpenUsbList(UsbList, fdSet, fdMap);
    if (fdMap.empty()) {
        std::cout << "fdMap is Zero!" << std::endl;
        return -1;
    }

    // 获取udp列表
    int upd_client_fd = -1;
    {   // udp: socket\bind(), 客户端ip+port
        upd_client_fd = udp_client_init(3001);
    }

    // 获取socketcan列表
    int can_client_fd = -1;
    {   
        can_client_fd = socket_can_init("can1");
    }

    // epfd指向一个红黑树树根
    int epfd = epoll_create(MAX_EVENTS);  

    // usb挂载
    int cur_eventposi = -1;     // 已占用的资源位置
    struct epoll_event epv[MAX_EVENTS];       
    for (int i = 0; i<fdSet.size(); ++i) { 

        privateData[i].fd = fdSet[i];
        privateData[i].call_back = usbDealCB;       // 处理函数
        privateData[i].arg = &privateData[i];       // 指向自己
        privateData[i].len = 0;
	    memset(privateData[i].buf, 0, sizeof(privateData[i].buf));

        // epv[i].data.fd = fdSet[i];           // 注意: epoll_event类型的data成员是union,使用时(*ptr\fd)只能存在一个
        epv[i].data.ptr = &privateData[i];      // core: 以便于后面根据fd调用具体的协议解析函数
        epv[i].events = EPOLLIN | EPOLLET;      // epoll边沿触发 + 描述符的读事件
        epoll_ctl(epfd, EPOLL_CTL_ADD, fdSet[i], &epv[i]);      // add添加节点
        cur_eventposi = i;
    }

    // udp挂载
    privateData[cur_eventposi].fd = upd_client_fd;
    privateData[cur_eventposi].call_back = udpdataDeal;                 // 处理函数
    privateData[cur_eventposi].arg = &privateData[cur_eventposi];       // 指向自己
    privateData[cur_eventposi].len = 0;
    memset(privateData[cur_eventposi].buf, 0, sizeof(privateData[cur_eventposi].buf));
    epv[cur_eventposi].data.ptr = &privateData[cur_eventposi];
    epv[cur_eventposi].events = EPOLLIN | EPOLLET;
    epoll_ctl(epfd, EPOLL_CTL_ADD, upd_client_fd, &epv[cur_eventposi]);
    cur_eventposi++;

    // socketCan挂载
    privateData[cur_eventposi].fd = can_client_fd;
    privateData[cur_eventposi].call_back = cansocketDataDeal;           // 处理函数
    privateData[cur_eventposi].arg = &privateData[cur_eventposi];       // 指向自己
    privateData[cur_eventposi].len = 0;
    memset(privateData[cur_eventposi].buf, 0, sizeof(privateData[cur_eventposi].buf));
    epv[cur_eventposi].data.ptr = &privateData[cur_eventposi];
    epv[cur_eventposi].events = EPOLLIN | EPOLLET;
    epoll_ctl(epfd, EPOLL_CTL_ADD, upd_client_fd, &epv[cur_eventposi]);
    cur_eventposi++;

    struct epoll_event events[MAX_EVENTS];
    while (true) { 

        int readyNums = epoll_wait(epfd, events, sizeof(epv), 1000);     // 1000ms没数据报警
        if (readyNums==0) {
            std::cout << "don't received data since 1000 ms." << std::endl;
            continue;
        } else if (readyNums<0) {
            std::cout << "epoll error." << std::endl;
            continue;            
        } else {
            
            // 开线程去处理
            for (int i=0; i<readyNums; ++i) { 
                
                struct MyEpoll* nh = (struct MyEpoll *)events[i].data.ptr;    // 通过ptr取出该fd的参数
                if (events[i].events & EPOLLIN) {                             // 读就绪事件, 调用回调函数处理

                    std::thread([&nh]() -> void {
                        nh->call_back(nh->fd, nh->arg);
                    });
                }
            }
        }
    }

    return 0;
}


void getUsbList(std::vector<std::string>& UsbList) { 

    DIR* dir;
    struct dirent* entry;
    std::regex regex_usb("^ttyUSB[0-9]+$");     // 正则匹配

    dir = opendir("/dev");
    if (dir == NULL) {
        std::cerr << "Failed to open directory." << std::endl;
        return;
    }

    while ((entry = readdir(dir)) != NULL) {
        std::string filename = entry->d_name;
        if (std::regex_match(filename, regex_usb)) {
            UsbList.emplace_back("/dev/" + filename);
            std::cout << UsbList[UsbList.size()-1] << std::endl;
        }
    }

    closedir(dir);
    return;
}
bool isValidata(const std::string& check_data) {
    if (check_data.length() < 2) {
        std:: cout << "please input large data for check!" << std::endl;
        return true;
    }
    /*
    [AscII]
        0-31、127del是一些控制字符,属于非打印字符;32空格、34-126是可打印字符;扩展集合128-255
    */
    for (char c : check_data) {
        if (c < 32 || c > 126) {
            return true;          // 非法
        }
    }
    return false;   // 合法
}
int uart_open(const char* ttysn) {
    int fd = open(ttysn, O_RDWR|O_NOCTTY); 
    if(fd == -1){
        perror("Open UART failed!");
        return -1;
    }
    return fd;
}
void tryOpenUsbList(const std::vector<std::string>& UsbList, 
                    std::vector<int>& fdSet, 
                    std::map<DataType, int>& fdMap) {
    std::vector<int> BaudRate{9600, 115200, 230400};
    char msgBuff_forCheck[32];
    for (const auto& usbx : UsbList) {      // 尝试第一个usb设备
        int fd = -1;
        fd = uart_open(usbx.c_str());

        for ( int i = 0; i<BaudRate.size(); ++i) {      // 尝试波特率
            memset(msgBuff_forCheck, 0, sizeof(msgBuff_forCheck));

            int fctl = fcntl(fd, F_GETFL);  // 设置非阻塞读
            fcntl(fd, F_SETFL, fctl | O_NONBLOCK); 
            uart_conf_set(fd, BaudRate[i], 8, 1, 'N');

            for (int num=0, ret=0; num<sizeof(msgBuff_forCheck); ) {        // 尝试读取
                ret = read(fd, msgBuff_forCheck+num, sizeof(msgBuff_forCheck)-num);
                num += ret;
            }
            std::string recv2str = msgBuff_forCheck;        // 检验是否乱码
            if(isValidata(recv2str)) { 
                continue;
            } else {    // 无乱码 
                std::cout << "success open usb: " << usbx << ":" << BaudRate[i] << std::endl;

                // 绑定fd和对应的设备
                // ... ... (利用帧头\帧尾)
                fdMap[DataType::IMU] = fd;
                fdSet.emplace_back(fd);
                break;
            }
        }
    }
}

void usbDealCB(int fd, void* arg) {

    switch (fd) { 
        case (int)DataType::IMU:        usb1dataDeal(fd, arg); break;
        case (int)DataType::Ultrasonic: usb2dataDeal(fd, arg); break;
        default: break;
    }
}

void usb1dataDeal(int fd, void* arg) { 
	struct MyEpoll* pdata = (struct MyEpoll *)arg;
	// 接收数据
	int len = read(pdata->fd, pdata->buf, sizeof(pdata->buf));	
	if(len <= 0) {
		std::cout << "fd[" << fd << "] " << "read failed!" << std::endl;
        close(pdata->fd);
		return;
	}
    pdata->len = len;
    // 解析数据
    // ... ...

    return;
}

void usb2dataDeal(int fd, void* arg) { return; }

void udpdataDeal(int fd, void* arg) { 
    struct MyEpoll* pdata = (struct MyEpoll *)arg;
    struct sockaddr_in server_addr;
    ushort temo_Message_ID;		// 帧id
    // 接收
    recvfrom(pdata->fd, pdata->buf, sizeof(pdata->buf),0,
                           (struct sockaddr*)&server_addr, (socklen_t *)sizeof(server_addr));
    // 解析
    memcpy(&temo_Message_ID, pdata->buf + 4, sizeof(temo_Message_ID));
    switch (temo_Message_ID) {

        // case 42: memcpy();break; // 调用对应的结构体填充就行, 注意结构体 #pragma pack(1) 
        default: break;
    }
}

void cansocketDataDeal(int fd, void* arg) { 
    struct MyEpoll* pdata = (struct MyEpoll *)arg;
    struct can_frame msg_frame;
    // 接收
    ssize_t nbs = read(pdata->fd, &msg_frame, sizeof(msg_frame));
    if (nbs>0) {

        switch(msg_frame.can_id) {  // 帧id
            // case 0x101: break;
            default: break;
        }
    }
}

int udp_client_init(const int client_port)
{
    // 申请一个fd 
    int client_sock_fd = socket(AF_INET,SOCK_DGRAM,0);  

    // bind服务器地址
    struct sockaddr_in serveraddr;
    if(bind(client_sock_fd,(struct sockaddr *)&serveraddr,sizeof(serveraddr)) == -1){
        perror("udp client socket)fd bind failed");
        exit(-1);
    } 

    char recvbuf[1024] = {0};
    bzero(recvbuf,sizeof(recvbuf));
	// 测试连接, 尝试向服务器发送数据
    if (sendto(client_sock_fd,recvbuf,sizeof(recvbuf),0,
           (struct sockaddr*) &serveraddr,sizeof(serveraddr))==-1)
        return -1;

    return client_sock_fd;
}

int socket_can_init(const char * can_dev) {
    // 创建 SocketCAN 套接字
    int32_t fd = 0;
    if ((fd = socket(PF_CAN,SOCK_RAW,CAN_RAW)) < 0){
        perror("socketcan open failed");
        return -1;
    }
    // 指定 can 设备
    struct ifreq sockcan_conf;
    strcpy(sockcan_conf.ifr_name, can_dev);
    // 获取网络接口索引
    ioctl(fd, SIOCGIFINDEX, &sockcan_conf);

    // 将套接字地址与 can 绑定
    struct sockaddr_can addr;
    bzero(&addr,sizeof(addr));
    addr.can_family = AF_CAN;
    addr.can_ifindex = sockcan_conf.ifr_ifindex;
    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr))){
       perror("sockaddr_can bind failed");
    }
    // 设置过滤规则(自己配置)
    struct can_filter rfilter[1];
    rfilter[0].can_id = 0x123;          // 过滤 ID 为 0x123 的帧
    rfilter[0].can_mask = CAN_SFF_MASK; // 使用标准帧过滤掩码
    setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FILTER, &rfilter, sizeof(rfilter));

    return fd;
}

二、Unix下IO模型

       UNIX5大IO模型:阻塞io、非阻塞io、io复用、信号驱动式io、异步io。

        总的来说,IO操作分2阶段:
        1)数据准备阶段;
        2)内核空间复制回用户进程缓冲区阶段;

-前4种io:仍然属于同步io,只是由内核监听请求,收到请求后数据的拷贝过程还是要应用程序去等待执行,这段时间应用程序是阻塞的;
-异步io:你应用程序不用管了,你只需要告诉我(内核)你要做什么,我做完了直接把结果给你(数据从内核到用户态也不需要你干预);

-io模型——io复用(select、poll、epoll):

1)select:

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

        参数:监听的最大描述符+1、监听的读描述符集合rset、写描述符集合wset、异常描述符集合illset(传入传出参数),返回的请求数(对少个fd有动作),只能监听集合是否有动作,不能分清是什么动作、具体哪几个动作了,需要自己遍历描述符集合,如判断读描述符集合里的fd1是否动作了:FD_ISSET(fd1, &rset);

        【缺点】:

        1)有监听有上限数;
        2)每次select函数调用都要将监听的描述符拷贝到内核;
        3)内核中进行轮询一遍所有描述符变化;(在timeout时间之内不断轮询,fd对应的socketbuff是否有数据到达);

2)poll:相较于select的提升在于监听描述符上限优化了。

3)epoll:提升了select的1)无上限,2)只需要拷贝一次描述符集合到内核,对于3)==> 使用红黑树管理监听的fd,在查找、插入、删除速度快。内部维护一个就绪链表,某个fd就绪时,调用回调函数机制把就绪fd放入就绪链表中,调用epoll_wait只需要不断轮询就绪链表是否为空就ok,节省了CPU大量遍历的时间。

总结: 

参考链接:IO多路复用——深入浅出理解select、poll、epoll的实现 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/887760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jmeter errstr :“unsupported field type for multipart.FileHeader“

在使用jmeter测试接口的时候&#xff0c;提示errstr :"unsupported field type for multipart.FileHeader"如图所示 这是因为我们 在HTTP信息头管理加content-type参数有问题 直接在HTTP请求中&#xff0c;勾选&#xff1a; use multipart/form-data for POST【中文…

uniapp条形码实现

条形码在实际应用场景是经常可见的。 这里教大家如何集成uniapp条形码。条形码依赖类库JsBarcode. 下载JsBarcode源码&#xff0c;对CanvasRenderer进行了改进兼容uniapp。 import merge from "../help/merge.js"; import {calculateEncodingAttributes, getTotal…

POSTGRESQL 关于2023-08-14 数据库自动启动文章中使用KILL 来进行配置RELOAD的问题解释...

开头还是介绍一下群&#xff0c;如果感兴趣Polardb ,mongodb ,MySQL ,Postgresql ,redis &#xff0c;SQL SERVER ,ORACLE,Oceanbase 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请加 liuaustin3微信号 &…

Text-to-SQL小白入门(二)——Transformer学习

摘要 本文主要针对NLP任务中经典的Transformer模型的来源、用途、网络结构进行了详细描述&#xff0c;对后续NLP研究、注意力机制理解、大模型研究有一定帮助。 1. 引言 在上一篇《Text-to-SQL小白入门&#xff08;一&#xff09;》中&#xff0c;我们介绍了Text-to-SQL研究…

Java版电子招投标管理系统源码-电子招投标认证服务平台-权威认证 tbms

​ 功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;…

【mysql】—— 表的增删改查

目录 序言 &#xff08;一&#xff09;Create操作 1、单行数据 全列插入 2、多行数据 指定列插入 3、插入否则更新 4、直接替换 &#xff08;二&#xff09;Retrieve操作 1、SELECT 列 1️⃣全列查询 2️⃣指定列查询 3️⃣查询字段为表达式 4️⃣为查询结果指定…

无涯教程-Perl - sprintf函数

描述 此函数使用FORMAT基于LIST中的值返回格式化的字符串。本质上与printf相同,但是返回格式化的字符串而不是将其打印。 语法 以下是此函数的简单语法- sprintf FORMAT, LIST返回值 此函数返回SCALAR(格式化的文本字符串)。 例 以下是显示其基本用法的示例代码- #!/us…

SQL Injection

SQL Injection 就是通过把恶意的sql命令插入web表单递交给服务器&#xff0c;或者输入域名或页面请求的查询字符串递交到服务器&#xff0c;达到欺骗服务器&#xff0c;让服务器执行这些恶意的sql命令&#xff0c;从而让攻击者&#xff0c;可以绕过一些机制&#xff0c;达到直…

【Android Studio】 win11 安装配置 jdk17 超详细

概述 一个好的安装教程能够帮助开发者完成更便捷、更快速的开发。书山有路勤为径&#xff0c;学海无涯苦作舟。我是秋知叶i、期望每一个阅读了我的文章的开发者都能够有所成长。 一、下载JDK JDK官网 这里下载 JDK17 windows x64 installer 二、安装JDK 双击打开下载的 j…

计算机毕设项目之基于django+mysql的疫情实时监控大屏系统(前后全分离)

系统阐述的是一款新冠肺炎疫情实时监控系统的设计与实现&#xff0c;对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计&#xff0c;描述&#xff0c;实现和分析与测试方面来表明开发的过程。开发中使用了 django框架和MySql数据库技术搭建系统的整体…

AutoSAR配置与实践(基础篇)3.1 BSW架构和主要功能概要

传送门 点击返回 ->AUTOSAR配置与实践总目录 AutoSAR配置与实践(基础篇)3.1 BSW架构和主要功能概要 一、什么叫BSW二、分层介绍BSW主要功能2.1 微处理器抽象层MCAL2.2 ECU抽象层2.3 服务层2.4 复杂驱动CDD三、总结BSW各层主要功能一、什么叫BSW BSW是Basic Software的缩…

LeetCode150道面试经典题-- 合并两个有序链表(简单)

1.题目 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 2.示例 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4] 示例 2&#xff1a; 输入&#xff1a;l1 [], l2 [] 输…

使用Kaptcha生成验证码

说明&#xff1a;验证码&#xff0c;是登录流程中必不可少的一环&#xff0c;一般企业级的系统&#xff0c;使用都是专门制作验证码、审核校验的第三方SDK&#xff08;如极验&#xff09;。本文介绍&#xff0c;使用谷歌提供的Kaptcha技术&#xff0c;制作一个简单的验证码。 …

关系代数运算中的集合运算符和关系运算符

关系代数是一种抽象的查询语言&#xff0c;是研究关系模型的数学工具。关系代数的运算对象是关系&#xff0c;运算结果也是关系。关系代数运算符主要分为集合运算符和关系运算符两大类。 集合运算符有笛卡尔积、并、交、差&#xff0c;关系运算符有除、选择、投影、连接&#x…

ABAP内表与XML格式互转

1需求说明 在系统交互中需要将SAP内表转换为XML文件&#xff0c;发送给其他系统&#xff0c;并且将其他系统返回的XML文件转换为SAP内表。 2创建转换编辑器 事务代码&#xff1a;STRANS 选择简单转换 以图形方式编辑 右键插入新行 选择参考的类型。此处的TYPE类型是SE11中创…

基于STM32的指纹门禁系统

基于STM32的指纹门禁系统 系统简介 指纹系统&#xff0c;可以存指纹&#xff1b;指纹对了之后开门&#xff1b; 材料 STM32F103C8T6芯片OLED液晶显示屏杜邦线AS608模块用keil5进行开发 开始制作 制作过程 AS608模块调试 模块原理 模块是通过串口通信,采用回复式通信。 这…

基于Python编写个语法解析器

这篇文章主要为大家详细介绍了如何基于Python编写个语法解析器&#xff0c;文中的示例代码讲解详细&#xff0c;具有一定的学习价值&#xff0c;感兴趣的小伙伴可以了解一下 − 目录 前言选型效果实现 字符指针错误类型语法解析交互 前言 目的纯粹&#xff0c;基于Python做…

26、springboot的自动配置03--核心功能--自定义条件注解及使用

开发自己的自动配置------开发自己的条件注解 ★ 自定义条件注解 好处有两个&#xff1a; 1. 真正掌握Spring boot条件注解的本质。 2. 项目遇到一些特殊的需求时&#xff0c;也可以开发自己的自定义条件注解来解决问题。自定义条件注解&#xff1a; ▲ 所有自定义注解其实都…

香港服务器三网直连内地线路什么意思?好用吗?

​  三网直连内地是指香港服务器可以直接连接中国内地的电信、联通和移动三大运营商网络&#xff0c;避免了中间网络干线的支持。这样可以实现直接、快速、稳定的网络访问&#xff0c;提高用户对网络访问的效率&#xff0c;减少网络访问问题和拥堵的现象。 香港服务器直连内地…

开源项目AJ-Captcha使用小结

在网上看到开源项目AJ-Captcha&#xff0c;想把它加入到自己的项目中&#xff0c;遇到了一些问题&#xff0c;记录一下。 AJ-Captcha: 行为验证码(滑动拼图、点选文字)&#xff0c;前后端(java)交互&#xff0c;包含vue/h5/Android/IOS/flutter/uni-app/react/php/go/微信小程…