知识点: epoll模型(使用成员ptr携带信息), udp(#pragma pack结构体对齐), socketcan(帧过滤), Linux多路uart232tousb列表获取, 正则匹配, ASCII乱码检测, C++线程(lambda), 非阻塞读。
一、代码
#include <iostream>
#include <stdlib.h>
#include <string>
#include <vector>
#include <map>
#include <dirent.h>
#include <regex>
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <linux/can.h>
#include <linux/can/raw.h>
#include <linux/can/error.h>
#include <net/if.h>
#include <thread>
#define MAX_EVENTS 1024
enum class DataType {
IMU=1, // uart232toUsb1
Ultrasonic, // uart232toUsb2
GNSS, // udp
MWaveRadar // socketCan
};
struct MyEpoll {
int fd;
void *arg;
double timeStamp=0.0; // 记录每个传感器距离上次处理的时间
void (*call_back)(int fd, void *arg);
char buf[512]; // 数据存储
int len; // 数据大小
};
struct MyEpoll privateData[MAX_EVENTS];
/*****************函数声明*******************/
void getUsbList(std::vector<std::string>& UsbList);
bool isValidata(const std::string& check_data);
int uart_open(const char* ttysn);
int uart_conf_set(int fd,int nBaud,int nBits,int nStop,char nEvent) {}
void tryOpenUsbList(const std::vector<std::string>& UsbList, std::vector<int>& fdSet, std::map<DataType, int>& fdMap);
int udp_client_init(const int client_port);
int socket_can_init(const char * can_dev);
void usbDealCB(int fd, void* arg);
void usb1dataDeal(int fd, void* arg);
void usb2dataDeal(int fd, void* arg);
void udpdataDeal(int fd, void* arg);
void cansocketDataDeal(int fd, void* arg);
int main() {
// 获取usb列表
std::vector<std::string> UsbList;
getUsbList(UsbList);
if (UsbList.empty()) {
std::cout << "UsbList is Empty!" << std::endl;
return -1;
}
// 尝试波特率正确打开, 非阻塞读, 乱码检测, 绑定fd和设备
std::vector<int> fdSet;
std::map<DataType, int> fdMap;
tryOpenUsbList(UsbList, fdSet, fdMap);
if (fdMap.empty()) {
std::cout << "fdMap is Zero!" << std::endl;
return -1;
}
// 获取udp列表
int upd_client_fd = -1;
{ // udp: socket\bind(), 客户端ip+port
upd_client_fd = udp_client_init(3001);
}
// 获取socketcan列表
int can_client_fd = -1;
{
can_client_fd = socket_can_init("can1");
}
// epfd指向一个红黑树树根
int epfd = epoll_create(MAX_EVENTS);
// usb挂载
int cur_eventposi = -1; // 已占用的资源位置
struct epoll_event epv[MAX_EVENTS];
for (int i = 0; i<fdSet.size(); ++i) {
privateData[i].fd = fdSet[i];
privateData[i].call_back = usbDealCB; // 处理函数
privateData[i].arg = &privateData[i]; // 指向自己
privateData[i].len = 0;
memset(privateData[i].buf, 0, sizeof(privateData[i].buf));
// epv[i].data.fd = fdSet[i]; // 注意: epoll_event类型的data成员是union,使用时(*ptr\fd)只能存在一个
epv[i].data.ptr = &privateData[i]; // core: 以便于后面根据fd调用具体的协议解析函数
epv[i].events = EPOLLIN | EPOLLET; // epoll边沿触发 + 描述符的读事件
epoll_ctl(epfd, EPOLL_CTL_ADD, fdSet[i], &epv[i]); // add添加节点
cur_eventposi = i;
}
// udp挂载
privateData[cur_eventposi].fd = upd_client_fd;
privateData[cur_eventposi].call_back = udpdataDeal; // 处理函数
privateData[cur_eventposi].arg = &privateData[cur_eventposi]; // 指向自己
privateData[cur_eventposi].len = 0;
memset(privateData[cur_eventposi].buf, 0, sizeof(privateData[cur_eventposi].buf));
epv[cur_eventposi].data.ptr = &privateData[cur_eventposi];
epv[cur_eventposi].events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, upd_client_fd, &epv[cur_eventposi]);
cur_eventposi++;
// socketCan挂载
privateData[cur_eventposi].fd = can_client_fd;
privateData[cur_eventposi].call_back = cansocketDataDeal; // 处理函数
privateData[cur_eventposi].arg = &privateData[cur_eventposi]; // 指向自己
privateData[cur_eventposi].len = 0;
memset(privateData[cur_eventposi].buf, 0, sizeof(privateData[cur_eventposi].buf));
epv[cur_eventposi].data.ptr = &privateData[cur_eventposi];
epv[cur_eventposi].events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, upd_client_fd, &epv[cur_eventposi]);
cur_eventposi++;
struct epoll_event events[MAX_EVENTS];
while (true) {
int readyNums = epoll_wait(epfd, events, sizeof(epv), 1000); // 1000ms没数据报警
if (readyNums==0) {
std::cout << "don't received data since 1000 ms." << std::endl;
continue;
} else if (readyNums<0) {
std::cout << "epoll error." << std::endl;
continue;
} else {
// 开线程去处理
for (int i=0; i<readyNums; ++i) {
struct MyEpoll* nh = (struct MyEpoll *)events[i].data.ptr; // 通过ptr取出该fd的参数
if (events[i].events & EPOLLIN) { // 读就绪事件, 调用回调函数处理
std::thread([&nh]() -> void {
nh->call_back(nh->fd, nh->arg);
});
}
}
}
}
return 0;
}
void getUsbList(std::vector<std::string>& UsbList) {
DIR* dir;
struct dirent* entry;
std::regex regex_usb("^ttyUSB[0-9]+$"); // 正则匹配
dir = opendir("/dev");
if (dir == NULL) {
std::cerr << "Failed to open directory." << std::endl;
return;
}
while ((entry = readdir(dir)) != NULL) {
std::string filename = entry->d_name;
if (std::regex_match(filename, regex_usb)) {
UsbList.emplace_back("/dev/" + filename);
std::cout << UsbList[UsbList.size()-1] << std::endl;
}
}
closedir(dir);
return;
}
bool isValidata(const std::string& check_data) {
if (check_data.length() < 2) {
std:: cout << "please input large data for check!" << std::endl;
return true;
}
/*
[AscII]
0-31、127del是一些控制字符,属于非打印字符;32空格、34-126是可打印字符;扩展集合128-255
*/
for (char c : check_data) {
if (c < 32 || c > 126) {
return true; // 非法
}
}
return false; // 合法
}
int uart_open(const char* ttysn) {
int fd = open(ttysn, O_RDWR|O_NOCTTY);
if(fd == -1){
perror("Open UART failed!");
return -1;
}
return fd;
}
void tryOpenUsbList(const std::vector<std::string>& UsbList,
std::vector<int>& fdSet,
std::map<DataType, int>& fdMap) {
std::vector<int> BaudRate{9600, 115200, 230400};
char msgBuff_forCheck[32];
for (const auto& usbx : UsbList) { // 尝试第一个usb设备
int fd = -1;
fd = uart_open(usbx.c_str());
for ( int i = 0; i<BaudRate.size(); ++i) { // 尝试波特率
memset(msgBuff_forCheck, 0, sizeof(msgBuff_forCheck));
int fctl = fcntl(fd, F_GETFL); // 设置非阻塞读
fcntl(fd, F_SETFL, fctl | O_NONBLOCK);
uart_conf_set(fd, BaudRate[i], 8, 1, 'N');
for (int num=0, ret=0; num<sizeof(msgBuff_forCheck); ) { // 尝试读取
ret = read(fd, msgBuff_forCheck+num, sizeof(msgBuff_forCheck)-num);
num += ret;
}
std::string recv2str = msgBuff_forCheck; // 检验是否乱码
if(isValidata(recv2str)) {
continue;
} else { // 无乱码
std::cout << "success open usb: " << usbx << ":" << BaudRate[i] << std::endl;
// 绑定fd和对应的设备
// ... ... (利用帧头\帧尾)
fdMap[DataType::IMU] = fd;
fdSet.emplace_back(fd);
break;
}
}
}
}
void usbDealCB(int fd, void* arg) {
switch (fd) {
case (int)DataType::IMU: usb1dataDeal(fd, arg); break;
case (int)DataType::Ultrasonic: usb2dataDeal(fd, arg); break;
default: break;
}
}
void usb1dataDeal(int fd, void* arg) {
struct MyEpoll* pdata = (struct MyEpoll *)arg;
// 接收数据
int len = read(pdata->fd, pdata->buf, sizeof(pdata->buf));
if(len <= 0) {
std::cout << "fd[" << fd << "] " << "read failed!" << std::endl;
close(pdata->fd);
return;
}
pdata->len = len;
// 解析数据
// ... ...
return;
}
void usb2dataDeal(int fd, void* arg) { return; }
void udpdataDeal(int fd, void* arg) {
struct MyEpoll* pdata = (struct MyEpoll *)arg;
struct sockaddr_in server_addr;
ushort temo_Message_ID; // 帧id
// 接收
recvfrom(pdata->fd, pdata->buf, sizeof(pdata->buf),0,
(struct sockaddr*)&server_addr, (socklen_t *)sizeof(server_addr));
// 解析
memcpy(&temo_Message_ID, pdata->buf + 4, sizeof(temo_Message_ID));
switch (temo_Message_ID) {
// case 42: memcpy();break; // 调用对应的结构体填充就行, 注意结构体 #pragma pack(1)
default: break;
}
}
void cansocketDataDeal(int fd, void* arg) {
struct MyEpoll* pdata = (struct MyEpoll *)arg;
struct can_frame msg_frame;
// 接收
ssize_t nbs = read(pdata->fd, &msg_frame, sizeof(msg_frame));
if (nbs>0) {
switch(msg_frame.can_id) { // 帧id
// case 0x101: break;
default: break;
}
}
}
int udp_client_init(const int client_port)
{
// 申请一个fd
int client_sock_fd = socket(AF_INET,SOCK_DGRAM,0);
// bind服务器地址
struct sockaddr_in serveraddr;
if(bind(client_sock_fd,(struct sockaddr *)&serveraddr,sizeof(serveraddr)) == -1){
perror("udp client socket)fd bind failed");
exit(-1);
}
char recvbuf[1024] = {0};
bzero(recvbuf,sizeof(recvbuf));
// 测试连接, 尝试向服务器发送数据
if (sendto(client_sock_fd,recvbuf,sizeof(recvbuf),0,
(struct sockaddr*) &serveraddr,sizeof(serveraddr))==-1)
return -1;
return client_sock_fd;
}
int socket_can_init(const char * can_dev) {
// 创建 SocketCAN 套接字
int32_t fd = 0;
if ((fd = socket(PF_CAN,SOCK_RAW,CAN_RAW)) < 0){
perror("socketcan open failed");
return -1;
}
// 指定 can 设备
struct ifreq sockcan_conf;
strcpy(sockcan_conf.ifr_name, can_dev);
// 获取网络接口索引
ioctl(fd, SIOCGIFINDEX, &sockcan_conf);
// 将套接字地址与 can 绑定
struct sockaddr_can addr;
bzero(&addr,sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = sockcan_conf.ifr_ifindex;
if (bind(fd, (struct sockaddr *) &addr, sizeof(addr))){
perror("sockaddr_can bind failed");
}
// 设置过滤规则(自己配置)
struct can_filter rfilter[1];
rfilter[0].can_id = 0x123; // 过滤 ID 为 0x123 的帧
rfilter[0].can_mask = CAN_SFF_MASK; // 使用标准帧过滤掩码
setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FILTER, &rfilter, sizeof(rfilter));
return fd;
}
二、Unix下IO模型
UNIX5大IO模型:阻塞io、非阻塞io、io复用、信号驱动式io、异步io。
总的来说,IO操作分2阶段:
1)数据准备阶段;
2)内核空间复制回用户进程缓冲区阶段;
-前4种io:仍然属于同步io,只是由内核监听请求,收到请求后数据的拷贝过程还是要应用程序去等待执行,这段时间应用程序是阻塞的;
-异步io:你应用程序不用管了,你只需要告诉我(内核)你要做什么,我做完了直接把结果给你(数据从内核到用户态也不需要你干预);
-io模型——io复用(select、poll、epoll):
1)select:
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数:监听的最大描述符+1、监听的读描述符集合rset、写描述符集合wset、异常描述符集合illset(传入传出参数),返回的请求数(对少个fd有动作),只能监听集合是否有动作,不能分清是什么动作、具体哪几个动作了,需要自己遍历描述符集合,如判断读描述符集合里的fd1是否动作了:FD_ISSET(fd1, &rset);
【缺点】:
1)有监听有上限数;
2)每次select函数调用都要将监听的描述符拷贝到内核;
3)内核中进行轮询一遍所有描述符变化;(在timeout时间之内不断轮询,fd对应的socketbuff是否有数据到达);
2)poll:相较于select的提升在于监听描述符上限优化了。
3)epoll:提升了select的1)无上限,2)只需要拷贝一次描述符集合到内核,对于3)==> 使用红黑树管理监听的fd,在查找、插入、删除速度快。内部维护一个就绪链表,某个fd就绪时,调用回调函数机制把就绪fd放入就绪链表中,调用epoll_wait只需要不断轮询就绪链表是否为空就ok,节省了CPU大量遍历的时间。
总结:
参考链接:IO多路复用——深入浅出理解select、poll、epoll的实现 - 知乎