C++无锁编程——无锁队列

news2025/2/2 23:58:21

C++无锁编程——无锁队列

贺志国
2023.7.11

上一篇博客给出了最简单的C++数据结构——堆栈的几种无锁实现方法。队列的挑战与栈的有些不同,因为Push()Pop()函数在队列中操作的不是同一个地方。因此同步的需求就不一样。需要保证对一端的修改是正确的,且对另一端是可见的。因此队列需要两个Node指针:head_tail_。这两个指针都是原子变量,从而可在加锁的情形下,可给多个线程同时访问。首先来分析单生产者/单消费者的情形。

一、单生产者-单消费者模型下的无锁队列

单生产者/单消费者模型就是指,在某一时刻,最多只存在一个线程调用Push()函数,最多只存在一个线程调用Pop()函数。该情形下的代码(文件命名为 lock_free_queue.h)如下:

#pragma once

#include <atomic>
#include <memory>

template <typename T>
class LockFreeQueue {
 public:
  LockFreeQueue() : head_(new Node), tail_(head_.load()) {}
  ~LockFreeQueue() {
    while (Node* old_head = head_.load()) {
      head_.store(old_head->next);
      delete old_head;
    }
  }

  LockFreeQueue(const LockFreeQueue& other) = delete;
  LockFreeQueue& operator=(const LockFreeQueue& other) = delete;

  bool IsEmpty() const { return head_.load() == tail_.load(); }

  void Push(const T& data) {
    auto new_data = std::make_shared<T>(data);
    Node* p = new Node;             // 3
    Node* old_tail = tail_.load();  // 4
    old_tail->data.swap(new_data);  // 5
    old_tail->next = p;             // 6
    tail_.store(p);                 // 7
  }

  std::shared_ptr<T> Pop() {
    Node* old_head = PopHead();
    if (old_head == nullptr) {
      return std::shared_ptr<T>();
    }

    const std::shared_ptr<T> res(old_head->data);  // 2
    delete old_head;
    return res;
  }

 private:
  // If the struct definition of Node is placed in the private data member
  // field where 'head_' is defined, the following compilation error will occur:
  //
  // error: 'Node' has not been declared ...
  //
  // It should be a bug of the compiler. The struct definition of Node is put in
  // front of the private member function `DeleteNodes` to eliminate this error.
  struct Node {
    // std::make_shared does not throw an exception.
    Node() : data(nullptr), next(nullptr) {}

    std::shared_ptr<T> data;
    Node* next;
  };

 private:
  Node* PopHead() {
    Node* old_head = head_.load();
    if (old_head == tail_.load()) {  // 1
      return nullptr;
    }
    head_.store(old_head->next);
    return old_head;
  }

 private:
  std::atomic<Node*> head_;
  std::atomic<Node*> tail_;
};

