windows C++-创建数据流代理(二)

news2025/1/6 14:27:06
 完整的数据流演示 

下图显示了 dataflow_agent 类的完整数据流网络:

由于 run 方法是在一个单独的线程上调用的,因此在完全连接网络之前,其他线程可以将消息发送到网络。 _source 数据成员是一个 unbounded_buffer 对象,用于缓冲从应用程序发送到代理的所有输入。 为了确保网络能够处理所有输入消息,代理会首先链接网络的内部节点,然后将该网络的起点 connector 链接到 _source 数据成员。 这可以保证在形成网络的过程中不会处理消息。

由于此示例中的网络是基于数据流,而不是基于控制流的,网络必须向代理传达它已经完成了对每个输入值的处理,并且 Sentinel 节点也已接收到它的值。 此示例使用 countdown_event 对象来指示所有输入值均已经过处理,使用 concurrency::event 对象指示 Sentinel 节点已接收到它的值。 countdown_event 类使用 event 对象指示计数器值达到零。 每当数据流网络的头收到一个值时,都会递增计数器值。 在处理输入值后,网络的每个终端节点都会递减计数器值。 代理形成数据流网络后,它会等待 Sentinel 节点设置 event 对象,还会等待 countdown_event 对象指示其计数器值已达到零。

下面的示例展示了 control_flow_agent、dataflow_agent 和 countdown_event 类。 wmain 函数创建了 control_flow_agent 和 dataflow_agent 对象,并使用 send_values 函数将一系列随机值发送到代理。

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

输出如下:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
编译代码

复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 dataflow-agent.cpp 的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc dataflow-agent.cpp

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2196434.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git clone 私有仓库时出现错误 Authentication failed for :xxxxxx

错误信息 remote: Support for password authentication was removed on August 13, 2021. remote: Please see https://docs.github.com/get-started/getting-started-with-git/about-remote-repositories#cloning-with-https-urls for information on currently recommended…

【算法】博弈论(C/C++)

个人主页&#xff1a;摆烂小白敲代码 创作领域&#xff1a;算法、C/C 持续更新算法领域的文章&#xff0c;让博主在您的算法之路上祝您一臂之力 欢迎各位大佬莅临我的博客&#xff0c;您的关注、点赞、收藏、评论是我持续创作最大的动力 目录 博弈论&#xff1a; 1. Grundy数…

手机商城系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;用户管理&#xff0c;订单管理&#xff0c;商品信息管理&#xff0c;基础数据管理&#xff0c;地址管理&#xff0c;轮播图管理 微信端账号功能包括&#xff1a;系统首页&#…

Java数据结构栈和队列(Stack和Queue详解)

前言&#xff1a; 栈和队列是数据结构中重要的存储方式&#xff0c;也是建立在Arrarylist和LinkedList之上的数据结构。 本猿在C语言阶段就已经详细剖析过栈和队列&#xff0c;不熟悉的小伙伴可以翻看之前的博客内容&#xff01; Java阶段一方面剖析Java中自带的Stack和Queue中…

宠物咖啡馆平台开发:SpringBoot框架的高效应用

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

【Spring】“请求“ 之传递 JSON 数据

文章目录 JSON 概念JSON 语法JSON 的语法JSON 的两种结构 JSON 字符串和 Java 对象互转JSON 优点传递 JSON 对象 JSON 概念 JSON&#xff1a;JavaScript Object Notation【JavaScript 对象表示法】 JSON 就是一种数据格式&#xff0c;有自己的格式和语法&#xff0c;使用文本…

什么是PLM系统?PLM系统对制造业起到哪些作用?三品PLM系统对汽车制造业意义

在当今竞争激烈的制造业环境中&#xff0c;企业面临着来自市场、技术、客户需求等多方面的挑战。为了应对这些挑战&#xff0c;许多制造企业纷纷引入产品生命周期管理PLM系统&#xff0c;以实现更高效、更灵活的产品全生命周期管理。PLM系统以其独特的优势&#xff0c;在优化产…

社区圈子系统 圈子社区系统 兴趣社区圈子论坛系统 圈子系统源码圈子系统的适用领域有哪些?如何打造自己的圈子圈子系统有哪些常见问题

