🔥博客主页: 我要成为C++领域大神
🎥系列专栏:【C++核心编程】 【计算机网络】 【Linux编程】 【操作系统】
❤️感谢大家点赞👍收藏⭐评论✍️本博客致力于知识分享,与更多的人进行学习交流
负载均衡技术
大量的用户请求可能导致任务分发不均匀,导致资源浪费,不能很好的处理和响应
通过预先设定的分发策略,最大的尝试均匀分发业务,让每台处理机都有任务负载
代理服务器
代理服务器是一个以数据中转为主要职责的中间件,代理服务器可以将用户的请求中转给处理服务器机,也可以将结果反馈给用户,避免用户直接访问服务器主机,提高安全性(安全策略都可以部署在代理服务器中),还可以进行任务的控制与分发,例如负载均衡可以在代理服务器中完成
HA高可用性结构
某个处理机宕机,可用通过HA概念将数据任务转发给正常的处理机。察觉处理机异常,快速反应
线程池设计原则
epoll具有强大的socket监听能力,可以快速察觉所有套接字,线程池具有高并发处理能力,大量的用户请求可用快速处理。
1)提高线程重用性,线程不能与用户绑定,可重复为多个用户处理业务,避免频繁创建销毁线程,减少不必要的开销。
2)预创建,提前准备好部分线程待用,用户发送请求后直接选择线程处理,提高响应速度
3)线程管理策略,设定线程池阈值,通过阈值管理调度线程,线程扩容与缩减
4)为服务器提供并发处理能力,可以更快处理请求或业务
5)提高线程池的重用性,用户实现任务,线程池负责执行任务
6)线程池使用生产者消费者实现,任务传递模式
工作流程
线程池:
生产者将要处理的业务传递到任务队列中去,如果任务队列中有可获取的任务,消费者一直获取执行,生产者投递的任务不允许持续占用消费者。管理者会对消费者进行扫描,根据阈值检测是否需要扩容和缩减,对消费者进行创建或者杀死。
epoll模型:
生产者负责实现epoll模型,将事件转换成业务,投递到线程池中
线程池的扩容和缩减
使用线程池最小阈值min,作为扩容增减数量
扩容:当前任务量>=闲消费者数量 或者 忙线程数量占活线程数量的70%
缩减:当前线程数量+扩容量,小于最大线程阈值
消费者与管理者配合实现缩减
epoll的水平触发模式和边缘触发模式
epoll 的水平触发(Level Triggered, LT)和边缘触发(Edge Triggered, ET)是两种不同的事件通知机制,它们定义了 epoll 如何向应用程序报告文件描述符上的事件。
水平触发(LT)
在水平触发模式下,只要满足条件的事件仍然存在,epoll 就会重复通知这个事件。比如,如果一个文件描述符上有可读数据,那么只要没有读完,epoll_wait 就会不断报告该文件描述符是可读的。这种模式的特点是:
容错性较好,不易丢失事件。
更易于编程和理解。
可以用于多线程程序中,多个线程可以共享同一个 epoll 文件描述符。
缺点:开销大,往返在socket缓冲区和用户之间
在水平模式下,我们的epoll+线程池模型有问题,当第一轮事件监听未处理完毕,epoll_wait不会阻塞,当再次有客户端发送任务时epoll_wait立即返回,并且会对任务进行误添加。
可以以边缘触发模式监听socket的读事件来避免这种问题,node.events=EPOLL | EPOLLET
边缘触发(ET)
边缘触发模式下,事件只在状态变化时被通知一次,之后即使条件仍然满足,也不会再次通知,直到状态再次发生变化。例如,只有当新数据到达使得文件描述符从非可读变为可读时,epoll_wait 才会报告可读事件。边缘触发模式的特点是:
效率更高,因为它减少了事件的重复通知。
需要更加小心地处理每次通知,确保处理所有的数据,否则可能会丢失未处理完的数据。
更适合单线程或者每个线程使用独立 epoll 文件描述符的场景。
代码实现
文件结构:
makefile:
server.h
#ifndef __server_H__
#define __server_H__
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#define TIMEOUT 1
pthread_mutex_t lock;
typedef struct
{
void *(*bussiness)(void *);//任务函数指针
void *arg;
}bs_t;
typedef struct
{
int thread_shutdown;//线程池开关
int thread_max;//最大线程数
int thread_min;//最小线程数
int thread_alive;//有效线程数
int thread_busy;//忙线程数量
int kill_number;//缩减码
bs_t *queue;//任务队列
int front;
int rear;
int cur;
int max;
pthread_cond_t Not_Full;
pthread_cond_t Not_Empty;
pthread_t * ctids;//存储消费者ID
pthread_t mtid;//存储管理者ID
}pool_t;//线程池类型
pool_t * thread_pool_create(int Max,int Min,int Qmax);//线程池初始化
int Producer_add_task(pool_t * p,bs_t bs);//生产者添加任务模块,执行一次添加一次任务
void *Customer_thread(void *arg);//消费者线程,参数为线程池地址
void *Manager_thread(void *arg);//管理者线程,参数为线程池地址
int thread_pool_destroy(pool_t *p);//销毁线程池
void * user_bussiness(void *arg);//自定义线程任务
int is_thread_alive(pthread_t pid);
thread_pool_create.c
#include "../include/server.h"
pool_t * thread_pool_create(int Max,int Min,int Qmax)
{
pool_t *ptr=NULL;
if((ptr=(pool_t*)malloc(sizeof(pool_t)))==NULL)
{
perror("thread_pool_create malloc pool failed");
exit(0);
}
ptr->thread_shutdown=1;
ptr->thread_max=Max;
ptr->thread_min=Min;
ptr->thread_alive=0;
ptr->kill_number=0;
ptr->thread_busy=0;
if((ptr->queue=(bs_t*)malloc(sizeof(bs_t)*Qmax))==NULL)
{
perror("thread_pool_create malloc queue failed");
exit(0);
}
if((ptr->ctids=(pthread_t*)malloc(sizeof(pthread_t)*Max))==NULL)
{
perror("thread_pool_create malloc ctids failed");
exit(0);
}
ptr->front=0;
ptr->rear=0;
ptr->cur=0;
ptr->max=Qmax;
if(pthread_cond_init(&ptr->Not_Full,NULL)!=0 || pthread_cond_init(&ptr->Not_Empty,NULL)!=0|| pthread_mutex_init(&lock,NULL)!=0)
{
printf("thread pool create failed,init Cond or Lock Failed\n");
exit(0);
}
int err;
for(int i=0;i<Min;++i)
{
if((err=pthread_create(&ptr->ctids[i],NULL,Customer_thread,(void*)ptr))!=0)
{
printf("thread pool create failed,customer thread create failed:%s",
strerror(err));
exit(0);
}
++ptr->thread_alive;
}
if((err=pthread_create(&ptr->mtid,NULL,Manager_thread,(void*)ptr))!=0)
{
printf("thread pool create failed,manager thread create failed:%s",strerror(err));
exit(0);
}
pthread_create(&ptid,NULL,print_thread,(void*)ptr);
printf("Print Thread Create Success...\n");
return ptr;
}
thread_pool_destroy.c
#include "../include/server.h"
int thread_pool_destroy(pool_t *p)
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&p->Not_Full);
pthread_cond_destroy(&p->Not_Empty);
free(p->ctids);
free(p->queue);
free(p);
return 0;
}
Epoll_Listen.c
#include "../include/server.h"
int Epoll_Listen(int serverfd,pool_t * p)
{
struct epoll_event ready_array[EPOLLMAX];
int ready,flag;
bs_t tmp;
printf("Epoll_Thread Server,Epoll_Listen Running...\n");
while(p->thread_shutdown)
{
if((ready=epoll_wait(epfd,ready_array,EPOLLMAX,-1))==-1)
{
perror("Epoll_Listen call failed,epoll_wait call failed");
exit(0);
}
flag=0;
while(ready)
{
if(ready_array[flag].data.fd==serverfd)
{
tmp.business=Business_Accept;
tmp.arg=(void *)&serverfd;
Producer_add_task(p,tmp);
}
else
{
tmp.business=Business_Retime;
tmp.arg=((void*)&ready_array[flag].data.fd);
Producer_add_task(p,tmp);
}
++flag;
--ready;
}
}
close(serverfd);
return 0;
}
Manager_thread.c
#include "../include/server.h"
void * Manager_thread(void *arg)
{
pthread_detach(pthread_self());
pool_t *p=(pool_t *)arg;
int alive,cur,busy;
pthread_mutex_lock(&lock);
alive=p->thread_alive;
busy=p->thread_busy;
cur=p->cur;
pthread_mutex_unlock(&lock);
int add,flag;
//持续执行
while(p->thread_shutdown)
{
if((cur>=alive-busy||(double)busy/alive*100>=70)&&alive+p->thread_min<=p->thread_max)
{
for(flag=0,add=0;flag<p->thread_max&&add<p->thread_min;flag++)
{
if(p->ctids[flag]==0 || !is_thread_alive(p->ctids[flag]))
{
pthread_create(&p->ctids[flag],NULL,Customer_thread,(void*)p);
add++;
pthread_mutex_lock(&lock);
++(p->thread_alive);
pthread_mutex_unlock(&lock);
}
}
pthread_kill(ptid,SIGUSR1);
}
if(busy*2<=alive-busy && alive-p->thread_min >= p->thread_min)
{
printf("%d\n",p->thread_min);
pthread_mutex_lock(&lock);
p->kill_number=p->thread_min;
pthread_mutex_unlock(&lock);
for(int i=0;i<p->thread_min;++i)
{
pthread_cond_signal(&p->Not_Empty);
}
}
sleep(TIMEOUT);
}
printf("Thread shutdown 0,manager thread[0x%x]exiting...\n",
(unsigned int)pthread_self());
pthread_exit(NULL);
}
Business_Retime.c
#include "../include/server.h"
void * Business_Retime(void *arg)
{
int toupper_flag=0;
char recv_buf[1024];
bzero(recv_buf,sizeof(recv_buf));
char time_buf[100];
bzero(time_buf,sizeof(time_buf));
time_t tp;
int recvlen;
int sockfd=*(int*)arg;
printf("111\n");
while((recvlen=recv(sockfd,recv_buf,sizeof(recv_buf),MSG_DONTWAIT))==-1)
{
if(errno==EINTR)
break;
perror("Business_Retime recv call failed");
exit(0);
}
if(recvlen>0)
{
if (strcmp(recv_buf, "localtime") == 0) {
tp = time(NULL); // 获取时间种子
ctime_r(&tp, time_buf);
time_buf[strcspn(time_buf, "\n")] = '\0';
send(sockfd, time_buf, strlen(time_buf) + 1, MSG_NOSIGNAL);
bzero(time_buf, sizeof(time_buf));
}
else
{
toupper_flag = 0;
while (recvlen > toupper_flag)
{
recv_buf[toupper_flag] = toupper(recv_buf[toupper_flag]);
++toupper_flag;
}
send(sockfd, recv_buf, recvlen, MSG_NOSIGNAL);
bzero(recv_buf, sizeof(recv_buf));
}
}
else if(recvlen==0)
{
close(sockfd);
epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
}
return NULL;
}
Epoll_Create.c
#include "../include/server.h"
int Epoll_Create(int serverfd)
{
int epfd;
if((epfd=epoll_create(EPOLLMAX))==-1)
{
perror("Epoll_Create call failed,epoll_create call failed");
exit(0);
}
struct epoll_event node;
node.data.fd=serverfd;
node.events=EPOLLIN|EPOLLET;
if((epoll_ctl(epfd,EPOLL_CTL_ADD,serverfd,&node))==-1)
{
perror("Epoll_Create call failed,epoll_ctl call failed");
exit(0);
}
printf("Epoll_Server Epoll Create success...\n");
return epfd;
}
Business_Accept.c
#include "../include/server.h"
void *Business_Accept(void *arg)
{
struct sockaddr_in addr;
socklen_t addrlen;
int sockfd=*(int *)arg;
int customerfd;
struct epoll_event node;
char response[1024];
char ip[16];
bzero(response,sizeof(response));
bzero(ip,sizeof(ip));
addrlen=sizeof(addr);
if((customerfd=accept(sockfd,(struct sockaddr*)&addr,&addrlen))==-1)
{
perror("Business_Accept accept call failed");
exit(0);
}
inet_ntop(AF_INET,&addr.sin_addr.s_addr,ip,16);
node.data.fd=customerfd;
node.events=EPOLLIN|EPOLLET;
if(epoll_ctl(epfd,EPOLL_CTL_ADD,customerfd,&node)==-1)
{
perror("Business_Accept epoll_ctl call failed");
exit(0);
}
sprintf(response,"hi Thread [%s] welcome to epoll demo",ip);
send(customerfd,response,strlen(response),MSG_NOSIGNAL);
return NULL;
}
print_thread.c
#include "../include/server.h"
void *print_thread(void*arg)
{
pthread_detach(pthread_self());
pool_t *ptr=(pool_t*)arg;
PTR=ptr;
struct sigaction act,oact;
act.sa_handler=sig_usr;
act.sa_flags=0;
sigemptyset(&act.sa_mask);
sigaction(SIGUSR1,&act,&oact);//设置捕捉
sigprocmask(SIG_SETMASK,&act.sa_mask,NULL);//解除屏蔽
while(ptr->thread_shutdown)
sleep(TIMEOUT);//等待信号
pthread_exit(NULL);
}
sig_usr.c
#include "../include/server.h"
void sig_usr(int n)
{
//显示一次阈值信息
printf("[Thread_Epoll_Server Info] alive[%d] busy[%d] Idel[%d] Cur[%d] Busy/Alive[%.2f%%] Alive/max[%.2f%%]\n",
PTR->thread_alive,PTR->thread_busy,PTR->thread_alive-PTR->thread_busy,PTR->cur,(double)PTR->thread_busy/PTR->thread_alive*100,(double)PTR->thread_alive/PTR->thread_max*100);
}
Producer_add_task.c
#include "../include/server.h"
int Producer_add_task(pool_t *p,bs_t bs)
{
if(p->thread_shutdown)
{
//上锁
pthread_mutex_lock(&lock);
while(p->cur==p->max)
{
pthread_cond_wait(&p->Not_Full,&lock);
if(!p->thread_shutdown)
{
pthread_mutex_unlock(&lock);
printf("thread shutdown 0,exiting...\n");
pthread_exit(NULL);
}
}
//添加一个业务
p->queue[p->front].business=bs.business;
p->queue[p->front].arg=bs.arg;
++p->cur;
p->front=(p->front+1)%p->max;
//解锁
pthread_mutex_unlock(&lock);
pthread_kill(ptid,SIGUSR1);
//唤醒一个消费者
pthread_cond_signal(&p->Not_Empty);
}
else
{
printf("thread shutdown 0,exiting...\n");
pthread_exit(NULL);
}
printf("Producer Thread [0x%x] Add Task Successfully,business_addr=%p\n",
(unsigned int)pthread_self(),bs.business);
return 0;
}
Net_init.c
#include "../include/server.h"
int Net_init(void)
{
int sockfd;
struct sockaddr_in sockAddr;
bzero(&sockAddr,sizeof(sockAddr));
sockAddr.sin_family=AF_INET;
sockAddr.sin_port=htons(8080);
sockAddr.sin_addr.s_addr=htonl(INADDR_ANY);
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket call failed");
exit(0);
}
if(bind(sockfd,(struct sockaddr*)&sockAddr,sizeof(sockAddr))==-1)
{
perror("bind call failed");
exit(0);
}
listen(sockfd,BACKLOG);
printf("Epoll_thread Server Net init Success...\n");
return sockfd;
}
is_thread_alive.c
#include "../include/server.h"
int is_thread_alive(pthread_t tid)
{
pthread_kill(tid,0);
if(errno==ESRCH)
return 0;
return 1;
}
main.c
#include "../include/server.h"
int main(void)
{
//主线程设置对SIGUSR1信号的屏蔽,继承给所有线程
sigset_t set,oset;
sigemptyset(&set);
sigaddset(&set,SIGUSR1);
sigprocmask(SIG_SETMASK,&set,&oset);
//启动接口start
int sockfd=Net_init();
epfd=Epoll_Create(sockfd);
pool_t *ptr=thread_pool_create(300,10,1000);
Epoll_Listen(sockfd,ptr);
if(!ptr->thread_shutdown)
thread_pool_destroy(ptr);
printf("Epoll Server Closing...\n");
return 0;
}
运行结果
在服务端刚启动后,有13个线程,一个生产者,十个消费者,一个管理者,一个输出线程