一眼望去,这个实现没什么毛病,当只有一个线程调用Push()Pop()时,这种情况下队列一点毛病没有。Push()Pop()之间的先行(happens-before )关系非常重要,直接关系到能否安全地获取到队列中的数据。对尾部节点tail_的存储⑦(对应于上述代码片段中的注释// 7,下同)同步(synchronizes with)于对tail_的加载①,存储之前节点的data指针⑤先行(happens-before )于存储tail_。并且,加载tail_先行于加载data指针②,所以对data的存储要先行于加载,一切都没问题。因此,这是一个完美的单生产者/单消费者(SPSC, single-producer, single-consume)队列。
问题在于当多线程对Push()Pop()并发调用。先看一下Push():如果有两个线程并发调用Push(),会新分配两个节点作为虚拟节点③,也会读取到相同的tail_值④,因此也会同时修改同一个节点,同时设置datanext指针⑤⑥,存在明显的数据竞争!
PopHead()函数也有类似的问题。当有两个线程并发的调用这个函数时,这两个线程就会读取到同一个head_,并且会通过next指针去修改旧值。两个线程都能索引到同一个节点——真是一场灾难!不仅要保证只有一个Pop()线程可以访问给定项,还要保证其他线程在读取head_时,可以安全的访问节点中的next,这就是和无锁栈中Pop()一样的问题了。
Pop()的问题假设已解决,那么Push()呢?问题在于为了获取Push()Pop()间的先行关系,就需要在为虚拟节点设置数据项前,更新tail_指针。并发访问Push()时,因为每个线程所读取到的是同一个tail_,所以线程会进行竞争。

说明
先行(happens-before )与同步(synchronizes with)是原子变量在线程间同步的两个重要关系。
Happens-before(先行)
Regardless of threads, evaluation A happens-before evaluation B if any of the following is true: 1) A is sequenced-before B; 2) A inter-thread happens before B. The implementation is required to ensure that the happens-before relation is acyclic, by introducing additional synchronization if necessary (it can only be necessary if a consume operation is involved). If one evaluation modifies a memory location, and the other reads or modifies the same memory location, and if at least one of the evaluations is not an atomic operation, the behavior of the program is undefined (the program has a data race) unless there exists a happens-before relationship between these two evaluations.
(无关乎线程,若下列任一为真,则求值 A 先行于求值 B :1) A 先序于 B;2) A 线程间先发生于 B。要求实现确保先发生于关系是非循环的,若有必要则引入额外的同步(若引入消费操作,它才可能为必要)。若一次求值修改一个内存位置,而其他求值读或修改同一内存位置,且至少一个求值不是原子操作,则程序的行为未定义(程序有数据竞争),除非这两个求值之间存在先行关系。)
Synchronizes with(同步)
If an atomic store in thread A is a release operation, an atomic load in thread B from the same variable is an acquire operation, and the load in thread B reads a value written by the store in thread A, then the store in thread A synchronizes-with the load in thread B. Also, some library calls may be defined to synchronize-with other library calls on other threads.
(如果在线程A上的一个原子存储是释放操作,在线程B上的对相同变量的一个原子加载是获得操作,且线程B上的加载读取由线程A上的存储写入的值,则线程A上的存储同步于线程B上的加载。此外,某些库调用也可能定义为同步于其它线程上的其它库调用。)

二、多生产者-多消费者模型下的无锁队列

下述代码存在bug,需要进一步调试(文件命名为 lock_free_queue.h):

#pragma once

#include <atomic>
#include <memory>

template <typename T>
class LockFreeQueue {
 public:
  LockFreeQueue() : head_(CountedNodePtr(new Node)), tail_(head_.load()) {}

  ~LockFreeQueue() {
    while (Pop()) {
      // Do nothing
    }
  }

  LockFreeQueue(const LockFreeQueue& other) = delete;
  LockFreeQueue& operator=(const LockFreeQueue& other) = delete;

  bool IsEmpty() const { return head_.load().ptr == tail_.load().ptr; }

