写在前面
项目参考:https://github.com/qinguoyi/TinyWebServer
写作框架/图参考:https://blog.csdn.net/qq_52313711/article/details/136356042?spm=1001.2014.3001.5502
原本计划是,先将项目代码大概看一遍,然后再着手实现一下。但后面准备不搞C++了,项目看一半又很难受,于是想着把代码看完,项目懂个六七成就够了,剩下部分不是看看就能理解的,如果后面有时间(应该没有了),就着手实现一次,并且画一个项目的框架流程图。
看项目的时间差不多一个月吧,但是实际上是断断续续的,中间有各种事情。总之有C++网络编程基础的话,整这个项目应该花不了太多的时间。
试着在终端运行
先用git clone将项目拉到本地。
想起来玩虚拟机的时候是大二大三的时候,好久没碰过了。。,要是我能直接在本地(windows)系统上跑有多好啊,于是本着试一试的原则,放到vscode和clion上,都显示出识别不到sys/socket.
库的存在。
原来,这个头文件是用于用于网络编程的标准POSIX头文件,通常只在类Unix系统(如Linux、macOS)上存在。而Windows系统默认并不提供POSIX的网络编程接口(如sys/socket.h
)。
好,这个方法直接pass,还是得安装虚拟机,在Linux环境下使用。于是乎,安装了VM和Ubuntu22.04.4。详细可参考:VMware虚拟机安装Ubuntu教程(超详细),真的很详细按着安装就好了。
一切准备就绪后,开始配置项目所需的MySQL。(项目中MySQL是5版本的,而我是8的。这里有个小坑,但是也不难解决)
首先利用apt-get update
更新,再apt-get install mysql-server
下载mysql。
apt-get update
和apt-get upgrade
区别:前者是获取最新的软件包信息,后者会根据apt-get update
更新的包索引,安装软件包的最新版本。apt-get upgrade
只会升级那些可以安全升级的软件包,即它不会移除任何软件包或安装新的依赖项。
下载完毕后,使用sudo systemctl status mysql
可以看到正在运行中,这样就算成功了。顺便把mysql设置成开机自启动:systemctl enable mysql
。
这个项目需要创建一个mysql数据库,于是执行下面指令:
// 建立yourdb库
create database mydb;
// 创建user表
USE mydb;
CREATE TABLE user(
username char(50) NULL,
passwd char(50) NULL
)ENGINE=InnoDB;
// 添加数据
INSERT INTO user(username, passwd) VALUES('pzy', '123');
ENGINE=InnoDB是什么?存储引擎,即指定该表使用 InnoDB 存储引擎。MySQL 的不同存储引擎在数据存储和管理方面有不同的逻辑结构和实现细节。每种存储引擎设计的目标不同,比如有的方便查找,有的方便修改…
利用Xftp传输项目到Ubuntu上后,我们修改main.cpp
中的信息。注意跟上面添加的数据不用一样。上面是网页上的用户密码,下面这个是mysql访问的用户和密码。
//需要修改的数据库信息,登录名,密码,库名
string user = "root";
string passwd = "root";
string databasename = "mydb";
如果你的mysql版本太高,编译运行后会有个错误,具体可以去项目github地址中的issue了解。
真的是一坑接着一坑,前面还在思考,为什么我运行sh总会有: not found 2: 的提示。 于是我重新将build.sh重写了一遍,发现还是不行,甚至比前面多提示了”。 停止。没有规则可制作目标“server
。排除了半天,发现是Linux和Windows的换行字符是不同的。
Linux 下换行符是 “\n”。
windows 下换行符是 “\r\n”。“\r” 在vim中被解释为 “^M” 。
利用指令cat -v makefile
(-v表示显示不可打印字符)可以看到,在每一行后面都有^M的存在。
要解决这个问题,我们需要利用dos2unix
将换行字符转化为Unix格式。处理完毕后,项目可以正常运行了,下面将对项目中的代码和细节进行研究。
几个扩展问题:
1.如果使用cat -v 查看代码文件,发现也会有上述情况,为什么代码还可以正常运行?
答:因为编译器会处理这种情况。2.上面讲了,我重新把build.sh写了一遍,于是多了一个错误:
”。 停止。没有规则可制作目标“server
,这是为什么?
答:我们重新将文件传输到Ubuntu上,发现什么都没有问题。使用cat -v查看build.sh,如下图所示:
如果我们编辑了一下,继续查看:末尾被自动添加了一个换行符,WTF?所以导致server识别不到。
3.如果在liunx下直接git clone文件,会有这种编码问题吗?
答:这个没有实践,因为我的虚拟机上不了github,但是经过查阅,git是有这个功能的,可以自动转化编码。
试着用vscode运行
参考链接:VSCode 、Cmake、C++调试教程
首先,在Ubuntu上下载vscode,然后使用code指令(需要非root用户)打开vscode。
回顾一下之前的,我们在windows上运行简单的C文件。在Ubuntu上也是一样。也就是需要task.json
和launch.json
两个文件(c_cpp_properties.json
非必要),即可进行简单的运行和编译。
但是如果项目规模变大,这种方式显然不适合,我们可以利用CMake编译运行项目。
我们新建一个文件CMakeLists.txt
,然后在下面写入
# CMake 最低版本号要求
cmake_minimum_required(VERSION 3.10)
# first_cmake是项目名称,VERSION是版本号,DESCRIPTION是项目描述,LANGUAGES是项目语言
project(first_cmake
VERSION 1.0.0
DESCRIPTION "项目描述"
LANGUAGES CXX)
# 添加一个可执行程序,first_cmake是可执行程序名称,main.cpp是源文件
add_executable(first_cmake hello.cpp)
这里也分两种方式:命令行运行和vscode插件运行,这里给出第一种方式,更多请看参考链接。
# 第一步:配置,-S 指定源码目录,-B 指定构建目录
cmake -S . -B build
# 第二步:生成,--build 指定构建目录
cmake --build build
# 运行
./build/first_cmake
关于Make和CMake的内容,推荐B站up主:于仕琪的内容,通过几个例子循序渐进短时间入门Make和CMake。
项目运行起来了,可以开始代码分析了:
线程同步机制:Lock类的封装
RAII(Resource Acquisition is Initialization)
- RAII全称是“Resource Acquisition is Initialization”,直译过来是“资源获取即初始化”.
- 在构造函数中申请分配资源,在析构函数中释放资源。因为C++的语言机制保证了,当一个对象创建的时候,自动调用构造函数,当对象超出作用域的时候会自动调用析构函数。所以,在RAII的指导下,我们应该使用类来管理资源,将资源和对象的生命周期绑定
- RAII的核心思想是将资源或者状态与对象的生命周期绑定,通过C++的语言机制,实现资源和状态的安全管理,智能指针是RAII最好的例子。
线程池的实现需要依靠锁机制以及信号量机制来实现线程同步,保证操作的原子性。
简简单单,就一个头文件,下面对代码进行分析。
首尾是预处理指令,用于文件包含保护。
接着引入三个头文件:
#include <pthread.h>
:提供 POSIX 线程(pthread)编程的支持,用于在 Unix/Linux 系统上进行多线程编程。
#include <exception>
: 提供了与异常处理相关的类和函数。
#include <semaphore.h>
:提供信号量(semaphore)操作的函数,用于线程间同步和控制对共享资源的访问。
最后定义了三个类:
信号量、互斥锁和条件变量
信号量和条件变量的异同:信号量是基于计数器的同步机制,而条件变量是基于条件的同步机制。
关于这三者详细的关系:(GPT)
信号量(Semaphore)和条件变量(Condition Variable)都是多线程编程中用来同步线程的机制,但它们的工作原理和适用场景有所不同。
信号量(Semaphore)
- 定义: 信号量是一个计数器,用于控制对共享资源的访问。它可以用来限制同时访问某一共享资源的线程数量。
- 类型:
- 计数信号量(Counting Semaphore): 允许多个线程访问共享资源,计数器表示当前可以访问资源的线程数。
- 二进制信号量(Binary Semaphore): 类似于互斥锁(mutex),只有两个状态(0或1),控制一个线程对共享资源的独占访问。
- 工作原理:
- 等待(wait / P操作): 如果计数器大于0,线程可以继续执行并将计数器减1;否则,线程会被阻塞直到计数器大于0。
- 信号(signal / V操作): 线程执行完共享资源的操作后,计数器加1,如果有线程被阻塞,则唤醒其中一个。
- 适用场景: 控制对有限资源的访问,或者实现简单的线程同步。
条件变量(Condition Variable)
- 定义: 条件变量是与互斥锁结合使用的同步原语,用于线程间的通信,通常用来等待特定的条件变为真。
- 工作原理:
- 等待(wait): 线程在获取互斥锁后,进入条件变量的等待状态,并自动释放互斥锁,直到被通知(或超时)后重新获取互斥锁继续执行。
- 通知(notify / signal): 通知一个等待该条件变量的线程继续执行。
- 通知所有(notify_all / broadcast): 通知所有等待该条件变量的线程继续执行。
- 适用场景: 用于等待某个条件成立(如队列非空、资源可用等),以及实现更复杂的线程同步逻辑。
异同点
- 相同点:
- 都用于线程间的同步。
- 都可以阻塞线程,并在某个条件满足后唤醒线程。
- 不同点:
- 信号量是基于计数器的同步机制,而条件变量是基于条件的同步机制。
- 信号量可以独立使用,而条件变量通常需要与互斥锁(mutex)一起使用。
- 信号量用于控制对共享资源的访问数量,而条件变量主要用于等待某个条件的发生。
总结来说,信号量适合控制资源的并发访问,而条件变量更适合协调线程间的条件同步。
信号量
#include <pthread.h>
#include <exception>
#include <semaphore.h>
class sem
{
public:
sem()
{
// 初始化信号量:int sem_init(sem_t *sem, int pshared, unsigned int value);
// sem: 指向信号量对象的指针。
// pshared: 指定信号量的类型。如果为 0,则信号量在线程间共享;如果为非零值,则信号量在进程间共享。
// value: 信号量的初始值
if (sem_init(&m_sem, 0, 0) != 0) // 返回值:是否成功
{
throw std::exception();
}
}
sem(int num)
{
if (sem_init(&m_sem, 0, num) != 0)
{
throw std::exception();
}
}
~sem()
{
sem_destroy(&m_sem);
}
bool wait()
{
return sem_wait(&m_sem) == 0; // 等待信号量,P操作
}
bool post()
{
return sem_post(&m_sem) == 0; // 释放信号量,V操作
}
private:
sem_t m_sem; // 是由 <semaphore.h> 头文件定义的一个数据类型,用于表示一个信号量。
};
互斥锁
class locker
{
public:
locker()
{
// int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
// mutex: 指向 pthread_mutex_t 类型的指针,用于存储互斥锁对象
// attr: 互斥锁属性对象,传 NULL 表示使用默认属性。
if (pthread_mutex_init(&m_mutex, NULL) != 0)
{
throw std::exception();
}
}
~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool lock()
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock()
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
// 获得锁,一般用在调用条件变量类中的wait和timewait
pthread_mutex_t *get()
{
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
条件变量(与互斥锁结合使用)
class cond
{
public:
cond()
{
if (pthread_cond_init(&m_cond, NULL) != 0)
{
throw std::exception();
}
}
~cond()
{
pthread_cond_destroy(&m_cond);
}
// 可以看到这里用到互斥锁m_mutex
bool wait(pthread_mutex_t *m_mutex)
{
int ret = 0;
// int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
//函数用于等待目标条件变量.该函数调用时需要传入 mutex参数(加锁的互斥锁) ,函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,互斥锁会再次被锁上. 也就是说函数内部会有一次解锁和加锁操作.
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t)
{
int ret = 0;
// 该方法与 wait 类似,但增加了一个超时时间 t。
// 如果在指定的时间内没有收到信号,pthread_cond_timedwait 将返回超时错误。
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
问题
1.如何解决虚假唤醒问题。还有,什么是唤醒丢失?
可以看一下这个博客:C++并发编程之:条件变量(信号丢失、虚假唤醒)
注意:上面只是为了讲明白虚假唤醒和唤醒丢失。在实际代码中,if和whiled对productflag判断时,都是要独立操作的,也就是需要上锁的。这也就是为什么条件变量和互斥锁要一起搭配了。条件变量提供等待和唤醒等操作,可以通过一个变量/条件来判断并执行等待or唤醒,而互斥锁用来保障这个变量/条件在判断时候具有原子性。
虚假唤醒是指线程在没有接收到真正的条件通知时被唤醒。这种唤醒可能是由于操作系统或线程库内部的某些机制引起的,而不是因为条件变量的实际条件发生了变化。
唤醒丢失是指条件变量的通知(唤醒)信号被丢失,即某些线程可能没有被唤醒,因为条件变量的通知在它们等待之前已经发出。
2.能否用信号量代替条件变量?也就是说条件变量和信号量的区别是什么?
还是上面的参考博客,加上GPT的回答:
3.条件变量在使用时,代码作者注释掉了(issue上说是为了简化),需要在外面加锁,这是为什么?(会出现线程安全问题和竞态条件,需要解决,如何解决?)
竞态条件(Race Condition)是一种并发编程中的错误情况,发生在多个线程或进程在没有正确同步的情况下同时访问和操作共享资源时。由于这些线程或进程的执行顺序和时间是不确定的,可能会导致程序产生不可预测的行为或错误结果。
指的是这块代码
bool wait(pthread_mutex_t *m_mutex)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
issue:嗯,我记得之前看到哪块代码调用了wait,并且那块逻辑需要加锁,为了避免在调用wait前加一次锁进入wait又加一次锁的双重锁死锁问题,所以作者把里面的锁给注释了。主要是这个普通的mutex不支持重复加锁,这一块的逻辑不是很严谨,但是应该作者为了代码易懂,所以没用高级锁
同步/异步日志系统:LOG的封装
日志的实现有两种,一种是同步日志,一种是异步日志;
同步日志:日志写入函数与工作线程串行执行,由于涉及I/O操作,同步日志会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在访问峰值时,写日志可能会成为系统的瓶颈
异步日志:将工作线程所写的日志内容先存入阻塞队列,写线程从阻塞队列中取出内容,写入日志。
在异步日志中,每个工作线程当有日志需要处理时,将所需写的内容所在内存加入一个阻塞队列,然后就不管了。而日志系统会单独分配一个写线程,不断地从阻塞队列中获得任务并写入日志文件中。
从上面地日志工作流程描述中我们可以发现,这是一个典型的生产者-消费者模型。其中工作线程时生产,写线程是消费者。
对于生产者消费者模型,临界区(缓冲区)就是一个队列,这个项目采用的是循环队列来实现。
阻塞队列
阻塞队列类中封装了生产者-消费者模型,其中push成员是生产者,pop成员是消费者。
我们来分析一下log/block_queue.h的代码。
/*************************************************************
*循环数组实现的阻塞队列,m_back = (m_back + 1) % m_max_size;
*线程安全,每个操作前都要先加互斥锁,操作完后,再解锁
**************************************************************/
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "../lock/locker.h"
using namespace std;
// 这里用到模板类,忘记的可以出查一下C++模板的使用。
template <class T>
class block_queue
{
public:
// 初始化阻塞队列
block_queue(int max_size = 1000)
{
if (max_size <= 0)
{
exit(-1);
}
m_max_size = max_size;
// 这似乎违背了通常的头文件规则:“不要放置分配存储空间的任何东西”,但是这是模板类。
m_array = new T[max_size];
m_size = 0;
// 初始时,头尾指向同一个地址。满队列的时候,也是同一个地址,但是判断队满队空的时候使用m_size来判断
m_front = -1;
m_back = -1;
}
// 清空队列
void clear()
{
m_mutex.lock();
m_size = 0;
m_front = -1;
m_back = -1;
m_mutex.unlock();
}
//队列析构
~block_queue()
{
m_mutex.lock();
if (m_array != NULL)
delete [] m_array;
m_mutex.unlock();
}
//判断队列是否满了
bool full()
{
m_mutex.lock();
if (m_size >= m_max_size)
{
m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}
//判断队列是否为空
bool empty()
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}
//返回队首元素
bool front(T &value)
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return false;
}
value = m_array[m_front];
m_mutex.unlock();
return true;
}
//返回队尾元素
bool back(T &value)
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return false;
}
value = m_array[m_back];
m_mutex.unlock();
return true;
}
int size()
{
int tmp = 0;
m_mutex.lock();
tmp = m_size;
m_mutex.unlock();
return tmp;
}
int max_size()
{
int tmp = 0;
// 这里个人感觉没必要加锁,直接返回最大值即可
m_mutex.lock();
tmp = m_max_size;
m_mutex.unlock();
return tmp;
}
//往队列添加元素,需要将所有使用队列的线程先唤醒
//当有元素push进队列,相当于生产者生产了一个元素
//若当前没有线程等待条件变量,则唤醒无意义
bool push(const T &item)
{
m_mutex.lock();
//若队满,广播唤醒,退出
if (m_size >= m_max_size)
{
m_cond.broadcast();
m_mutex.unlock();
return false;
}
m_back = (m_back + 1) % m_max_size;
m_array[m_back] = item;
m_size++;
m_cond.broadcast();
m_mutex.unlock();
return true;
}
//pop时,如果当前队列没有元素,将会等待条件变量
bool pop(T &item)
{
m_mutex.lock();
//若队空,等待资源
while (m_size <= 0)
{
if (!m_cond.wait(m_mutex.get()))
{
m_mutex.unlock();
return false;
}
}
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}
//增加了超时处理
bool pop(T &item, int ms_timeout) // ms_timeout:单位:毫秒
{
//struct timeval {
// time_t tv_sec; /* 秒 */
// suseconds_t tv_usec; /* 微秒 */
//};
struct timespec t = {0, 0}; //timespec比timeval更精确,提供了纳秒 tv_nssec
struct timeval now = {0, 0};
gettimeofday(&now, NULL); // 获取当前时间,并将其存储在 timeval 结构中,NULL表示不关心时区信息
m_mutex.lock();
if (m_size <= 0)
{
t.tv_sec = now.tv_sec + ms_timeout / 1000; // 毫秒转化为秒
t.tv_nsec = (ms_timeout % 1000) * 1000; // 毫秒转化为纳秒
if (!m_cond.timewait(m_mutex.get(), t))
{
m_mutex.unlock();
return false;
}
}
// 这一步应该是多余的,m_size在锁内已经判断过了。
/*
if (m_size <= 0)
{
m_mutex.unlock();
return false;
}
*/
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}
private:
locker m_mutex;
cond m_cond; // 条件变量与互斥锁配合使用
T *m_array;
int m_size;
int m_max_size;
int m_front;
int m_back;
};
#endif
单例模式
在本项目中,日志系统的创建采用的是单例模式。下面介绍单例模式的概念以及代码。
单例模式作为最常用的设计模式之一,保证一个类仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
实现思路:私有化它的构造函数,以防止外界创建单例类的对象;使用类的私有静态指针变量指向类的唯一实例,并用一个公有的静态方法获取该实例。
单例模式有两种实现方法,分别是懒汉和饿汉模式。顾名思义,懒汉模式,即非常懒,不用的时候不去初始化,所以在第一次被使用时才进行初始化;饿汉模式,即迫不及待,在程序运行时立即初始化。
经典的线程安全懒汉模式
单例模式(懒汉)的代码如下:
// .h头文件如下所示
class single{
private:
//私有静态指针变量指向唯一实例
static single *p;
//静态锁,是由于静态函数只能访问静态成员
static pthread_mutex_t lock;
//私有化构造函数
single(){
pthread_mutex_init(&lock, NULL);
}
~single(){}
public:
//公有静态方法获取实例
static single* getinstance();
};
// .cpp文件如下所示
pthread_mutex_t single::lock; //类内声明完静态变量后,一定要在类外初始化(至少要定义)。因为所有对象共享静态数据成员,则静态数据成员必须要在类外分配到一块内存(相当于全局变量了,存于全局区),而类只是数据类型,内部是无法占有内存的。在编译阶段 就需要为它们分配内存空间,以便在整个程序生命周期内都可以使用。
single* single::p = NULL;
single* single::getinstance(){
if(NULL == p) {
pthread_mutex_lock(&lock);
if (NULL == p) {
p = new single;
}
pthread_mutex_unlock(&lock);
}
return p;
}
好,问题来了。为什么要用双检测,只检测一次不行吗?
双检测锁
如果只检测一次,如下:
single* single::getinstance(){ if(NULL == p) { pthread_mutex_lock(&lock); p = new single; pthread_mutex_unlock(&lock); } return p; }
当线程1经过判断,在获取实例之前,有线程2也通过了判断,这个时候会有多次获取实例的可能。
或者:
single* single::getinstance(){ pthread_mutex_lock(&lock); if (NULL == p) { p = new single; } pthread_mutex_unlock(&lock); return p; }
这样在每次调用获取实例的方法时,都需要加锁,这将严重影响程序性能。
总结:双层检测可以有效避免这种情况,仅在第一次创建单例的时候加锁,其他时候都不再符合NULL == p的情况,直接返回已创建好的实例。它可以避免不必要的锁定开销,同时确保线程安全。
局部静态变量之线程安全懒汉模式
当然,还有更加优雅的版本。(适用于C++11之后)
class single{
private:
single(){}
~single(){}
public:
static single* getinstance();
};
single* single::getinstance() {
static single obj; //注意这个局部静态变量,和上面的静态变量的区别是“不是在编译时分配空间,而是在运行时第一次执行到定义它的函数时分配空间并初始化。”
return &obj;
}
问题:该方法不加锁会不会造成线程安全问题?
其实,C++0X以后,要求编译器保证内部静态变量的线程安全性,故C++0x之后该实现是线程安全的,C++0x之前仍需加锁,其中C++0x是C++11标准成为正式标准之前的草案临时名字。
所以,如果使用C++11之前的标准,还是需要加锁,这里同样给出加锁的版本。
不过本人觉得这样写反而更不优雅了,而且每次调用获得实例还得占用锁资源。
class single{
private:
static pthread_mutex_t lock;
single() {
pthread_mutex_init(&lock, NULL);
}
~single(){}
public:
static single* getinstance();
};
pthread_mutex_t single::lock;
single* single::getinstance(){
pthread_mutex_lock(&lock);
static single obj;
pthread_mutex_unlock(&lock);
return &obj;
}
}
饿汉模式
饿汉模式不需要用锁,就可以实现线程安全。原因在于,在程序运行时就定义了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不需要在获取实例的成员函数中加锁。
class single{
private:
static single* p;
single(){}
~single(){}
public:
static single* getinstance();
};
single* single::p = new single(); //迫不及待,在程序运行时立即初始化
single* single::getinstance(){
return p;
}
饿汉模式虽好,但其存在隐藏的问题,在于非静态对象(函数外的static对象)在不同编译单元中的初始化顺序是未定义的。如果在初始化完成之前调用 getInstance() 方法会返回一个未定义的实例。
例子:
/* A.cpp */
class GlobalObjectA {
public:
GlobalObjectA() {
std::cout << "GlobalObjectA is initialized." << std::endl;
useSingleton();
}
};
GlobalObjectA globalA; // 非局部静态对象
void useSingleton() {
Singleton::getInstance().doSomething();
}
/* B.cpp */
class GlobalObjectB {
public:
GlobalObjectB() {
std::cout << "GlobalObjectB is initialized." << std::endl;
}
};
GlobalObjectB globalB; // 非局部静态对象
Singleton& Singleton::getInstance() {
static Singleton instance; // 饿汉模式
return instance;
}
void Singleton::doSomething() {
std::cout << "Singleton is used." << std::endl;
}
/*Singleton.h*/
class Singleton {
public:
static Singleton& getInstance();
void doSomething();
private:
Singleton() = default; // 私有构造函数
};
如果先调用A.cpp,再调用B.cpp,会导致执行期间发生错误。初始化globalA,调用了构造函数,调用单例模式类。但单例类的初始化在B.cpp中。也就是说:如果 Singleton
实例依赖于其他尚未初始化的全局静态对象(如 globalB
),可能会导致未定义行为。
题外话:看到这段代码的时候,我想,如果两个终端运行类似上面这个代码,变量会不会时共享的?
于是写了个test代码:
class A { static int a; public: static void print_a(){ a++; cout<<a<<endl; } }; int A::a = 10;
int main() { A::print_a(); getchar(); return 0; }
我希望看到的结果时,两个终端运行后,a值是不同的(因为a++了),但是实际运行都是一样的。经过查阅后,这是每个程序运行实例之间的内存隔离导致的。在多个终端中运行同一个程序实际上是启动了多个独立的程序实例,这些实例的内存相互独立,因此静态变量
a
在每个实例中是独立的,导致它们的值是相同的,而不是共享的。
继续提问:如果像让这些实例互相通信,怎么做?
这个实际上就是进程间通信(IPC, Inter-Process Communication)的内容了。
LOG类的封装
经过上面单例模式的讲解,我们可以对项目代码log.h和log.cpp进行分析。
log.h
#ifndef LOG_H
#define LOG_H
#include <stdio.h> // FILE操作
#include <iostream>
#include <string>
#include <stdarg.h> // 可变参数的使用
#include <pthread.h>
#include "block_queue.h"
using namespace std;
class Log
{
public:
//C++11以后,使用局部变量懒汉不用加锁
static Log *get_instance()
{
static Log instance;
return &instance;
}
// 上面是放回指针的形式,当然也可以放回实例,但是要注意不要触发到对象的拷贝构造,这样就不止一个实例了,违背单例模式原则。
/*
比如,下面这种就不行:
static Log get_instance()
{
static Log instance;
return instance;
}
这样就可以:
static Log& get_instance() {
static Log instance;
return instance;
}
*/
// 问题:为什么flush_log_thread这个函数需要static修饰。
// 答:它作为一个线程入口函数使用,而线程入口函数的签名要求是:void* (*)(void*)。大多数线程库(如POSIX线程库)要求线程入口函数必须是一个全局函数或静态成员函数。因为普通的成员函数隐含着一个this指针作为参数,而this指针指向类的实例,无法匹配线程库所需的函数签名。
static void *flush_log_thread(void *args)
{
Log::get_instance()->async_write_log();
}
// 问题:那为什么下面这些成员函数不用static进行修饰?
// 答:因为它会用到类中的非静态成员,而静态成员函数不能访问类中的非静态变量。
// 追问:那如果把类中的非静态变量全都变成静态的呢?
// 答:如果这样设置,这个类就变成了静态工具类。 相比之下,单例模式可以控制初始化时机,而最主要的区别是单例模式可继承,能实现多态。
// 可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
bool init(const char *file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
void write_log(int level, const char *format, ...);
void flush(void);
private:
Log();
virtual ~Log(); // 确保当你通过基类指针或引用删除一个派生类对象时,能够正确调用派生类的析构函数。这是一种良好的编程习惯,尤其是在涉及继承的情况下。
void *async_write_log()
{
string single_log;
//从阻塞队列中取出一个日志string,写入文件
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp); //将single_log.c_str()写入m_fp中。
// 在stdio.h中,int fputs(const char *str, FILE *stream);
// str: 要写入文件的字符串。它是一个以 null 结尾的字符数组(char*)。
// stream: 指定要写入的文件流(FILE*)。这个文件流通常是通过 fopen 打开的文件指针。
m_mutex.unlock();
}
}
private:
char dir_name[128]; //路径名
char log_name[128]; //log文件名
int m_split_lines; //日志最大行数
int m_log_buf_size; //日志缓冲区大小
long long m_count; //日志行数记录
int m_today; //因为按天分类,记录当前时间是那一天
FILE *m_fp; //打开log的文件指针,FILE在stdio.h中。
char *m_buf;
block_queue<string> *m_log_queue; //阻塞队列
bool m_is_async; //是否同步标志位
locker m_mutex;
int m_close_log; //关闭日志
};
/*
定义了四个宏,简化调用。其中__VA_ARGS__表示variadic arguments,是 C/C++ 预处理器提供的一种方式,允许你在宏中传递可变数量的参数。
## 的作用是如果没有传递可变参数,它会去掉前面的逗号,以避免语法错误。
例子:
LOG_DEBUG("This is a debug message: %d", 42);
会展开为:
if (0 == m_close_log) {
Log::get_instance()->write_log(0, "This is a debug message: %d", 42);
Log::get_instance()->flush();
}
*/
#define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();}
#endif
如果看完代码,应该有个问题:**静态类和单例模式的区别是什么?**上面代码没有用到继承,按理说直接把变量方法啥的全都变成静态不久可以了?
这里分享一下知乎老哥的话:好吧,如果你说你的单例完全不会出现继承的情况,是不是就不需要写成Meyers’ Singleton? 我只想说,如果你一定要强加这么多限定的话,那么这种设计模式的讨论本身就没有意义。就很像是在说:我自己能够保证每个new出来的指针我都能delete掉它,所以我不需要RAII……。
virtual ~Log(); 析构函数声明为虚函数意义是什么?
确保正确的资源释放。比如下面的代码:
class Base {
public:
virtual ~Base() { std::cout << "Base Destructor\n"; }
};
class Derived : public Base {
public:
~Derived() { std::cout << "Derived Destructor\n"; }
};
Base* obj = new Derived();
delete obj; // 正确调用 Derived 和 Base 的析构函数
如果 Base
类的析构函数没有声明为 virtual
,当通过 Base*
指针删除 Derived
对象时,只会调用 Base
的析构函数,而不会调用 Derived
的析构函数,可能导致资源泄漏或其他未定义行为。
总之,在日志系统中,Log
类的析构函数被声明为 virtual
,是为了在将来可能扩展该类时,确保基类和派生类的析构函数能够正确调用,避免潜在的资源管理问题。这是一种防御性编程的习惯,即使当前类没有被继承的需求,这样的声明也是一种良好的编程实践。
async_write_log放在类内实现的原因是什么?
在 C++ 中,类内部定义的成员函数在编译时通常会被视为内联函数,因为它们的实现直接嵌入到类定义中。即使不显式使用 inline
关键字,编译器也可能将这些成员函数内联。内联成员函数可以提高性能,减少函数调用开销,但也可能导致代码膨胀和增加编译时间。
log.cpp
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <stdarg.h>
#include "log.h"
#include <pthread.h>
using namespace std;
Log::Log()
{
m_count = 0; // 日志行数记录为0
m_is_async = false; // 初始为同步
}
Log::~Log()
{
if (m_fp != NULL)
{
fclose(m_fp);
}
}
//异步需要设置阻塞队列的长度,同步不需要设置
bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size)
{
//如果设置了max_queue_size,则设置为异步
if (max_queue_size >= 1)
{
m_is_async = true;
m_log_queue = new block_queue<string>(max_queue_size);
pthread_t tid;
//flush_log_thread为回调函数,这里表示创建线程异步写日志。PS:回调函数是通过函数指针实现的。回调函数允许你在运行时动态指定要调用的函数。
pthread_create(&tid, NULL, flush_log_thread, NULL);
// 上面功能:如果是异步,则创建一个线程负责异步写文件。
}
m_close_log = close_log;
m_log_buf_size = log_buf_size;
m_buf = new char[m_log_buf_size];
memset(m_buf, '\0', m_log_buf_size);
m_split_lines = split_lines;
time_t t = time(NULL); // 表示从 1970 年 1 月 1 日午夜(UTC)到当前时间的秒数。
struct tm *sys_tm = localtime(&t); // struct tm: 是一个结构体,用于表示时间和日期的各个组成部分,包括年份、月份、日期、小时、分钟和秒等。
struct tm my_tm = *sys_tm; // 拷贝一个独立的值出来,赋在my_tm上。
/*日志命名*/
// char *strrchr(const char *str, int c); 这里第二个参数是int的原因:在 C 语言中,字符通常被处理为 int 类型的值,这是为了兼容不同的字符集和编码方式。在 C 标准库的早期版本中,char 类型通常用于字符操作,但由于 char 类型的限制(例如仅 8 位),int 类型的使用可以提供更大的灵活性和兼容性。这种设计选择也反映在其他标准库函数中,例如 fgetc 和 fputc,它们也使用 int 类型来处理字符。
// 返回一个指向字符串str中最后一个出现的字符 c 的指针
const char *p = strrchr(file_name, '/');
char log_full_name[256] = {0};
if (p == NULL)
{
// snprintf用于将格式化的数据写入字符串中。
// int snprintf(char *str, size_t size, const char *format, ...);
// 目标字符数组的指针str, size_t(无符号整数类型),size表示写入最大字符数,如果超过会被截断。format表示格式化形式,...表示格式化的内容。 看下面操作就知道了,类似printf
snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
}
else
{
// strcpy 和 strncpy都是字符串复制函数。但是也有区别:
// strcpy不会检查目标缓冲区大小,可能导致溢出。strcpy 会将源字符串的所有字符(包括终止的空字符 '\0')复制到目标数组中.
// strncpy 允许你指定要复制的最大字符数 n,从而防止缓冲区溢出。它会复制最多 n 个字符,如果源字符串的长度少于 n,strncpy 会在目标数组中填充空字符 '\0' 直到达到 n。 如果 src 的长度等于或超过 n,strncpy 不会在 dest 中添加终止的空字符。这可能导致 dest 不是以空字符结尾的,因此在使用时需要小心。
strcpy(log_name, p + 1); // char *strcpy(char *dest, const char *src);
strncpy(dir_name, file_name, p - file_name + 1); // char *strncpy(char *dest, const char *src, size_t n);
// 这里应该写漏了一个,因为strncpy不会自动补全\0,所以进行操作dir[p - file_name + 1] = '\0';
snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
}
m_today = my_tm.tm_mday;
m_fp = fopen(log_full_name, "a");
if (m_fp == NULL)
{
return false;
}
return true;
}
void Log::write_log(int level, const char *format, ...)
{
struct timeval now = {0, 0};
gettimeofday(&now, NULL);
time_t t = now.tv_sec;
struct tm *sys_tm = localtime(&t);
struct tm my_tm = *sys_tm;
char s[16] = {0};
switch (level)
{
case 0:
strcpy(s, "[debug]:");
break;
case 1:
strcpy(s, "[info]:");
break;
case 2:
strcpy(s, "[warn]:");
break;
case 3:
strcpy(s, "[erro]:");
break;
default:
strcpy(s, "[info]:");
break;
}
//写入一个log,对m_count++, m_split_lines最大行数
m_mutex.lock();
m_count++;
// 如果是新的一天了,或者日志行数到上限了,创建新日志
if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //everyday log
{
char new_log[256] = {0}; // 新日志的文件名
// fflush 函数会将缓冲区中的数据立即写入到文件中,确保所有先前写入的数据都被提交到磁盘或输出设备。
// 在使用 fwrite, fprintf, fputs 等函数写入文件时,数据通常先写入文件流的缓冲区,而不是立即写入磁盘。fflush 确保这些数据被立即写入。
fflush(m_fp);
fclose(m_fp); // 关闭一个已打开的文件
char tail[16] = {0};
snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);
if (m_today != my_tm.tm_mday)
{
snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
m_today = my_tm.tm_mday;
m_count = 0;
}
else
{
snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
}
m_fp = fopen(new_log, "a");
}
m_mutex.unlock();
va_list valst;
va_start(valst, format); // 初始化 valst,使其指向第一个可变参数
string log_str;
m_mutex.lock();
// 写入的具体时间内容格式
// 使用 snprintf 格式化时间和日志标签,并将日志内容存储在 m_buf 中。
int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", // 48 是缓冲区m_buf的最大(包括'\0')
my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, // n是实际写入的字符数(不包括'\0'),即最多写47个字符
my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
// vsnprintf 使用可变参数生成格式化的日志内容,追加到 m_buf 中,并将其转换为字符串 log_str
// m_buf + n: 表示 m_buf 缓冲区中从 n 开始的位置,也就是在之前 snprintf 写入的位置后继续写入。
// m_log_buf_size - n - 1: 指定缓冲区剩余的大小。m_log_buf_size 是总缓冲区大小,减去 n(已经使用的部分)和 1(留给 \0)。
int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst);
m_buf[n + m] = '\n';
m_buf[n + m + 1] = '\0';
log_str = m_buf;
m_mutex.unlock();
// 如果是异步,且阻塞队列还有空位
if (m_is_async && !m_log_queue->full())
{
m_log_queue->push(log_str);
}
else // 如果是同步,或者阻塞队列已满,直接按照同步的方式写入。
{
m_mutex.lock();
fputs(log_str.c_str(), m_fp);
m_mutex.unlock();
}
va_end(valst);
}
void Log::flush(void)
{
m_mutex.lock();
//强制刷新写入流缓冲区
fflush(m_fp);
m_mutex.unlock();
}
这里面涉及到回调函数,在c++中,回调函数是通过函数指针实现的,至于它相比于正常函数的优越性,看下面例子。
假设你正在编写一个图形用户界面 (GUI) 应用程序,你需要在用户点击按钮时执行特定的操作。这个操作可能会因不同的按钮或用户操作而有所不同。如果没有回调函数,你需要为每个按钮写不同的事件处理逻辑,这会导致大量重复代码和低代码复用性。
使用回调函数,你可以将具体的操作逻辑作为回调函数传递给按钮的事件处理程序,这样你可以动态决定按钮点击时要执行的操作。
#include <iostream>
// 回调函数类型定义
typedef void (*ButtonCallback)();
// 事件处理函数
void onButtonClick(ButtonCallback callback) {
// 这里模拟按钮点击事件
std::cout << "Button clicked!" << std::endl;
// 调用回调函数
callback();
}
// 实际的回调函数实现
void handlePrintMessage() {
std::cout << "Handling button click: Printing message." << std::endl;
}
void handleSaveData() {
std::cout << "Handling button click: Saving data." << std::endl;
}
int main() {
// 模拟两个不同的按钮点击事件
std::cout << "Simulating button click for print message action:" << std::endl;
onButtonClick(handlePrintMessage);
std::cout << "\nSimulating button click for save data action:" << std::endl;
onButtonClick(handleSaveData);
return 0;
}
snprintf和vsnprintf的区别:
snprintf
和 vsnprintf
是 C 标准库中的两个函数,用于格式化输出字符串。这两个函数非常相似,主要的区别在于 vsnprintf
接受一个 va_list
类型的参数列表,而 snprintf
接受可变数量的普通参数。
封装数据库连接池
数据库连接池三问
1.什么是数据库连接池?
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化。通俗来说,池是资源的容器,本质上是对资源的复用。
顾名思义,连接池中的资源为一组数据库连接,由程序动态地对池中的连接进行使用,释放。
当系统开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配;当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。
2.数据库访问的一般流程是什么?
当系统需要访问数据库时,先系统创建数据库连接,完成数据库操作,然后系统断开数据库连接。
3.为什么要创建连接池?
从一般流程中可以看出,若系统需要频繁访问数据库,则需要频繁创建和断开数据库连接,而创建数据库连接是一个很耗时的操作,也容易对数据库造成安全隐患。在程序初始化的时候,集中创建多个数据库连接,并把他们集中管理,供程序使用,可以保证较快的数据库读写速度,更加安全可靠。
池可以看做资源的容器,所以多种实现方法,比如数组、链表、队列等。这里,使用单例模式和链表创建数据库连接池,实现对数据库连接资源的复用。
连接池代码实现
#ifndef _CONNECTION_POOL_
#define _CONNECTION_POOL_
#include <stdio.h>
#include <list> // 双向链表
#include <mysql/mysql.h>
#include <error.h>
#include <string.h>
#include <iostream>
#include <string>
#include "../lock/locker.h"
#include "../log/log.h"
using namespace std;
class connection_pool
{
public:
MYSQL *GetConnection(); //获取数据库连接
bool ReleaseConnection(MYSQL *conn); //释放连接
int GetFreeConn(); //获取连接
void DestroyPool(); //销毁所有连接
//单例模式
static connection_pool *GetInstance();
void init(string url, string User, string PassWord, string DataBaseName, int Port, int MaxConn, int close_log);
private:
connection_pool();
~connection_pool();
int m_MaxConn; //最大连接数
int m_CurConn; //当前已使用的连接数
int m_FreeConn; //当前空闲的连接数
locker lock;
list<MYSQL *> connList; //连接池
sem reserve;
public:
string m_url; //主机地址
string m_Port; //数据库端口号
string m_User; //登陆数据库用户名
string m_PassWord; //登陆数据库密码
string m_DatabaseName; //使用数据库名
int m_close_log; //日志开关
};
对应实现如下:
#include <mysql/mysql.h>
#include <stdio.h>
#include <string>
#include <string.h>
#include <stdlib.h>
#include <list>
#include <pthread.h>
#include <iostream>
#include "sql_connection_pool.h"
using namespace std;
connection_pool::connection_pool()
{
m_CurConn = 0;
m_FreeConn = 0;
}
// 懒汉版本的单例模式
connection_pool *connection_pool::GetInstance()
{
static connection_pool connPool;
return &connPool;
}
//构造初始化
void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log)
{
m_url = url;
m_Port = Port;
m_User = User;
m_PassWord = PassWord;
m_DatabaseName = DBName;
m_close_log = close_log;
for (int i = 0; i < MaxConn; i++)
{
MYSQL *con = NULL;
con = mysql_init(con);
if (con == NULL)
{
LOG_ERROR("mysql_init() failed");
exit(1);
}
con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
if (con == NULL)
{
LOG_ERROR("mysql_real_connect() failed");
exit(1);
}
connList.push_back(con);
++m_FreeConn; // 空闲连接数++
}
reserve = sem(m_FreeConn); //信号量记录共享资源总量
m_MaxConn = m_FreeConn;
}
//当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数
MYSQL *connection_pool::GetConnection()
{
MYSQL *con = NULL;
if (0 == connList.size())
return NULL;
reserve.wait();
lock.lock();
con = connList.front();
connList.pop_front();
--m_FreeConn;
++m_CurConn;
lock.unlock();
return con;
}
//释放当前使用的连接
bool connection_pool::ReleaseConnection(MYSQL *con)
{
if (NULL == con)
return false;
lock.lock();
connList.push_back(con);
++m_FreeConn;
--m_CurConn;
lock.unlock();
reserve.post();
return true;
}
//销毁数据库连接池
void connection_pool::DestroyPool()
{
lock.lock();
if (connList.size() > 0)
{
list<MYSQL *>::iterator it;
for (it = connList.begin(); it != connList.end(); ++it)
{
MYSQL *con = *it;
mysql_close(con);
}
m_CurConn = 0;
m_FreeConn = 0;
connList.clear();
}
lock.unlock();
}
//当前空闲的连接数
int connection_pool::GetFreeConn()
{
return this->m_FreeConn;
}
connection_pool::~connection_pool()
{
DestroyPool();
}
RAII机制释放数据库连接
我们单独再创建一个RAII类,这个类的唯一作用就是与数据库连接池的资源进行绑定;可以看到这个类中只有构造函数和析构函数。这样当类创建实例时就会调用构造函数,构造函数内就会调用数据库连接函数;当类的生命周期结束时就会调用析构函数,析构函数会调用销毁数据库函数;从而实现了资源的获取与释放与类的实例的生命周期绑定。
class connectionRAII{
public:
// 这里需要注意的是,在获取连接时,通过有参构造对传入的参数进行修改。其中数据库连接本身是指针类型,所以参数需要通过双指针才能对其进行修改。
connectionRAII(MYSQL **con, connection_pool *connPool);
~connectionRAII();
private:
MYSQL *conRAII;
connection_pool *poolRAII;
};
实现部分如下:
不直接调用获取和释放连接的接口,将其封装起来,通过RAII机制进行获取和释放。
connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool){
*SQL = connPool->GetConnection();
conRAII = *SQL;
poolRAII = connPool;
}
connectionRAII::~connectionRAII(){
poolRAII->ReleaseConnection(conRAII);
}
下面举个例子说明一下RAII的用处。
假设你有一个函数需要从数据库中查询数据并返回结果,通常你会这样写:
void queryDatabase(connection_pool *connPool) {
MYSQL *conn = connPool->GetConnection();
if (conn == nullptr) {
// 处理连接获取失败
return;
}
// 执行数据库查询操作
// ...
// 手动释放连接
connPool->ReleaseConnection(conn);
}
在上述代码中,如果中途发生异常或忘记调用 ReleaseConnection(conn)
,就会导致连接没有被归还到连接池,造成资源泄漏。
使用 connectionRAII
类,可以这样写:
void queryDatabase(connection_pool *connPool) {
MYSQL *conn = nullptr;
connectionRAII conRAII(&conn, connPool); // 自动获取连接并管理释放
if (conn == nullptr) {
// 处理连接获取失败
return;
}
// 执行数据库查询操作
// ...
// 无需手动释放连接,`conRAII` 析构时会自动归还连接
}
定时器类处理非活动连接
基本介绍
由于非活跃连接占用了连接资源,严重影响服务器的性能,通过实现一个服务器定时器,处理这种非活跃连接,释放连接资源。
利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务。
这里借用一下大佬画的图片。这章代码部分主要实现右边部分。
基础知识
非活跃
,是指客户端(这里是浏览器)与服务器端建立连接后,长时间不交换数据,一直占用服务器端的文件描述符,导致连接资源的浪费。定时事件
,是指固定一段时间之后触发某段代码,由该段代码处理一个事件,如从内核事件表删除事件,并关闭文件描述符,释放连接资源。定时器
,是指利用结构体或其他形式,将多种定时事件进行封装起来。具体的,这里只涉及一种定时事件,即定期检测非活跃连接,这里将该定时事件与连接资源封装为一个结构体定时器。定时器容器
,是指使用某种容器类数据结构,将上述多个定时器组合起来,便于对定时事件统一管理。具体的,项目中使用升序链表将所有定时器串联组织起来。
基础API
sigaction结构体以及函数
sigaction
是一种用于设置信号处理程序的函数,它提供了比 signal
更加灵活和强大的信号处理机制。sigaction
允许我们在信号发生时执行特定的操作,例如处理、忽略或者恢复默认的处理方式。
sigaction
的结构
#include <signal.h>
struct sigaction {
void (*sa_handler)(int); // 信号处理函数指针,或者 SIG_IGN、SIG_DFL
void (*sa_sigaction)(int, siginfo_t *, void *); // 信号处理函数,使用 siginfo_t 结构提供额外信息
sigset_t sa_mask; // 在处理该信号时需要被阻塞的信号集
int sa_flags; // 控制信号行为的标志,如 SA_RESTART、SA_SIGINFO 等
void (*sa_restorer)(void); // 已废弃
};
常见的 sa_flags
标志
SA_RESTART
: 使被信号中断的系统调用自动重新启动。SA_SIGINFO
: 使sa_sigaction
生效,代替sa_handler
,可以提供更多的信号信息。SA_NOCLDSTOP
: 对于SIGCHLD
信号,忽略子进程的停止或继续信号。
sigaction
结构体以及函数
- 定义一个
sigaction
结构体。 - 指定信号处理函数和其他选项。
- 调用
sigaction
函数来设置信号处理程序。
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
// 自定义信号处理函数
void handle_sigint(int sig) {
printf("Caught signal %d (SIGINT), but I won't terminate!\n", sig);
}
int main() {
// 定义 sigaction 结构体
struct sigaction sa;
// 设置信号处理函数
sa.sa_handler = handle_sigint;
// 初始化 sa_mask 阻塞信号集(在信号处理期间不阻塞其他信号)
sigemptyset(&sa.sa_mask);
// 设置 SA_RESTART :如果被信号中断的系统调用可以自动重启
sa.sa_flags = SA_RESTART;
// 使用 sigaction 来替换默认的 SIGINT 处理程序
// SIGINT是中断信号,当你按Ctrl+C时会触发。它的默认行为是终止进程。
if (sigaction(SIGINT, &sa, NULL) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}
// 模拟一个长时间运行的进程
while (1) {
printf("Running... Press Ctrl+C to send SIGINT\n");
sleep(2);
}
return 0;
}
// 补充
/*
信号的工作流程如下:
键盘输入 Ctrl+C
↓
终端捕获并生成 SIGINT 信号
↓
内核将 SIGINT 发送给前台进程组
↓
目标进程接收 SIGINT 信号
↓
根据进程设置的行为(终止、忽略或自定义处理)
*/
// 而上面我们用sigaction,实际上是重写了这个SIGINT信号的处理方式。
其中,int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
参数解释:
signum
:为要捕获的信号编号,例如 SIGINT
(对应于 Ctrl+C)。
act
: 指向一个 sigaction
结构体,指定了如何处理信号。
oldact
: 指向一个 sigaction
结构体(可以是 NULL
),如果不为 NULL
,那么 sigaction
会在这个指针指向的结构体中保存之前的信号处理设置。
运行结果:
sigfillset函数
#include <signal.h>
int sigfillset(sigset_t *set);
它用于初始化一个信号集合,使其包含所有有效的信号。这个函数常用于信号掩码的操作,确保特定的信号集合包含所有可能的信号。
socketpair函数
socketpair
函数用于创建一对无名的、双向的套接字连接。这种套接字对通常用于在同一进程中的不同线程或进程之间进行通信。它提供了一种轻量级的进程间通信(IPC)机制。
项目中使用管道通信。
#include <sys/types.h>
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);
domain
: 套接字的协议域,通常设置为 AF_UNIX
(或 AF_LOCAL
)。这是用于本地进程间通信的协议域。
type
: 套接字的类型,通常设置为 SOCK_STREAM
(流式套接字)或 SOCK_DGRAM
(数据报套接字)。SOCK_STREAM
提供一个可靠的、双向的字节流,而 SOCK_DGRAM
提供无连接的数据报服务。
protocol
: 套接字的协议,一般设置为 0,表示使用默认协议。
sv[2]
: 一个长度为 2 的整型数组,用于存储创建的两个套接字的文件描述符。sv[0]
和 sv[1]
将分别指向这对套接字的两端。
例子
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/wait.h>
int main() {
int sv[2]; // 存储两个套接字的文件描述符
char buf[100];
// 创建一对无名的流式套接字
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
perror("socketpair");
exit(EXIT_FAILURE);
}
// 创建一个子进程
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (pid == 0) {
// 子进程
close(sv[0]); // 关闭不需要的文件描述符
// close是一个系统调用函数,用于关闭一个已经打开的文件描述符。这包括文件、套接字、管道等。关闭文件描述符后,它就不再指向任何文件或资源,系统会回收相关资源,确保不再有资源泄漏。
// 向父进程发送数据
const char *msg = "Hello from child!";
write(sv[1], msg, strlen(msg) + 1);
close(sv[1]); // 关闭写端
exit(EXIT_SUCCESS);
} else {
// 父进程
close(sv[1]); // 关闭不需要的文件描述符
// 从子进程接收数据
ssize_t n = read(sv[0], buf, sizeof(buf));
if (n > 0) {
printf("Received message: %s\n", buf);
} else {
perror("read");
}
close(sv[0]); // 关闭读端
wait(NULL); // 等待子进程结束
}
return 0;
}
send函数
#include <sys/types.h>
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
参数说明
sockfd
:- 套接字的文件描述符。它指定了用于发送数据的套接字。
- 在调用
socket()
创建套接字时返回的值。
buf
:- 指向要发送的数据缓冲区的指针,即需要发送的数据内容。
len
:- 需要发送的数据的长度,即
buf
中数据的字节数。
- 需要发送的数据的长度,即
flags
:- 该参数用于设置发送操作的标志位。常用的标志有:
0
: 默认标志,没有特殊行为。MSG_DONTWAIT
: 非阻塞发送。MSG_OOB
: 发送带外数据(紧急数据)。MSG_NOSIGNAL
: 防止在发送过程中发生SIGPIPE
信号(常用于避免程序因远端断开而崩溃)。
- 该参数用于设置发送操作的标志位。常用的标志有:
SIGALRM、SIGTERM信号
SIGALRM (Signal Alarm):SIGALRM
是由计时器到期时向进程发送的信号。常见的用法是通过 alarm()
函数设置一个定时器,当时间到时,系统会发送 SIGALRM
信号给进程。
典型用法:
- 定时器: 可以用
SIGALRM
处理函数来实现定时操作,比如定时检测任务、定时执行某些操作等。 - 实现超时控制: 可以在等待某些操作(如网络或I/O操作)时,使用
SIGALRM
实现超时控制。
例子:
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void handle_alarm(int sig, siginfo_t *info, void *context) {
printf("Alarm triggered! Signal number: %d\n", sig);
}
int main() {
struct sigaction sa;
sa.sa_sigaction = handle_alarm;
sa.sa_flags = SA_RESTART;
sigemptyset(&sa.sa_mask); // 初始化信号掩码
sigaction(SIGALRM, &sa, NULL); // 设置 SIGALRM 信号处理程序
/*如果采用的是signal函数,则signal(SIGALRM, handle_alarm)即可。
signal 是早期的信号接口,简单易用但功能较少。
sigaction 提供了更全面和可靠的信号处理功能,允许更精细的控制。
*/
// 设置 5 秒的定时器
alarm(5);
// 等待信号触发
printf("Waiting for alarm...\n");
pause(); // 等待信号
printf("Program exiting.\n");
return 0;
}
解释:
alarm(5)
设置一个 5 秒定时器。- 当定时器到期时,进程会收到
SIGALRM
信号,触发handle_alarm
函数。
返回14的原因:
#define SIGALRM 14 //由alarm系统调用产生timer时钟信号
#define SIGTERM 15 //终端发送的终止信号
SIGTERM (Signal Terminate):是请求进程正常终止的信号。它是一个比较常见的信号,用于有序地关闭进程,使进程有机会清理资源。
典型用法:
- 优雅终止进程: 当你想要终止一个进程时,通常发送
SIGTERM
信号,这样进程有机会保存工作、关闭文件描述符、释放内存等。 - 进程管理: 例如,通过命令
kill <pid>
发送SIGTERM
信号给进程,要求它终止。
区别于其他终止信号:
SIGKILL
: 是一种强制终止信号,进程不能捕获、忽略或阻塞这个信号。与SIGKILL
不同,SIGTERM
允许进程捕获信号并执行清理操作。SIGINT
: 通常用于用户中断(如按Ctrl+C
),而SIGTERM
更常用于系统或管理员请求终止进程。
alarm函数
#include <unistd.h>;
unsigned int alarm(unsigned int seconds);
设置信号传送闹钟,即用来设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程。如果未设置信号SIGALRM的处理函数,那么alarm()默认处理终止进程.
socketpair函数
在linux下,使用socketpair函数能够创建一对套接字进行通信,项目中使用管道通信。
#include <sys/types.h>
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);
- domain表示协议族,PF_UNIX或者AF_UNIX
- type表示协议,可以是SOCK_STREAM或者SOCK_DGRAM,SOCK_STREAM基于TCP,SOCK_DGRAM基于UDP
- protocol表示类型,只能为0
- sv[2]表示套节字柄对,该两个句柄作用相同,均能进行读写双向操作
- 返回结果, 0为创建成功,-1为创建失败
信号通知流程
Linux下的信号采用的异步处理机制,信号处理函数和当前进程是两条不同的执行路线。具体的,当进程收到信号时,操作系统会中断进程当前的正常流程,转而进入信号处理函数执行操作,完成后再返回中断的地方继续执行。
为避免信号竞态现象发生,信号处理期间系统不会再次触发它。所以,为确保该信号不被屏蔽太久,信号处理函数需要尽可能快地执行完毕。
一般的信号处理函数需要处理该信号对应的逻辑,当该逻辑比较复杂时,信号处理函数执行时间过长,会导致信号屏蔽太久。
这里的解决方案是,信号处理函数仅仅发送信号通知程序主循环,将信号对应的处理逻辑放在程序主循环中,由主循环执行信号对应的逻辑代码。
统一事件源
统一事件源,是指将信号事件与其他事件一样被处理。
具体的,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。
信号处理机制
每个进程之中,都有存着一个表,里面存着每种信号所代表的含义,内核通过设置表项中每一个位来标识对应的信号类型。
-
信号的接收
-
- 接收信号的任务是由内核代理的,当内核接收到信号后,会将其放到对应进程的信号队列中,同时向进程发送一个中断,使其陷入内核态。注意,此时信号还只是在队列中,对进程来说暂时是不知道有信号到来的。
-
信号的检测
- 进程陷入内核态后,有两种场景会对信号进行检测:
- 进程从内核态返回到用户态前进行信号检测(确保用户态代码在恢复执行之前处理所有待处理的信号。)
- 进程在内核态中,从睡眠状态被唤醒的时候进行信号检测(在进程从睡眠状态中恢复时处理信号,防止进程继续执行而忽略信号。)
- 当发现有新信号时,便会进入下一步,信号的处理。
-
信号的处理
-
- ( 内核 )信号处理函数是运行在用户态的,调用处理函数前,内核会将当前内核栈的内容备份拷贝到用户栈上,并且修改指令寄存器(eip)将其指向信号处理函数。
- ( 用户 )接下来进程返回到用户态中,执行相应的信号处理函数。
- ( 内核 )信号处理函数执行完成后,还需要返回内核态,检查是否还有其它信号未处理。
- ( 用户 )如果所有信号都处理完成,就会将内核栈恢复(从用户栈的备份拷贝回来),同时恢复指令寄存器(eip)将其指向中断前的运行位置,最后回到用户态继续执行进程。
至此,一个完整的信号处理流程便结束了,如果同时有多个信号到达,上面的处理流程会在第2步和第3步骤间重复进行。
定时器类
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <errno.h> //提供errno(全局变量) 是记录系统的最后一次错误代码
#include <sys/wait.h>
#include <sys/uio.h>
#include <time.h>
#include "../log/log.h"
class util_timer;
/*用户数据结构体(连接资源)*/
struct client_data
{
sockaddr_in address; // 客户端的socket地址
int sockfd; // socket文件描述符
util_timer *timer; // 连接资源对应的定时器
};
/*定时器类*/
class util_timer
{
public:
util_timer() : prev(NULL), next(NULL) {}
public:
time_t expire; //超时时间
/*回调函数声明:声明一个返回值为空的函数指针cb_func,传入clent_data指针作为函数参数*/
void (* cb_func)(client_data *); //回调函数指针
client_data *user_data; //连接资源
util_timer *prev; //前向定时器
util_timer *next; //后继定时器
};
基于升序链表的定时器
// sort_timer_lst类是一个管理定时器的双向链表,定时器按照超时时间排序。
class sort_timer_lst
{
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer(util_timer *timer); //添加定时器
void adjust_timer(util_timer *timer); //调整定时器
void del_timer(util_timer *timer); //删除定时器
void tick(); //定时任务处理函数
private:
void add_timer(util_timer *timer, util_timer *lst_head);
util_timer *head;
util_timer *tail;
};
具体函数实现如下:
除了tick()函数以外就是一个非常简单的数据结构的双向链表的设计了
#include "lst_timer.h"
#include "../http/http_conn.h"
sort_timer_lst::sort_timer_lst()
{
head = NULL;
tail = NULL;
}
sort_timer_lst::~sort_timer_lst()
{
util_timer *tmp = head;
while (tmp)
{
head = tmp->next;
delete tmp;
tmp = head;
}
}
void sort_timer_lst::add_timer(util_timer *timer)
{
if (!timer)
{
return;
}
if (!head)
{
head = tail = timer;
return;
}
// 如果新增的计时器更先超时,则插到head上
if (timer->expire < head->expire)
{
timer->next = head;
head->prev = timer;
head = timer;
return;
}
// 否则,寻找适合的位置插入
add_timer(timer, head);
}
// 函数用于调整定时器的位置,以保持链表中定时器的排序顺序。具体来说,当定时器的超时时间(expire)发生变化时,如果新的超时时间导致定时器的位置不再正确,函数会将定时器移到正确的位置。
void sort_timer_lst::adjust_timer(util_timer *timer)
{
if (!timer)
{
return;
}
util_timer *tmp = timer->next;
if (!tmp || (timer->expire < tmp->expire))
{
return;
}
if (timer == head) //如果timer是head,则需要更新head
{
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
else
{
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
void sort_timer_lst::del_timer(util_timer *timer)
{
if (!timer)
{
return;
}
if ((timer == head) && (timer == tail))
{
delete timer;
head = NULL;
tail = NULL;
return;
}
if (timer == head)
{
head = head->next;
head->prev = NULL;
delete timer;
return;
}
if (timer == tail)
{
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
// 心搏函数,处理链表中的定时器,检查是否有超时的定时器,并调用其回调函数。
void sort_timer_lst::tick()
{
if (!head)
{
return;
}
time_t cur = time(NULL); // 获取当前时间
util_timer *tmp = head;
while (tmp)
{
if (cur < tmp->expire)
{
break;
}
tmp->cb_func(tmp->user_data); // 调用定时器的回调函数处理超时事件
head = tmp->next;
if (head)
{
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head)
{
util_timer *prev = lst_head;
util_timer *tmp = prev->next;
while (tmp)
{
if (timer->expire < tmp->expire)
{
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
// 如果到最后,记得维护tail指针
if (!tmp)
{
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
Utils类
class Utils
{
public:
Utils() {}
~Utils() {}
//初始化定时器的时间间隔
void init(int timeslot);
//对文件描述符设置非阻塞
int setnonblocking(int fd);
//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT
void addfd(int epollfd, int fd, bool one_shot, int TRIGMode);
//信号处理函数
static void sig_handler(int sig);
//设置信号函数
void addsig(int sig, void(handler)(int), bool restart = true);
//定时处理任务,重新定时以不断触发SIGALRM信号
void timer_handler();
void show_error(int connfd, const char *info);
public:
static int *u_pipefd; // 管道,指向一个包含两个元素的数组(读端和写端文件描述符),声明为静态,在所有实例之间共享。
sort_timer_lst m_timer_lst; // 排序的定时器链表
static int u_epollfd; // 存储epoll实例的文件描述符。u_epollfd 是静态的,因此在类的所有实例之间共享,表示所有操作都基于同一个 epoll 实例。
int m_TIMESLOT; // 定时器处理任务的时间间隔
};
void cb_func(client_data *user_data);
具体实现如下:
void Utils::init(int timeslot)
{
m_TIMESLOT = timeslot;
}
//对文件描述符设置非阻塞(在非阻塞模式下,如果操作无法立即完成(如读取数据时没有数据可读),操作会立即返回,而不是挂起程序。)
int Utils::setnonblocking(int fd)
{
// fcntl 可以对打开的文件、socket 等文件描述符进行操作
int old_option = fcntl(fd, F_GETFL); //取当前文件描述符的标志位(F_GETFL)
int new_option = old_option | O_NONBLOCK; //将 O_NONBLOCK 标志加到这些标志位上
fcntl(fd, F_SETFL, new_option); //设置新的标志位(F_SETFL)
return old_option;
}
// 该函数的主要作用是将指定的文件描述符 fd 添加到 epoll 实例 epollfd 中,注册要监控的事件(如可读、挂断等),并根据传入的模式(TRIGMode 和 one_shot)选择触发方式。最后,还会将文件描述符设置为非阻塞模式,以便在 I/O 操作时不会阻塞程序的执行。
void Utils::addfd(int epollfd, int fd, bool one_shot, int TRIGMode)
{
// events: 用于指定要监控的事件类型。
/*
struct epoll_event {
__uint32_t events; // 事件类型
epoll_data_t data; // 用户数据
};
events的值可以是:
EPOLLIN: 文件描述符可读。
EPPOLLOUT: 文件描述符可写。
其中,epoll_data_t结构体如下
typedef union epoll_data {
void *ptr;
int fd; // 文件描述符,一般只用这个。是 epoll 事件的核心使用方式。
uint32_t u32;
uint64_t u64;
} epoll_data_t;
*/
epoll_event event;
event.data.fd = fd;
if (1 == TRIGMode) // 判断是否使用边缘触发
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; // 监控可读事件|设置为边缘触发模式| 监控挂断事件
else // 水平触发
event.events = EPOLLIN | EPOLLRDHUP;
if (one_shot)
// 如果启用 EPOLLONESHOT,意味着当文件描述符上的一个事件被触发后,该文件描述符会被自动从 epoll 事件列表中移除。这样做的好处是可以防止多线程环境下同一文件描述符被多个线程同时处理。
event.events |= EPOLLONESHOT;
// epoll_ctl 函数允许你添加、修改或删除要监控的文件描述符。这里是添加一个新的文件描述符到 epoll 实例中.
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd); // 将文件描述符 fd 设置为非阻塞模式
}
//信号处理函数(静态函数),当信号到达时,这个函数会被调用。信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响。 同一事件源体现
void Utils::sig_handler(int sig)
{
//为保证函数的可重入性,保留原来的errno
//可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
int save_errno = errno;
int msg = sig;
//将信号值从管道写端写入,传输字符类型,而非整型
send(u_pipefd[1], (char *)&msg, 1, 0); // 发送长度为1字节(msg时整数,这里只发送低字节部分,为啥(待填坑))
errno = save_errno;
}
//设置信号函数
void Utils::addsig(int sig, void(handler)(int), bool restart)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask); // sa.sa_mask 定义了在处理信号期间要阻塞哪些其他信号。通过 sigfillset 把所有信号都添加到 sa_mask,意味着在当前信号处理函数执行时,其他信号都会被阻塞,直到处理函数执行完毕。
assert(sigaction(sig, &sa, NULL) != -1);
}
//定时处理任务,重新定时以不断触发SIGALRM信号
void Utils::timer_handler()
{
m_timer_lst.tick();
alarm(m_TIMESLOT);
}
void Utils::show_error(int connfd, const char *info)
{
send(connfd, info, strlen(info), 0);
close(connfd);
}
int *Utils::u_pipefd = 0; // 在现代C++中通常用nullptr
int Utils::u_epollfd = 0;
定时器回调函数
class Utils;
void cb_func(client_data *user_data)
{
/*删除非活动连接在socket上的注册时间*/
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
/*关闭文件描述符*/
close(user_data->sockfd);
http_conn::m_user_count--;
}
问题
errno变量?
errno
是一个全局变量,用于表示最近一次系统调用或库函数的错误码。当某个系统调用或库函数发生错误时,它会设置 errno
的值,以指示具体的错误类型。许多系统调用和库函数在失败时都会修改 errno
,从而使程序能够检查并处理这些错误。
为什么要保护errno?
信号处理函数在程序运行的任何时刻都可能被触发。如果信号处理函数修改了 errno
而不进行恢复,可能会导致程序其他部分对 errno
的检查失效,造成难以调试的错误。例如,程序在信号处理函数执行前进行了一次系统调用或库函数调用,而后希望根据 errno
的值来处理错误。如果信号处理函数改变了 errno
,程序可能会误判系统调用的结果。
换句话说,信号处理函数不会改变 errno
的值,程序的其他部分依然可以安全地依赖 errno
来判断系统调用或库函数的成功或失败。
通过save_errno,这样即使信号处理函数中的操作(例如 send()
)修改了 errno
,原来的 errno
值仍然保存在 save_errno
中。
信号和中断?
信号和中断的概念有点类似,当某个事件发生时,打断当前执行的程序流,去处理信号。这种行为在某种程度上与硬件中断相似,所以有人把信号称为“软件中断”。
中断通常是指硬件中断,由硬件设备触发并由CPU处理。而信号是进程级别的机制,由内核或进程触发,用来通知或管理某个进程的状态。
虽然信号有类似“中断”的行为(打断程序执行,异步处理事件),但它并不直接与硬件打交道,更多是用于进程间通信和进程状态管理,属于操作系统和软件层面的机制。
从概念上说,信号可以被看作是一种“软件中断”,因为它打断了进程的正常执行流程去处理某个事件。但是,在更严格的技术定义中,信号和硬件中断是不同的:信号是操作系统提供的一种进程通信机制,而中断是由硬件触发的CPU级别的事件处理。
所以,虽然这句话有一定道理,但为了避免误解,通常最好把信号描述为异步通知机制或进程控制机制,而不是“软件中断”。
两者关系:
- 中断处理可能导致信号的产生:当内核处理中断(特别是内中断)时,可能会向相关进程发送信号,通知异常或特定事件。
- 信号是中断处理的一种结果,但信号本身不是中断:信号是在中断处理完成后,由内核决定是否发送给进程的通知机制。
回忆:中断是硬件或CPU层面的机制。
中断可以分为外中断(外部硬件设备触发)和内中断(异常)外中断:异步发生,独立于当前CPU执行的指令流。
内中断(异常):同步发生,与指令流直接相关,在指令执行时触发。
文件描述符的标志位?
文件描述符的标志位是操作系统为每个打开的文件或资源(如套接字、管道)维护的属性或状态,用来控制文件描述符的行为。标志位通常存储在内核中,可以通过系统调用如 fcntl
、open
等来获取或设置。
这些标志位的作用是决定文件描述符如何进行 I/O 操作,例如是否使用非阻塞模式、是否以追加方式写入文件,或者是否在执行 exec()
系列函数时关闭文件描述符等。
文件描述符标志位的作用:
- 控制文件 I/O 的行为:通过设置不同的标志位,可以控制文件描述符的读取、写入、同步、阻塞等行为,灵活应对不同的应用场景。
- 提升性能:通过设置非阻塞模式 (
O_NONBLOCK
) 或异步写入模式,可以避免进程被阻塞,提高程序的响应性,尤其是在网络编程中。 - 安全性:通过
FD_CLOEXEC
标志,确保文件描述符不会被子进程继承,避免敏感资源泄漏。 - 数据可靠性:通过
O_SYNC
、O_DSYNC
等标志,确保数据被正确写入硬盘,防止因系统崩溃或电源故障导致数据丢失
open
/read
和 fopen
/fread
?
open
/read
:属于系统调用接口,提供低级的文件操作功能。操作的是文件描述符直接与操作系统内核进行交互。
fopen
/fread
:属于标准库接口(C 标准库),提供高级的文件操作功能。提供了更高层次的抽象,通常比 open
/read
更易于使用。
epfd
(Epoll 文件描述符)和fd
(文件描述符)的关系?
容器和实体的关系。一对多的关系。
epfd
(Epoll 文件描述符)
- 定义:
epfd
是一个由epoll_create
或epoll_create1
系统调用创建的文件描述符。它代表了一个 epoll 实例。 - 作用:
epfd
用于管理和监控多个文件描述符。通过它,你可以将多个文件描述符添加到 epoll 实例中,并通过epoll_wait
等待这些文件描述符上的 I/O 事件发生。 - 使用: 在调用
epoll_ctl
时,epfd
是用来指定要操作的 epoll 实例的文件描述符。
fd
(文件描述符)
- 定义:
fd
是一个普通的文件描述符,可以是任何打开的文件、套接字、管道等。它是由系统调用如open
、socket
、pipe
等创建的。 - 作用:
fd
是对系统资源的抽象,允许应用程序读写或管理这些资源。在epoll
的上下文中,它是需要被监控的具体资源。 - 使用: 在调用
epoll_ctl
时,fd
是用来指定要监控或操作的具体文件描述符。
关系
- 管理关系:
epfd
是一个 epoll 实例的标识符,管理着所有通过epoll_ctl
添加到该实例中的文件描述符(fd
)。fd
是你要监控的具体资源,而epfd
是容纳这些监控资源的容器。 - 操作关系: 你使用
epoll_ctl
和epoll_wait
来操作和监控fd
。具体来说,你可以通过epoll_ctl
将fd
添加到epfd
中,设置其监控的事件类型,并通过epoll_wait
等待这些事件的发生。
为什么要将管道写端设置为非阻塞?
send是将信息发送给套接字缓冲区,如果缓冲区满了,则会阻塞,这时候会进一步增加信号处理函数的执行时间,为此,将其修改为非阻塞。
没有对非阻塞返回值处理,如果阻塞是不是意味着这一次定时事件失效了?
是的,但定时事件是非必须立即处理的事件,可以允许这样的情况发生。
半同步半反应堆线程池
服务器编程基本框架
I/O处理单元:接受客户连接,接受客户数据,将处理好的数据还给客户端。
逻辑单元:通常为一个进程或线程,分析客户数据后进行处理,将处理结果交给I/O单元。
五种I/O模型
- 阻塞IO:调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的去检查这个函数有没有返回,必须等这个函数返回才能进行下一步动作
- 非阻塞IO:非阻塞等待,每隔一段时间就去检测IO事件是否就绪。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是立即返回,不管时间是否已经发生,若时间没有发生,则返回-1,此时可以根据errno区分这两种情况,对于accept,recv和send,事件未发生时,errno通常被设置成eagain
- 信号驱动IO:linux用套接口进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO时间就绪,进程收到SIGIO信号。然后处理IO事件。
- IO复用:linux用select/poll函数实现IO复用模型,这两个函数也会使进程阻塞,但是和阻塞IO所不同的是这两个函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检测。知道有数据可读或可写时,才真正调用IO操作函数
- 异步IO:linux中,可以调用aio_read函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。
阻塞I/O,非阻塞I/O,信号驱动I/O和I/O复用都属于同步IO
同步I/O和异步I/O之间的区别:同步I/O指内核向应用程序通知的是就绪事件,比如只通知有客户端连接,要求用户代码自行执行I/O操作,异步I/O是指内核向应用程序通知的是完成事件,比如读取客户端的数据后才通知应用程序,由内核完成I/O操作。
同步:同步操作意味着任务按顺序执行,当前操作必须等待上一个操作完成才能继续。例如,阻塞的I/O操作就是同步的,程序必须等待I/O完成后才能继续执行。
异步:异步操作意味着任务可以并行或交替执行,不必等待当前操作完成。程序可以继续执行其他操作,当异步任务完成时,系统会通过回调或信号通知。
这里有个重要的面试考点:I/O复用技术,即使用select,poll,epoll等系统调用,让主线程能同时监听多个文件描述符,提高服务器运行的效率。
参考b站链接:【并发】IO多路复用select/poll/epoll介绍,腾讯面试:请描述 select、poll、epoll 这三种IO多路复用技术的执行原理
I/O复用(select, poll, epoll)
select:创建3个文件描述符集并拷贝到内核中,分别监听读、写、异常动作。这里受到单个进程可以打开的fd数量限制,默认是1024。需要对文件描述符表进行遍历,**O(n)**的轮询时间复杂度。
poll:将传入的struct pollfd结构体数组拷贝到内核中进行监听。select和poll的动作基本一致,只是poll采用链表来进行文件描述符的存储,而select采用fd标注位来存放,所以**select会受到最大连接数的限制,而poll不会。**需要对文件描述符表进行遍历,O(n)的轮询时间复杂度。
结构体:
epoll:执行epoll_create会在内核的高速cache区中建立一颗红黑树以及就绪链表(该链表存储已经就绪的文件描述符)。接着用户执行的epoll_ctl函数添加文件描述符会在红黑树上增加相应的结点。在epoll_ctl()函数中,为每个文件描述符都指定了回调函数,基于回调函数把就绪时间放到就绪队列中,因此,把时间复杂度从O(n)降到了O(1)。
注意:
select、poll、epoll虽然都会返回就绪的文件描述符数量。但是select和poll并不会明确指出是哪些文件描述符就绪,而epoll会。造成的区别就是,系统调用返回后,调用select和poll的程序需要遍历监听的整个文件描述符找到是谁处于就绪,而epoll则直接处理即可。
select、poll采用轮询的方式来检查文件描述符是否处于就绪态,而epoll采用回调机制。造成的结果就是,随着fd的增加,select和poll的效率会线性降低,而epoll不会受到太大影响,除非活跃的socket很多。
select、poll都需要将有关文件描述符的数据结构拷贝进内核,最后再拷贝出来。而epoll创建的有关文件描述符的数据结构本身就存于内核态中,系统调用返回时利用mmap()文件映射内存加速与内核空间的消息传递:即epoll使用mmap减少复制开销。(好像是错误的,epoll并没有使用mmap的零拷贝技术)
epoll对文件描述符的操作又两种模式,LT模式和ET模式
LT模式(水平触发):默认模式,当epoll_wait检测到文件描述符上有事件发生并通知应用程序后,应用程序可以不立即处理事件,当下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到这个事件被处理。
ET模式(边沿触发):当epoll_wait检测到文件描述符上有事件发生并通知应用程序后,应用程序必须立即处理事件,后续epoll_wait不再通知
事件处理模式
将上面的服务器基本框架和I/O模型结合起来,我们就可以得到两种高效的事件处理模式。
- reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
- proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。
由于异步I/O并不成熟,实际中使用较少,这里将使用同步I/O模拟实现proactor模式。
同步I/O模型的工作流程如下(epoll_wait为例):
- 主线程往epoll内核事件表注册socket上的读就绪事件。
- 主线程调用epoll_wait等待socket上有数据可读
- 当socket上有数据可读,epoll_wait通知主线程,主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
- 睡眠在请求队列上某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
- 主线程调用epoll_wait等待socket可写。
- 当socket上有数据可写,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。
基础API
pthread_create
pthread_create
是用于创建新线程的函数,属于 POSIX 线程库(Pthreads)的一部分。
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
参数解释
pthread_t \*thread
:- 线程的标识符。
pthread_create
会创建一个新的线程,并将线程 ID 写入thread
中,方便后续对线程的管理。
- 线程的标识符。
const pthread_attr_t \*attr
:- 线程的属性。如果传入
NULL
,则使用默认属性(如创建普通线程)。可以通过pthread_attr_init
初始化属性,修改线程的栈大小、优先级等。
- 线程的属性。如果传入
void \*(\*start_routine)(void \*)
:- 线程的入口函数,也就是新线程创建后要执行的函数。这个函数必须接受一个
void*
参数并返回void*
。新线程创建后,从这个函数开始执行。
- 线程的入口函数,也就是新线程创建后要执行的函数。这个函数必须接受一个
void \*arg
:- 传递给线程入口函数的参数。这是一个通用的指针,允许向新线程传递任意类型的数据。可以是
NULL
,如果入口函数不需要参数。
- 传递给线程入口函数的参数。这是一个通用的指针,允许向新线程传递任意类型的数据。可以是
示例
void* my_thread_function(void* arg) {
int* num = (int*) arg;
printf("Thread is running. Argument: %d\n", *num);
return NULL;
}
int main() {
pthread_t thread;
int arg = 10;
// 创建一个线程,执行 my_thread_function 函数
if (pthread_create(&thread, NULL, my_thread_function, &arg) != 0) {
perror("Failed to create thread");
}
// 等待线程完成
pthread_join(thread, NULL);
return 0;
}
pthread_detach
pthread_detach
是用于将线程设置为 分离状态 的函数,线程在分离状态下,终止后会自动释放其占用的系统资源,无需调用 pthread_join
。
int pthread_detach(pthread_t thread);
参数解释
pthread_t thread
:要分离的线程 ID。该线程在终止后,其资源会被系统自动回收,不再需要其他线程使用pthread_join
来等待它结束。
使用场景
- 当你不关心一个线程的退出状态,也不需要与它同步时,可以使用
pthread_detach
将其分离。 - 分离的线程会在结束后自动回收资源,避免资源泄露。
注意
- 一旦一个线程被分离,无法通过
pthread_join
来等待该线程结束。如果尝试对已分离的线程调用pthread_join
,会返回错误。
示例
void* my_thread_function(void* arg) {
printf("Detached thread is running.\n");
return NULL;
}
int main() {
pthread_t thread;
// 创建一个线程
if (pthread_create(&thread, NULL, my_thread_function, NULL) != 0) {
perror("Failed to create thread");
}
// 将该线程分离
if (pthread_detach(thread) != 0) {
perror("Failed to detach thread");
}
// 主线程继续执行,不需要等待子线程
printf("Main thread continues...\n");
return 0;
}
threadpool
搞这么多概念,还不如直接看代码来得痛快。这里放一下大佬画的半同步半反应堆的大体框架图。
下面是threadpool.h代码
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
template <typename T>
class threadpool
{
public:
/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
bool append(T *request, int state);
bool append_p(T *request);
private:
/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
static void *worker(void *arg);
void run();
private:
int m_thread_number; //线程池中的线程数
int m_max_requests; //请求队列中允许的最大请求数
pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; //请求队列
locker m_queuelocker; //保护请求队列的互斥锁
sem m_queuestat; //是否有任务需要处理
connection_pool *m_connPool; //数据库
int m_actor_model; //模型切换
};
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
}
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
request->m_state = state;
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); // V操作
return true;
}
template <typename T>
bool threadpool<T>::append_p(T *request)
{
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); // V操作
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
// 这个run等后面看到http的时候回来看。(待填坑)
template <typename T>
void threadpool<T>::run()
{
while (true)
{
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty()) // 这里应该是要保险,按道理说前面m_queuestat已经保证队列有线程资源可用了。
{
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
if (1 == m_actor_model) // 是否异步
{
if (0 == request->m_state) // 读操作还是写操作
{ // 读操作
if (request->read_once()) // 调用 read_once() 尝试读取数据
{
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
else
{
request->improv = 1;
request->timer_flag = 1;
}
}
else
{ // 写操作
if (request->write())
{
request->improv = 1;
}
else
{
request->improv = 1;
request->timer_flag = 1;
}
}
}
else // 同步操作
{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
http连接处理类
基础概念
http协议格式
参考:https://www.cnblogs.com/suizhikuo/p/8493362.html
下图为http请求的报文结构
示例请求报文:
POST /login HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate, br
Content-Type: application/x-www-form-urlencoded
Content-Length: 28
Connection: keep-alive
Cookie: sessionId=abc123; userId=789
username=john&password=doe123
请求行:POST /login HTTP/1.1
请求登录页面,使用POST方法,协议为HTTP/1.1。
请求头部:包含主机、用户代理、内容类型、内容长度等。
空行:分隔请求头部和请求体。
请求数据:username=john&password=doe123
,表示提交了一个登录表单。
下图为http响应报文结构
何为状态机?
状态机(State Machine),又称为有限状态机(Finite State Machine, FSM),是一个数学模型,表示一个系统在任何时刻都处于某个具体的状态,并且根据外部输入或内部条件,可以从一种状态转移到另一种状态。
**“有限”指的是状态的数量是有限的。**换句话说,一个有限状态机只能处于一组预定义的有限状态中的一个,并且可以在这些状态之间进行转移。这个有限的状态集合是其“有限”特性的关键所在。
下面用gpt写一个简单的状态机,方便我们认识状态机。
class VendingMachine:
def __init__(self):
# 定义初始状态
self.state = "waiting"
def select_item(self):
if self.state == "waiting":
print("选择商品...")
self.state = "item_selected"
else:
print("当前状态不能选择商品。")
def insert_money(self):
if self.state == "item_selected":
print("正在处理支付...")
self.state = "processing_payment"
else:
print("必须先选择商品。")
def cancel(self):
if self.state in ["item_selected", "processing_payment"]:
print("取消操作,返回等待状态...")
self.state = "waiting"
else:
print("没有可以取消的操作。")
def dispense_item(self):
if self.state == "processing_payment":
print("支付成功,商品已出货。")
self.state = "waiting"
else:
print("必须先完成支付。")
def get_state(self):
print(f"当前状态: {self.state}")
# 创建状态机对象
machine = VendingMachine() # 销售机
# 流程测试
machine.get_state() # 当前状态: waiting
machine.select_item() # 选择商品...
machine.get_state() # 当前状态: item_selected
machine.insert_money() # 正在处理支付...
machine.get_state() # 当前状态: processing_payment
machine.dispense_item() # 支付成功,商品已出货。
machine.get_state() # 当前状态: waiting
# 测试取消操作
machine.select_item() # 选择商品...
machine.get_state() # 当前状态: item_selected
machine.cancel() # 取消操作,返回等待状态...
machine.get_state() # 当前状态: waiting
VendingMachine
类是一个状态机,它有三个状态:waiting
, item_selected
, 和 processing_payment
。这就是一个简单是状态机,是不是忽然感觉状态机也不是什么高大上的模式了。思考一下,如果将这个简单的状态机改为主从状态机
的模式,如何改呢?
首先,对于从状态机,它负责具体状态的处理逻辑。我们可以为每个状态定义一个类,每个类都实现一个共同的接口。
class State:
def select_item(self, machine):
raise NotImplementedError
def insert_money(self, machine):
raise NotImplementedError
def cancel(self, machine):
raise NotImplementedError
def dispense_item(self, machine):
raise NotImplementedError
def get_state(self):
raise NotImplementedError
class WaitingState(State):
def select_item(self, machine):
print("选择商品...")
machine.set_state(ItemSelectedState())
def insert_money(self, machine):
print("当前状态不能选择商品。")
def cancel(self, machine):
print("没有可以取消的操作。")
def dispense_item(self, machine):
print("必须先选择商品。")
def get_state(self):
return "waiting"
class ItemSelectedState(State):
def select_item(self, machine):
print("已经选择了商品,请插入钱币。")
def insert_money(self, machine):
print("正在处理支付...")
machine.set_state(ProcessingPaymentState())
def cancel(self, machine):
print("取消操作,返回等待状态...")
machine.set_state(WaitingState())
def dispense_item(self, machine):
print("必须先完成支付。")
def get_state(self):
return "item_selected"
class ProcessingPaymentState(State):
def select_item(self, machine):
print("当前状态不能选择商品。")
def insert_money(self, machine):
print("已经在处理支付。")
def cancel(self, machine):
print("取消操作,返回等待状态...")
machine.set_state(WaitingState())
def dispense_item(self, machine):
print("支付成功,商品已出货。")
machine.set_state(WaitingState())
def get_state(self):
return "processing_payment"
而对于主状态机,主状态机负责管理状态的切换和处理。它的代码比较简单,如下:
class VendingMachine:
def __init__(self):
self.state = WaitingState()
def set_state(self, state):
self.state = state
def select_item(self):
self.state.select_item(self)
def insert_money(self):
self.state.insert_money(self)
def cancel(self):
self.state.cancel(self)
def dispense_item(self):
self.state.dispense_item(self)
def get_state(self):
print(f"当前状态: {self.state.get_state()}")
# 创建状态机对象
machine = VendingMachine()
# 流程测试
machine.get_state() # 当前状态: waiting
machine.select_item() # 选择商品...
machine.get_state() # 当前状态: item_selected
machine.insert_money() # 正在处理支付...
machine.get_state() # 当前状态: processing_payment
machine.dispense_item() # 支付成功,商品已出货。
machine.get_state() # 当前状态: waiting
# 测试取消操作
machine.select_item() # 选择商品...
machine.get_state() # 当前状态: item_selected
machine.cancel() # 取消操作,返回等待状态...
machine.get_state() # 当前状态: waiting
基础API
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
recv
是用于从套接字接收数据的系统调用。它从指定的套接字 m_sockfd
中读取数据并将其存储在缓冲区中。
sockfd
:表示用于通信的套接字。
buf
:指向用于存储接收数据的缓冲区。
len
:指定缓冲区的大小,表示可以接收的最大数据长度。
flags
:用于指定接收数据的行为,0
表示使用默认接收行为。
char *strpbrk(const char *s, const char *accept);
strpbrk
是一个在 C 语言标准库中定义的函数,它用于搜索字符串中任意字符的首次出现。这个函数的名称来源于 “string position break”,即 “字符串位置断点”。
-
s
:指向需要搜索的字符串的指针。 -
accept
:指向包含允许匹配的字符集合的字符串的指针。 -
如果找到了这样的字符,函数返回一个指针,指向在
s
中第一次出现的一个字符,该字符也出现在accept
中。 -
如果没有找到匹配的字符,则返回
NULL
。
int strcasecmp(const char *s1, const char *s2);
strcasecmp
是一个在 C 语言标准库中定义的函数,用于比较两个字符串,不区分大小写。这个函数在处理字符串时非常有用,尤其是当你需要忽略字母大小写差异进行比较时。如果发现s1分别小于、匹配或大于s2,则返回一个小于、等于或大于0的整数。
unsigned char strncasecmp(const char *s1, const char *s2, int n)
相比之下则多了个参数n,表示比较参数s1和s2字符串前n个字符。
size_t strspn(const char *s, const char *accept);
strspn
是一个在 C 语言标准库中定义的函数,用于计算字符串中第一个不允许的字符的位置。即该函数返回 str1 中第一个不在字符串 str2 中出现的字符下标。
例子:
const char str1[] = "ABCDEFG019874";
const char str2[] = "ABCD";
len = strspn(str1, str2); // 4
#include <sys/uio.h>
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
fd
:文件描述符,指明了要写入数据的目标。iov
:指向iovec
结构数组的指针,每个iovec
结构包含一个指向数据缓冲区的指针和该缓冲区的长度。iovcnt
:iovec
数组中的元素数量,即要写入的内存区域的数量。
writev
函数将 iov
数组中的所有缓冲区连续写入文件描述符 fd
。这个函数特别适合于需要将多个分散的数据块一次性写入的情况,可以减少系统调用的次数,提高 I/O 效率。
http头文件
根据状态转移,通过主从状态机封装了http连接类。其中,主状态机在内部调用从状态机,从状态机将处理状态和数据传给主状态机
- 客户端发出http连接请求
- 从状态机读取数据,更新自身状态和接收数据,传给主状态机
- 主状态机根据从状态机状态,更新自身状态,决定响应请求还是继续读取
#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <errno.h>
#include <sys/wait.h>
#include <sys/uio.h>
#include <map>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
#include "../timer/lst_timer.h"
#include "../log/log.h"
class http_conn
{
public:
// 设置读取文件的名称m_rea_file大小
static const int FILENAME_LEN = 200;
// 设置读缓冲区m_read_buf大小
static const int READ_BUFFER_SIZE = 2048;
// 设置写缓冲区m_write_buf大小
static const int WRITE_BUFFER_SIZE = 1024;
// 报文的请求方法,本项目只用到GET和Post
enum METHOD
{
GET = 0,
POST,
HEAD,
PUT,
DELETE,
TRACE,
OPTIONS,
CONNECT,
PATH
};
// 主状态机的状态
enum CHECK_STATE
{
CHECK_STATE_REQUESTLINE = 0, // 检查请求行(request line)。请求行是 HTTP 请求的第一行
CHECK_STATE_HEADER, // 检查请求头(headers)
CHECK_STATE_CONTENT // 检查请求体(body)
};
// 报文解析结果(响应码)
enum HTTP_CODE
{
NO_REQUEST, // 当前没有完整的请求可以处理
GET_REQUEST, // HTTP 请求成功解析并完成,且是一个有效的 GET 请求。
BAD_REQUEST, // 请求格式错误,无法解析为有效的 HTTP 请求。
NO_RESOURCE, // 请求的资源不存在。
FORBIDDEN_REQUEST, // 客户端的请求被禁止访问。
FILE_REQUEST, // 请求的文件可以成功提供给客户端。
INTERNAL_ERROR, // 服务器内部错误。
CLOSED_CONNECTION // 客户端已经关闭连接。
};
// 从状态机的状态
enum LINE_STATUS
{
LINE_OK = 0, // 行解析成功
LINE_BAD, // 行解析失败
LINE_OPEN // 行尚未完全读取
};
// 主状态机(CHECK_STATE) 负责 HTTP 请求的整体处理流程,分为请求行、请求头和请求体三个阶段。
// 从状态机(LINE_STATUS) 负责解析每一行的数据,并提供行解析的状态信息。
public:
http_conn() {}
~http_conn() {}
public:
/*初始化套接字地址,函数内部调用私有方法init()*/
void init(int sockfd, const sockaddr_in &addr, char *, int, int, string user, string passwd, string sqlname);
/*关闭http连接*/
void close_conn(bool real_close = true);
/*读取浏览器端发来的全部数据*/
void process();
/*响应报文写入函数*/
bool read_once();
bool write();
sockaddr_in *get_address()
{
return &m_address;
}
/*同步线程初始化数据库读取表*/
void initmysql_result(connection_pool *connPool);
int timer_flag;
int improv;
private:
void init();
/*从m_read_buf读取,并处理请求报文*/
HTTP_CODE process_read();
/*向m_write_buf写入响应报文*/
bool process_write(HTTP_CODE ret);
/*主状态机解析报文中的请求行数据*/
HTTP_CODE parse_request_line(char *text);
/*主状态机解析请求报文中的请求头数据*/
HTTP_CODE parse_headers(char *text);
/*主状态机解析报文中的请求内容*/
HTTP_CODE parse_content(char *text);
/*生成响应报文*/
HTTP_CODE do_request();
/*m_start_line是已解析的字符*/
/*get_line用于将指针往后偏移,指向未处理的字符*/
char *get_line() { return m_read_buf + m_start_line; };
/*从状态机读取一行,然后返回该行的解析状态。*/
LINE_STATUS parse_line();
void unmap();
/*根据响应报文的格式,生成对应的8个部分,以下函数均由do_request调用*/
bool add_response(const char *format, ...);
bool add_content(const char *content);
bool add_status_line(int status, const char *title);
bool add_headers(int content_length);
bool add_content_type();
bool add_content_length(int content_length);
bool add_linger();
bool add_blank_line();
public:
static int m_epollfd;
static int m_user_count;
MYSQL *mysql;
int m_state; //读为0, 写为1
private:
int m_sockfd;
sockaddr_in m_address;
/*存储读取的请求报文*/
char m_read_buf[READ_BUFFER_SIZE];
/*缓冲区中m_read_buf中数据的最后一个字节的下一个位置*/
long m_read_idx;
/*m_read_buf读取的位置m_checked_idx*/
long m_checked_idx;
/*m_read_buf中已经解析的字符个数*/
int m_start_line;
/*存储发送的响应报文数据*/
char m_write_buf[WRITE_BUFFER_SIZE];
/*指示buffer中的长度*/
int m_write_idx;
/*主状态机的状态*/
CHECK_STATE m_check_state;
/*请求方法*/
METHOD m_method;
/*以下为解析请求报文中对应的6个变量*/
char m_real_file[FILENAME_LEN]; // 通常是基于m_url生成的,表示服务器上的资源路径。
char *m_url; // 存储HTTP请求中的请求URL,即客户端要访问的资源路径。
char *m_version; // 存储HTTP请求中的协议版本(如HTTP/1.1)
char *m_host; // 存储HTTP请求头中的主机名(Host)字段,用于指明请求的目标服务器的主机地址或域名。
long m_content_length; // 存储HTTP请求头中的内容长度(Content-Length),表示请求体的大小
bool m_linger; // 用于标记HTTP请求的连接是否保持(即是否使用keep-alive模式)(Connection字段)。
char *m_file_address; //读取服务器上的文件地址
struct stat m_file_stat; // 文件的状态信息结构。stat 是系统调用,用于获取文件的元数据,如文件大小、修改时间、权限等。
struct iovec m_iv[2]; // iovec 结构体数组,用于定义多块数据的内存区域。iovec 是一个用于 writev() 系统调用的数据结构,允许服务器一次发送多块非连续的内存数据。这里 m_iv 数组通常有两个元素,第一个用于存储 HTTP 头部,第二个用于存储文件数据,从而实现头部和文件的合并发送。
int m_iv_count; // 记录 iovec 中有效内存块的数量。
int cgi; //是否启用的POST
char *m_string; //存储请求头数据
int bytes_to_send; //剩余发送的字节
int bytes_have_send; //已发送的字节
char *doc_root; //网站根目录
map<string, string> m_users;
int m_TRIGMode; // m_TRIGMode == 1时epoll为ET触发模式
int m_close_log;
char sql_user[100];
char sql_passwd[100];
char sql_name[100];
};
#endif
http实现文件
#include "http_conn.h"
#include <mysql/mysql.h>
#include <fstream>
//定义http响应的一些状态信息
const char *ok_200_title = "OK";
const char *error_400_title = "Bad Request";
const char *error_400_form = "Your request has bad syntax or is inherently impossible to staisfy.\n";
const char *error_403_title = "Forbidden";
const char *error_403_form = "You do not have permission to get file form this server.\n";
const char *error_404_title = "Not Found";
const char *error_404_form = "The requested file was not found on this server.\n";
const char *error_500_title = "Internal Error";
const char *error_500_form = "There was an unusual problem serving the request file.\n";
locker m_lock;
map<string, string> users;
void http_conn::initmysql_result(connection_pool *connPool)
{
//先从连接池中取一个连接
MYSQL *mysql = NULL;
connectionRAII mysqlcon(&mysql, connPool);
//在user表中检索username,passwd数据,浏览器端输入
if (mysql_query(mysql, "SELECT username,passwd FROM user"))
{
LOG_ERROR("SELECT error:%s\n", mysql_error(mysql));
}
//从表中检索完整的结果集
MYSQL_RES *result = mysql_store_result(mysql);
//返回结果集中的列数
int num_fields = mysql_num_fields(result);
//返回所有字段结构的数组
MYSQL_FIELD *fields = mysql_fetch_fields(result);
//从结果集中获取下一行,将对应的用户名和密码,存入map中
while (MYSQL_ROW row = mysql_fetch_row(result))
{
string temp1(row[0]);
string temp2(row[1]);
users[temp1] = temp2;
}
}
//对文件描述符设置非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT (这里为什么跟utils类中的addfd一样?待填坑)
void addfd(int epollfd, int fd, bool one_shot, int TRIGMode)
{
epoll_event event;
event.data.fd = fd;
if (1 == TRIGMode)
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
else
event.events = EPOLLIN | EPOLLRDHUP;
if (one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
//从内核时间表删除描述符
void removefd(int epollfd, int fd)
{
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
//将事件重置为EPOLLONESHOT(oneshot:当文件描述符上的一个事件被触发后,该文件描述符会被自动从 epoll 事件列表中移除。)
void modfd(int epollfd, int fd, int ev, int TRIGMode)
{
epoll_event event;
event.data.fd = fd;
if (1 == TRIGMode)
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
else
event.events = ev | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
int http_conn::m_user_count = 0;
int http_conn::m_epollfd = -1;
//关闭连接,关闭一个连接,客户总量减一
void http_conn::close_conn(bool real_close)
{
if (real_close && (m_sockfd != -1))
{
printf("close %d\n", m_sockfd);
removefd(m_epollfd, m_sockfd);
m_sockfd = -1;
m_user_count--;
}
}
//初始化连接,外部调用初始化套接字地址
void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, int TRIGMode,
int close_log, string user, string passwd, string sqlname)
{
m_sockfd = sockfd;
m_address = addr;
// 将 sockfd 注册到 epoll 实例 m_epollfd 中,并监听该文件描述符的事件。
addfd(m_epollfd, sockfd, true, m_TRIGMode);
m_user_count++; // 当前连接的客户端数量+1
//当浏览器出现连接重置时,可能是网站根目录出错或http响应格式出错或者访问的文件中内容完全为空
doc_root = root;
m_TRIGMode = TRIGMode;
m_close_log = close_log;
strcpy(sql_user, user.c_str());
strcpy(sql_passwd, passwd.c_str());
strcpy(sql_name, sqlname.c_str());
init();
}
//初始化新接受的连接
//check_state默认为分析请求行状态
void http_conn::init()
{
mysql = NULL;
bytes_to_send = 0;
bytes_have_send = 0;
m_check_state = CHECK_STATE_REQUESTLINE;
m_linger = false;
m_method = GET;
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
cgi = 0;
m_state = 0;
timer_flag = 0;
improv = 0;
memset(m_read_buf, '\0', READ_BUFFER_SIZE);
memset(m_write_buf, '\0', WRITE_BUFFER_SIZE);
memset(m_real_file, '\0', FILENAME_LEN);
}
//从状态机,用于分析出一行内容
//返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN
http_conn::LINE_STATUS http_conn::parse_line()
{
char temp;
for (; m_checked_idx < m_read_idx; ++m_checked_idx)
{
temp = m_read_buf[m_checked_idx];
if (temp == '\r')
{
if ((m_checked_idx + 1) == m_read_idx)
return LINE_OPEN; // 尚未读取完毕
else if (m_read_buf[m_checked_idx + 1] == '\n') // 以\r\n结尾,读取完毕
{
m_read_buf[m_checked_idx++] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
else if (temp == '\n')
{
if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r') //和上面一样,检测\r\n
{
m_read_buf[m_checked_idx - 1] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;
}
//循环读取客户数据,直到无数据可读或对方关闭连接
//非阻塞ET工作模式下,需要一次性将数据读完
bool http_conn::read_once()
{
if (m_read_idx >= READ_BUFFER_SIZE)
{
return false;
}
int bytes_read = 0;
//LT读取数据,在LT模式下,epoll_wait会无数次地通知应用程序读事件的发生,直到应用程序(即read_once()代码)去取。
if (0 == m_TRIGMode)
{
// m_read_idx 是当前已读取数据的位置偏移量
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0); //从m_sockfd接受数据的系统调用
m_read_idx += bytes_read;
if (bytes_read <= 0)
{
return false;
}
return true;
}
//ET读数据,在ET模式下,epoll_wait只会通知应用程序一次,应用程序被要求在这一次就把sockfd中全部的数据取出,即read_once
else
{
while (true)
{
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
return false;
}
else if (bytes_read == 0)
{
return false;
}
m_read_idx += bytes_read;
}
return true;
}
}
// 主状态机,用于分析出一行的内容
//解析http请求行,获得请求方法,目标url及http版本号
http_conn::HTTP_CODE http_conn::parse_request_line(char *text)
{
m_url = strpbrk(text, " \t"); //将找到的第一个空格或者水平制表符返回
if (!m_url)
{
return BAD_REQUEST;
}
*m_url++ = '\0'; // 截断方法字段,并指向URL字段
char *method = text;
if (strcasecmp(method, "GET") == 0)
m_method = GET;
else if (strcasecmp(method, "POST") == 0)
{
m_method = POST;
cgi = 1; // 是否启动cgi的标志位
}
else
return BAD_REQUEST;
m_url += strspn(m_url, " \t"); // 保险起见,跳过中间的空格部分,然后指向url的第一个字符
m_version = strpbrk(m_url, " \t"); // 指向URL的下一个空格字段
if (!m_version)
return BAD_REQUEST;
*m_version++ = '\0'; // 截断url部分
m_version += strspn(m_version, " \t"); // 保险起见,跳过中间的空格部分,指向版本号第一个字符
if (strcasecmp(m_version, "HTTP/1.1") != 0)
return BAD_REQUEST;
if (strncasecmp(m_url, "http://", 7) == 0)
{
m_url += 7;
m_url = strchr(m_url, '/'); // m_url转到第一个指向'/'的位置
}
if (strncasecmp(m_url, "https://", 8) == 0)
{
m_url += 8;
m_url = strchr(m_url, '/');
}
if (!m_url || m_url[0] != '/')
return BAD_REQUEST;
//当url为/时,显示判断界面
if (strlen(m_url) == 1)
strcat(m_url, "judge.html");
m_check_state = CHECK_STATE_HEADER;
return NO_REQUEST;
}
// 主状态机,解析http请求的一个头部信息
http_conn::HTTP_CODE http_conn::parse_headers(char *text)
{
if (text[0] == '\0') // 判断是空行还是请求头(请求头和请求体之间隔着\r\n,我们上面处理是转化为\0了)
{
if (m_content_length != 0) // 请求中有消息体。
{
m_check_state = CHECK_STATE_CONTENT;
return NO_REQUEST;
}
return GET_REQUEST; // GET请求一般不包含消息体。在GET请求中,数据是通过URL中的查询字符串(即?key=value的形式)传递的,而不是在请求体中传递的。
}
else if (strncasecmp(text, "Connection:", 11) == 0)
{
text += 11;
text += strspn(text, " \t");
if (strcasecmp(text, "keep-alive") == 0)
{
m_linger = true;
}
}
else if (strncasecmp(text, "Content-length:", 15) == 0)
{
text += 15;
text += strspn(text, " \t");
m_content_length = atol(text); // 将字符串 str 转换为一个 long int 类型的数值
}
else if (strncasecmp(text, "Host:", 5) == 0)
{
text += 5;
text += strspn(text, " \t");
m_host = text;
}
else
{
LOG_INFO("oop!unknow header: %s", text);
}
return NO_REQUEST;
}
// 主状态机,判断http请求是否被完整读入
http_conn::HTTP_CODE http_conn::parse_content(char *text)
{
if (m_read_idx >= (m_content_length + m_checked_idx)) // 检查是否已经读取了足够的数据来完成请求体的处理。
{
text[m_content_length] = '\0';
//POST请求中最后为输入的用户名和密码,实际Web中parse_content的内容要多得多得多。
m_string = text;
return GET_REQUEST;
}
return NO_REQUEST;
}
// 主状态机,整合上面的parse_request_line,parse_headers和parse_content
// 按照不同的状态逐步解析请求,并根据解析结果决定如何处理请求。
http_conn::HTTP_CODE http_conn::process_read()
{
// 初始化主从状态机状态。
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
//while两个判断:1.如果在处理请求体(有多行数据)2.尝试解析下一行,可行就继续
while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK))
{
text = get_line(); // 指向未处理的字符
m_start_line = m_checked_idx;
LOG_INFO("%s", text);
/*主状态机三种状态转移逻辑转移*/
switch (m_check_state)
{
case CHECK_STATE_REQUESTLINE:
{
ret = parse_request_line(text);
if (ret == BAD_REQUEST)
return BAD_REQUEST;
break;
}
case CHECK_STATE_HEADER:
{
ret = parse_headers(text);
if (ret == BAD_REQUEST)
return BAD_REQUEST;
else if (ret == GET_REQUEST)
{
return do_request(); // 完整解析GET请求后,跳转到报文响应函数
}
break;
}
case CHECK_STATE_CONTENT:
{
ret = parse_content(text);
if (ret == GET_REQUEST)
return do_request(); // 完整解析POST请求后,跳转到报文响应函数
line_status = LINE_OPEN;
break;
}
default:
return INTERNAL_ERROR;
}
}
return NO_REQUEST;
}
// 在do_request部分,我们实现对POST请求的响应连接。同时我们在这里实现一个简易的注册登录功能。
http_conn::HTTP_CODE http_conn::do_request()
{
strcpy(m_real_file, doc_root);
int len = strlen(doc_root);
//printf("m_url:%s\n", m_url);
const char *p = strrchr(m_url, '/'); // 找到m_url中/的位置
//处理cgi
if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3'))
{
//根据标志判断是登录检测还是注册检测
char flag = m_url[1]; // 例如:m_url = /2XXXX.html
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/");
strcat(m_url_real, m_url + 2); // 跳过前面的/2, 即复制XXXX.html
strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1);
free(m_url_real);
//将用户名和密码提取出来
//user=123&password=123
char name[100], password[100];
int i;
for (i = 5; m_string[i] != '&'; ++i) // user=占五个字符
name[i - 5] = m_string[i];
name[i - 5] = '\0';
int j = 0;
for (i = i + 10; m_string[i] != '\0'; ++i, ++j) // &password=占十个字符
password[j] = m_string[i];
password[j] = '\0';
if (*(p + 1) == '3')
{
//如果是注册,先检测数据库中是否有重名的
//没有重名的,进行增加数据
char *sql_insert = (char *)malloc(sizeof(char) * 200);
strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES(");
strcat(sql_insert, "'");
strcat(sql_insert, name);
strcat(sql_insert, "', '");
strcat(sql_insert, password);
strcat(sql_insert, "')");
if (users.find(name) == users.end())
{
m_lock.lock();
// SQL查询
int res = mysql_query(mysql, sql_insert);
users.insert(pair<string, string>(name, password));
m_lock.unlock();
if (!res)
strcpy(m_url, "/log.html");
else
strcpy(m_url, "/registerError.html");
}
else
strcpy(m_url, "/registerError.html");
}
//如果是登录,直接判断
//若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0
else if (*(p + 1) == '2')
{
if (users.find(name) != users.end() && users[name] == password)
strcpy(m_url, "/welcome.html");
else
strcpy(m_url, "/logError.html");
}
}
if (*(p + 1) == '0')
{
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/register.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '1')
{
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/log.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '5')
{
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/picture.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '6')
{
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/video.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else if (*(p + 1) == '7')
{
char *m_url_real = (char *)malloc(sizeof(char) * 200);
strcpy(m_url_real, "/fans.html");
strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
free(m_url_real);
}
else
strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1);
/*下面条件确保请求的文件存在,并且对其他人可读,同时确保请求的不是一个目录*/
if (stat(m_real_file, &m_file_stat) < 0) // stat函数是 Unix/Linux 系统调用,用于获取文件或目录的状态信息。
return NO_RESOURCE;
if (!(m_file_stat.st_mode & S_IROTH)) // S_IROTH 是一个宏,用于检查其他人(others)是否具有读取文件的权限。
return FORBIDDEN_REQUEST;
if (S_ISDIR(m_file_stat.st_mode)) // S_ISDIR(m_file_stat.st_mode) 用于检查给定的文件模式是否表示一个目录。
return BAD_REQUEST;
/*以只读方式获取文件描述符,通过mmap将该文件描述符映射到内存*/
int fd = open(m_real_file, O_RDONLY);
// mmap() 函数用于将文件或其他对象映射到内存中。这允许程序以访问内存的方式直接访问文件内容,这通常比传统的读写方法(如 read() 和 write())更高效。
// 0 表示映射区域的起始地址由系统选择。
// m_file_stat.st_size 是之前通过 stat() 函数获取的文件大小,表示映射区域的长度。
// PROT_READ 表示映射区域只允许读取操作。
// MAP_PRIVATE 表示创建一个写时复制的私有映射。对映射区域的修改不会影响到原文件,且其他进程看不到这些修改。
// fd 是文件描述符,表示要映射的文件。
// m_file_address 是一个字符指针,用于存储映射区域的地址。如果 mmap() 调用成功,它将指向文件内容在内存中的地址;如果失败,则为 MAP_FAILED。
m_file_address = (char *)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
close(fd);
return FILE_REQUEST;
}
// 解除映射
void http_conn::unmap()
{
if (m_file_address)
{
munmap(m_file_address, m_file_stat.st_size);
m_file_address = 0;
}
}
/*
服务器通过 epoll 监听客户端连接的状态。通常会在两个状态之间切换:
可读状态(EPOLLIN):客户端发送请求,服务器读取数据。
可写状态(EPOLLOUT):服务器生成响应,发送数据给客户端。
*/
// 在写缓冲区写满要发送的数据后,我们最后调用write将其发送给浏览器. 每次调用 writev 时可能无法发送所有数据,代码通过这段逻辑来更新缓冲区的起始位置和剩余长度,确保在下次发送时继续从未发送的数据部分开始,避免重复发送已经传输过的部分。
bool http_conn::write()
{
int temp = 0;
// 无数据发送
if (bytes_to_send == 0)
{
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); // 服务器切换回监听客户端的 可读事件(EPOLLIN),以便接收客户端的下一次请求。
init();
return true;
}
while (1)
{
// 将响应报文状态行,消息头,空行和响应正文发给浏览器端
temp = writev(m_sockfd, m_iv, m_iv_count);
if (temp < 0) //写失败,错误详情可以通过 errno 获得
{
if (errno == EAGAIN) // 缓冲区写满了
{
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode); // EAGAIN 错误表示此时发送缓冲区已满,不能继续发送,需要等待缓冲区有空间可用。因此服务器暂时切换到 可写模式(EPOLLOUT),等待缓冲区有空间时继续发送数据。
return true;
}
unmap();
return false;
}
bytes_have_send += temp; // 更新已经发送的数据量。
bytes_to_send -= temp; // 更新还剩余需要发送的数据量。
// 如果已发送的数据量超过了响应头部的长度,说明头部已经完全发送完毕
if (bytes_have_send >= m_iv[0].iov_len)
{
m_iv[0].iov_len = 0;
m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
m_iv[1].iov_len = bytes_to_send;
}
else
{
m_iv[0].iov_base = m_write_buf + bytes_have_send;
m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;
}
if (bytes_to_send <= 0)
{
unmap();
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); // 重新监听客户端的 可读事件,为下一个请求做准备。
if (m_linger) // 保持长连接
{
init();
return true;
}
else
{
return false;
}
}
}
}
bool http_conn::add_response(const char *format, ...)
{
if (m_write_idx >= WRITE_BUFFER_SIZE)
return false;
va_list arg_list;
va_start(arg_list, format);
/*将数据format从可变参数列表写入缓冲区,返回写入数据长度*/
int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list);
// 若写入的数据长度比缓冲区剩下的空间大,则报错
if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx))
{
va_end(arg_list);
return false;
}
m_write_idx += len; // 更新m_write_idx位置
va_end(arg_list);
LOG_INFO("request:%s", m_write_buf);
return true;
}
// 下面函数都是调用add_response的、
bool http_conn::add_status_line(int status, const char *title)
{
return add_response("%s %d %s\r\n", "HTTP/1.1", status, title);
}
bool http_conn::add_headers(int content_len)
{
return add_content_length(content_len) && add_linger() &&
add_blank_line();
}
bool http_conn::add_content_length(int content_len)
{
return add_response("Content-Length:%d\r\n", content_len);
}
bool http_conn::add_content_type()
{
return add_response("Content-Type:%s\r\n", "text/html");
}
bool http_conn::add_linger()
{
return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close");
}
bool http_conn::add_blank_line()
{
return add_response("%s", "\r\n");
}
bool http_conn::add_content(const char *content)
{
return add_response("%s", content);
}
// 根据不同状态给浏览器返回数据
bool http_conn::process_write(HTTP_CODE ret)
{
switch (ret)
{
case INTERNAL_ERROR:
{
add_status_line(500, error_500_title);
add_headers(strlen(error_500_form));
if (!add_content(error_500_form))
return false;
break;
}
case BAD_REQUEST:
{
add_status_line(404, error_404_title);
add_headers(strlen(error_404_form));
if (!add_content(error_404_form))
return false;
break;
}
case FORBIDDEN_REQUEST:
{
add_status_line(403, error_403_title);
add_headers(strlen(error_403_form));
if (!add_content(error_403_form))
return false;
break;
}
case FILE_REQUEST: // 成功
{
add_status_line(200, ok_200_title);
if (m_file_stat.st_size != 0)
{
add_headers(m_file_stat.st_size);
m_iv[0].iov_base = m_write_buf; // http头部
m_iv[0].iov_len = m_write_idx;
m_iv[1].iov_base = m_file_address; // 文件内容
m_iv[1].iov_len = m_file_stat.st_size;
m_iv_count = 2;
bytes_to_send = m_write_idx + m_file_stat.st_size;
return true;
}
else
{
const char *ok_string = "<html><body></body></html>";
add_headers(strlen(ok_string));
if (!add_content(ok_string))
return false;
}
}
default:
return false;
}
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv_count = 1;
bytes_to_send = m_write_idx;
return true;
}
// http最为核心的函数
void http_conn::process()
{
HTTP_CODE read_ret = process_read();
if (read_ret == NO_REQUEST)
{
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); // 监听客户端可读事件
return;
}
bool write_ret = process_write(read_ret);
if (!write_ret)
{
close_conn();
}
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode); // 监听可写事件
}
问题
在parse_line
中,为什么\r\n要检测两次?
在处理网络数据时,数据不一定是一次性到达的。可能读取到 \r
后,下一次数据到达时才会读取到 \n
。因此代码在多次调用 parse_line
时,仍然需要重新检查是否完整读取了 \r\n
。
get和post请求?
GET
主要用于请求资源,数据在 URL 中传递,适合简单的查询操作。
POST
用于提交数据,数据在请求体中传递,适合传输大量信息或修改服务器数据的操作。
webserver类
基础API
int socket(int domain, int type, int protocol);
参数
domain
:指定通信协议的域。最常见的是AF_INET
(IPv4 Internet 协议)和AF_INET6
(IPv6 Internet 协议)。type
:指定套接字的类型。常见的类型包括SOCK_STREAM
(提供序列化的、可靠的、双向连接的字节流,基于 TCP)和SOCK_DGRAM
(提供数据报文服务,基于 UDP)。protocol
:指定使用的特定协议。通常可以设置为 0,让系统选择一个默认协议。对于SOCK_STREAM
,通常使用IPPROTO_TCP
;对于SOCK_DGRAM
,通常使用IPPROTO_UDP
。
返回值
- 成功时,
socket
函数返回一个非负整数,即套接字描述符(socket descriptor),用于后续的网络操作。 - 失败时,返回
-1
,并设置全局变量errno
以指示具体错误。
struct linger {
int l_onoff; // 启用或禁用 linger 选项
int l_linger; // 等待时间(秒)
};
在 C 或 C++ 语言中,struct linger
是一个用于设置套接字选项的结构体,它定义了当套接字关闭时的行为。这个结构体与 setsockopt
函数一起使用,可以控制套接字在关闭时是否立即返回,或者等待所有数据被发送完毕。struct linger
结构体通常在 <sys/socket.h>
头文件中定义。
- l_onoff:这个字段是一个标志位,用于启用或禁用 linger 选项。
- 当设置为
0
时,禁用 linger 选项,套接字在关闭时会立即返回。 - 当设置为
1
时,启用 linger 选项,套接字在关闭时会等待一定时间,以确保所有数据被发送完毕。
- 当设置为
- l_linger:这个字段指定了当启用 linger 选项时,套接字在关闭前等待未发送数据发送完毕的最长时间(秒)。如果
l_onoff
为0
,则这个字段被忽略。
举例:
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("Failed to create socket");
return 1;
}
struct linger ling = {1, 5}; // 启用 linger,等待5秒
if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) < 0) {
perror("Failed to set linger option");
close(sockfd);
return 1;
}
// 使用套接字进行通信...
close(sockfd);
return 0;
}
在这个示例中,我们创建了一个 TCP 套接字,然后使用 setsockopt
函数设置 linger 选项,使得套接字在关闭时等待5秒,以确保所有数据被发送完毕。这对于确保数据完整性和可靠性非常重要,特别是在网络通信中。
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
setsockopt
是一个在 socket 编程中用来设置套接字选项的函数。这个函数可以配置各种套接字的行为特性,比如设置超时时间、启用或禁用某些功能等。
sockfd
:套接字文件描述符,是一个非负整数,用于标识一个打开的套接字。level
:指定选项的协议级别。SOL_SOCKET
表示选项是在套接字级别上。optname
:要设置的选项名称。SO_LINGER
是一个特定的选项,用于控制套接字在关闭时的行为。optval
:指向包含选项值的变量的指针。在这个例子中,它指向一个struct linger
结构。optlen
:optval
指向的变量的大小。
头文件
#ifndef WEBSERVER_H
#define WEBSERVER_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>
#include "./threadpool/threadpool.h"
#include "./http/http_conn.h"
const int MAX_FD = 65536; //最大文件描述符
const int MAX_EVENT_NUMBER = 10000; //最大事件数
const int TIMESLOT = 5; //最小超时单位
class WebServer
{
public:
WebServer();
~WebServer();
void init(int port , string user, string passWord, string databaseName,
int log_write , int opt_linger, int trigmode, int sql_num,
int thread_num, int close_log, int actor_model);
void thread_pool();
void sql_pool();
void log_write();
void trig_mode();
void eventListen();
void eventLoop();
void timer(int connfd, struct sockaddr_in client_address);
void adjust_timer(util_timer *timer);
void deal_timer(util_timer *timer, int sockfd);
bool dealclientdata();
bool dealwithsignal(bool& timeout, bool& stop_server);
void dealwithread(int sockfd);
void dealwithwrite(int sockfd);
public:
//基础
int m_port;
char *m_root;
int m_log_write;
int m_close_log;
int m_actormodel;
int m_pipefd[2];
int m_epollfd;
http_conn *users;
//数据库相关
connection_pool *m_connPool;
string m_user; //登陆数据库用户名
string m_passWord; //登陆数据库密码
string m_databaseName; //使用数据库名
int m_sql_num;
//线程池相关
threadpool<http_conn> *m_pool;
int m_thread_num;
//epoll_event相关
epoll_event events[MAX_EVENT_NUMBER];
int m_listenfd;
int m_OPT_LINGER;
int m_TRIGMode;
int m_LISTENTrigmode;
int m_CONNTrigmode;
//定时器相关
client_data *users_timer;
Utils utils;
};
#endif
实现
#include "webserver.h"
WebServer::WebServer()
{
//http_conn类对象
users = new http_conn[MAX_FD];
//root文件夹路径
char server_path[200];
getcwd(server_path, 200); //获得当前工作目录,200是存放这个目录的缓冲区的大小
char root[6] = "/root"; // 注意:数组长度要把结束符'\0'算在其内
m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1);
strcpy(m_root, server_path);
strcat(m_root, root);
//定时器
users_timer = new client_data[MAX_FD];
}
WebServer::~WebServer()
{
close(m_epollfd);
close(m_listenfd);
close(m_pipefd[1]);
close(m_pipefd[0]);
delete[] users;
delete[] users_timer;
delete m_pool;
}
void WebServer::init(int port, string user, string passWord, string databaseName, int log_write,
int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model)
{
m_port = port;
m_user = user;
m_passWord = passWord;
m_databaseName = databaseName;
m_sql_num = sql_num;
m_thread_num = thread_num;
m_log_write = log_write;
m_OPT_LINGER = opt_linger;
m_TRIGMode = trigmode;
m_close_log = close_log;
m_actormodel = actor_model;
}
void WebServer::trig_mode()
{
//LT + LT
if (0 == m_TRIGMode)
{
m_LISTENTrigmode = 0;
m_CONNTrigmode = 0;
}
//LT + ET
else if (1 == m_TRIGMode)
{
m_LISTENTrigmode = 0;
m_CONNTrigmode = 1;
}
//ET + LT
else if (2 == m_TRIGMode)
{
m_LISTENTrigmode = 1;
m_CONNTrigmode = 0;
}
//ET + ET
else if (3 == m_TRIGMode)
{
m_LISTENTrigmode = 1;
m_CONNTrigmode = 1;
}
}
// 写日志
void WebServer::log_write()
{
if (0 == m_close_log)
{
//初始化日志
if (1 == m_log_write) // 异步写,800表示阻塞队列长度
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800);
else // 同步写
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0);
}
}
void WebServer::sql_pool()
{
//初始化数据库连接池
m_connPool = connection_pool::GetInstance();
m_connPool->init("localhost", m_user, m_passWord, m_databaseName, 3306, m_sql_num, m_close_log);
//初始化数据库读取表
users->initmysql_result(m_connPool); // 等同:users[0]->initmysql_result(m_connPool);
}
void WebServer::thread_pool()
{
//线程池
m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);
}
// 相比于正常的创建listenfd、绑定、监听流程,我们还需要添加对信号事件的监听,将其一起放入epoll内核事件注册表中,这种思想称之为统一事件源
void WebServer::eventListen()
{
//网络编程基础步骤
// 通过 socket() 函数创建了一个基于 IPv4 协议(PF_INET)和 TCP 协议(SOCK_STREAM)的监听套接字 m_listenfd,用于接受客户端的连接。
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(m_listenfd >= 0);
//优雅关闭连接(通过 setsockopt() 配置 SO_LINGER 选项,决定是否在关闭套接字时立刻终止还是等待剩余数据发送完)
if (0 == m_OPT_LINGER)
{ // 使用 {0, 1} 表示关闭连接时,套接字将直接关闭,不会等到发送缓冲区的数据发送完毕。
struct linger tmp = {0, 1};
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
else if (1 == m_OPT_LINGER)
{ // 使用 {1, 1},表示在关闭时会等待1秒,确保缓冲区数据全部发送完。
struct linger tmp = {1, 1};
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address)); // 将 address 结构体清零,然后设置协议族(IPv4 AF_INET)、监听地址(任意地址 INADDR_ANY),以及端口号(m_port), 现在推荐使用memset
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(m_port);
// htonl(INADDR_ANY):将 INADDR_ANY 常量转换为网络字节顺序。INADDR_ANY 用于指定服务器绑定时监听所有可用接口的 IP 地址(即,0.0.0.0)。htonl 确保这个地址以正确的字节顺序发送到网络上。
// htons(m_port):将端口号 m_port 转换为网络字节顺序。端口号用于区分同一台主机上运行的不同服务。
// 这样做:可以让服务器监听所有客户端发送到m_port的信息
int flag = 1;
setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); // SO_REUSEADDR是一个很有用的选项,一般服务器的监听socket都应该打开它。它的大意是允许服务器bind一个地址,即使这个地址当前已经存在已建立的连接
ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address)); // 将创建的套接字绑定到指定的IP地址和端口
assert(ret >= 0);
ret = listen(m_listenfd, 5); // listen():启动监听模式,backlog 参数设置为5,表示允许最多5个待处理的连接。
assert(ret >= 0);
utils.init(TIMESLOT); // 初始化定时器,定时触发事件,TIMESLOT 是定时的时间间隔。
//epoll创建内核事件表
// epoll_event events[MAX_EVENT_NUMBER]; 这里应该是作者写错了,本来类中就有events了
m_epollfd = epoll_create(5); // 使用 epoll_create() 创建一个 epoll 文件描述符,用于管理所有的事件。其中5用来建议内核预分配足够的资源以处理大约 size 个文件描述符。这是一个性能优化的提示,而不是严格的限制。
assert(m_epollfd != -1);
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode); // 将监听套接字 m_listenfd 添加到 epoll实例m_epollfd 中,通过 addfd() 函数注册为读事件,m_LISTENTrigmode 决定监听套接字的触发模式
http_conn::m_epollfd = m_epollfd;
// 创建一个双向管道,m_pipefd[0] 和 m_pipefd[1] 是管道的两个端,用于传递信号。
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
assert(ret != -1);
// 将管道的写端 m_pipefd[1] 设置为非阻塞模式,并将读端 m_pipefd[0] 注册到 epoll 事件表中,用于处理来自信号的通知。
utils.setnonblocking(m_pipefd[1]);
utils.addfd(m_epollfd, m_pipefd[0], false, 0);
utils.addsig(SIGPIPE, SIG_IGN); // SIGPIPE 是一个信号,通常在进程尝试写入一个已被关闭的管道时发送。 SIG_IGN 是一个宏,指示信号应该被忽略。
utils.addsig(SIGALRM, utils.sig_handler, false); // 是一个信号,通常用于定时器功能
utils.addsig(SIGTERM, utils.sig_handler, false); // 是一个信号,通常用于请求程序自己终止
alarm(TIMESLOT);
//工具类,信号和描述符基础操作
Utils::u_pipefd = m_pipefd;
Utils::u_epollfd = m_epollfd;
}
// 注意:只调用 listen() 而没有使用其他事件检测手段,程序是被动等待状态,无法得知什么时候有新的连接到达。后面要接accept()或者epoll这种真正“监听”这些连接请求并进行处理。
void WebServer::timer(int connfd, struct sockaddr_in client_address)
{
users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
//初始化client_data数据
//创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
users_timer[connfd].address = client_address;
users_timer[connfd].sockfd = connfd;
util_timer *timer = new util_timer;
timer->user_data = &users_timer[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users_timer[connfd].timer = timer;
utils.m_timer_lst.add_timer(timer);
}
//若有数据传输,则将定时器往后延迟3个单位
//并对新的定时器在链表上的位置进行调整
void WebServer::adjust_timer(util_timer *timer)
{
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
utils.m_timer_lst.adjust_timer(timer);
LOG_INFO("%s", "adjust timer once");
}
void WebServer::deal_timer(util_timer *timer, int sockfd)
{
timer->cb_func(&users_timer[sockfd]);
if (timer)
{
utils.m_timer_lst.del_timer(timer);
}
LOG_INFO("close fd %d", users_timer[sockfd].sockfd);
}
// 处理新连接
bool WebServer::dealclientdata()
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
if (0 == m_LISTENTrigmode) // 水平触发模式:此模式下,服务器只需处理一次 accept() 调用,处理一个连接。
{
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
LOG_ERROR("%s:errno is:%d", "accept error", errno);
return false;
}
if (http_conn::m_user_count >= MAX_FD)
{
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
return false;
}
timer(connfd, client_address);
}
else
{ // 边缘触发(ET)模式下,必须循环调用 accept() 以确保处理所有的连接请求,因为信号只触发一次,后续的连接必须在一次处理。也就是说,这个模式下只会在事件发生时通知你一次,之后就不再通知,直到有新的事件产生。
while (1)
{
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
LOG_ERROR("%s:errno is:%d", "accept error", errno);
break;
}
if (http_conn::m_user_count >= MAX_FD)
{
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
break;
}
timer(connfd, client_address);
}
return false;
}
return true;
}
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server)
{
int ret = 0;
int sig;
char signals[1024];
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1)
{
return false;
}
else if (ret == 0)
{
return false;
}
else
{
for (int i = 0; i < ret; ++i)
{
switch (signals[i])
{
case SIGALRM:
{
timeout = true;
break;
}
case SIGTERM:
{
stop_server = true;
break;
}
}
}
}
return true;
}
void WebServer::dealwithread(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer; // 获取与客户相关联的定时器
//reactor:要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生(可读、可写),若有,则立即通知工作线程(逻辑单元),将socket可读可写事件放入请求队列,交给工作线程处理。
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer); // 和客户端进行数据传输,定时器的过期时间调久一点。
}
//若监测到读事件,将该事件放入请求队列(线程池)
m_pool->append(users + sockfd, 0); //读为0, 写为1
while (true) // 主线程交给工作线程后,必须等工作线程弄完才可以跳出循环。同步I/O的体现
{
if (1 == users[sockfd].improv) // 读完后
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd); // 删除定时器机器
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].read_once()) // 由主线程读
{
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若监测到读事件,将该事件放入请求队列
m_pool->append_p(users + sockfd);
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
void WebServer::dealwithwrite(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);
while (true)
{
if (1 == users[sockfd].improv)
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].write())
{
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
// 创建好epoll内核事件注册表,并将读写+信号事件注册到事件表后,我们开始正式的处理。
/*
1.我们调用epoll_wait将内核中的就绪事件取出放入events数组中
2.根据evens数组中的事件类型,我们采用不同的处理方式一个一个处理(循环)
3.当遇到事件是处理非活动连接时,我们将其记录下来,等event[i]数组中的I/O事件全部处理完毕,再删除非活动连接
继续1~3步骤
*/
void WebServer::eventLoop()
{
bool timeout = false; // 是否有超时事件
bool stop_server = false; // 是否停止服务器
while (!stop_server)
{
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); // -1 表示如果没有事件发生,epoll_wait() 将一直阻塞。number 包含准备好的文件描述符的数量.
if (number < 0 && errno != EINTR)
{
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd; // 触发事件描述符的值
//处理新到的客户连接
if (sockfd == m_listenfd) // 监听套接字本身不进行数据传输,它的唯一职责就是监听新的连接请求。因此,sockfd == m_listenfd 意味着是新的客户端连接。
{
bool flag = dealclientdata(); // 根据不同触发方式处理连接
if (false == flag)
continue;
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) // 事件类型是客户端关闭连接、挂起或发生错误
{
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd); // 删除定时器,关闭用户sockfd
}
//处理信号
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN))
{
bool flag = dealwithsignal(timeout, stop_server); // 处理信号,如果是SIGALRM:timeout=true, 如果是SIGTERM: stop_server=true
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN)
{
dealwithread(sockfd);
}
// 客户端的文件描述符上发生了 EPOLLOUT 事件,意味着可以向客户端发送数据。
else if (events[i].events & EPOLLOUT)
{
dealwithwrite(sockfd);
}
}
if (timeout)
{
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
问题
注意套接字的概念:套接字可以理解为通信端点。它抽象了底层的网络协议细节,提供给开发者一个易于使用的接口来进行数据传输。
注意,服务器和客户端连接与通信,使用的不是一个套接字哦!
对于服务端来说:
- 创建一个监听套接字并绑定到特定的 IP 和端口。
- 进入监听状态,等待客户端连接。
- 当有客户端连接请求到来时,调用
accept()
接收连接,这时创建一个新的套接字。 - 使用这个新的套接字与客户端进行数据传输。
具体例子如下:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#define PORT 8080
int main() {
// 创建监听套接字
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
// 绑定
bind(listenfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
// 监听
listen(listenfd, 5);
std::cout << "Server listening on port " << PORT << std::endl;
// 接受连接
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_len); // 这是一个新套接字
std::cout << "Client connected." << std::endl;
// 数据传输
const char* msg = "Hello, Client!";
send(connfd, msg, strlen(msg), 0);
// 关闭套接字
close(connfd); // 关闭连接套接字
close(listenfd); // 关闭监听套接字
return 0;
}
config类
这个类是为了运行服务器时,可以解析可选选项。如-h, -l 等等。
头文件
#ifndef CONFIG_H
#define CONFIG_H
#include "webserver.h"
using namespace std;
class Config
{
public:
Config();
~Config(){};
void parse_arg(int argc, char*argv[]);
//端口号
int PORT;
//日志写入方式
int LOGWrite;
//触发组合模式
int TRIGMode;
//listenfd触发模式
int LISTENTrigmode;
//connfd触发模式
int CONNTrigmode;
//优雅关闭链接
int OPT_LINGER;
//数据库连接池数量
int sql_num;
//线程池内的线程数量
int thread_num;
//是否关闭日志
int close_log;
//并发模型选择
int actor_model;
};
#endif
实现文件
#include "config.h"
Config::Config(){
//端口号,默认9006
PORT = 9006;
//日志写入方式,默认同步
LOGWrite = 0;
//触发组合模式,默认listenfd LT + connfd LT
TRIGMode = 0;
//listenfd触发模式,默认LT
LISTENTrigmode = 0;
//connfd触发模式,默认LT
CONNTrigmode = 0;
//优雅关闭链接,默认不使用
OPT_LINGER = 0;
//数据库连接池数量,默认8
sql_num = 8;
//线程池内的线程数量,默认8
thread_num = 8;
//关闭日志,默认不关闭
close_log = 0;
//并发模型,默认是proactor
actor_model = 0;
}
void Config::parse_arg(int argc, char*argv[]){
int opt;
const char *str = "p:l:m:o:s:t:c:a:"; // 冒号表示参数后面需要带一个参数
while ((opt = getopt(argc, argv, str)) != -1)
{
switch (opt)
{
case 'p':
{
PORT = atoi(optarg); // optarg 是一个全局变量,它在 C 语言的 getopt 函数中使用,用于存储命令行选项的参数值。 类型 char*
break;
}
case 'l':
{
LOGWrite = atoi(optarg);
break;
}
case 'm':
{
TRIGMode = atoi(optarg);
break;
}
case 'o':
{
OPT_LINGER = atoi(optarg);
break;
}
case 's':
{
sql_num = atoi(optarg);
break;
}
case 't':
{
thread_num = atoi(optarg);
break;
}
case 'c':
{
close_log = atoi(optarg);
break;
}
case 'a':
{
actor_model = atoi(optarg);
break;
}
default:
break;
}
}
}
main
#include "config.h"
int main(int argc, char *argv[])
{
//需要修改的数据库信息,登录名,密码,库名
string user = "root";
string passwd = "root";
string databasename = "mydb";
//命令行解析
Config config;
config.parse_arg(argc, argv);
WebServer server;
// 根据config进行初始化
server.init(config.PORT, user, passwd, databasename, config.LOGWrite,
config.OPT_LINGER, config.TRIGMode, config.sql_num, config.thread_num,
config.close_log, config.actor_model);
// 初始化日志模块(单例模式)。 根据close_log判断是否关闭日志,根据LOGWrite判断是否异步写日志。
// 如果是异步模式,需要设置阻塞队列长度。
server.log_write();
// 初始化数据库连接池(单例模式)。用双向链表list管理连接池中的连接。
server.sql_pool();
// 初始化线程池。使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。
server.thread_pool();
// 设置触发模式。
server.trig_mode();
// 监听。主要监听两个地方。
// 1. 用listen将套接字变为监听状态,然后加入到epoll实例中监听。 即监听客户端的请求。
// 2. 监听管道读端(信号处理函数会将信号写在里面),管道加入到epoll实例监听。
server.eventListen();
//运行
server.eventLoop();
return 0;
}