操作系统采用 <客户端IP : 客户端端口> : <服务端IP : 服务端端口> 四元组来标识一条TCP连接。
所以要想实现百万连接:
第一种是服务器端只开启一个进程,然后使用很多个客户端进程绑定不同的客户端 ip 来连接,假设 20个ip * 5w(端口范围是最大65535,这里保守算5w)
第二种是服务器开启多个进程,这样客户端就可以只使用一个 ip 即可,原理类似。
实验说明
第一个实验
只是简单的创建一个连接,并使用epoll监听它的读事件
第二的实验
1 使用任意多个线程,
2 每个线程创建任意数量的客户端
3 每个线程都有一个独立的epoll
4 各个线程管理各自的epoll
5 每个epoll管理各自所在线程创建的客户端
6 每个线程绑定一个独立的IP地址,这个IP地址是通过下面的代码创建的,这里也是使用一个进程能够创建百万连接的关键。
snprintf(pd->ip_str, sizeof(pd->ip_str),"192.168.0.%d",host_index); DEBUG_INFO("pd->ip_str = %s",pd->ip_str); snprintf(cmd_str, sizeof(cmd_str),"sudo ip addr add %s/24 dev ens33 1>/dev/null 2>&1",pd->ip_str); DEBUG_INFO("cmd_str = %s",cmd_str); system(cmd_str);
第一个实验:单客户端测试
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#define _DEBUG_INFO
#ifdef _DEBUG_INFO
#define DEBUG_INFO(format, ...) printf("%s:%s:%d -- "format"\n" \
,__FILE__,__func__,__LINE__ \
, ##__VA_ARGS__)
#else
#define DEBUG_INFO(format, ...)
#endif
#define _DEBUG_PRINT
#ifdef _DEBUG_PRINT
#define DEBUG_PRINT(format,...) printf(format,##__VA_ARGS__)
#else
#define DEBUG_PRINT(format,...)
#endif
struct epoll_event evlist[10];
static char buf[1024 + 1];
int main(int argc, char **argv)
{
int s;
s = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
int connect(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
struct sockaddr_in server_addr;
server_addr.sin_port = htons(6600);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("192.168.0.5");
connect(s,(const struct sockaddr *)&server_addr,sizeof(server_addr));
int epfd = epoll_create(1);
struct epoll_event client_ev;
client_ev.data.fd = s;
client_ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD,s,&client_ev);
while(1){
int ready = epoll_wait(epfd,evlist,sizeof(evlist)/sizeof(evlist[0]),-1);
if(ready == -1){
if(errno == EINTR){
continue;
}
perror("epoll_wait");
exit(1);
}
for(int i=0;i<ready;i++){
int fd = evlist[i].data.fd;
DEBUG_INFO("fd = %d:events:%s%s%s",fd,
(evlist[i].events & EPOLLIN)?"EPOLLIN":"",
(evlist[i].events & EPOLLHUP)?"EPOLLHUP":"",
(evlist[i].events & EPOLLERR)?"EPOLLERR":"");
int read_len = read(fd,buf,sizeof(buf) - 1);
if(read_len == -1){
if(errno == EINTR){
continue;
}else{
perror("Read");
close(fd);
}
}
DEBUG_INFO("read_len: %d",read_len);
if(read_len == 0){
DEBUG_INFO("client disconnected");
close(fd);
}
buf[read_len] = '\0';
DEBUG_INFO("buf = %s",buf);
}
}
DEBUG_INFO("bye bye");
return 0;
}
使用调试助手创建服务器
使用调试助手发送数据,然后关闭服务器,测试结果:
第二个实验:客户端集群
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#define _DEBUG_INFO
#ifdef _DEBUG_INFO
#define DEBUG_INFO(format, ...) printf("%s:%s:%d -- "format"\n" \
,__FILE__,__func__,__LINE__ \
, ##__VA_ARGS__)
#else
#define DEBUG_INFO(format, ...)
#endif
#define _DEBUG_PRINT
#ifdef _DEBUG_PRINT
#define DEBUG_PRINT(format,...) printf(format,##__VA_ARGS__)
#else
#define DEBUG_PRINT(format,...)
#endif
// ip addr add 192.168.0.12/24 dev ens33
static char buf[1024 + 1];
#define DEFAULT_CLIENT_COUNT 20
#define HOST_NUM 40
struct private_data{
char ip_str[20];
char server_ip[20];
pthread_t thread_id;
int client_count;
struct epoll_event evlist[10000];
uint16_t port;
};
void * client_thread(void * arg){
struct epoll_event client_ev;
int epfd;
int client_socket;
struct private_data *pd = (struct private_data *)arg;
int res = 0;
epfd = epoll_create(1);
if(epfd < 0){
perror("epoll_create");
return NULL;
}
struct sockaddr_in server_addr;
server_addr.sin_port = htons(pd->port);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(pd->server_ip);//"192.168.0.5");
struct sockaddr_in client_addr;
client_addr.sin_port = 0;
client_addr.sin_family = AF_INET;
client_addr.sin_addr.s_addr = inet_addr(pd->ip_str);
for(int i = 0; i < pd->client_count; i++){
client_socket = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC,0);// | SOCK_NONBLOCK, 0);
if(client_socket < 0){
perror("socket");
exit(1);
}
res = bind(client_socket,(const struct sockaddr *)&client_addr,sizeof(client_addr));
if(res < 0){
perror("bind");
exit(1);
}
res = connect(client_socket,(const struct sockaddr *)&server_addr,sizeof(server_addr));
if(res < 0){
perror("connect");
exit(1);
}
client_ev.data.fd = client_socket;
client_ev.events = EPOLLIN;
res = epoll_ctl(epfd, EPOLL_CTL_ADD,client_socket,&client_ev);
if(res < 0){
perror("connect");
exit(1);
}
}
DEBUG_INFO("%s Connect %s:%d count=%d",
pd->ip_str,
pd->server_ip,
pd->port,
pd->client_count);
while(1){
int ready = epoll_wait(epfd,pd->evlist,sizeof(pd->evlist)/sizeof(pd->evlist[0]),-1);
if(ready == -1){
if(errno == EINTR){
continue;
}
perror("epoll_wait");
exit(1);
}
DEBUG_INFO("ready %d",ready);
for(int i=0;i<ready;i++){
int fd = pd->evlist[i].data.fd;
DEBUG_INFO("fd = %d:events:%s %s %s",fd,
(pd->evlist[i].events & EPOLLIN)?"EPOLLIN":"",
(pd->evlist[i].events & EPOLLHUP)?"EPOLLHUP":"",
(pd->evlist[i].events & EPOLLERR)?"EPOLLERR":"");
int read_len = read(fd,buf,sizeof(buf) - 1);
if(read_len == -1){
if(errno == EINTR){
continue;
}else{
perror("Read");
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
close(fd);
continue;
}
}
DEBUG_INFO("read_len: %d",read_len);
if(read_len == 0){
DEBUG_INFO("client disconnected");
close(fd);
goto OUT;
}
buf[read_len] = '\0';
DEBUG_INFO("buf = %s",buf);
}
}
OUT:
DEBUG_INFO("bye bye");
close(epfd);
free(pd);
return NULL;
}
int main(int argc, char **argv)
{
int s;
int res = 0;
int client_count = 10;
char cmd_str[1024];
int host_min_num = 12;
int max_host_num = 2;//HOST_NUM
int opt;
char server_ip[20] = "192.168.0.5";
static int thread_count = 1;
int port = 6600;
while((opt = getopt(argc,argv,"c:r:t:p:")) != -1){
switch (opt)
{
case 'r':
DEBUG_INFO("option:%c,server ip = %s",opt,optarg);
memcpy(server_ip,optarg,strlen(optarg));
server_ip[strlen(optarg)] = '\0';
break;
case 'c':
DEBUG_INFO("option:%c client_count = %s",opt,optarg);
client_count = atoi(optarg);
break;
case 'p':
DEBUG_INFO("option:%c client_count = %s",opt,optarg);
port = atoi(optarg);
break;
case 't':
DEBUG_INFO("option:%c thread_count = %s",opt,optarg);
thread_count = atoi(optarg);
break;
case ':':
DEBUG_INFO("option:%c needs a value",opt);
exit(0);
break;
case '?':
DEBUG_INFO("unkown option:%c needs a value",optopt);
break;
default:
break;
}
}
DEBUG_INFO("client_count = %d",client_count);
DEBUG_INFO("server_ip = %s",server_ip);
DEBUG_INFO("thread_count = %d",thread_count);
//创建40个ip地址192.168.0.12 - 192.168.0.51
for(int host_index = host_min_num; host_index < host_min_num + thread_count;host_index++){
struct private_data *pd = (struct private_data*)malloc(sizeof(struct private_data));
if(pd == NULL){
perror("malloc");
exit(0);
}
snprintf(pd->ip_str, sizeof(pd->ip_str),"192.168.0.%d",host_index);
DEBUG_INFO("pd->ip_str = %s",pd->ip_str);
snprintf(cmd_str, sizeof(cmd_str),"sudo ip addr add %s/24 dev ens33 1>/dev/null 2>&1",pd->ip_str);
DEBUG_INFO("cmd_str = %s",cmd_str);
system(cmd_str);
pd->client_count = client_count;
pd->port = port;
memcpy(pd->server_ip, server_ip, sizeof(server_ip));
res = pthread_create(&pd->thread_id, NULL,client_thread,pd);
if(res != 0){
perror("pthread_create");
exit(1);
}
}
while(1){
sleep(1);
}
return 0;
}
编译并运行:
sudo ./_build_/client -c 4 -r 192.168.0.5 -t 3
参数说明:
-c:每个线程创建的客户端数量
-r:远程服务器IP地址
-t:创建的线程数量
服务器调试工具:
在服务器上看到了12个客户端
3个线程,没个线程4个客户端。
下次,写一个能够接收很多客户端的服务器
例如
sudo ./_build_/client -c 25000 -r 192.168.0.5 -t 40
40个线程,每个线程25000个客户端。