  void Push(const T& data) {
    auto new_data = std::make_unique<T>(data);
    CountedNodePtr new_next(new Node);
    new_next.external_count = 1;
    CountedNodePtr old_tail = tail_.load();

    while (true) {
      IncreaseExternalCount(&tail_, &old_tail);

      T* old_data = nullptr;
      if (old_tail.ptr->data.compare_exchange_strong(old_data,
                                                     new_data.get())) {
        CountedNodePtr old_next = old_tail.ptr->next.load();
        if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
          delete new_next.ptr;
          new_next = old_next;
        }
        SetNewTail(new_next, &old_tail);

        // Release the ownership of the managed object so that the data will not
        // be deleted beyond the scope the unique_ptr<T>.
        new_data.release();
        break;
      } else {
        CountedNodePtr old_next = old_tail.ptr->next.load();
        if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
          old_next = new_next;
          new_next.ptr = new Node;
        }
        SetNewTail(old_next, &old_tail);
      }
    }
  }

  std::unique_ptr<T> Pop() {
    CountedNodePtr old_head = head_.load();
    while (true) {
      IncreaseExternalCount(&head_, &old_head);
      Node* ptr = old_head.ptr;
      if (ptr == tail_.load().ptr) {
        ptr->ReleaseRef();
        return std::unique_ptr<T>();
      }

      CountedNodePtr next = ptr->next.load();
      if (head_.compare_exchange_strong(old_head, next)) {
        T* res = ptr->data.exchange(nullptr);
        FreeExternalCounter(&old_head);
        return std::unique_ptr<T>(res);
      }

      ptr->ReleaseRef();
    }
  }

 private:
  // Forward class declaration
  struct Node;
  struct CountedNodePtr {
    CountedNodePtr() : ptr(nullptr), external_count(0) {}
    explicit CountedNodePtr(Node* input_ptr)
        : ptr(input_ptr), external_count(0) {}

    Node* ptr;
    int external_count;
  };

  struct NodeCounter {
    NodeCounter() : internal_count(0), external_counters(0) {}
    NodeCounter(const unsigned input_internal_count,
                const unsigned input_external_counters)
        : internal_count(input_internal_count),
          external_counters(input_external_counters) {}

    // external_counters occupies only 2 bits, where the maximum value stored
    // is 3. Note that you need only 2 bits for the external_counters because
    // there are at most two such counters. By using a bit field for this and
    // specifying internal_count as a 30-bit value, you keep the total counter
    // size to 32 bits. This gives you plenty of scope for large internal count
    // values while ensuring that the whole structure fits inside a machine word
    // on 32-bit and 64-bit machines. It’s important to update these counts
    // together as a single entity in order to avoid race conditions. Keeping
    // the structure within a machine word makes it more likely that the atomic
    // operations can be lock-free on many platforms.
    unsigned internal_count : 30;
    unsigned external_counters : 2;
  };

  struct Node {
    // There are only two counters in Node (counter and next), so the initial
    // value of external_counters is 2.
    Node()
        : data(nullptr), counter(NodeCounter(0, 2)), next(CountedNodePtr()) {}

    void ReleaseRef() {
      NodeCounter old_node = counter.load();
      NodeCounter new_counter;

      do {
        new_counter = old_node;
        --new_counter.internal_count;
      } while (!counter.compare_exchange_strong(old_node, new_counter));

      if (!new_counter.internal_count && !new_counter.external_counters) {
        delete this;
      }
    }

    std::atomic<T*> data;
    std::atomic<NodeCounter> counter;
    std::atomic<CountedNodePtr> next;
  };

 private:
  static void IncreaseExternalCount(std::atomic<CountedNodePtr>* atomic_node,
                                    CountedNodePtr* old_node) {
    CountedNodePtr new_node;

    // If `*old_node` is equal to `*atomic_node`, it means that no other thread
    // changes the `*atomic_node`, update `*atomic_node` to `new_node`. In fact
    // the `*atomic_node` is still the original node, only the `external_count`
    // of it is increased by 1. If `*old_node` is not equal to `*atomic_node`,
    // it means that another thread has changed `*atomic_node`, update
    // `*old_node` to `*atomic_node`, and keep looping until there are no
    // threads changing `*atomic_node`.
    do {
      new_node = *old_node;
      ++new_node.external_count;
    } while (!atomic_node->compare_exchange_strong(*old_node, new_node));

    old_node->external_count = new_node.external_count;
  }

  static void FreeExternalCounter(CountedNodePtr* old_node) {
    Node* ptr = old_node->ptr;
    const int increased_count = old_node->external_count - 2;
    NodeCounter old_counter = ptr->counter.load();
    NodeCounter new_counter;
    do {
      new_counter = old_counter;
      --new_counter.external_counters;
      new_counter.internal_count += increased_count;
    } while (!ptr->counter.compare_exchange_strong(old_counter, new_counter));

    if (!new_counter.internal_count && !new_counter.external_counters) {
      delete ptr;
    }
  }

  void SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail) {
    Node* current_tail_ptr = old_tail->ptr;
    while (!tail_.compare_exchange_weak(*old_tail, new_tail) &&
           old_tail->ptr == current_tail_ptr) {
      // Do nothing
    }

    if (old_tail->ptr == current_tail_ptr) {
      FreeExternalCounter(old_tail);
    } else {
      current_tail_ptr->ReleaseRef();
    }
  }

 private:
  std::atomic<CountedNodePtr> head_;
  std::atomic<CountedNodePtr> tail_;
};

