ZMQ管道模型

news2024/9/22 23:33:16

案例一

生产者Producer

#include <zmq.hpp>
#include <iostream>
#include <string>
#include<chrono>
#include<thread>

using namespace std;
using namespace zmq;

int main() {
    context_t context(1);

    // 创建 PUSH 套接字,用于发送消息到代理
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.connect("tcp://localhost:8888");
    cout << "connect success" << endl;
    int i = 0;
    for (; ;) {
        string message = "Message " + to_string(++i);
        push_socket.send(buffer(message), send_flags::none);
      
        cout << "Sending: " << message << endl;
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
    }

    return 0;
}

队列queue()

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>


// 休眠指定的时间长度


using namespace std;
using namespace zmq;

int main() {
    context_t context(2);

    // 创建 PULL 套接字,用于接收生产者的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.bind("tcp://*:8888");

    // 创建 PUSH 套接字,用于发送消息给消费者
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.bind("tcp://*:8889");
    cout << "bind success" << endl;
    while (true) {
        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "broken recv :" << message.to_string() << endl;
        // 睡眠一段时间模拟处理时间
        
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
        cout << "broken send :" << message.to_string() << endl;
        push_socket.send(message, send_flags::none);
        
    }

    return 0;
}```

## 消费者(Consumer)

```cpp
// client.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>

using namespace std;
using namespace zmq;

int main() {
    context_t context(1);

    // 创建 PULL 套接字,用于接收代理发送的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.connect("tcp://localhost:8889");
    cout << "connect success" << endl;
    for (; ; ) {
        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "Received: " << message.to_string() << endl;
    }

    return 0;
}

编译

g++ -o server server.cpp `pkg-config --cflags --libs libzmq`
g++ -o client client.cpp `pkg-config --cflags --libs libzmq`
g++ -o queue queue.cpp `pkg-config --cflags --libs libzmq`

在这里插入图片描述

队列程序逻辑

队列作为管道需要接受来只上游生产者的消息,又需要发送消息给下游的消费者。

绑定端点

context_t context(1);

// 创建 PULL 套接字,用于接收生产者的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.bind("tcp://*:8888");

// 创建 PUSH 套接字,用于发送消息给消费者
socket_t push_socket(context, ZMQ_PUSH);
push_socket.bind("tcp://*:8889");

收发消息

        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "broken recv :" << message.to_string() << endl;
        // 睡眠一段时间模拟处理时间
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
        cout << "broken send :" << message.to_string() << endl;
        push_socket.send(message, send_flags::none);

生产者程序逻辑

连接管道

    context_t context(1);
    // 创建 PUSH 套接字,用于发送消息到代理
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.connect("tcp://localhost:8888");

发送消息

		string message = "Message " + to_string(++i);
        push_socket.send(buffer(message), send_flags::none);

消费者程序逻辑

连接管道

    context_t context(1);
    // 创建 PULL 套接字,用于接收代理发送的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.connect("tcp://localhost:8889");

接受消息

        message_t message;
        pull_socket.recv(message, recv_flags::none);

总结

在管道两边,可以有n个生产者,m个消费者,生产者的消息会以先进先出的顺序经过管道到达消费者,管道发给m个消费者的消息是负载均衡的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2074725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2000-2023年上市公司财务困境RLPM模型数据(含原始数据+计算结果)

2000-2023年上市公司财务困境RLPM模型数据&#xff08;含原始数据计算结果&#xff09; 1、时间&#xff1a;2000-2023年 2、来源&#xff1a;上市公司年报 3、指标&#xff1a;证券代码、证券简称、统计截止日期、是否剔除ST或*ST或PT股、是否剔除上市不满一年、已经退市或…

《深度学习》OpenCV 计算机视觉入门 (上篇)

目录 一、了解OpenCV 1、简介 2、导包 3、了解图片构成 二、函数运用 1、展示图片 2、展示图片属性 1&#xff09;img.shape 形状 2&#xff09;img.dtype 类型 像素值类型&#xff1a; 3&#xff09;img.size 尺寸 4&#xff09;演示&#xff0c;img为上图 …

【轨物方案】红外抄表装置在光伏电站项目中的应用

首先&#xff0c;红外抄表装置能够实现远程自动抄表&#xff0c;这对于光伏电站来说至关重要。由于光伏电站往往分布在广阔的区域&#xff0c;且电站设备可能位于偏远或难以到达的位置&#xff0c;使用红外抄表装置可以减少人工抄表的需要&#xff0c;提高数据采集的效率和准确…

79、ansible-----playbook2

1、作业 [roottest1 opt]# vim test2.yaml - name: this is muluhosts: 192.168.168.22gather_facts: falsevars: ##定义变量testtest:- /opt/test1 ##对变量进行赋值- /opt/test2- /opt/test3- /opt/test4tasks:- name: create mulufile:path: "{{item}}&q…

K8s之自动扩缩容

Kubernetes (K8s) 的动态扩缩容&#xff08;自动伸缩&#xff09;功能是集群管理中非常关键的一部分&#xff0c;能够根据工作负载的变化自动调整应用程序的副本数&#xff0c;以确保资源的高效利用和服务的稳定性。 K8s介绍文章 容器之k8s(Kubernetes)-CSDN博客 1. 动态扩缩容…

