1. 互斥锁
定义锁:
pthread_mutex_t
初始化锁函数:
int pthread_mutex_init(pthread_mutex_t* mutex,pthread_mutexattr_t* attr);
第一个参数是定义的互斥锁的地址,第二个参数是锁的属性,一般传NULL
互斥锁的属性在创建锁的时候指定,
有四个值可供选择:
* PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
* PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
* PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样保证当不允许多次加锁时不出现最简单情况下的死锁
* PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争
也可以使用pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
加锁函数:
int pthread_mutex_lock(pthread_mutex_t* mutex); //加锁失败就会阻塞或
int pthread_mutex_trylock(pthread_mutex_t* mutex); //尝试加锁,成功返回0,失败则会返回错误码,不阻塞
解锁函数:
int pthread_mutex_unlock(pthread_mutex_t* mutex);
销毁锁函数:
int pthread_mutex_destroy(pthread_mutex_t* mutex);
2. 数据库连接池
server
#include "/freecplus/_freecplus.h"
#include "/freecplus/db/oracle/_ooci.h"
/*将socket发来的数据存入数据库*/
pthread_mutex_t mutexs[100]; // 用于数据库连接池的锁。
connection conns[100]; // 数据库连接池。
bool initconns(); // 初始化数据库连接池。
connection* getconn(); // 从连接池中获取一个数据库连接。
void freeconn(connection* in_conn); // 释放数据库连接。
void freeconns(); // 释放数据库连接池。
void* pthmain(void* arg);
CTcpServer TcpServer; // 创建服务端对象。
std::vector<long> vpthid; // 存放线程id的容器。
void mainexit(int sig); // 信号2和15的处理函数。
// 线程清理函数。
void pthmainexit(void* arg);
CLogFile logfile;
int main(int argc, char* argv[])
{
signal(2, mainexit);
signal(15, mainexit); // 捕获信号2和15
logfile.Open("/tmp/serverdb.log", "a+");
if (TcpServer.InitServer(5858) == false) // 初始化TcpServer的通信端口。
{
logfile.Write("TcpServer.InitServer(5858) failed.\n"); return -1;
}
if (initconns() == false) // 初始化数据库连接池。
{
logfile.Write("initconns() failed.\n"); return -1;
}
while (true)
{
if (TcpServer.Accept() == false) // 等待客户端连接。
{
logfile.Write("TcpServer.Accept() failed.\n"); return -1;
}
logfile.Write("客户端(%s)已连接。\n", TcpServer.GetIP());
pthread_t pthid;
if (pthread_create(&pthid, NULL, pthmain, (void*)(long)TcpServer.m_connfd) != 0)
{
logfile.Write("pthread_create failed.\n"); return -1;
}
vpthid.push_back(pthid); // 把线程id保存到vpthid容器中。
}
return 0;
}
void* pthmain(void* arg)
{
pthread_cleanup_push(pthmainexit, arg); // 设置线程清理函数。
pthread_detach(pthread_self()); // 分离线程。
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL); // 设置取消方式为立即取消。
int sockfd = (int)(long)arg; // 与客户端的socket连接。
int ibuflen = 0;
char strbuffer[1024]; // 存放数据的缓冲区。
while (true)
{
memset(strbuffer, 0, sizeof(strbuffer));
if (TcpRead(sockfd, strbuffer, &ibuflen, 300) == false) break; // 接收客户端发过来的请求报文。
logfile.Write("接收:%s\n", strbuffer);
connection* conn = getconn(); // 获取一个数据库连接。
// 处理业务
sleep(2);
freeconn(conn); // 释放一个数据库连接。
strcat(strbuffer, "ok"); // 在客户端的报文后加上"ok"。
logfile.Write("发送:%s\n", strbuffer);
if (TcpWrite(sockfd, strbuffer) == false) break; // 向客户端回应报文。
}
logfile.Write("客户端已断开。\n"); // 程序直接退出,析构函数会释放资源。
pthread_cleanup_pop(1);
pthread_exit(0);
}
// 信号2和15的处理函数。
void mainexit(int sig)
{
logfile.Write("mainexit begin.\n");
// 关闭监听的socket。
TcpServer.CloseListen();
// 取消全部的线程。
for (int ii = 0; ii < vpthid.size(); ii++)
{
logfile.Write("cancel %ld\n", vpthid[ii]);
pthread_cancel(vpthid[ii]);
}
// 释放数据库连接池。
freeconns();
logfile.Write("mainexit end.\n");
exit(0);
}
// 线程清理函数。
void pthmainexit(void* arg)
{
logfile.Write("pthmainexit begin.\n");
// 关闭与客户端的socket。
close((int)(long)arg);
// 从vpthid中删除本线程的id。
for (int ii = 0; ii < vpthid.size(); ii++)
{
if (vpthid[ii] == pthread_self())
{
vpthid.erase(vpthid.begin() + ii);
}
}
logfile.Write("pthmainexit end.\n");
}
// 初始化数据库连接池。
bool initconns()
{
for (int ii = 0; ii < 10; ii++)
{
if (conns[ii].connecttodb("scott/tiger", "Simplified Chinese_China.ZHS16GBK") != 0)
{
logfile.Write("connect database failed.\n%s\n", conns[ii].m_cda.message); return false;
}
}
for (int ii = 0; ii < 10; ii++) pthread_mutex_init(&mutexs[ii], NULL);
return true;
}
// 从连接池中获取一个数据库连接。
connection* getconn()
{
for (int ii = 0; ii < 10; ii++)
{
if (pthread_mutex_trylock(&mutexs[ii]) == 0)
{
logfile.Write("get a conn[%d] ok.\n", ii);
return &conns[ii];
}
}
/*没有获取到连接 新建连接*/
return NULL;
}
// 释放数据库连接。
void freeconn(connection* in_conn)
{
for (int ii = 0; ii < 10; ii++)
{
if (in_conn == &conns[ii]) pthread_mutex_unlock(&mutexs[ii]);
}
}
// 释放数据库连接池。
void freeconns()
{
for (int ii = 0; ii < 10; ii++)
{
conns[ii].disconnect(); pthread_mutex_destroy(&mutexs[ii]);
}
}
client.cpp
#include "_freecplus.h"
int main(int argc, char* argv[])
{
printf("pid=%d\n", getpid());
CTcpClient TcpClient; // 创建客户端的对象。
if (TcpClient.ConnectToServer("172.21.0.3", 5858) == false) // 向服务端发起连接请求。
{
printf("TcpClient.ConnectToServer(\"172.21.0.3\",5858) failed.\n"); return -1;
}
char strbuffer[1024]; // 存放数据的缓冲区。
for (int ii = 0; ii < 50; ii++) // 利用循环,与服务端进行5次交互。
{
memset(strbuffer, 0, sizeof(strbuffer));
snprintf(strbuffer, 50, "(%d)这是第%d个超级女生,编号%03d。", getpid(), ii + 1, ii + 1);
printf("发送:%s\n", strbuffer);
if (TcpClient.Write(strbuffer) == false) break; // 向服务端发送请求报文。
memset(strbuffer, 0, sizeof(strbuffer));
if (TcpClient.Read(strbuffer, 20) == false) break; // 接收服务端的回应报文。
printf("接收:%s\n", strbuffer);
sleep(5);
}
// 程序直接退出,析构函数会释放资源。
}
3. 条件变量
3.1 初始化条件变量
初始化条件变量的方式有两种,一种是直接将 PTHREAD_COND_INITIALIZER 赋值给条件变量,例如:
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;
借助 pthread_cond_init() 函数初始化条件变量,语法格式如下:
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr);
3.2 阻塞当前线程,等待条件成立
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime);
3.3 解除线程的“阻塞”状态
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);
3.4 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_wait发生了什么?
- 释放了互斥锁
- 等待条件
- 条件被触发
- 给互斥锁被加锁
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <stdio.h>
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
void* th1(void* arg)
{
while(true)
{
pthread_mutex_lock(&mut);
pthread_cond_wait(&myCond, &mut);//等待条件
printf("线程一被唤醒\n");
pthread_mutex_unlock(&mut);
}
}
void* th2(void* arg)
{
while(true)
{
pthread_mutex_lock(&mut);
pthread_cond_wait(&myCond, &mut);
printf("线程二被唤醒\n");
pthread_mutex_unlock(&mut);
}
}
void func(int sig)
{
pthread_cond_signal(&myCond);
//pthread_cond_broadcast(&myCond);
}
int main()
{
signal(15, func);
pthread_t t1,t2;
pthread_create(&t1, NULL, th1, NULL);
pthread_create(&t2, NULL, th2, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_cond_destroy(&myCond);
pthread_mutex_destroy(&mut);
return 0;
}
4. 生产者消费者模型
// 本程序演示用互斥锁和条件变量实现高速缓存
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
using namespace std;
int mesgid = 1; //消息的计数器
// 缓存消息的结构体。
struct st_message
{
int mesgid;
char message[1024];
} stmesg;
std::vector<struct st_message> vcache; // 用vector容器做缓存
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 声名并初始化条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 声名并初始化互斥锁
// 消费者、出队线程主函数
void* outcache(void* arg)
{
struct st_message stmesg;
while (true)
{
pthread_mutex_lock(&mutex); // 加锁
// 如果缓存为空,等待
// 条件变量虚假唤醒 用if的话会有问题
while (vcache.size() == 0)
{
pthread_cond_wait(&cond, &mutex);
}
// 从缓存中获取第一条记录,然后删除该记录
memcpy(&stmesg, &vcache[0], sizeof(struct st_message)); // 内存拷贝
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex); // 解锁
// 以下是处理业务的代码
printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
usleep(100);
}
}
// 生产者、把生产的数据存入缓存
//void* incache(void* arg)
void incache(int arg)
{
struct st_message stmesg;
memset(&stmesg, 0, sizeof(struct st_message));
stmesg.mesgid = mesgid++;
pthread_mutex_lock(&mutex); // 加锁。
// 生产数据,放入缓存。
vcache.push_back(stmesg); // 内存拷贝。
pthread_mutex_unlock(&mutex); // 解锁
//pthread_cond_broadcast(&cond); // 触发条件,激活全部的线程。
pthread_cond_signal(&cond);
}
int main()
{
signal(15, incache); //接收15的信号,调用生产者函数。
pthread_t thid1, thid2, thid3, thid4;
pthread_create(&thid1, NULL, outcache, NULL);
pthread_create(&thid2, NULL, outcache, NULL);
pthread_create(&thid3, NULL, outcache, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
pthread_join(thid3, NULL);
return 0;
}
注意
- 条件触发要写在解锁之后
pthread_mutex_unlock(&mutex); // 解锁
pthread_cond_broadcast(&cond); // 触发条件,激活全部的线程。
- 持有锁的时间越短越好
5. 信号量
信号量是一个整数计数器,其数值可以用于标识空闲临界资源的数量。
当有进程释放资源时,信号量增加,表示可用资源数增加;当有进程申请到资源时,信号量减少,表示可用资源数减少;
-
1、sem_init
函数声明:int sem_init(sem_t *sem, int pshared, unsigned int value);
创建信号量,初始化由sem指向的信号对象,设置它的共享选项,并给它一个初始的整数值;pshared控制信号量的类型,如果值为0,就表示这个信号量是当前进程的局部信号量,在当前进程的多个线程之间共享,否则信号量就可以在多个进程之间共享;value为sem的初始值返回值:调用成功时返回0,失败返回-1。
-
2、sem_wait
函数声明:int sem_wait(sem_t *sem);
用于以原子操作的方式将信号量的值减1。sem指向的对象是sem_init调用初始化的信号量返回值:调用成功返回0,失败返回-1
-
3、sem_post
函数声明:int sem_post(sem_t *sem);
用于以原子操作的方式将信号量的值加1。sem指向的对象是sem_init调用初始化的信号量。
返回值:调用成功返回0,失败返回-1 -
4、sem_destroy
函数声明:int sem_destroy(sem_t *sem);
用于对用完的信号量的清理。只有用sem_init初始化的信号量才能用sem_destroy销毁
返回值:调用成功返回0,失败返回-1。
信号量实现生产者消费者模型
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>
#include <semaphore.h>
using namespace std;
int mesgid = 1; // 消息的记数器。
// 缓存消息的结构体。
struct st_message
{
int mesgid;
char message[1024];
} stmesg;
std::vector<struct st_message> vcache; // 用vector容器做缓存。
sem_t sem; // 声明信号量。
pthread_mutex_t mutex; // 声名并初始化互斥锁。
// 消费者、出队线程主函数。
void* outcache(void* arg)
{
struct st_message stmesg;
while (true)
{
while (vcache.size() == 0)
{
sem_wait(&sem); // 如果缓存中没有数据,等待信号。
printf("%ld wait ok.\n", pthread_self());
}
pthread_mutex_lock(&mutex); // 加锁。
if (vcache.size() == 0) // 判断缓存中是否有数据。
{
pthread_mutex_unlock(&mutex);
continue; // 解锁,continue
}
// 从缓存中获取第一条记录,然后删除该记录。
memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex); //解锁
// 以下是处理业务的代码。
printf("phid=%ld,mesgid=%d\n", pthread_self(), stmesg.mesgid);
usleep(100);
}
}
// 生产者、把生产的数据存入缓存。
void incache(int sig)
{
struct st_message stmesg;
memset(&stmesg, 0, sizeof(struct st_message));
pthread_mutex_lock(&mutex); // 加锁。
// 生产数据,放入缓存。
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
stmesg.mesgid = mesgid++; vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex); // 解锁。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
}
int main()
{
signal(15, incache); // 接收15的信号,调用生产者函数。
sem_init(&sem, 0, 0); // 初始化信号量。
pthread_mutex_init(&mutex, NULL); // 初始化互斥锁。
pthread_t thid1, thid2, thid3;
pthread_create(&thid1, NULL, outcache, NULL);
pthread_create(&thid2, NULL, outcache, NULL);
pthread_create(&thid3, NULL, outcache, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
pthread_join(thid3, NULL);
return 0;
}
6. 读写锁
互斥锁只有加锁和不加锁两种状态,同一时间只能有一个线程加锁。
读写锁可以有三种状态:读模式下加锁,写模式下加锁和不加锁
读写锁的特点:
- 如果某线程申请了读锁,其他线程可以再申请读锁,但不能申请写锁
- 如果某线程申请了写锁,其他线程既不能申请读锁,也不能申请写锁
读写锁适合对数据的读比写次数多的情况。
7. 自旋锁
自旋锁是一种基于忙等待的锁,它在等待锁的过程中不会阻塞线程,而是使用循环不停地检查锁是否可用。如果锁被其他线程占用,当前线程就会一直循环等待,直到锁被释放为止。
自旋锁的优点是等待锁的线程不会被阻塞,因此在锁的占用时间很短的情况下,自旋锁的性能比较高。另外,自旋锁一般比互斥锁的实现更加简单,因为它不需要使用系统调用来挂起和恢复线程。
不过自旋锁也有一些缺点。首先,如果锁的占用时间很长,自旋锁会导致线程一直循环等待,浪费 CPU 资源。其次,自旋锁只适用于单核 CPU 或者多核 CPU 上锁的线程数比较少的情况,因为在多核 CPU 上,如果一个线程一直占用锁,其他线程就会一直循环等待,导致 CPU 资源的浪费和性能下降。