ZMQ/ZeroMQ的三种消息模式

news2024/11/27 16:42:45

一、 Reuqest-Reply(请求-应答模式)

        1、使用Request-Reply模式,需要遵循一定的规律。

        2、客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环

        3、服务端和客户端谁先启动,效果都是一样的。

        4、服务端在收到消息之前,会一直阻塞,等待客户端连上来。

        创建一个客户端和服务端,客户端发送消息给服务端,服务端返回消息给客户端,客户端和服务器谁先启动都可以。

        server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>

#define sleep(n)	Sleep(n)
#endif

int main () {
    //  Prepare our context and socket
    zmq::context_t context (2);
    zmq::socket_t socket (context, zmq::socket_type::rep);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (request, zmq::recv_flags::none);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep(1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy (reply.data (), "World", 5);
        socket.send (reply, zmq::send_flags::none);
    }
    return 0;
}

        client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()
{
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, zmq::socket_type::req);

    std::cout << "Connecting to hello world server..." << std::endl;
    socket.connect ("tcp://localhost:5555");

    //  Do 10 requests, waiting each time for a response
    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << request_nbr << "..." << std::endl;
        socket.send (request, zmq::send_flags::none);

        //  Get the reply.
        zmq::message_t reply;
        socket.recv (reply, zmq::recv_flags::none);
        std::cout << "Received World " << request_nbr << std::endl;
    }
    return 0;
}

二、Publisher-Subscriber(发布-订阅模式)

        Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。

        服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息

        比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息

        如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

        "慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",在启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。

       

 

        publisher.cpp

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#if (defined (WIN32))
#include <zhelpers.hpp>
#endif

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main () {

    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, zmq::socket_type::pub);
    publisher.bind("tcp://*:5556");
    publisher.bind("ipc://weather.ipc");				// Not usable on Windows.

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {

        int zipcode, temperature, relhumidity;

        //  Get values that will fool the boss
        zipcode     = within (100000);
        temperature = within (215) - 80;
        relhumidity = within (50) + 10;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
        	"%05d %d %d", zipcode, temperature, relhumidity);
        publisher.send(message, zmq::send_flags::none);

    }
    return 0;
}

         subscriber.cpp

#include <zmq.hpp>
#include <iostream>
#include <sstream>

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    std::cout << "Collecting updates from weather server...\n" << std::endl;
    zmq::socket_t subscriber (context, zmq::socket_type::sub);
    subscriber.connect("tcp://localhost:5556");

    //  Subscribe to zipcode, default is NYC, 10001
	const char *filter = (argc > 1)? argv [1]: "10001 ";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {

        zmq::message_t update;
        int zipcode, temperature, relhumidity;

        subscriber.recv(update, zmq::recv_flags::none);

        std::istringstream iss(static_cast<char*>(update.data()));
		iss >> zipcode >> temperature >> relhumidity ;

		total_temp += temperature;
    }
    std::cout 	<< "Average temperature for zipcode '"<< filter
    			<<"' was "<<(int) (total_temp / update_nbr) <<"F"
    			<< std::endl;
    return 0;
}

 三、Push-Pull(平行管道模式/分布式处理)

        1、Ventilator:任务发布器会生成大量可以并行运算的任务。

        2、Worker:有一组worker会处理这些任务。

        3、Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

        4、Worker上游和"任务发布器"相连,下游和"结果接收器"相连。

        5、"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点。

        6、Worker只是连接两个端点。

        7、需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动。

        8、会一下子接收很多任务。

        9、"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)。

        10、"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)。

         taskvent.cpp

#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to send messages on
    zmq::socket_t  sender(context, ZMQ_PUSH);
    sender.bind("tcp://*:5557");

    std::cout << "Press Enter when the workers are ready: " << std::endl;
    getchar ();
    std::cout << "Sending tasks to workers...\n" << std::endl;

    //  The first message is "0" and signals start of batch
    zmq::socket_t sink(context, ZMQ_PUSH);
    sink.connect("tcp://localhost:5558");
    zmq::message_t message(2);
    memcpy(message.data(), "0", 1);
    sink.send(message);

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));

    //  Send 100 tasks
    int task_nbr;
    int total_msec = 0;     //  Total expected cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        //  Random workload from 1 to 100msecs
        workload = within (100) + 1;
        total_msec += workload;

        message.rebuild(10);
        memset(message.data(), '\0', 10);
        sprintf ((char *) message.data(), "%d", workload);
        sender.send(message);
    }
    std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
    sleep (1);              //  Give 0MQ time to deliver

    return 0;
}

         taskwork.cpp

