锁
锁就是一个变量
为什么需要锁?::需要原子性的执行一系列的操作指令,程序员在源代码中加锁,放在临界区周围,保证临界区能够像单条原子指令一样执行。
举例说明
锁(通常是一个互斥量mutex)的基本行为。锁是一种同步机制,用于控制多个线程对共享资源的访问。在多线程编程中,临界区是指一段代码,其中的操作必须由单个线程完成,以避免数据竞争或不一致。
解释
-
Lock(): 当一个线程调用
lock()
方法时,它试图获取锁。-
如果锁是可用的(即没有其他线程持有锁),那么这个线程将获得锁,并可以进入临界区执行。
-
如果锁已经被另一个线程持有,那么这个线程将会等待,直到锁被释放。
-
-
Unlock(): 当持有锁的线程完成了它的任务后,它需要调用
unlock()
来释放锁,以便其他等待的线程可以获取它。 -
临界区: 指的是受保护的一段代码或数据区域,只有在拥有锁的情况下才能访问。
例子
假设我们有两个线程A和B,以及一个共享变量counter
,我们希望安全地增加这个计数器,即确保每次只有一个线程可以修改它。
#include <iostream> #include <thread> #include <mutex> std::mutex mtx; // 创建一个互斥锁 int counter = 0; void incrementCounter() { for (int i = 0; i < 1000000; ++i) { mtx.lock(); // 尝试获取锁 ++counter; // 修改共享数据 mtx.unlock(); // 释放锁 } } int main() { std::thread t1(incrementCounter); std::thread t2(incrementCounter); t1.join(); t2.join(); std::cout << "Final counter value: " << counter << std::endl; return 0; }
在这个例子中:
-
mtx
是一个互斥锁对象。 -
incrementCounter
函数会在修改counter
之前锁定mtx
,并在修改之后解锁。 -
在
main
函数中,我们创建了两个线程t1
和t2
,它们都会运行incrementCounter
函数。 -
当其中一个线程(例如线程A)调用
mtx.lock()
时,如果锁是自由的,则线程A获得了锁并开始修改counter
。 -
如果此时线程B也尝试调用
mtx.lock()
,由于锁被线程A持有,线程B将阻塞并等待,直到线程A调用mtx.unlock()
释放锁。 -
一旦锁被释放,另一个线程(如线程B)可以获取锁并进行修改。
通过这种方式,即使有多个线程同时尝试修改counter
,也不会出现数据竞争问题,因为同一时刻只有一个线程能够修改counter
。
POISX库将锁称之为互斥量(mutex)
-
被用来提供线程之间的互斥
评价一个锁的优劣指标
评估一个锁的优劣可以从多个方面来进行,主要包括性能、可伸缩性、易用性、公平性和鲁棒性等方面。以下是评估锁的一些关键指标:
1. 性能
-
吞吐量:锁应该允许尽可能多的工作并发进行,从而提高系统的整体吞吐量。
-
延迟:获取和释放锁所需的时间应该尽可能短,以减少等待时间。
-
争用开销:当多个线程试图获取同一个锁时,锁应尽量减少争用带来的额外开销。
-
上下文切换:锁的设计应该尽量减少因线程挂起和恢复而导致的上下文切换次数。
2. 可伸缩性
-
并发度:锁应该能够在高并发环境下表现良好,尤其是在多核或多处理器系统中。
-
可扩展性:锁应该能够随着系统负载的增加而平滑扩展,而不是成为瓶颈。
-
锁粒度:细粒度锁可以减少争用,但可能会增加锁的管理开销;粗粒度锁则相反。
3. 易用性
-
API 设计:锁的 API 应该简单易用,易于理解和实现。
-
异常处理:锁应该能够处理异常情况,例如死锁检测和避免。
-
自动资源管理:例如,使用 RAII(Resource Acquisition Is Initialization)技术来自动管理锁的生命周期。
4. 公平性
-
公平性:锁可以是公平的或非公平的。公平的锁保证按照请求顺序授予锁,而非公平的锁则可能让某些线程优先获取锁。
-
饥饿:公平的锁可以防止某些线程因持续的竞争而永远无法获取锁,从而避免饥饿现象。
5. 鲁棒性
-
死锁避免:锁应该具有一定的机制来避免死锁的发生,例如通过锁顺序或锁超时。
-
重入性:支持重入的锁允许一个已经持有锁的线程再次获取同一个锁,而不发生死锁。
-
适应性:锁应该能够适应不同的使用场景和需求变化。
6. 其他考虑因素
-
移植性:锁应该能够在不同的操作系统和硬件平台上良好工作。
-
安全性:锁应该能够防止数据竞争和其他并发错误。
-
调试支持:锁应该提供调试支持,例如记录锁的状态和操作历史。
常见的锁的类型
为了更好地理解这些指标,我们可以看看几种常见的锁类型及其特点:
1. 互斥锁(Mutex)
-
性能:一般性能较好,但如果线程频繁争用锁,则可能导致上下文切换。
-
可伸缩性:在高并发环境下可能会成为瓶颈。
-
易用性:API 简单,但容易出错,如忘记解锁或重复解锁。
-
公平性:可以是公平的也可以是非公平的。
-
鲁棒性:可以通过适当的使用来避免死锁,支持重入。
2. 读写锁(Read-Write Lock)
-
性能:读取操作可以并发进行,写入操作独占锁,适合读多写少的场景。
-
可伸缩性:在读多写少的场景下表现更好。
-
易用性:比互斥锁稍微复杂一些。
-
公平性:可以根据具体实现支持公平或非公平。
-
鲁棒性:支持读写分离,有助于避免死锁。
3. 自旋锁(Spin Lock)
自旋锁是一种简单的同步原语,常用于多线程编程中,特别是当锁持有的时间非常短时。自旋锁的主要特点是当一个线程试图获取一个已经被其他线程持有的锁时,它不会进入睡眠状态,而是继续自旋(循环检查锁的状态),直到锁变得可用为止。
自旋锁的特点
-
原子性:自旋锁的操作是原子的,即获取锁和释放锁的操作不可中断。
-
自旋等待:当一个线程未能获取锁时,它会不断地尝试获取锁,直到成功为止。
-
无上下文切换:自旋锁不会导致线程睡眠,因此不会引发上下文切换,这在锁持有的时间非常短的情况下可以提高性能。
-
CPU 占用:如果锁持有的时间较长,自旋锁会导致大量的 CPU 开销,因为它会占用 CPU 时间片不断自旋。
自旋锁的实现
在现代编程语言和库中,自旋锁通常通过原子操作来实现。下面是一个使用 C++11 标准库中的原子操作来实现自旋锁的具体案例。
示例代码
假设我们需要一个自旋锁来保护对一个共享变量的访问,下面是具体的实现:
#include <atomic> #include <thread> #include <iostream> std::atomic_flag spinLock = ATOMIC_FLAG_INIT; void spin_lock() { while (spinLock.test_and_set(std::memory_order_acquire)) { // 自旋等待 } } void spin_unlock() { spinLock.clear(std::memory_order_release); } int shared_counter = 0; void increment_counter() { spin_lock(); shared_counter++; spin_unlock(); } int main() { const int num_threads = 10; std::thread threads[num_threads]; // 创建多个线程来并发地增加共享变量 for (int i = 0; i < num_threads; ++i) { threads[i] = std::thread(increment_counter); } // 等待所有线程完成 for (auto& t : threads) { t.join(); } std::cout << "Final counter value: " << shared_counter << std::endl; return 0; }
代码解释
-
自旋锁的实现:
-
std::atomic_flag
是 C++11 中引入的一个原子标志位,用于实现自旋锁。 -
spin_lock
函数尝试获取锁。如果锁已经被其他线程持有,它会不断自旋等待,直到锁变得可用。 -
spin_unlock
函数释放锁。
-
-
共享变量的保护:
-
shared_counter
是一个共享变量,需要在多线程环境中安全地访问。 -
increment_counter
函数在修改shared_counter
之前获取锁,并在修改之后释放锁。
-
-
多线程测试:
-
在
main
函数中,创建了多个线程来并发地增加shared_counter
。 -
使用
std::thread
创建线程,并在所有线程完成后打印最终的shared_counter
值。
-
性能注意事项
-
自旋锁适合短时间持有:如果锁持有的时间非常短,自旋锁可以避免上下文切换,提高性能。
-
避免长时间持有锁:如果锁持有时间较长,自旋锁会导致大量的 CPU 开销,因为线程会不断自旋,占用 CPU 时间片。
-
自旋锁的适用场景:自旋锁最适合用于多核处理器环境,因为在多核处理器中,不同核心上的线程可以并发执行,减少自旋的开销。
总结
自旋锁是一种简单的同步机制,特别适用于锁持有时间非常短的情况。通过原子操作实现自旋锁可以有效地保护共享资源,并确保多线程环境下的数据一致性。在实际应用中,应根据锁的使用频率和持有时间来决定是否使用自旋锁。
-
性能:在锁持有时间很短的情况下性能很好,因为避免了上下文切换。
-
可伸缩性:在高并发环境下可能消耗大量 CPU 资源。
-
易用性:使用简单,但需要注意避免长时间持有锁。
-
公平性:通常是非公平的。
-
鲁棒性:不适合长时间持有的锁,否则会导致 CPU 资源浪费。
通过这些指标,你可以根据实际应用场景的需求来选择合适的锁机制,并评估其优劣。
比较并交换(Compare and Swap,简称 CAS)是一种原子操作,广泛应用于并发编程中,用于实现无锁或轻量级锁算法。CAS 操作通常用于构建更高级别的同步原语,如自旋锁、无锁队列等。下面我们详细探讨 CAS 的概念、用途以及实现方式,并通过一个具体的例子来说明其用法。
1. 比较并交换(CAS)的定义
比较并交换(CAS)是一种原子操作,它在一个单一的步骤中完成以下三个操作:
-
读取内存位置的当前值。
-
如果该值与预期值相同,则用新值替换当前值。
-
如果该值与预期值不同,则不做任何操作。
这个操作通常表示为一个函数 CAS(&location, expected_value, new_value)
,其中:
-
location
是要操作的内存位置。 -
expected_value
是期望在location
中找到的值。 -
new_value
是要替换expected_value
的新值。
CAS 操作返回一个布尔值,指示操作是否成功。如果操作成功(即 location
中的值与 expected_value
相同),则返回 true
;否则返回 false
。
2. CAS 的用途
CAS 操作主要用于实现无锁算法和轻量级锁机制,例如:
-
无锁数据结构:如无锁队列、栈等。
-
自旋锁:用于实现简单的同步机制,避免线程睡眠。
-
原子更新:在多线程环境中安全地更新共享变量。
3. CAS 的实现方式
在不同的编程语言和硬件架构中,CAS 操作有不同的实现方式。以下是一些常见的实现方式:
3.1 使用 C++11 标准库
在 C++11 中,可以使用 <atomic>
头文件中的 std::atomic
类型来实现 CAS 操作。
#include <atomic> #include <iostream> std::atomic<int> counter(0); bool cas_example() { int expected = 0; int desired = 1; // 尝试将 counter 的值从 0 更新为 1 bool success = counter.compare_exchange_strong(expected, desired); if (!success) { std::cout << "CAS failed, current value: " << counter.load() << std::endl; } else { std::cout << "CAS succeeded" << std::endl; } return success; } int main() { bool result = cas_example(); std::cout << "Result of CAS operation: " << result << std::endl; return 0; }
在这个例子中:
-
std::atomic<int> counter(0)
定义了一个原子整型变量counter
,初始值为 0。 -
compare_exchange_strong
方法尝试将counter
的值从expected
更新为desired
。 -
如果
counter
的当前值与expected
相同,则更新为desired
并返回true
;否则返回false
并保留当前值。
3.2 使用汇编语言
在低级编程中,可以使用特定架构的汇编指令来实现 CAS 操作。例如,在 x86 架构中,可以使用 CMPXCHG
指令。
; 假设 location 是一个内存位置 ; expected 是期望的值 ; new_value 是新的值 cas: mov eax, [expected] ; 把期望值放入 eax 寄存器 mov edx, [new_value] ; 把新值放入 edx 寄存器 lock cmpxchg [location], edx ; 原子地比较并交换 sete al ; 如果相等,al 设置为 1;否则设置为 0 ret
4. CAS 在无锁队列中的应用
下面是一个使用 CAS 操作实现的无锁队列示例。这个队列支持线程安全地插入和删除元素。
#include <atomic> #include <iostream> #include <thread> struct Node { int data; std::atomic<Node*> next; }; class LockFreeQueue { public: LockFreeQueue() : head(nullptr), tail(nullptr) {} void enqueue(int value) { Node* newNode = new Node{value, nullptr}; Node* expectedTail = tail.load(std::memory_order_relaxed); while (!tail.compare_exchange_weak(expectedTail, newNode, std::memory_order_release, std::memory_order_relaxed)) { expectedTail = tail.load(std::memory_order_relaxed); } if (expectedTail != nullptr) { expectedTail->next.store(newNode, std::memory_order_release); } else { head.store(newNode, std::memory_order_release); } } int dequeue() { Node* oldHead = head.load(std::memory_order_acquire); if (oldHead == nullptr) { return -1; // 队列为空 } Node* newHead = oldHead->next.load(std::memory_order_acquire); head.compare_exchange_strong(oldHead, newHead, std::memory_order_release, std::memory_order_acquire); int value = oldHead->data; delete oldHead; return value; } private: std::atomic<Node*> head; std::atomic<Node*> tail; }; int main() { LockFreeQueue queue; // 创建多个线程来并发地插入和删除元素 std::thread t1([&queue]() { queue.enqueue(1); }); std::thread t2([&queue]() { queue.enqueue(2); }); std::thread t3([&queue]() { queue.enqueue(3); }); t1.join(); t2.join(); t3.join(); // 从队列中取出元素 std::cout << "Dequeued: " << queue.dequeue() << std::endl; std::cout << "Dequeued: " << queue.dequeue() << std::endl; std::cout << "Dequeued: " << queue.dequeue() << std::endl; return 0; }
在这个例子中:
-
LockFreeQueue
类实现了基于 CAS 的无锁队列。 -
enqueue
方法使用compare_exchange_weak
尝试更新尾指针tail
。 -
dequeue
方法使用compare_exchange_strong
尝试更新头指针head
。 -
多个线程并发地插入和删除元素,验证队列的线程安全性。
通过这个例子,我们可以看到 CAS 操作在实现无锁数据结构中的重要作用,它可以有效地避免传统锁机制中的上下文切换和线程阻塞问题。
劳尔定律
使用链接的加载(Load Linked)和条件式存储(Conditional Store)指令来实现一个锁是一种经典的无锁同步方法。这种方法利用了硬件提供的原子操作来实现线程之间的同步。在许多处理器架构中,如 x86,提供了
lock cmpxchg
指令,可以用来实现这种锁。
原理
-
Load Linked (LL):此指令读取一个内存位置的值,并标记该位置为“链接”状态。
-
Conditional Store (CS):此指令尝试将一个新值写入已标记为“链接”状态的内存位置。如果在此期间内存位置的值未被其他线程修改,则写入成功并返回成功标志;否则返回失败标志。
使用 LL 和 CS 实现自旋锁
下面是一个使用 C++ 和 std::atomic
来模拟 Load Linked 和 Conditional Store 操作的自旋锁实现。虽然 std::atomic
提供了更高级别的抽象,但这个例子可以帮助理解 LL 和 CS 操作的基本思想。
代码实现
#include <atomic> #include <thread> #include <iostream> std::atomic<int> locked(0); class LLCSLock { public: void lock() { // 尝试获取锁 while (true) { // Load Linked 操作:获取锁的状态 int old_value = locked.load(std::memory_order_relaxed); // 如果锁未被持有,则尝试获取锁 if (old_value == 0) { // Conditional Store 操作:尝试将锁设置为1 if (locked.compare_exchange_weak(old_value, 1, std::memory_order_acquire)) { return; // 成功获取锁 } } // 自旋等待 std::this_thread::yield(); } } void unlock() { // 释放锁 locked.store(0, std::memory_order_release); } }; int shared_counter = 0; void increment_counter(LLCSLock& lock) { lock.lock(); shared_counter++; lock.unlock(); } int main() { LLCSLock lock; const int num_threads = 10; std::thread threads[num_threads]; // 创建多个线程来并发地增加共享变量 for (int i = 0; i < num_threads; ++i) { threads[i] = std::thread(increment_counter, std::ref(lock)); } // 等待所有线程完成 for (auto& t : threads) { t.join(); } std::cout << "Final counter value: " << shared_counter << std::endl; return 0; }
代码解释
-
LLCSLock 类:
-
lock
方法实现自旋锁:-
使用
load
获取锁的状态。 -
如果锁未被持有(值为0),则尝试使用
compare_exchange_weak
设置锁为1。 -
如果
compare_exchange_weak
成功,则获取锁。 -
如果失败,则继续自旋。
-
-
unlock
方法释放锁:-
使用
store
将锁状态设置为0。
-
-
-
increment_counter 函数:
-
使用
lock
和unlock
方法保护对shared_counter
的访问。
-
-
多线程测试:
-
在
main
函数中,创建多个线程并发地增加shared_counter
。 -
使用
std::ref(lock)
传递锁引用,确保锁在每个线程中都能正确获取和释放。
-
使用 x86 汇编实现 LL 和 CS
在 x86 汇编中,可以使用 lock cmpxchg
指令来实现 LL 和 CS 操作。下面是一个简单的示例:
section .data lock_var resb 4 ; 4-byte variable for the lock section .text global _start _start: ; Initialize lock variable to 0 mov eax, 0 mov [lock_var], eax lock: ; Load Linked (LL): Read the lock variable mov eax, [lock_var] ; If the lock is not held (0), try to acquire it cmp eax, 0 jne spin ; Jump to spin if the lock is already held ; Conditional Store (CS): Try to set the lock to 1 mov edx, 1 lock cmpxchg [lock_var], edx jne spin ; Jump to spin if the lock was not acquired ; Lock acquired, do some work here ; ... unlock: ; Release the lock by setting it back to 0 mov edx, 0 mov [lock_var], edx spin: ; Spin until the lock is available jmp lock ; Exit the program mov eax, 1 xor ebx, ebx int 0x80
在这个汇编示例中:
-
lock_var
是一个用于锁的变量。 -
lock
标签下的代码尝试获取锁。 -
cmpxchg
指令用于条件存储,尝试将lock_var
设置为1。 -
如果锁未被获取,则跳转到
spin
标签继续自旋。
总结
使用 Load Linked 和 Conditional Store 指令实现的锁是一种高效的无锁同步方法。它避免了传统锁机制中的上下文切换和线程阻塞问题,尤其适用于锁持有时间较短的场景。通过上述 C++ 示例和 x86 汇编示例,你可以更好地理解 LL 和 CS 操作的基本原理和实现方法。
如何实现高效的锁
实现高效的锁是并发编程中的一个重要课题,特别是在高并发场景下,锁的性能直接影响到整个系统的性能。
1. 选择合适的锁类型
不同的锁类型适用于不同的场景,选择合适的锁类型可以显著提升性能。
-
互斥锁(Mutex):适用于简单的同步场景,但可能会导致上下文切换和线程阻塞。
-
自旋锁(Spin Lock):适用于锁持有时间非常短的情况,避免上下文切换,但可能会消耗大量CPU资源。
-
读写锁(Read-Write Lock):允许多个读操作并发进行,但写操作独占锁,适合读多写少的场景。
-
偏向锁(Biased Locking):Java中的优化技术,适用于单线程访问的情况,可以完全避免锁开销。
-
轻量级锁(Lightweight Locking):同样用于Java,适用于多线程竞争不激烈的情况,通过CAS操作实现。
2. 使用原子操作
原子操作可以在不使用锁的情况下实现线程安全的数据修改,适用于简单的同步场景。
-
Fetch and Add:用于原子地增加或减少共享变量的值。
-
Compare and Swap (CAS):用于原子地比较并交换内存位置的值。
3. 减少锁的范围
-
细粒度锁:将大锁拆分成多个小锁,减少锁的争用。
-
锁分段:将数据结构分割成多个部分,每个部分有自己的锁,减少锁的争用。
-
锁剥离:在高并发场景下,将数据结构的访问分散到多个锁上,减少锁的竞争。
4. 避免热点锁
-
锁顺序:确保在多个锁的场景下按照固定的顺序获取锁,避免死锁。
-
锁分层:对于复杂的层次结构,可以采用层次化的锁策略,减少锁的竞争。
-
锁分区:将数据分区,每个分区有自己的锁,减少锁的竞争。
5. 使用无锁算法
-
无锁数据结构:使用无锁队列、栈等数据结构,避免使用锁。
-
乐观锁:使用版本号或时间戳来实现乐观锁,减少锁的竞争。
-
ABA 问题解决:使用带有版本号的 CAS 操作,解决 ABA 问题。
6. 适配硬件特性
-
缓存一致性:利用缓存一致性,减少跨处理器的通信开销。
-
内存屏障:使用内存屏障(Memory Barrier)确保内存访问的顺序性。
7. 软件层面的优化
-
自适应自旋锁:根据锁的竞争程度动态调整自旋次数。
-
优先级继承:在多线程环境中,避免优先级反转问题。
-
锁退避:在尝试获取锁失败时,增加一些延迟,减少锁的竞争。
示例:高效的自旋锁实现
下面是一个使用原子操作实现的高效自旋锁示例:
#include <atomic> #include <thread> #include <iostream> std::atomic_flag spinLock = ATOMIC_FLAG_INIT; void spin_lock() { while (spinLock.test_and_set(std::memory_order_acquire)) { // 自旋等待 } } void spin_unlock() { spinLock.clear(std::memory_order_release); } int shared_counter = 0; void increment_counter() { spin_lock(); shared_counter++; spin_unlock(); } int main() { const int num_threads = 10; std::thread threads[num_threads]; // 创建多个线程来并发地增加共享变量 for (int i = 0; i < num_threads; ++i) { threads[i] = std::thread(increment_counter); } // 等待所有线程完成 for (auto& t : threads) { t.join(); } std::cout << "Final counter value: " << shared_counter << std::endl; return 0; }
示例:高效的读写锁实现
下面是一个使用原子操作实现的高效的读写锁示例:
#include <atomic> #include <thread> #include <iostream> struct ReaderWriterLock { std::atomic<int> readers = 0; std::atomic_flag writers = ATOMIC_FLAG_INIT; void read_lock() { while (writers.test_and_set(std::memory_order_acquire)) { // 自旋等待 } readers.fetch_add(1, std::memory_order_acq_rel); writers.clear(std::memory_order_release); } void read_unlock() { readers.fetch_sub(1, std::memory_order_acq_rel); } void write_lock() { while (readers.load(std::memory_order_acquire) != 0 || writers.test_and_set(std::memory_order_acquire)) { // 自旋等待 } } void write_unlock() { writers.clear(std::memory_order_release); } }; int shared_counter = 0; ReaderWriterLock rwlock; void increment_counter() { rwlock.write_lock(); shared_counter++; rwlock.write_unlock(); } void read_counter() { rwlock.read_lock(); int value = shared_counter; rwlock.read_unlock(); std::cout << "Current counter value: " << value << std::endl; } int main() { const int num_writer_threads = 5; const int num_reader_threads = 10; std::thread writer_threads[num_writer_threads]; std::thread reader_threads[num_reader_threads]; // 创建写线程 for (int i = 0; i < num_writer_threads; ++i) { writer_threads[i] = std::thread(increment_counter); } // 创建读线程 for (int i = 0; i < num_reader_threads; ++i) { reader_threads[i] = std::thread(read_counter); } // 等待所有线程完成 for (auto& t : writer_threads) { t.join(); } for (auto& t : reader_threads) { t.join(); } std::cout << "Final counter value: " << shared_counter << std::endl; return 0; }
总结
实现高效的锁需要综合考虑多种因素,包括锁的选择、原子操作的使用、锁的范围、无锁算法的应用以及硬件特性的利用。通过以上示例和策略,可以有效地提升锁的性能,从而提高整个系统的并发性能。在实际应用中,还需要根据具体的场景和需求进行适当的调整和优化。