前言:在程序设计过程中,难免涉及操作系统、多进程、多线程、数据库等领域。在这些领域内,确保对共享资源的异步操作不出差错,是每个程序开发者必须要考虑的问题。面对并发问题,除了需要谨慎和小心外,引入并发控制策略是解决问题的主要手段,常见的技术有锁、互斥量、信号量、原子操作等。
一、锁(lock)
1.1 概念
锁是一个抽象概念,是一种保护共享资源的同步化技术。锁有两个操作:acquire 和 release。它一次最多只允许一个线程获取,一旦所有权被某一线程获取,其他线程只能等待该线程释放锁才能访问被保护的资源。
1.2 原理
锁的底层实现由硬件的原子指令支持,例如 TAS (Test And Set), CAS (Compare And Swap)。
TAS
TAS 操作向给定的内存地址写入1,并将原始的值返回。如果返回值为0,则更新状态成功成功(可理解为获取锁成功);如果返回值为1,则更新状态失败,说明该共享状态已被其他线程更新。如果使用 TAS 来实现锁,将会是这样:
void Lock(int* lock) {
while (test_and_set(lock) == 1);
}
这是典型的自旋锁(spinlock)实现方式。
CAS
CAS 相比 TAS 步骤较多,但仍是一个原子操作。首先将给定内存地址的当前值与期望值比较,如果二者相等,则将给定的期望值写到内存地址中。
示例代码:
bool compare_and_swap(int* memory, int expect, int new) {
if (*memory == expect) {
*memory = new;
return true;
}
return false;
}
CAS 可以实现互斥锁以及更复杂的自旋锁。
CAS 如何阻止其他线程获取锁?
假设共享状态地址初始值为 0,期望值为 0,new 为当前线程 ID,则当前线程执行 CAS 时,如果状态已被其他线程更新,则 *memory 将为另一线程的 ID,获取锁失败。
事实上,CAS 锁的实现比这复杂得多,其强大之处在于可以根据使用场景自定义上述代码中的 expect 和 new。
内核资料直通车:最新Linux内核源码资料文档+视频资料https://link.zhihu.com/?target=https%3A//docs.qq.com/doc/DTmFTc29xUGdNSnZ2
学习直通车:Linux内核源码/内存调优/文件系统/进程管理/设备驱动/网络协议栈
1.3 锁的类型
根据线程调度方式(底层实现),可以将锁分为互斥锁和自旋锁。
互斥锁 (Mutex Lock)
一个 CPU 上面的多个线程抢占一把锁,抢占失败的失败的线程会被调度器置位 WAIT (睡眠)状态,直到下一次的 CPU 时间片到来时再次抢占。互斥锁适用于加锁时间较长的场景,此时线程上下文切换带来的开销可以被忽略。
自旋锁 (Spin Lock)
线程获取自旋锁失败后,不会放弃 CPU,会一直处于忙等待状态并持续检查锁是否被释放。适用于加锁时间小于线程切换开销的场景。
数据库中常见的两种锁是悲观锁和乐观锁。
悲观锁 (Pessimistic Lock)
悲观锁的工作方式是在修改数据之前,先锁定再修改,完全防止并发写入。该锁对并发操作持悲观态度,总是做最坏的打算,杜绝并发。
乐观锁 (Optimistic Lock)
乐观锁相对悲观锁没有那么严格。乐观锁假设数据一般情况不会造成冲突,即使当多用户同时修改数据后,只有一条被记录。当数据进行提交更新的时候,如果检测到数据不匹配,此时上报错误信息。乐观锁适用于读多写少的场景,这样可以提高程序的吞吐量。
1.4 锁的粒度
决定锁粒度需要考虑以下几个方面:
- 锁的开销:使用锁需要额外的资源,如锁的存储空间、初始化和销毁时间、获取或释放锁的时间。程序使用的锁定越多,与使用相关的开销越多。
- 锁的抢占:每当一个进程或线程尝试获取由另一个进程或线程拥有的锁时,就会发生等待或重新调度的开销。锁的粒度越细,抢占导致的开销越低。数据库中对某条记录加锁要好于对整个表加锁。
- 死锁:锁的粒度越细,程序中需要使用的锁则越多,逻辑也越复杂,导致死锁的风险也越大。
2 和 3 相互制约,决定锁的粒度是一个权衡过程,应视应用场景而定。
1.5 死锁(deadlock)
当一个线程尝试连续两次获取互斥锁,或多个线程中因为多个互斥锁相互依赖而相互等待时,将产生死锁。
死锁轶事:
新冠疫情期间,某城市健康码服务器宕机,急需程序员前去维修。程序员到达园区门口后,由于无法出示健康码而不能进入园区。死锁发生!
死锁发生时的相互等待(如 A 等待 B,B 等待 C,C 等待 A)有个标志性的特性:锁的循环依赖。高并发程序达到一定规模后,检查是否有死锁的可能性发生尤为重要。其中一个手段就是通过有向图检查是否存在锁的循环依赖。
二、互斥量(Mutex)
Mutex 是词组 MUTual EXclusion 的缩写。与锁不同的是,互斥量更接近底层实现(上述原子操作 CAS, TAS);而锁更加抽象,一般是互斥量的封装或管理器。
除 shared mutex 和 recursive mutex 外,一般互斥量特点是一次只能被一个线程获取所有权,锁定后的状态也只能被该线程释放。互斥量一旦被锁定,任何线程尝试获取该互斥量都讲失败或阻塞(取决于 lock 的实现),包括该线程本身。
递归互斥量(recursive_mutex )允许当前线程获取锁后,在释放之前进一步获取锁,但其他线程无法获取该锁。C++ STL 中对应的实现为 std::recursive_mutex。
三、信号量 (Semaphores)
信号量的关键实现是提供一个机制使多个进程或线程访问和修改的某一资源,同时一个进程(线程)在使用资源后可以通知其他所有等待进程(线程)数据可用。信号量一般是实现了两个原子操作(wait() 和 signal())的整型变量。常见的两种类型是计数信号量(Counting Semaphore) 和 二元信号量(Binary Semaphore)。
计数信号量
将信号量值初始化为系统中需要保护的资源数量,当一个进程(线程)申请获取资源时,执行 wait() 操作使信号量值减少 1;释放资源时,执行任务 signal() 使信号量值加 1。当信号量值为 0 时,表示受保护的系统资源已用尽,此时任何进程(线程)尝试获取资源都会阻塞直到信号量值恢复到大于 0 的状态。
二元信号量
二元信号量值只能为 0 和 1,类似互斥锁具有排他性。信号量值只有为 0 时才能被获取。当某一进程(线程)wait() 信号量并进入临界区后,将信号量值设置为 1,离开临界去后将值恢复为 0 并执行 signal()。
四、互斥量(Mutex)与信号量(Semaphore)的区别
都是访问共享资源的同步化技术,Mutex 是锁机制,Semaphore 是信号机制。
Mutex 更能满足原子化要求。多个线程同时竞争锁时,Mutex 无法保证执行的先后顺序,而 Semaphore 可以通过条件变量对执行顺序进行控制。
Mutex 只能被获取到的线程释放,而 Semaphore 可以被其他线程触发信号(signal() 操作)。
当线程尝试释放未被锁定的 Mutex 锁时,将产生未定义的行为,而 Semaphore 则不会。
五、死锁检测代码
// deadock.c
#define _GNU_SOURCE
#include <dlfcn.h>
#include <unistd.h>
#include <stdint.h>
#include "graph.h"
#define THREAD_NUM 10
typedef unsigned long int uint64;
typedef int (*pthread_mutex_lock_t)(pthread_mutex_t *mutex);
pthread_mutex_lock_t pthread_mutex_lock_f;
typedef int (*pthread_mutex_unlock_t)(pthread_mutex_t *mutex);
pthread_mutex_unlock_t pthread_mutex_unlock_f;
int search_for_cycle(int idx)
{
struct vertex *ver = &tg->list[idx];
visited[idx] = 1;
k = 0;
path[k++] = idx;
while (ver->next != NULL) {
int i = 0;
for (i = 0; i < tg->num; i++) {
if (i == idx)
continue;
visited[i] = 0;
}
for (i = 1; i <= MAX; i++) {
path[i] = -1;
}
k = 1;
DFS(search_vertex(ver->next->s));
ver = ver->next;
}
return 0;
}
void check_dead_lock(void)
{
int i = 0;
deadlock = 0;
for (i = 0; i < tg->num; i++) {
if (deadlock == 1)
break;
search_for_cycle(i);
}
if (deadlock == 0) {
printf("no deadlock\n");
}
}
static void *thread_routine(void *args)
{
while (1) {
sleep(5);
check_dead_lock();
}
}
void start_check(void)
{
tg = (struct task_graph *)malloc(sizeof(struct task_graph));
tg->num = 0;
tg->lockidx = 0;
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, NULL);
}
int search_lock(uint64 lock)
{
int i = 0;
for (i = 0; i < tg->lockidx; i++) {
if (tg->locklist[i].lock_id == lock) {
return i;
}
}
return -1;
}
int search_empty_lock(uint64 lock)
{
int i = 0;
for (i = 0; i < tg->lockidx; i++) {
if (tg->locklist[i].lock_id == 0) {
return i;
}
}
return tg->lockidx;
}
int inc(int *value, int add)
{
// 实现原子操作,fatch_add
int old;
__asm__ volatile(
"lock;xaddl %2, %1;"
: "=a"(old)
: "m"(*value), "a"(add)
: "cc", "memory");
return old;
}
void print_locklist(void)
{
int i = 0;
printf("print_locklist: \n");
printf("---------------------\n");
for (i = 0; i < tg->lockidx; i++) {
printf("threadid : %ld, lockid: %ld\n", tg->locklist[i].id, tg->locklist[i].lock_id);
}
printf("---------------------\n\n\n");
}
void lock_before(uint64 thread_id, uint64 lockaddr)
{
int idx = 0;
// list<threadid, toThreadid>
for (idx = 0; idx < tg->lockidx; idx++) {
if ((tg->locklist[idx].lock_id == lockaddr)) {
struct source_type from;
from.id = thread_id;
from.type = PROCESS;
add_vertex(from);
struct source_type to;
to.id = tg->locklist[idx].id;
tg->locklist[idx].degress++;
to.type = PROCESS;
add_vertex(to);
if (!verify_edge(from, to)) {
add_edge(from, to); //
}
}
}
}
void lock_after(uint64 thread_id, uint64 lockaddr)
{
int idx = 0;
if (-1 == (idx = search_lock(lockaddr))) { // lock list opera
int eidx = search_empty_lock(lockaddr);
tg->locklist[eidx].id = thread_id;
tg->locklist[eidx].lock_id = lockaddr;
inc(&tg->lockidx, 1);
}
else {
struct source_type from;
from.id = thread_id;
from.type = PROCESS;
struct source_type to;
to.id = tg->locklist[idx].id;
tg->locklist[idx].degress--;
to.type = PROCESS;
if (verify_edge(from, to))
remove_edge(from, to);
tg->locklist[idx].id = thread_id;
}
}
void unlock_after(uint64 thread_id, uint64 lockaddr)
{
int idx = search_lock(lockaddr);
if (tg->locklist[idx].degress == 0) {
tg->locklist[idx].id = 0;
tg->locklist[idx].lock_id = 0;
}
}
int pthread_mutex_lock(pthread_mutex_t *mutex)
{
pthread_t selfid = pthread_self(); //
lock_before(selfid, (uint64)mutex);
pthread_mutex_lock_f(mutex);
lock_after(selfid, (uint64)mutex);
return 0;
}
int pthread_mutex_unlock(pthread_mutex_t *mutex)
{
pthread_t selfid = pthread_self();
pthread_mutex_unlock_f(mutex);
unlock_after(selfid, (uint64)mutex);
return 0;
}
static int init_hook()
{
// 使用钩子劫持库函数 pthread_mutex_(un)lock
// 在获取和释放锁时,获取锁 ID 和线程 ID
pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock");
pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock");
return 0;
}
int main()
{
// create oriented graph
tg = (struct task_graph *)malloc(sizeof(struct task_graph));
tg->num = 0;
struct source_type v1;
v1.id = 1;
v1.type = PROCESS;
add_vertex(v1);
struct source_type v2;
v2.id = 2;
v2.type = PROCESS;
add_vertex(v2);
struct source_type v3;
v3.id = 3;
v3.type = PROCESS;
add_vertex(v3);
struct source_type v4;
v4.id = 4;
v4.type = PROCESS;
add_vertex(v4);
struct source_type v5;
v5.id = 5;
v5.type = PROCESS;
add_vertex(v5);
add_edge(v1, v2);
add_edge(v2, v3);
add_edge(v3, v4);
add_edge(v4, v5);
add_edge(v3, v1);
search_for_cycle(search_vertex(v1));
}
// output: deadlock : 1 --> 2 --> 3 --> 1