#include "zhelpers.hpp"
#include <string>

int main (int argc, char *argv[])
{
    zmq::context_t context(1);

    //  Socket to receive messages on
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");

    //  Socket to send messages to
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.connect("tcp://localhost:5558");

    //  Process tasks forever
    while (1) {

        zmq::message_t message;
        int workload;           //  Workload in msecs

        receiver.recv(&message);
        std::string smessage(static_cast<char*>(message.data()), message.size());

        std::istringstream iss(smessage);
        iss >> workload;

        //  Do the work
        s_sleep(workload);

        //  Send results to sink
        message.rebuild();
        sender.send(message);

        //  Simple progress indicator for the viewer
        std::cout << "." << std::flush;
    }
    return 0;
}

        tasksink.cpp

#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>

int main (int argc, char *argv[])
{
    //  Prepare our context and socket
    zmq::context_t context(1);
    zmq::socket_t receiver(context,ZMQ_PULL);
    receiver.bind("tcp://*:5558");

    //  Wait for start of batch
    zmq::message_t message;
    receiver.recv(&message);

    //  Start our clock now
    struct timeval tstart;
    gettimeofday (&tstart, NULL);

    //  Process 100 confirmations
    int task_nbr;
    int total_msec = 0;     //  Total calculated cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {

        receiver.recv(&message);
        if (task_nbr % 10 == 0)
            std::cout << ":" << std::flush;
        else
            std::cout << "." << std::flush;
    }
    //  Calculate and report duration of batch
    struct timeval tend, tdiff;
    gettimeofday (&tend, NULL);

    if (tend.tv_usec < tstart.tv_usec) {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
        tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
    }
    else {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
        tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
    }
    total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
    std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/42161.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMQ(基于AMQP的开源消息代理软件)

一、AMQP高级消息队列协议 &#xff08;1&#xff09;介绍 AMQP&#xff0c;即Advanced Message Queuing Protocol&#xff0c;一个提供统一消息服务的应用层标准高级消息队列协议&#xff0c;是应用层协议的一个开放标准&#xff0c;为面向消息的中间件设计。基于此协议的客…

图神经网络关系抽取论文阅读笔记(四)

1 GDPNet:用于关系提取的潜在多视图图的精炼(GDPNet: Refining Latent Multi-View Graph for Relation Extraction) 论文&#xff1a;GDPNet: Refining Latent Multi-View Graph for Relation Extraction&#xff0c;2021 1.1 引言 由于基于 BERT 等序列模型与基于图模型算法是…

模块首页UX交互升级,接口测试支持禁用本地执行,MeterSphere开源持续测试平台v2.4.0发布

2022年11月28日&#xff0c;MeterSphere一站式开源持续测试平台正式发布v2.4.0版本。 在这一版本中&#xff0c;MeterSphere在测试跟踪和接口测试模块中对首页进行了UX交互升级&#xff0c;将部分指标进行了饼图、柱状图的展示优化&#xff0c;同时根据社区用户的实际使用反馈…

计算机毕业设计【HTML+CSS+JavaScript服装购物商城】毕业论文源码

常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他等网页设计题目, A…

JS进阶第二篇:函数参数按值传递

文章目录函数参数按值传递按值传递引用传递&#xff1f;应用函数参数按值传递 按值传递 在 JavaScript 中&#xff0c;我们有函数以及传递给这些函数的参数。但是 JavaScript 对如何处理你传递的内容并不总是很清楚。当你开始进入面向对象开发的时候&#xff0c;你可能会发现…

【Hack The Box】Linux练习-- Shibboleth

HTB 学习笔记 【Hack The Box】Linux练习-- Shibboleth &#x1f525;系列专栏&#xff1a;Hack The Box &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4c6;首发时间&#xff1a;&#x1f334;2022年11月27日&#x1f334; &#…

3.8 如何在小红书上蹭热点,这里有8个方法【玩赚小红书】

在小红书究竟能不能蹭到热点?有哪些热点可以蹭?怎么蹭?是很多小红书运营者关心的问题。在我看下&#xff0c;小红书热点分为官方热点、事件热点和账号热点三类&#xff0c;用好这8个方法&#xff0c;让笔记获得更多流量。 ​ ​ 一、官方热点 官方热点是小红书推出&#x…

RocketMQ 消费端如何监听消息?

前言 RocketMQ消息消费者是如何启动的&#xff0c;有一个步骤是非常重要的&#xff0c;就是启动消息的监听&#xff0c;通过不断的拉取消息&#xff0c;来实现消息的监听&#xff0c; 那具体怎么做&#xff0c;让我们我们跟着源码来学习一下~ 流程地图 源码跟踪 这一块的代…

