生产环境使用boost::fiber

news2025/1/23 7:27:20

简介

boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。

fiber封装

boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。

这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

图片

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class IFiberTask {
 public:
  IFiberTask() = default;
  virtual ~IFiberTask() = default;

  IFiberTask(const IFiberTask& rhs) = delete;
  IFiberTask& operator=(const IFiberTask& rhs) = delete;
  IFiberTask(IFiberTask&& other) = default;
  IFiberTask& operator=(IFiberTask&& other) = default;

  virtual void execute() = 0;
 public:
  inline static std::atomic_size_t fibers_size {0};
};

template <typename Func>
class FiberTask: public IFiberTask {
 public:
  explicit FiberTask(Func&& func) :func_{std::move(func)} { }

  ~FiberTask() override = default;
  FiberTask(const FiberTask& rhs) = delete;
  FiberTask& operator=(const FiberTask& rhs) = delete;
  FiberTask(FiberTask&& other)  noexcept = default;
  FiberTask& operator=(FiberTask&& other)  noexcept = default;

  void execute() override {
    fibers_size.fetch_add(1);
    func_();
    fibers_size.fetch_sub(1);
  }

 private:
  Func func_;
};

IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。

外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。

接着是任务装箱,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {
  // 捕获lambda极其参数
  auto capture = [func = std::forward<Func>(func),
      args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
    return std::apply(std::move(func), std::move(args));
  };

  // 任务的返回值类型
  using task_result_t = std::invoke_result_t<decltype(capture)>;
  // 该任务packaged_task的
  using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;
  // 创建任务对象
  packaged_task_t task {std::move(capture)};
  // 装箱到FiberTask中
  using task_t = fiber::FiberTask<packaged_task_t>;
  // 获取packaged_task的future
  auto result_future = task.get_future();
  // 添加到buffered_channel中
  auto status = work_queue_.push(
      std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));

  if (status != boost::fibers::channel_op_status::success) {
    return std::optional<std::decay_t<decltype(result_future)>> {};
  }

  return std::make_optional(std::move(result_future));
}

代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。

接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};

// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {
  // 解包
  auto& [launch_policy, task_to_run] = task_tuple;
  // 触发 fiber并detach
  boost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {
    task->execute();
  }).detach();
}

抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。

以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。

下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DefaultPool {
 private:
  static auto* Pool() {
    const static size_t size = std::thread::hardware_concurrency();
    static fiber::FiberPool pool(size, size*8);
    return &pool;
  }

 public:
  template<typename Func, typename... Args>
  static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {
    return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);
  }

  template<typename Func, typename... Args>
  static auto SubmitJob(Func &&func, Args &&... args) {
    return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);
  }

  static void Close() {
    Pool()->CloseQueue();
  }

 private:
  DefaultPool() = default;
};

其他组件封装

上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。

redis客户端封装

同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。

综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。

redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Type>
using Promise = boost::fibers::promise<Type>;

template<typename Type>
using Future = boost::fibers::future<Type>;

Future<long long > Del(const StringView &key) {
  auto promise = std::make_unique<Promise<long long >>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

Future<OptionalString> Get(const StringView &key) {
  auto promise = std::make_unique<Promise<OptionalString>>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

Future<bool> Set(const StringView &key, const StringView &val) {
  auto promise = std::make_unique<Promise<bool>>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。

grpc客户端封装

跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class GrpcClient {
 public:
  explicit GrpcClient(const ClientOption& option);
  ~GrpcClient();

  // 对外提供的接口
  Future<meta::HelloResponse> Call(const meta::HelloRequest& request);

 private:
  // worker线程执行的逻辑
  void Work();

 private:
  std::unique_ptr<grpc::CompletionQueue> completion_queue_;
  std::thread worker_;
  std::shared_ptr<grpc::Channel> channel_;
  gpr_timespec timespec_{};
};

异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:

1
2
3
4
5
6
struct CallData {
  grpc::ClientContext context;          // grpc上下文
  Promise<meta::HelloResponse> promise; // Promise对象
  grpc::Status status;                  // grpc调用状态
  meta::HelloResponse response;         // 相应包
};

Call函数中的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建上下文对象
auto data = new CallData;
// 设置超时时间
data->context.set_deadline(timespec_);
// 创建桩
meta::HelloService::Stub stub(channel_);
auto future = data->promise.get_future();
// 异步调用,添加到完成队列中
auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get());
// 绑定response、status,并将上下文对象作为tag传下去
rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data));
return future;