社区圈子系统 圈子社区系统 兴趣社区圈子论坛系统 圈子系统源码圈子系统的适用领域有哪些&#xff1f;如何打造自己的圈子圈子系统有哪些常见问题 圈子系统的适用领域 圈子系统的适用领域广泛&#xff0c;涵盖了多个行业和场景&#xff0c;包括但不限于以下几个方面&#xff1…

计算机毕业设计 自习室座位预约系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

火语言发布暨火车采集器VIP用户回馈活动

火车头软件系列产品从最初的火车采集器、火车浏览器到触控精灵一直致力于数据采集领域的深耕与拓展&#xff0c;不断优化用户体验&#xff0c;力求为用户提供更加高效、便捷的数据处理工具。 火语言也属于同创始团队的旗下产品&#xff0c;历经三年每周版本更新&#xff0c;迭…

电感七大关键参数

大家好,这里是大话硬件。 今天这篇文章介绍电感的七大关键参数。 1、电感值 电感值就是电感做好以后的固有特性,比如1uH, 10mH,1H,这样不同类型的感值。在学习电感值之前,我们先看一下电阻公式: 其中p是导体的电阻率(Ω*m),S是导体的横截面积(m2),l是导体的长度…

Linux驱动学习——Linux启动流程

什么是驱动 驱动&#xff0c;即设备驱动程序&#xff0c;是一种可以使计算机和设备通信的特殊程序。 从作用角度来看&#xff0c;驱动的主要功能是将硬件设备的功能与操作系统进行连接&#xff0c;让操作系统能够识别并正确使用硬件设备。例如&#xff0c;显卡驱动能让操作系…

Java SE-object类和里面的3个主要方法解读

文章目录 1.object类2.toString方法调用过程2.1具体案例2.2源代码查看2.3方法的重写2.4重写效果 3.equals方法调用过程3.1现象的描述3.2方法的重写3.3IDEA自动填充 4.hashcode方法 1.object类 java里面除了object类&#xff0c;所有的类都是存在继承关系的&#xff0c;object类…

【Docker】03-自制镜像

1. 自制镜像 2. Dockerfile # 基础镜像 FROM openjdk:11.0-jre-buster # 设定时区 ENV TZAsia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone # 拷贝jar包 COPY docker-demo.jar /app.jar # 入口 ENTRYPOINT ["ja…

javaweb-请求和响应

1.http协议 1.1请求 1.2.响应 常见响应状态码&#xff1a; 状态码大全&#xff1a;https://cloud.tencent.com/developer/chapter/13553 常见的响应头&#xff1a; 2.请求和响应 2.1.请求 2.1.1postman 作用&#xff1a;常用于接口测试 使用&#xff1a;官网安装-->注册…

电脑手机下载小米xiaomi redmi刷机包太慢 解决办法

文章目录 修改前下载速度修改后下载速度修改方法&#xff08;修改host&#xff09; 修改前下载速度 一开始笔者以为是迅雷没开会员的问题&#xff0c;在淘宝上买了一个临时会员后下载速度依然最高才100KB/s 修改后下载速度 修改方法&#xff08;修改host&#xff09; host文…

【星汇极客】STM32 HAL库各种模块开发之DHT11模块

前言 本人是一名嵌入式学习者&#xff0c;在大学期间也参加了不少的竞赛并获奖&#xff0c;包括&#xff1a;江苏省电子设计竞赛省一、睿抗机器人国二、中国高校智能机器人国二、嵌入式设计竞赛国三、光电设计竞赛国三、节能减排竞赛国三等。 暑假的时候参加了太多的比赛&#…

nacos启动报错:Unable to start embedded Tomcat

报错截图&#xff1a; 解决方法&#xff1a;编辑启动脚本/nacos/bin/startup.sh&#xff0c;指定模式为standalone即可 修改后启动成功~

vscode配置golang

1.安装golang解释器 从网址https://go.dev/dl/下载对应的golang解释器 2.配置环境 Extensions中搜索安装go 2.配置settings.json {"go.autocompleteUnimportedPackages": true,"go.gocodeAutoBuild": false,"explorer.confirmPasteNative"…

若依权限设计与自定义新增用户

前言 若依 系统的权限设计是基于RBAC&#xff08;Role-Based Access Control&#xff09;&#xff0c;即基于角色的访问控制模型&#xff0c;允许通过角色来管理用户的权限。 每个用户可以分配一个或多个角色。用户的权限来自于其所分配的角色。用户与角色的对应关系保存在 sys…