文件包含漏洞(1)

目录 PHP伪协议 php://input Example 1&#xff1a; 造成任意代码执行 Example 2&#xff1a; 文件内容绕过 php://filer zip:// PHP伪协议 php://input Example 1&#xff1a; 造成任意代码执行 搭建环境 <meta charset"utf8"> <?php error_repo…

代码随想录算法训练营第51天|卡码网99. 岛屿数量、100. 岛屿的最大面积

1.卡码网99. 岛屿数量 题目链接&#xff1a;https://kamacoder.com/problempage.php?pid1171 文章链接&#xff1a;https://www.programmercarl.com/kamacoder/0099.岛屿的数量深搜.html#_99-岛屿数量 本题思路: 遇到一个没有遍历过的节点陆地&#xff0c;计数器就加一&#x…

银河麒麟桌面操作系统V10:如何设置应用开机自启动?

银河麒麟桌面操作系统V10&#xff1a;如何设置应用开机自启动&#xff1f; 1、图形界面设置2、命令行设置3、注意 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、图形界面设置 打开“设置”->“系统”->“开机启动”。点击“添加…

秋招突击——笔试整理——8/25——拼多多笔试整理

文章目录 引言正文第一题——多多删树个人解答 第二题、多多的奇数序列个人解答 第三题&#xff1a;多多换礼物个人解答参考实现 第四题、多多的字符反转个人实现 总结 引言 今天的拼多多笔试挺难的&#xff0c;感觉自己在技巧性还差很多&#xff0c;很多东西需要看很久&#…

第3章-02-Python库Selenium安装与讲解

🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年CSDN全站百大博主。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 🏆本文已收录于专栏:Web爬虫入门与实战精讲,后续完整更新内容如下。 文章…

windows javascript 打开、关闭摄像头

1. 效果 打开摄像头 关闭摄像头&#xff08;包括指示灯也关了的&#xff09; 2. 代码 open_close_camera.html // open_close_camera.html <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>use camera</title>…

CPP中lamada表达式作用一览[more cpp-6]

一般语法 CPP中的lambda 表达式的本质就是匿名函数&#xff0c;它可以在代码中定义一个临时的、局部的函数.为什么需要lamada表达式&#xff1f; 因为命名是个大问题 想名字以及避免命名冲突是很劳神费力的事&#xff0c;这就是lamada表达式的优点(lamada优点表现为简洁性)总…

7.Linux_GCC与GDB

GCC 1、GCC编译过程 首先使用编辑器对.c文件进行编辑&#xff0c;即&#xff1a;敲代码。之后GCC编译器会对.c文件进行预处理、编译、汇编、链接&#xff0c;最终输出可执行文件。具体流程如下&#xff1a; 四个阶段的含义及指令 1、预处理 指令&#xff1a;gcc - E <.…

专利服务系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;专利申请管理&#xff0c;分类号管理&#xff0c;专利管理&#xff0c;增值服务管理&#xff0c;业务指导信息管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页…

免费插件集-illustrator插件-Ai插件-路径节点分割路径

文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件&#xff0c;加强illustrator使用人员工作效率&#xff0c;路径处理功能&#xff0c;功能是路径节点分割路径。首先从下载网址下载这款插件 https://download.csdn.net/down…

kubenetes--资源调度

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 出自B站博主教程笔记&#xff1a; 完整版Kubernetes&#xff08;K8S&#xff09;全套入门微服务实战项目&#xff0c;带你一站式深入掌握K8S核心能…

C语言 之 自定义类型:结构体

结构体类型的声明 结构体的声明 struct tag {member-list; //结构体中的成员&#xff0c;可以有多个 }variable-list; //这里是直接创建结构体的变量&#xff0c;但是不一定要在这里声明变量 //不能把后面这个 ; 省略了例如结构体用于描述一个学生&#xff1a; struct Stu…

48.给定一个 n × n 的二维矩阵表示一个图像,实现一个算法将图像原地顺时针旋转 90 度

48. Rotate Image 题目 你得到了一个 n x n 的二维矩阵,表示一张图像。 将图像顺时针旋转 90 度。 注意: 你必须 就地 旋转图像,这意味着你需要直接修改输入的二维矩阵。不能分配另一个二维矩阵来进行旋转。 示例 1: 输入: [ [1,2,3], [4,5,6], [7,8,9] ] 输出: [ [7,…

数据中台架构设计

由于当前项目需要对接多个不同的数据源&#xff0c;同时涉及到多端处理&#xff0c;而且需要考虑海量数据处理&#xff0c;还有总部与分部架构部署问题&#xff0c;因而整体技术栈倾向于大数据和分表分库式处理数据层接入问题。 简单讲&#xff0c;项目分为数据中台和业务中台…

介绍下线程池的七个参数

corePoolSize&#xff1a;核心线程数&#xff0c;池中要保留的线程数&#xff0c;即使它们处于空闲状态&#xff0c;除非设置了allowCoreThreadTimeOutmaximumPoolSize&#xff1a;最大线程数量keepAliveTime&#xff1a;线程存活时间unit&#xff1a;时间单位workQueue&#x…