三、测试代码

下面给出测试无锁栈工作是否正常的简单测试代码(文件命名为:lock_free_queue.cpp):

#include "lock_free_queue.h"

#include <iostream>
#include <thread>
#include <vector>

namespace {
constexpr size_t kElementNum = 10;
constexpr size_t kThreadNum = 200;
}  // namespace

int main() {
  LockFreeQueue<int> queue;
  for (size_t i = 0; i < kElementNum; ++i) {
    std::cout << "The data " << i << " is pushed in the queue.\n";
    queue.Push(i);
  }

  std::cout << "queue.IsEmpty() == " << std::boolalpha << queue.IsEmpty()
            << std::endl;

  while (auto data = queue.Pop()) {
    std::cout << "Current data is : " << *data << '\n';
  }

  std::vector<std::thread> producers1;
  std::vector<std::thread> producers2;
  std::vector<std::thread> consumers1;
  std::vector<std::thread> consumers2;

  for (size_t i = 0; i < kThreadNum; ++i) {
    producers1.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 10);
    producers2.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 20);
    consumers1.emplace_back(&LockFreeQueue<int>::Pop, &queue);
    consumers2.emplace_back(&LockFreeQueue<int>::Pop, &queue);
  }

  for (size_t i = 0; i < kThreadNum; ++i) {
    producers1[i].join();
    consumers1[i].join();
    producers2[i].join();
    consumers2[i].join();
  }

  return 0;
}

CMake的编译配置文件CMakeLists.txt

cmake_minimum_required(VERSION 3.0.0)
project(lock_free_queue VERSION 0.1.0)
set(CMAKE_CXX_STANDARD 17)

# If the debug option is not given, the program will not have debugging information.
SET(CMAKE_BUILD_TYPE "Debug")

add_executable(${PROJECT_NAME} ${PROJECT_NAME}.cpp)

find_package(Threads REQUIRED)
# libatomic should be linked to the program.
# Otherwise, the following link errors occured:
# /usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'
# /usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT} atomic)

include(CTest)
enable_testing()
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
include(CPack)

上述配置中添加了对原子库atomic的链接。因为引用计数的结构体CountedNodePtr包含两个数据成员:int external_count; Node* ptr;,这两个变量占用16字节,而16字节的数据结构需要额外链接原子库atomic,否则会出现链接错误:

/usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'
/usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'

VSCode调试启动配置文件.vscode/launch.json

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "cpp_gdb_launch",
            "type": "cppdbg",
            "request": "launch",
            "program": "${workspaceFolder}/build/${workspaceFolderBasename}",
            "args": [],
            "stopAtEntry": false,
            "cwd": "${fileDirname}",
            "environment": [],
            "externalConsole": false,
            "MIMode": "gdb",
            "setupCommands": [
                {
                    "description": "Enable neat printing for gdb",
                    "text": "-enable-pretty-printing",
                    "ignoreFailures": true
                }
            ],
            // "preLaunchTask": "cpp_build_task",
            "miDebuggerPath": "/usr/bin/gdb"
        }
    ]
}

使用CMake的编译命令:

cd lock_free_queue
# 只执行一次
mkdir build
cd build
cmake .. && make

运行结果如下:

./lock_free_queue 
The data 0 is pushed in the queue.
The data 1 is pushed in the queue.
The data 2 is pushed in the queue.
The data 3 is pushed in the queue.
The data 4 is pushed in the queue.
The data 5 is pushed in the queue.
The data 6 is pushed in the queue.
The data 7 is pushed in the queue.
The data 8 is pushed in the queue.
The data 9 is pushed in the queue.
queue.IsEmpty() == false
Current data is : 0
Current data is : 1
Current data is : 2
Current data is : 3
Current data is : 4
Current data is : 5
Current data is : 6
Current data is : 7
Current data is : 8
Current data is : 9

VSCode调试界面如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/762346.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP/IP网络编程 第十二章:I/O复用

