目录
无锁队列
无锁队列是什么
为什么需要无锁队列
队列的类型
无锁队列的分类
ringbuffer(SPSC)
ret_ring(MPMC)
无锁队列
无锁队列是什么
- 无锁队列通过原子操作来实现线程安全的队列,属于非阻塞队列
- 有锁队列通过互斥锁或其他同步机制保证线程安全的队列,属于阻塞队列
为什么需要无锁队列
锁的局限:
- 线程阻塞带来的切换
- 死锁风险
- 性能瓶颈,高并发下锁竞争激烈,吞吐量下降
队列的类型
无锁队列
- 无锁(lock-free):保证了至少有一个线程在正常运行,其他可能重试,依赖CAS 等原子操作。
- 无等待(wait-free):所有线程必成功,无重试,依赖exchange等原子操作。
有锁队列
- 阻塞队列(blocking-queue):当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新的元素加入;当队列已满时,向队列中添加元素的操作会被阻塞,直到队列中有元素被移除,依赖锁实现。
无锁队列的分类
SPSC(单生产者,单消费者)
含义:单生产者单消费者队列,即队列只有一个生产者线程和一个消费者线程。
特点:不存在多个生产者或消费者之间的竞争,所以可以使用比较简单高效的算法来实现。
MPSC(多生产者,单消费者)
含义:多生产者单消费者队列,多个线程可以同时向队列中添加元素,但只有一个线程可以从队列中取出元素。
特点:实现时需要重点处理多个生产者之间的并发问题,确保多个生产者能够安全地向队列中添加元素。而对于消费者线程,由于是唯一的,处理相对简单。
SPMC(单生产者,多消费者)
含义:单生产者多消费者队列,只有一个线程可以向队列中添加元素,但是有多个线程可以同时从队列中取出元素。
特点:需要处理多个消费者之间的并发问题,保证多个消费者能够安全地从队列中取出元素。同时,要确保生产者在添加元素时不会受到消费者的干扰。
MPMC(多生产者,多消费者)
含义:多生产者多消费者队列,意味着有多个线程可以同时向队列中添加元素(生产者),也有多个线程可以同时从队列中取出元素(消费者)。
特点:由于多个生产者和消费者可能会同时访问队列,因此需要更复杂的并发控制机制来保证线程安全。通常会使用 CAS 等原子操作来处理多个线程对队列的并发访问,避免数据竞争和不一致的问题。
ringbuffer(SPSC)
#pragma once
// SPSC
#include <atomic>
#include <cstddef>
#include <type_traits>
template<typename T, std::size_t Capacity>
class RingBuffer
{
public:
static_assert(Capacity && !(Capacity & (Capacity - 1)), "Capacity must be power of 2");
RingBuffer() : read_(0), write_(0) {}
~RingBuffer() {
std::size_t r = read_.load(std::memory_order_relaxed);
std::size_t w = write_.load(std::memory_order_relaxed);
while (r != w) {
reinterpret_cast<T *>(&buffer_[r])->~T();
r = (r + 1) & (Capacity - 1);
}
}
// 这里使用万能引用和完美转发,支持左值和右值
template<typename U>
bool Push(U && value) {
const std::size_t w = write_.load(std::memory_order_relaxed);
const std::size_t next_w = (w + 1) & (Capacity - 1);
// 检查缓冲区是否满
if (next_w == read_.load(std::memory_order_acquire)) {
return false;
}
new (&buffer_[w]) T(std::forward<U>(value));
write_.store(next_w, std::memory_order_release);
return true;
}
bool Pop(T & value) {
const std::size_t r = read_.load(std::memory_order_relaxed);
// 检查缓冲区是否空
if (r == write_.load(std::memory_order_acquire)) {
return false;
}
// 取出元素并析构
value = std::move(*reinterpret_cast<T *>(&buffer_[r]));
reinterpret_cast<T *>(&buffer_[r])->~T();
read_.store((r + 1) & (Capacity - 1), std::memory_order_release);
return true;
}
std::size_t Size() const {
const std::size_t r = read_.load(std::memory_order_acquire);
const std::size_t w = write_.load(std::memory_order_acquire);
return (w >= r) ? (w - r) : (Capacity - r + w);
}
private:
//cache line 64B
alignas(64) std::atomic<std::size_t> read_;
alignas(64) std::atomic<std::size_t> write_;
alignas(64) std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity]; // 支持 pod 和 非 pod 类型
};
ret_ring(MPMC)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include "rte_ring.h"
#define RING_SIZE 16<<20
typedef struct cc_queue_node {
int data;
} cc_queue_node_t;
static struct rte_ring *r;
typedef unsigned long long ticks;
static __inline__ ticks getticks(void)
{
u_int32_t a, d;
asm volatile("rdtsc" : "=a" (a), "=d" (d));
return (((ticks)a) | (((ticks)d) << 32));
}
void *enqueue_fun(void *data)
{
int n = (int)data;
int i = 0;
int ret;
cc_queue_node_t *p;
for (; i < n; i++) {
p = (cc_queue_node_t *)malloc(sizeof(cc_queue_node_t));
p->data = i;
ret = rte_ring_mp_enqueue(r, p);
if (ret != 0) {
printf("enqueue failed: %d\n", i);
}
}
return NULL;
}
void *dequeue_func(void *data)
{
int ret;
int i = 0;
int sum = 0;
int n = (int)data;
cc_queue_node_t *p;
ticks t1, t2, diff;
//return;
t1 = getticks();
while (1) {
p = NULL;
ret = rte_ring_sc_dequeue(r, (void **)&p);
if (ret != 0) {
//do something
}
if (p != NULL) {
i++;
sum += p->data;
free(p);
if (i == n) {
break;
}
}
}
t2 = getticks();
diff = t2 - t1;
printf("time diff: %llu\n", diff);
printf("dequeue total: %d, sum: %d\n", i, sum);
return NULL;
}
int main(int argc, char *argv[])
{
int ret = 0;
pthread_t pid1, pid2, pid3, pid4, pid5, pid6;
pthread_attr_t pthread_attr;
r = rte_ring_create("test", RING_SIZE, 0);
if (r == NULL) {
return -1;
}
printf("start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.\n");
pthread_attr_init(&pthread_attr);
if ((ret = pthread_create(&pid1, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
pthread_detach(pid1);
}
if ((ret = pthread_create(&pid2, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
pthread_detach(pid2);
}
if ((ret = pthread_create(&pid3, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
pthread_detach(pid3);
}
if ((ret = pthread_create(&pid4, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
pthread_detach(pid4);
}
if ((ret = pthread_create(&pid5, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
pthread_detach(pid5);
}
printf("start dequeue, 1 consumer thread.\n");
if ((ret = pthread_create(&pid6, &pthread_attr, dequeue_func, (void *)5000)) == 0) {
//pthread_detach(pid6);
}
pthread_join(pid6, NULL);
rte_ring_free(r);
return 0;
}