继之前的一篇文章 业务需求是这样 程序中配置了很多个网络设备 这些设备作为server端
每隔1分钟要通过socket去和设备通信 以此来实现 设备是否在线
默认最传统的方法 一个线程中 遍历这些设备 假设有30个设备 每个设备超时时间5秒 那么 遍历一遍需要30*5 = 150秒 如果 有100个设备 就需要500秒
明显不是很好
优化方法是是通过线程 遍历的时候每次去创建一个新线程 在新线程中执行 socket连接测试任务
这个方法 当然没有问题 设备少的话 问题也不大 但是如果 有100个设备
每次去创建100个线程 然后销毁100个线程 显然不是很好
然后很自然的就想到 用线程池的方法
这个 其实 能应付大多数的需求了 假如有100个设备 最坏的情况 每个设备都不通 超时时间为5秒
线程池大小设为1 的话 需要500秒
线程池大小设为10的话则需要50秒
线程池大小设为20 的话 则需要 25秒 如果 每次检查设备在线时间设为1分钟 这种情况下是满足需求的
然后又想了用EPOLL的方法 来将每个客户端的socket fd加入到epoll中
然后在循环中 检查读写事件 但是实际测试下来 效果并不是很好 感觉还是单线程 在跑的感觉
后来 想到 协程
linux下的协程库 state thread
经过测试完美的解决了这个问题 设置的超时时间为5秒 6个 设备都不通 5秒后所有轮训结束
再看一个在实际项目中测试的情况 总共有24个设备 有 21个有数据返回 一共只花了 5004毫秒
框架代码如下 TimerTask 是程序启动之后开启的 定时器任务 目前是每个20秒检查一次
Report函数如下:
void QMCY_APP::Report()
{
std::unique_lock<std::mutex> lock(m_table_mutex);
jsonxx::json response ;
struct timeval time_before{};
struct timeval time_after{};
#if 1
gettimeofday(&time_before, nullptr);
double msecs_time1 = (double)(time_before.tv_sec * 1000) + (double)(time_before.tv_usec / 1000.0);
#endif
std::cout<<"Begin Get state from led server"<<std::endl;
std::vector<st_thread_t> st_threads;
int index = 0;
for(auto it=m_led_table.begin(); it!=m_led_table.end();it++)
{
auto led = it->second;
if(m_run_flag.load() == false || led == nullptr)
{
return;
}
LED_MSG led_msg;
led_msg.msg_id = MSG_GET_LED_STATE;
led_msg.playlist.bmsid = it->first;
if(m_update_flag.load()|| m_run_flag.load() == false)
{
std::cout<<"Updating led table ....................Get led state will not be executed"<<std::endl;
}
else
{
auto led = m_led_table[led_msg.playlist.bmsid];
if(led)
{
st_thread_t led_thread = st_thread_create(GetStateThread, (void *) led.get(), 1,0);
//led->NVR_GetLEDState();
if(led_thread)
{
st_threads.push_back(led_thread);
}
}
}
}
for(auto &item:st_threads)
{
st_thread_join(item,NULL);
}
#if 1
gettimeofday(&time_after, nullptr);
double msecs_time2 =(double) (time_after.tv_sec * 1000) + (double)(time_after.tv_usec / 1000.0);
auto elapse = msecs_time2-msecs_time1;
printf("Getstate with ST thread takes time:%f \n",elapse);
#endif
#ifdef EPOLL
HandleEPOLL();
DelFromEPOLL();
#endif
int online_count = 0;
for(auto &it :m_led_table)
{
jsonxx::json item;
auto result = it.second ->NVR_GetStatus();
item["bmsid"]= it.first;
item["status"]= result.first;
if(result.first == 0)
{
online_count++;
}
item["msg"]= result.second;
response.push_back(std::move(item));
}
std::cout<<"Report online status [total:"<<m_led_table.size()<<" online:"<<online_count<<"]"<<std::endl;
auto output = response.dump();
if(output.size()>5)
{
m_led_status = output;
if(auto res = m_http_client->Post("/qmcy",output,"application/json"))
{
if (res->status == 200)
{
}
else
{
auto err = res.error();
std::cout << "HTTP error: " << httplib::to_string(err) << std::endl;
}
}
else
{
std::cout<<"Report status to server failed!"<<std::endl;
//zlog_error(g_zlog,"Report status to server[%s:%d] failed!",pHandle->server_ip,pHandle->server_port);
}
}
else
{
std::cout<<"output is invalid size :"<<output.size()<<std::endl;
}
}
实际执行的线程函数
具体执行的函数 中 主要是需要将传统的 网络的io函数都改为 st的io函数 才有用
具体代码如下 CreateSocket 是用传统的socket函数 从connect开始的所有io函数都得换成st库的io函数
if(m_get_socket->CreateSocket(3))
{
#ifdef QMCY_STLIB
#define ST_TIMEOUT (1000*10000)
auto fd = m_get_socket->GetRawFD();
st_netfd_t st_fd = nullptr;
st_fd = st_netfd_open_socket(fd);
if(st_fd)
{
const char *server_ip = m_ip.c_str();
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(m_port);
inet_pton(AF_INET,server_ip,&server_addr.sin_addr);
if(st_connect(st_fd, (struct sockaddr*)&server_addr,sizeof(server_addr), ST_TIMEOUT)== -1)
{
st_netfd_close(st_fd);
}
char buf[1024] = {0};
int nw, nr;
nw = st_write(st_fd, packet.c_str(), packet.size(), ST_TIMEOUT);
if(nw != packet.size())
{
std::cout<<"st_write:"<<nw<<std::endl;
}
nr = (int) st_read(st_fd, buf, 1024, ST_TIMEOUT);
if (nr >0)
{
std::cout<<"st_read >0:"<<nr<<std::endl;
//auto ret = CheckResponse(buf,nr);
if((unsigned char)buf[0] == PROTOCOL_HEAD_BYTE && (unsigned char)buf[nr-3] == PROTOCOL_TAIL_BYTE)
{
m_status = 0;
m_fault_msg= LED_ONLINE;
}
else
{
std::cout<<"Not whole packet"<<std::endl;
m_status = 1;
m_fault_msg= LED_NOT_MATCH;
}
}
else
{
std::cout<<"st_read <=0:"<<nr<<std::endl;
m_status = 1;
m_fault_msg= LED_NO_DATA;
}
st_netfd_close(st_fd);
return 0;
}