data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {
  // 判断队列是否已经结束
  if (!ok) {
    break;
  }
  // 如果grpc状态ok,则赋值
  if (data->status.ok()) {
    data->promise.set_value(std::move(data->response));
  } else {
    // 否则设置异常
    data->promise.set_exception(std::make_exception_ptr(
        std::runtime_error(data->status.error_message())));
  }
  // 删除数据
  delete data;
  data = nullptr;
}

调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。

上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。

参考

  • fiberpool代码
  • 生产环境使用fiber
  • grpc异步客户端
  • hiredis
  • 生产环境使用boost::fiber

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1150879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

canvas绘制签名并保存

实现签名的三个关键方法&#xff1a; 1.mousedown&#xff1a;当鼠标按下时开始绘制签名。 2.mousemove&#xff1a;鼠标移动时持续绘制。 3.mouseup&#xff1a;鼠标抬起时结束绘制。 html&#xff1a; <div class"setSign"><canvasref"canvas&q…

驱动开发7 基于GPIO子系统编写LED驱动,编写应用程序进行测试设置定时器,5秒钟打印一次hello world

驱动代码 #include <linux/init.h> #include <linux/module.h> #include <linux/of.h> #include <linux/of_gpio.h> #include <linux/gpio.h> #include <linux/timer.h> #include <linux/of_irq.h> #include <linux/interrupt.h…

最新Ubuntu20.04安装教程(图文)

总的来说&#xff0c;安装Ubantu包含以下三个步骤&#xff1a; 一、安装虚拟机 二、Ubuntu镜像下载 三、虚拟机配置 一、安装虚拟机 选择安装VMware Workstation&#xff0c;登录其官网下载安装包&#xff0c;链接如下&#xff1a; 下载 VMware Workstation Pro | CN 下载…

从原理到实践,深入理解CPU缓存一致性

1 存储体系结构 速度快的存储硬件成本高、容量小&#xff0c;速度慢的成本低、容量大。为了权衡成本和速度&#xff0c;计算机存储分了很多层次&#xff0c;扬长避短&#xff0c;有寄存器、L1 cache、L2 cache、L3 cache、主存&#xff08;内存&#xff09;和硬盘等。图1 展示…

掌握组件缓存:解开Vue.js中<keep-alive>的奥秘

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

自己的开题ppt分享 做的很烂 请喷我 谢谢

研究背景与意义 给出自己要做的东西 钢铁异常检测方法 为需求 技术挑战性 回答以下这几个问题 所做任务在技术上的挑战性是什么&#xff1f; &#xff08;数学描述&#xff1a;数据层面、任务层面或者模型层面等&#xff09; 什么原因导致这个挑战&#xff1f; 它为什么是…

程序环境+预处理

&#x1f493;博客主页&#xff1a;江池俊的博客⏩收录专栏&#xff1a;C语言进阶之路&#x1f449;专栏推荐&#xff1a;✅C语言初阶之路 ✅数据结构探索✅C语言刷题专栏&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f389;欢迎大家点赞&#x1f44d;评论&#x…

设计模式:解释器模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

上一篇《责任链模式》 下一篇《设计模式学习顺序》 简介&#xff1a; 解释器模式&#xff0c;它是一种行为型模式&#xff0c;它给定一门语言&#xff0c;定义其文法的一种表示&#xff0c;并定义一个解释器&#…

KDL库在VS2022上配置博客

一、前提准备 1.下载kdl库和kdl所依赖的eigen库&#xff0c; &#xff08;在kdl库中找到FindEigen3可以查看里面所需要的Eigen3库所需要的最低版本&#xff09;下载完成后重命名压缩包为kdl和eigen3之后解压缩文件 2.安装好cmake工具 二、建立build目录 ..\kdl\orocos_kine…

SpringCloud(一) 服务架构的演变及注册RestTemplate实现服务的远程调用

目录 一, 服务架构的演变 1.1 单体架构 1.2 分布式架构 1.3 微服务 1.4 SpringCloud 二, 服务拆分和远程调用 2,1 服务拆分原则 2.2 服务拆分示例 2.3 创建相应数据库 2.4 实现远程调用示例 1, 更改需求 2, 注册RestTemplate实现远程调用 2.5 服务消费者和提供者 一…