ipv6地址概述——配置ipv6

个人简介&#xff1a;云计算网络运维专业人员&#xff0c;了解运维知识&#xff0c;掌握TCP/IP协议&#xff0c;每天分享网络运维知识与技能。个人爱好: 编程&#xff0c;打篮球&#xff0c;计算机知识个人名言&#xff1a;海不辞水&#xff0c;故能成其大&#xff1b;山不辞石…

加拿大留学一定要善用这八个服务系统

加拿大留学之所以如此受大家欢迎&#xff0c;主要也是由于其优质的教育、易移民等优势&#xff0c;而对于学生本人来说&#xff0c;满意的学习体验&#xff0c;也是留学生涯中不可缺少的重要一环。 加拿大大学都拥有成熟完善的学生服务系统&#xff0c;帮助学生们更好的学习、…

Code learning tools

这里写目录标题1. Code learning tools1.1. Chrome Sourcegraph plugin1.2. Print statements never go out of style1.3. When in doubt, PANIC1.4. Visit the past with GitHub blame1. Code learning tools I know what you are thinking. Brad, you are new to Kube and G…

有哪些电容笔值得推荐?值得买的电容笔测评

和苹果原装的电容笔不同&#xff0c;普通电容笔没有苹果电容笔的独特重力压感&#xff0c;只是给人一种倾斜的压感。不过&#xff0c;如果你对画画没有什么特别的需求&#xff0c;也不必买昂贵的Apple Pencil&#xff0c;平替电容笔就足够我们使用了。接下来&#xff0c;我会给…

全球133种语言自动翻译mishop大米外贸商城系统

提示&#xff1a;133种语言自动翻译&#xff0c;开源无加密。 文章目录介绍安装方法部分代码展示学习资料下载地址成品效果图片展示介绍 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 大米外贸商城系统 简称mishop 完全开源版&#xff0c;只需做一种语言一…

JXcore 打包在企业级项目里的合理运用和模块系统以及网络的配置详解【node.js】

文章目录 JXcore 打包模块系统netJXcore 打包 node.Js是面向服务器端和网络应用程序的开源跨平台运行环境。 JXcore是一个支持多线程的节点。对于js发行版,您可以在多个线程中安全运行,而无需对现有代码进行任何更改。 安装命令如下: $ curl https://raw.githubusercontent.…

Java语言学习全笔记保姆级学习

目录 文章目录一、Java基础篇1.接口和抽象类的区别2.重载和重写的区别3.和equals的区别4.异常处理机制5.HashMap原理6.想要线程安全的HashMap怎么办&#xff1f;7.ConcurrentHashMap原如何保证的线程安全&#xff1f;8.HashTable与HashMap的区别9.ArrayList和LinkedList的区别1…

Tauri 入门教程

Tauri入门教程1 简介2 创建Tauri项目(页面基于Vue)2.1 环境准备2.2 创建工程3 Tauri 工程目录介绍4 页面调用rust方法5 事件系统6 HTTP请求7 文件系统8 对话框9 窗口配置10 打包1 简介 Tauri:构建跨平台的快速、安全、前端隔离应用。Tauri 是一个相对较新的框架&#xff0c;允…

Apollo 应用与源码分析:Monitor监控-硬件监控-CAN监控

目录 基本概念 CAN Card CAN - 原始套接字 ESD CAN 监控分析 Socket Can监控分析 基本概念 CAN Card 首先需要直到CAN的一些基本的概念。 CAN 是Controller Area Network 的缩写&#xff08;以下称为CAN&#xff09;&#xff0c;是ISO国际标准化的串行通信协议。在汽车产…

Elastic Stack 环境配置与框架简介

目录 简介 什么是Elastic Stack Elasticasearch Logstash Kibana Beats 框架图 下载 配置 一、安装java环境 启动 Elasticsearch Kibana FileBeat Logstash 测验 简介 什么是Elastic Stack Elastic Stack缩写为elk&#xff0c;它由三个软件组成&#xff1a;E…

唯品会:高利润,慢增长?

配图来自Canva可画 近日&#xff0c;阿里、京东等互联网大厂纷纷发布了新一季度的财报&#xff0c;从其财报不难看出&#xff0c;国内头部电商平台已经告别了一路狂奔的时代&#xff0c;开始愈发稳健起来。唯品会虽然在体量和规模上都还不能和这两家巨头相比&#xff0c;但自其…

36、Java——吃货联盟订餐系统(JDBC+MySQL+Apache DBUtils)

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏&#xff1a;Java案例分…