TCP简介
TCP(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议。它位于OSI模型的第四层,主要为应用层提供数据传输服务。TCP通过三次握手建立连接,确保数据在发送和接收过程中的准确性和顺序性。
TCP的主要特点
- 可靠性:TCP通过序列号、确认应答、超时重传等机制保证数据可靠传输。
- 面向连接:通信双方在传输数据前需要建立连接,通信结束后释放连接。
- 流量控制:通过滑动窗口机制,TCP可以控制数据的发送速度,避免接收方缓冲区溢出。
- 拥塞控制:TCP可以根据网络状况调整发送速率,减少网络拥塞。
- 全双工通信:TCP连接允许数据在两个方向上同时传输。
TCP与UDP的比较
与TCP不同,用户数据报协议(UDP)是无连接的、不可靠的传输层协议。UDP适用于对实时性要求高、但可以容忍少量数据丢失的应用,如视频通话、在线游戏等。而TCP则适用于对数据完整性要求高的应用,如文件传输、电子邮件等。
函数介绍
socket()
int socket(int domain, int type, int protocol);
- 功能:创建一个新的套接字。
- 参数说明:
domain
:指定协议族,如AF_INET
(IPv4)或AF_INET6
(IPv6)。type
:指定套接字类型,如SOCK_STREAM
(TCP)或SOCK_DGRAM
(UDP),由于使用TCP协议,所以使用SOCK_STREAM,表示面向流的传输协议。protocol
:指定协议类型,通常为 0,表示使用默认协议。
- 返回值:成功时返回一个新的套接字描述符,失败时返回 -1。该套接字描述符本质与文件描述符一样,应用程序可以像读写文件一样用 read/write 在网络上收发数据。
bind()
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
- 功能:将套接字绑定到指定的地址和端口。服务器程序所监听的网络地址和端口号通常是固定不变的,客户端程序得知服务器程序的地址和端口号后就可以向服务器发起连接; 服务器需要调用 bind 绑定一个固定的网络地址和端口号,客户端无需手动绑定。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。addr
:指向包含地址和端口信息的sockaddr
结构的指针。addrlen
:sockaddr
结构的长度。由于struct sockaddr *是一个通用指针类型,addr 参数实际上可以接受多种协议的 sockaddr 结构体,而它们的长度各不相同,所以需要第三个参数 addrlen指定结构体的长度。
我们的程序中对
addr
参数是这样初始化的
:
struct sockaddr_in socket;
bzero(&socket,sizeof(socket)); //将整个结构体清零
socket.sin_family=AF_INET; //设置地址类型为 AF_INET
socket.sin_port=htons(SERVER_PORT); //设置端口号,如8080
socket.sin_addr.s_addr=inet_addr(SERVER_IP); //设置IP地址,
//如"127.0.0.1",如果地址为 INADDR_ANY, 这个宏表示本地的任意 IP
//地址,因为服务器可能有多个网卡,每个网卡也可能绑定多个 IP 地址,
//这样设置可以在所有的 IP 地址上监听, 直到与某个客户端建立了连接
//时才确定下来到底用哪个 IP 地址
- 返回值:成功时返回 0,失败时返回 -1。
listen()
int listen(int sockfd, int backlog);
- 功能:服务器开始监听连接请求。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。backlog
:指定连接请求队列的最大长度,最多允许有 backlog 个客户端处于连接等待状态, 如果接收到更多的连接请求就忽略。
- 返回值:成功时返回 0,失败时返回 -1。
int _listensockfd = socket(AF_INET, SOCK_STREAM, 0);// 创建socket
struct sockaddr_in socket;
bzero(&socket,sizeof(socket)); //将整个结构体清零
socket.sin_family=AF_INET; //设置地址类型为 AF_INET
socket.sin_port=htons(SERVER_PORT); //设置端口号,如8080
socket.sin_addr.s_addr=inet_addr(SERVER_IP);
int ret = bind(_listensockfd, (struct sockaddr *)&socket, sizeof(socket));
ret = listen(_listensockfd, BACKLOG);// 将socket设置为监听状态
accept()
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
- 功能:接受客户端的连接请求。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。addr
:指向用于存储客户端地址信息的sockaddr
结构的指针,如果给 addr 参数传NULL,表示不关心客户端的地址。addrlen
:指向sockaddr
结构长度的指针。
- 返回值:成功时返回一个新的套接字描述符,代表与客户端的连接,之后服务器和客户端之间的交流就通过该返回值进行,失败时返回 -1。
struct sockaddr_in peer;
socklen_t peerlen = sizeof(peer);
int sockfd = accept(_listensockfd, (struct sockaddr *)&peer, &peerlen);
connect()
int connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen);
- 功能:客户端连接到指定的服务器,connect 和 bind 的参数形式一致, 区别在于 bind 的参数是自己的地址, 而connect 的参数是对方的地址。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。serv_addr
:指向包含服务器地址信息的sockaddr
结构的指针。addrlen
:sockaddr
结构的长度。
- 返回值:成功时返回 0,失败时返回 -1。
// 填写网络信息
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());
// client 无需显示bind,connect连接时自动bind
int n = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
send()
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
- 功能:发送数据到已连接的套接字。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。buf
:指向要发送数据的缓冲区的指针。len
:要发送的数据的长度。flags
:通常为 0,表示默认的发送行为。
- 返回值:成功时返回发送的字节数,失败时返回 -1。
recv()
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
- 功能:从已连接的套接字接收数据。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。buf
:指向用于接收数据的缓冲区的指针。len
:缓冲区的长度。flags
:通常为 0,表示默认的接收行为。
- 返回值:成功时返回接收的字节数,失败时返回 -1。
close()
int close(int sockfd);
- 功能:关闭套接字。
- 参数说明:
sockfd
:由socket()
函数返回的套接字描述符。
- 返回值:成功时返回 0,失败时返回 -1。
案例 多线程远程命令执行
makefile
all: server client
server:TcpServermain.cc
g++ -o $@ $^ -std=c++17 -lpthread
client:TcpClient.cc
g++ -o $@ $^ -std=c++17
.PHONY:clean
clean:
rm -f server client
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex
{
public:
Mutex(const Mutex&)=delete;
const Mutex& operator=(const Mutex&)=delete;
Mutex()
{
pthread_mutex_init(&_lock,nullptr);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
void Lock()
{
pthread_mutex_lock(&_lock);
}
pthread_mutex_t * LockPtr()
{
return &_lock;
}
void Unlock()
{
pthread_mutex_unlock(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex& m)
:_mutex(m)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex& _mutex;
};
Cond.hpp
#pragma once
#include"Mutex.hpp"
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
void Wait(Mutex& mutex)
{
pthread_cond_wait(&_cond,mutex.LockPtr());
}
void Notify()
{
pthread_cond_signal(&_cond);
}
void NotifyAll()
{
pthread_cond_broadcast(&_cond);
}
private:
pthread_cond_t _cond;
};
Thread.hpp
#pragma once
#include <pthread.h>
#include <iostream>
#include <functional>
#include <string>
#include <unistd.h>
using namespace std;
using func_t = function<void(string)>;
static int number = 1;
enum STATUS
{
NEW,
RUNNING,
STOP
};
class Thread
{
private:
static void *Routine(void *arg)
{
Thread *t = static_cast<Thread *>(arg);
t->_func(t->_name);
return nullptr;
}
public:
Thread(func_t func)
: _func(func), _status(NEW), _joinable(true)
{
_name = "Thread-" + to_string(number++);
_pid = getpid();
}
bool Start()
{
if (_status != RUNNING)
{
_status = RUNNING;
int n = pthread_create(&_tid, nullptr, Routine, this);
if (n != 0)
{
return false;
}
return true;
}
return false;
}
bool Stop()
{
if (_status == RUNNING)
{
_status = STOP;
int n = pthread_cancel(_tid);
if (n != 0)
{
return false;
}
return true;
}
return false;
}
bool Join()
{
if (_joinable)
{
_status = STOP;
int n = pthread_join(_tid, nullptr);
if (n != 0)
{
return false;
}
return true;
}
return false;
}
void Detach()
{
_joinable = false;
pthread_detach(_tid);
}
string Name()
{
return _name;
}
private:
string _name;
pthread_t _tid;
pid_t _pid;
STATUS _status;
bool _joinable;
func_t _func;
};
ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
using thread_t = shared_ptr<Thread>;
const static int defaultnum = 5;
template <class T>
class ThreadPool
{
private:
bool IsEmpty() { return _taskq.empty(); }
void HandlerTask(string name)
{
cout << "线程: " << name << ", 进入HandlerTask的逻辑" << endl;
while (true)
{
// 1. 拿任务
T t;
{
LockGuard lockguard(_lock);
while (IsEmpty() && _isrunning)
{
_wait_num++;
_cond.Wait(_lock);
_wait_num--;
}
// 2. 任务队列为空 && 线程池退出了
if (IsEmpty() && !_isrunning)
break;
t = _taskq.front();
_taskq.pop();
}
// 2. 处理任务
t(); // 规定,未来所有的任务处理,全部都是必须提供t()方法!
}
cout << "线程: " << name << " 退出";
}
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功" << endl;
}
}
public:
static ThreadPool<T> *getInstance()
{
if (instance == NULL)
{
LockGuard lockguard(mutex);
if (instance == NULL)
{
cout << "单例首次被执行,需要加载对象..." << endl;
instance = new ThreadPool<T>();
instance->Start();
}
}
return instance;
}
void Equeue(T in)
{
LockGuard lockguard(_lock);
if (!_isrunning)
return;
_taskq.push(in);
if (_wait_num > 0)
_cond.Notify();
}
void Start()
{
if (_isrunning)
return;
_isrunning = true;
for (auto &thread_ptr : _threads)
{
cout << "启动线程" << thread_ptr->Name() << " ... 成功";
thread_ptr->Start();
}
}
void Wait()
{
for (auto &thread_ptr : _threads)
{
thread_ptr->Join();
cout << "回收线程" << thread_ptr->Name() << " ... 成功";
}
}
void Stop()
{
LockGuard lockguard(_lock);
if (_isrunning)
{
_isrunning = false; // 不工作
// 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
if (_wait_num > 0)
_cond.NotifyAll();
}
}
private:
vector<thread_t> _threads;
int _num;
int _wait_num;
std::queue<T> _taskq; // 临界资源
Mutex _lock;
Cond _cond;
bool _isrunning;
static ThreadPool<T> *instance;
static Mutex mutex; // 只用来保护单例
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = NULL;
template <class T>
Mutex ThreadPool<T>::mutex; // 只用来保护单例
InetAddr.hpp
#pragma once
#include <string>
#include <iostream>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
using namespace std;
class InetAddr
{
public:
InetAddr();
InetAddr(int port, string ip = "")
: _port(port), _ip(ip)
{
bzero(&_sockaddr, sizeof(_sockaddr));
_sockaddr.sin_family = AF_INET;
_sockaddr.sin_port = htons(_port);
if (_ip.empty())
_sockaddr.sin_addr.s_addr = INADDR_ANY;
else
_sockaddr.sin_addr.s_addr = inet_addr(_ip.c_str());
}
InetAddr(const struct sockaddr_in &sockaddr)
{
_port = ntohs(sockaddr.sin_port);
char buf[64];
_ip = inet_ntop(AF_INET, &sockaddr.sin_addr, buf, sizeof(buf));
}
bool operator==(const InetAddr &other)
{
return _ip == other._ip;
}
InetAddr operator=(const InetAddr &other)
{
_ip = other._ip;
_port = other._port;
_sockaddr = other._sockaddr;
return *this;
}
struct sockaddr *getSockaddr()
{
return (struct sockaddr *)&_sockaddr;
}
int getSockaddrLen()
{
return sizeof(_sockaddr);
}
const string &getIp()
{
return _ip;
}
int getPort()
{
return _port;
}
private:
string _ip;
int _port;
struct sockaddr_in _sockaddr;
};
Common.hpp
enum
{
SOCKET_ERROR=1,
BIND_ERROR,
LISTEN_ERROR,
ACCEPT_ERROR,
CONNECT_ERROR
};
TcpServer.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include <memory>
#include "Common.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"
using namespace std;
#define BACKLOG 8
using handler_t = function<string(string)>;
static const uint16_t gport = 8080;
class TcpServer
{
using task_t = function<void()>;
struct ThreadData
{
int sockfd;
TcpServer *self;
};
public:
TcpServer(handler_t handler, int port = gport)
: _handler(handler), _port(port), _isrunning(false)
{
}
void InitServer()
{
// 创建socket
_listensockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_listensockfd < 0)
{
cout << "socket error" << endl;
exit(SOCKET_ERROR);
}
cout << "socket create success,sockfd is: " << _listensockfd << endl;
// 填写IP端口
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
// bind
int ret = bind(_listensockfd, (struct sockaddr *)&local, sizeof(local));
if (ret < 0)
{
cout << "bind error" << endl;
exit(BIND_ERROR);
}
cout << "bind success" << endl;
// 将socket设置为监听状态
ret = listen(_listensockfd, BACKLOG);
if (ret < 0)
{
cout << "listen error" << endl;
exit(LISTEN_ERROR);
}
cout << "listen success" << endl;
}
void HandleRequest(int sockfd) // TCP为全双工通信
{
char buffer[1024];
while (true)
{
// int n = read(sockfd, buffer, sizeof(buffer) - 1);
int n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
cout << buffer << endl;
string cmd_result = _handler(buffer);
// write(sockfd, cmd_result.c_str(), cmd_result.size());
send(sockfd, cmd_result.c_str(), sizeof(cmd_result), 0);
}
else if (n == 0)
{
// 如果读取的值为0,说明client退出
cout << "client quit" << endl;
break;
}
else
{
// 读取失败
break;
}
}
close(sockfd);
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
// 获取新连接
struct sockaddr_in peer;
socklen_t peerlen = sizeof(peer);
int sockfd = accept(_listensockfd, (struct sockaddr *)&peer, &peerlen);
if (sockfd < 0)
{
cout << "accept error" << endl;
exit(ACCEPT_ERROR);
}
cout << "accept success,sockfd is: " << sockfd << endl;
InetAddr addr(peer);
cout << "client info: " << addr.getIp() << ":" << addr.getPort() << endl;
// 将任务交给线程池
ThreadPool<task_t>::getInstance()->Equeue(
[&]()
{
this->HandleRequest(sockfd);
});
}
}
void Stop()
{
_isrunning = false;
}
private:
int _listensockfd; // 监听socket
uint16_t _port;
bool _isrunning;
// 处理上层任务的入口
handler_t _handler;
};
TcpServermain.cc
#include "TcpServer.hpp"
#include "CommandExec.hpp"
int main()
{
Command cmd;
unique_ptr<TcpServer> server = make_unique<TcpServer>([&](string cmdstr)
{ return cmd.Execute(cmdstr); });
server->InitServer();
server->Start();
return 0;
}
TcpClient.cc
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <cstring>
using namespace std;
#include "Common.hpp"
//./client server_ip server_port
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage:./client server_ip server_port" << endl;
return 0;
}
string server_ip = argv[1];
int server_port = stoi(argv[2]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
cout << "socket create error" << endl;
exit(SOCKET_ERROR);
}
// 填写网络信息
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());
// client 无需显示bind,connect连接时自动bind
int n = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (n < 0)
{
cout << "connect error" << endl;
exit(CONNECT_ERROR);
}
string message;
while (true)
{
char buffer[1024];
cout << "input message: ";
getline(cin, message);
n = send(sockfd, message.c_str(), message.size(), 0);
if (n > 0)
{
int m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (m > 0)
{
buffer[m] = 0;
cout << buffer;
}
else
break;
}
else
break;
}
close(sockfd);
return 0;
}