网络套接字(TCP)
- 一.编写TCP服务器
- 二.编写Tcp客户端
- 三.多进程
- 四.多线程版本
- 五.线程池版+完整源代码
- 六.使用示例
一.编写TCP服务器
1.先搭一个架子
2.创建sockfd
domain参数依然是AF_INET(因为是IPV4)
type方式选择SOCK_STREAM(提供可靠的连接方式)
protocol协议类型默认缺省0
当然创建套接字是有可能失败的,所以这里加了一个日志(lg)文件,如果失败就打印日志(下面也会有该日志的完整源代码),注意:也可以直接打印在显示器上,但用日志更规范。
3创建并填充结构体
这里需要注意的是,填port时要使用htons函数转换成网络字节序(也就是大端),同时ip也要从字符串类型转换成int类型(有多种函数,这里选择inet_aton)。具体原因可以看上一篇博客(Udp)。
4.绑定内核
参数:1.绑定的套接字,2.绑定的结构体,3.结构体大小。
5.进行监听
TCP与UDP不同的是,它是面向连接的,服务器一般是比较“被动”的,服务器一直要处于等待连接的状态,我们一般称为监听。
6.对main函数进行构建,使用命令行传参方式传入端口号
7.接着编写Start部分代码
主要分为两步:1.获取新连接,2.根据新连接来通信。
这里的三个参数不必多说,关键在它的返回值也是一个套接字。它与最开始创建的套接字的功能划分不同,返回的这个套接字才是主要提供服务的。而最开始的套接字是帮助创建监听的,所以一般称它为listensock_。
8.编写服务代码
由于Tcp是面向字节流的,所以我们可以直接使用文件操作函数进行操作。
9.进行阶段性测试
由于还没有写客户端,所以我们可以使用telnet,它可以指定服务远程登陆,ip:127.0.0.1表示本地环回,端口号8888。如果没有该命令,可以yum -y install telnet。注意:下面我将Main.cc文件编译成了Tcpserver,启动该文件,在后面跟上端口号8888.
接着ctrl+6+],然后回车,就可以发消息了。
退出:ctrl+6+],输入quit。
二.编写Tcp客户端
1.创建套接字
2.绑定
客户端需要进行绑定吗?答案是需要的,但是不需要像服务器一样显示的绑定。因为无论是Udp还是Tcp,它都是socket进行通信,一定需要ip+port来进行区别,这样多个主机间才能进行区分。对于服务器来说,这个端口号要确定,因为其它客户端要对服务器发消息,就需要知道服务器的端口号。对于客户端来说,这个端口号是多少不重要,重要的是唯一,因为客户端对服务器发消息后,服务器就能通过结构体知道你的端口号和ip,所以客户端不需要显示的绑定。
Udp是在第一次发消息时绑定,而Tcp在发起connect时进行绑定。
在生活中,我们通过各种广告推广或者商店来获取服务器的ip和port,在这里我们修改一下main函数来收到传入服务器的ip和port。
3.测试
这样算是完成了一个框架,但有一个问题是目前是单进程,也就是说同时只能有一个客户端,很明显不符合预期,接下来改成多进程版。
三.多进程
首先为什么要close(listensockfd_)呢?因为我们只需要父进程一个提供监听功能就行了。为什么要close(sockfd)呢?因为我们将具体的服务功能交给了子进程,所以父进程就不需要这个fd了。当然这并没有完成多进程版,因为父进程会一直阻塞,有多种解决办法,这里选择再创建孙子进程,让子进程直接释放,孙子进程交给OS管理,这样父进程就不会阻塞了。(不了解的可以看看博客进程等待)
但是这种创建进程的成本实在太高了,一般连个三四十个就无了,所以为了高性能,我们可以搞一个多线程版本。
四.多线程版本
在创建线程后,我们使用pthread_detach,将主线程和其它线程分离,这样在其它线程提供服务时,主线程依然能自主运行。(不了解该函数的可以看博客线程(二))
对于线程创建不熟悉的可以看看博客线程(一),这里因为线程函数是在类内创建的,会自带一个this指针,不符合参数要求,所以改成静态的。接下来给线程函数进行传参。(由于线程函数是静态的,不能直接使用类内函数,所以我们可以将Tcpserver类一并当参数传入)
显然创建线程依然有代价,服务器在运行时一直有用户进入退出,所以会不断创建销毁,那么我们如何将线程多次利用呢?下面来线程池版。
五.线程池版+完整源代码
具体线程池代码如何写,可以看我的博客线程池,这里就直接使用了。简单来说就是向线程池里不断push任务,然后多个线程去竞争任务。 可以使用该命令检测线程个数ps -aL | head -1 && ps -aL | grep 程序名。
注意:为了保证服务器不被长时间连接,当前服务是只提供一次,使用客户端发消息时,每发一条就会重新建立一个连接。
makefile
.PHONY:all
all:Tcpserver Tcpclient
Tcpserver:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
Tcpclient:Tcpclient.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f Tcpclient Tcpserver
log.hpp
#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
Log lg;
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_) // ???
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
std::cout << "log: singleton create done first!" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <string.h>
#include "log.hpp"
extern Log lg;
class Task
{
public:
Task(int sockfd, const std::string &clientip, const uint16_t &clientport)
: sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
{
}
Task()
{
}
void run()
{
// 测试代码,收一个消息再发回去
char buffer[4096];
ssize_t n = read(sockfd_, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say#" << buffer << std::endl;
std::string echo_string = "tcpserver echo#";
echo_string += buffer;
n=write(sockfd_, echo_string.c_str(), echo_string.size());
if(n<0)
{
lg(Warning, "write error,errorno:%d,errstring:%s", errno, strerror(errno));
}
}
else if (n == 0) // 结束服务
{
lg(Info, "quit,sever close sockfd:%d", sockfd_);
}
else // 出错
{
lg(Warning, "read error,sockfd:%d,client port:%d,client ip:%s", sockfd_, clientport_, clientip_.c_str());
}
close(sockfd_);
}
void operator()()
{
run();
}
~Task()
{
}
private:
int sockfd_;
std::string clientip_;
uint16_t clientport_;
};
Tcpserver.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <signal.h>
#include "Task.hpp"
#include "log.hpp"
#include "ThreadPool.hpp"
const int defaultfd = -1;
const std::string defaultip = "0.0.0.0";
const int backlog = 10; // 一般不要设置太大
// 枚举错误代码,方便查阅
enum
{
UseageError = 1,
SocketError,
BindError,
ListenError,
};
// //线程传参
// class ThreadData
// {
// public:
// ThreadData(int fd,const std::string &ip,uint16_t &p,TcpServer*tp): sockfd(fd),clientip(ip),clientport(p),tsvr(tp)
// {}
// public:
// int sockfd;
// std::string clientip;
// uint16_t clientport;
// TcpServer* tsvr;
// };
class TcpServer
{
public:
TcpServer(const uint16_t &port, const std::string &ip = defaultip) : listensockfd_(defaultfd), port_(port), ip_(ip)
{
}
void InitServer()
{
listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensockfd_ < 0) // 创建失败
{
lg(Fatal, "create listensockfd,errno:%d,errstring:%s", errno, strerror(errno));
exit(SocketError);
}
// 如果成功,打印成功信息
lg(Info, "create listensockfd sucess:%d", listensockfd_);
// 创建并填充结构体
struct sockaddr_in local; // 创建结构体
memset(&local, 0, sizeof(local)); // 将内部清零
local.sin_family = AF_INET; // 填充类型
local.sin_port = htons(port_); // 填充端口号
inet_aton(ip_.c_str(), &(local.sin_addr)); // 填充ip地址
// 绑定
if (bind(listensockfd_, (struct sockaddr *)&local, sizeof(local))) // 如果绑定失败
{
lg(Fatal, "bind listensockfd,errno:%d,errstring:%s", errno, strerror(errno));
exit(BindError);
}
// 监听
if (listen(listensockfd_, backlog) < 0) // 如果失败
{
lg(Fatal, "listen error,errno:%d,errstring:%s", errno, strerror(errno));
exit(ListenError);
}
lg(Info, "listen sucess,listensockfd:%d", listensockfd_);
}
// //线程函数
// static void* Routine(void*args)
// {
// pthread_detach(pthread_self());//分离线程
// ThreadData*td=static_cast<ThreadData*>(args);
// td->tsvr->Serverce(td->sockfd,td->clientip,td->clientport);//提供服务
// delete td;
// return nullptr;
// }
void Start()
{
signal(SIGPIPE,SIG_IGN);
ThreadPool<Task>::GetInstance()->Start();//启动线程池
lg(Info, "tcpServer if running.....");
while (true)
{
// 1.获取新连接
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
lg(Warning, "accept error,errorno:%d,errstring:%s", errno, strerror(errno));
continue;
}
// 接收客户端的port和ip
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET, &(client.sin_addr), clientip, sizeof(clientip));
// 2.根据新连接进行通信
lg(Info, "get a new link,sockfd:%d,client port:%d,client ip:%s", sockfd, clientport, clientip);
// 3.服务
//3.1 单进程版
//Serverce(sockfd, clientip, clientport);
//close(sockfd);
//3.2 多进程版
// pid_t id=fork();
// if(id==0)//子进程
// {
// close(listensockfd_);
// if(fork()>0) exit(0);//创建孙子进程,释放子进程
// Serverce(sockfd, clientip, clientport);//孙子进程提供服务
// close(sockfd);//服务完成,关闭服务
// exit(0);//退出进程
// }
// //父进程
// close(sockfd);
// pid_t rid=waitpid(id,nullptr,0);
// (void)rid;
//3.3多线程版本
// ThreadData*td=new ThreadData(sockfd,clientip,clientport,this);
// pthread_t tid;
// pthread_create(&tid,nullptr,Routine,td);
//3.4线程池版本
Task t(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(t);
}
}
// void Serverce(const int sockfd, const std::string &clientip, const uint16_t clientport)
// {
// // 测试代码,收一个消息再发回去
// while (true)
// {
// char buffer[4096];
// ssize_t n = read(sockfd, buffer, sizeof(buffer));
// if (n > 0)
// {
// buffer[n] = 0;
// std::cout << "buffer say#" << buffer << std::endl;
// std::string echo_string = "tcpserver echo#";
// echo_string += buffer;
// write(sockfd, echo_string.c_str(), echo_string.size());
// }
// else if(n==0)//结束服务
// {
// lg(Info,"quit,sever close sockfd:%d",sockfd);
// break;
// }
// else//出错
// {
// lg(Warning, "read error,sockfd:%d,client port:%d,client ip:%s", sockfd, clientport, clientip.c_str());
// break;
// }
// }
// }
~TcpServer()
{
}
private:
int listensockfd_; // 监听套接字
std::string ip_; // ip地址
uint16_t port_; // 端口号
};
Tcpclient.cc
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
void Usage(char *proc)
{
std::cout << "\n\rUsage:" << proc << "severip serverport" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(1);
}
// 获取服务器的ip和port
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
// 填充结构体
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
while (true)
{
// 创建套接字
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
return 1;
}
// 进行连接
int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0) // 如果连接失败
{
std::cerr << "connect error...." << std::endl;
return 2;
}
// 测试代码
std::string message;
std::cout << "Please Enter#";
getline(std::cin, message);
n = write(sockfd, message.c_str(), message.size());
if (n < 0)
{
std::cerr << "write error....." << std::endl;
continue;
}
char buffer[4096];
n = read(sockfd, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer << std::endl;
}
close(sockfd);
}
return 0;
}
Main.cc
#include"Tcpserver.hpp"
#include<memory>
void Usage(std::string proc)
{
std::cout<<"\n\rUsage:"<<proc<<"port[1024+]\n"<<std::endl;
}
int main(int argc,char*argv[])
{
if(argc!=2)//如果传参数不对,提醒一下
{
Usage(argv[0]);
exit(UseageError);
}
uint16_t port=std::stoi(argv[1]);
std::unique_ptr<TcpServer> tcp_svr(new TcpServer(port));
tcp_svr->InitServer();
tcp_svr->Start();
return 0;
}