基于I/O复用的服务器端 多进程服务器端的缺点和解决方法 为了构建并发服务器&#xff0c;只要有客户端连接请求就会创建新进程。这的确是实际操作中采用的种方案&#xff0c;但并非十全十美&#xff0c;因为创建进程时需要付出极大代价。这需要大量的运算和内存空间&#xff…

Vue列表排序

开始前先回顾一下sort排序用法&#xff1a; 定义一串数组arr&#xff0c;使用sort排序&#xff0c;会收到前后两个数据项设置两个参数a&#xff0c;b。 注意&#xff1a;a-b 是升序 b-a 是降序 a-b升序&#xff1a; <script>let arr [12,11,2,5,76,33]arr.sort((a,b…

家居商城小程序:打造舒适家居生活的优选平台

随着人们对家居生活品质的追求&#xff0c;家居商城小程序成为提供便捷购物和个性化服务的不可或缺的工具。通过家居商城小程序&#xff0c;用户可以浏览并购买各类家居商品&#xff0c;如家具、装饰品、家纺等。同时&#xff0c;家居商城小程序能提供热销商品推荐、客户评价和…

浅析高校用电问题及智慧电力监管平台的构建 安科瑞 许敏

摘 要&#xff1a;介绍了当前高校用电存在的问题&#xff0c;进行了原因分析&#xff0c;由此提出建立高校用电智慧监管平台。对高校用电智慧监管平台的构架进行设计&#xff0c;运用物联网技术&#xff0c;实现各回路实时自主控制&#xff0c;并细化管理权限&#xff0c;实现…

Swift 周报 第三十三期

文章目录 前言新闻和社区App 内购买项目和订阅即将实行价格与税率调整为家庭提供安全的 App 体验 提案正在审查的提案 Swift论坛推荐博文话题讨论关于我们 前言 本期是 Swift 编辑组自主整理周报的第二十四期&#xff0c;每个模块已初步成型。各位读者如果有好的提议&#xff…

公网访问的Linux CentOS本地Web站点搭建指南

文章目录 前言1. 本地搭建web站点2. 测试局域网访问3. 公开本地web网站3.1 安装cpolar内网穿透3.2 创建http隧道&#xff0c;指向本地80端口3.3 配置后台服务 4. 配置固定二级子域名5. 测试使用固定二级子域名访问本地web站点 前言 在web项目中,部署的web站点需要被外部访问,则…

常规函数和箭头函数之间的主要区别

常规函数和箭头函数之间的主要区别 在 JavaScript 中&#xff0c;函数是设计用于执行特定任务的代码块。函数允许使用函数将大型程序分解为多个更小、更易于管理的组件。因此&#xff0c;我们就不再需要重复编写相同的代码。 JavaScript中有两种类型的函数 常规函数箭头函数…

jmeter如何进行web脚本录制

目录 录制web脚本 &#xff08;1&#xff09;jmeter中设置HTTP代理 &#xff08;2&#xff09;浏览器中设置代理 &#xff08;3&#xff09;页面操作 &#xff08;4&#xff09;查看录制的web脚本 &#xff08;5&#xff09;脚本内容过滤 &#xff08;6&#xff09;脚本优化…

练习 数列前n项和(递归函数)

C++自学精简教程 目录(必读) 数列的前n项和 S = 1 + 2 + 3 + ...... + n 之前我们用for循环求解数列前n项和,本文用递归函数求解。 代码如下 #include <iostream> using namespace std;int f(int a) {if (a == 1){return 1;}else{return a + f(a - 1);} }int main(…

APP外包开发中的H5框架

在开发APP的技术中&#xff0c;除了原生开发外也可以使用H5框架来开发。原生开发的特点是质量高&#xff0c;用户体验更好&#xff0c;但成本高&#xff0c;适用于对质量要求高的APP项目。H5框架的特点是通用性较强&#xff0c;对开发人员的要求相对较低&#xff0c;成本也低&a…

从小众到大热:海外网红营销的成功之道