Spring的条件注解,一篇文章盘得清清楚楚明明白白

前言 在Spring中&#xff0c;条件注解可根据特定的条件来决定是否创建或配置Bean&#xff0c;这些条件可以基于类、属性、环境等因素。通过使用条件注解&#xff0c;我们可以在Spring容器中更加灵活地管理和控制组件的创建和注入&#xff0c;帮助我们更加灵活地管理和控制Bean…

3.5每日一题(求齐次方程组的特解)

1、判断类型选择方法&#xff1a;看出为齐次方程&#xff08;次幂都一样&#xff09; 2、 化为变量可分离&#xff1b;按变量可分离的方法求出通解&#xff08;此题等式两边同时除以 x &#xff09; 3、把x1&#xff0c;y0带入通解&#xff0c;定常数C&#xff0c;求出特解 …

Android 优质的UI组件汇总

1、RuleView &#xff1a;Android自定义标尺控件(选择身高、体重等) 链接&#xff1a;https://github.com/cStor-cDeep/RuleView 2、DashboardView &#xff1a;Android自定义仪表盘View&#xff0c;仿新旧两版芝麻信用分、炫酷汽车速度仪表盘 链接&#xff1a;https://git…

业务连续性的重要性及关键因素

在今天的竞争激烈的商业环境中&#xff0c;保持业务连续性至关重要。业务连续性是指企业能够在面对各种不可预测的挑战和灾难情况下&#xff0c;保持运营&#xff0c;提供产品和服务&#xff0c;以确保客户满意度和可持续发展。本文将探讨业务连续性的重要性、关键因素和最佳实…

探营云栖大会:蚂蚁集团展出数字人全栈技术,三大AI“机器人”引关注

一年一度的科技盛会云栖大会将于10月31日正式开幕。30日&#xff0c;记者来到云栖大会展区探营&#xff0c;提前打卡今年上新的“黑科技”。 记者在蚂蚁集团展馆看到&#xff0c;超1亿人参与的亚运“数字火炬手”全栈技术首次公开展示&#xff0c;还可体验基于数字人技术的“数…

【工具使用】NPS内网穿透工具介绍

文章目录 前言一、内网穿透二、NPS概述三、NPS原理四、NPS服务器搭建(一)云服务器配置 五、NPS内网穿透演示(一)演示案例一(二)演示案例二 六、NPS内网穿透检测建议(一)流量监控(二)流量协议分析(三)网络行为异常检测 七、NPS内网穿透防范建议(一)阻止或隔离流量(二)更新和强化…

GitHub经常打不开或者访问解决办法

访问慢或无法访问的原因&#xff1a;DNS解析是最为基础的一个环节。由于Github的服务器在全球各地&#xff0c;域名解析所需的时间也会不同&#xff0c;这就导致了在特定地区可能会出现Github无法正常访问的情况。 解决&#xff1a;查询到github对应的IP&#xff0c;然后在host…

Java - JDK演变之路和JDK21新特性

Java - JDK演变之路和JDK21新特性 前言一. JDK演变之路JDK9 新特性&#xff08;2017年9月&#xff09;JDK10 新特性&#xff08;2018年3月&#xff09;JDK11 新特性&#xff08;2018年9月 - LTS版本&#xff09;☆JDK12 新特性&#xff08;2019年3月&#xff09;JDK13 新特性&a…

《低代码指南》——我想将维格云与别的系统打通,自动同步数据,怎么实现?

与其他系统打通的3种形式 ​ 人工复制粘贴:操作难度低,时效性差,适合于少量数据定时更新Excel导入:可追加导入,作难度低,时效性差,适和于定期更新数据API对接:可实现实时数据对接,有一定操作门槛API对接的3种方法​ 维格机器人:可以通过维格机器人直接调用对接系统的…

MES与AGV对接浅谈

昨天分享一些有关数字工厂与立体库的对接经验&#xff0c;随着智能物料技术的越来越成熟&#xff0c;硬件设施成本的下降&#xff0c;很多制造业在工厂规划时已经开始考虑在物料搬运、物料配送等使用无人化的智能搬运机器。今天聊聊有关在智能工厂实施中MES与AGV的对接方式一些…