在当今数字时代&#xff0c;社交媒体已经成为人们获取信息、沟通交流的主要渠道之一。而在这个社交媒体的浪潮中&#xff0c;海外网红营销逐渐从小众走向大热。它以其独特的营销模式和广泛的受众群体&#xff0c;成为许多品牌和企业的首选营销方式。本文Nox聚星将详细介绍海外网…

高等学校节能监控平台的具体应用 安科瑞 许敏

摘要&#xff1a;高校节能监控平台&#xff0c;主要是通过物联网技术实现对水、电、气等高耗能设备进行计量和控制&#xff0c;为高校能耗的分析&#xff0c;能源流向&#xff0c;节能目标提供有力的数据支撑。高效节能监控平台主要包括能耗监测系统、照明节能控制系统、空调节…

Gtest在ARM平台上的离线搭建(让Gtest编译安装成功之后的可执行文件.so变成ARM下的—ARM aarch64)(实用篇)

编译时自动调用Cunit或者Gtest的静态或者动态库文件说明拷贝Gtest安装包到新目录下根目录下创建build目录并且进行编译检查生成的库文件是否属于ARM架构下的将库文件拷贝到统一的ARM包下面编译时自动调用Cunit或者Gtest的静态或者动态库文件说明 这里之前在usr/local/lib下面安…

Springboot工程常见错误

1. mybatis的mapper.xml出现tag name expected错误 https://blog.csdn.net/watson2017/article/details/128902300 <符号在xml配置SQL 语句中是不能直接识别出来的&#xff0c;也就是说&#xff0c;我们在使用到 > 、< 等符号的时候&#xff0c;需要将其进行转义&…

LaTex 的基本使用方法

TeXstudio 设置新建和编辑文档编译和预览拼写检查宏和脚本 LaTex 分模块详解 LaTex 文件头 documentclass article&#xff1a;用于写短篇文章、报告report&#xff1a;用于写长篇报告、学位论文、技术报告等book&#xff1a;用于编写书籍&#xff0c;具有章节、子章节和节的…

leetcode 77. 组合

2023.7.17 今天正式开始回溯系列&#xff0c;这是一道经典回溯题。 先上一个经典回溯模板&#xff1a; void backtracking(参数) {if (终止条件) {存放结果;return;}for (选择&#xff1a;本层集合中元素&#xff08;树中节点孩子的数量就是集合的大小&#xff09;) {处理节点…

飞行动力学 - 第10节-空间机动性、稳定性与操纵性概述 之 基础点摘要

飞行动力学 - 第10节-空间机动性、稳定性与操纵性概述 之 基础点摘要 1. 协调盘旋性能计算流程2. 一般盘旋2.1 动力学方程2.2 角点速度2.3 典型战斗机盘旋曲线 3. 空间机动能力4. 飞行动力学&#xff1a;飞行性能稳定性与操纵性5. 稳定性定义6. 飞行品质6.1 品质等级6.2 品质评…

大数据测试之数据仓测试怎么做(上)

前面的文章我们为大家介绍了大数据测试平台和大数据系统的测试方法&#xff0c;接下来我们重点来讲一下数据仓库测试&#xff0c;首先看一下它的定义。 数据仓库(Data Warehouse)&#xff1a;一个面向主题的&#xff08;Subject Oriented&#xff09;、集成的 &#xff08;In…

给学弟妹们的 10 个秋招建议!

大家好&#xff0c;我是鱼皮。最近很多大公司的提前批陆陆续续开启了&#xff0c;说明秋招已经拉开了序幕&#xff0c;大家要准备起来了。 所以我也赶紧写了一篇文章&#xff0c;结合自己曾经大厂求职的经验&#xff0c;并且从招聘方的角度&#xff0c;给学弟妹们一些秋招找工…

机器人架构设计和中间件

一&#xff0e;引言 在无人驾驶与机器人领域&#xff0c;算法一直都是研究的核心。无论是导航技术、控制技术&#xff0c;还是识别技术都是构成其技术栈的重要组成部分。但是&#xff0c;随着技术的发展&#xff0c;开发者们逐渐认识到一个问题&#xff0c;即程序本身的组织架构…