ZMQ发布订阅模式二次封装

news2024/11/24 2:20:38

ZeroMQ

参考ZMQ从入门到掌握一
ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。它有自己的模式,不同于更底层的点对点通讯模式。它有比 tcp 协议更高一级的协议。(当然 ZeroMQ 不一定基于 TCP 协议,它也可以用于进程间和进程内通讯)它改变了通讯都基于一对一的连接这个假设。ZeroMQ 把通讯的需求看成四类。其中一类是一对一结对通讯,用来支持传统的 TCP socket 模型,但并不推荐使用。常用的通讯模式只有三类:

  • 请求回应模型:由请求端发起请求,并且等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是 1:N 的模型。通常把 1 认为是 server ,N 认为是 Client 。ZeroMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为 N:M (只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。
  • 发布订阅:这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的 socket 采用请求回应模型满足这个需求。
  • 管道模式:单向的

发布者

  • ZmqSendUtil.h
#ifndef ZMQSENDUTIL_H
#define ZMQSENDUTIL_H
 
#include <string>
#include <cpp/zmq.hpp>
class ZmqSendUtil{
private:
    void *context;
    void *publisher;
    std::string url;
 
public:
    ZmqSendUtil();                              //默认构造方法
    ZmqSendUtil(void *context, void *publisher, const std::string &url);    //初始化构造方法
    virtual ~ZmqSendUtil();                     //析构函数
    int sendMsg(const std::string& info);       //发送消息
};
 
#endif
  • ZmqSendUtil.cpp

#include "ZmqSendUtil.h"
ZmqSendUtil::ZmqSendUtil() {}
ZmqSendUtil::ZmqSendUtil(void *context, void *publisher, const std::string &url) 
                        :context(context),publisher(publisher), url(url){
    //初始化上下文
    if (this->context == nullptr){
        this->context = zmq_ctx_new();
        assert(this->context != nullptr);
    }
 
    //获取socket对象
    if (this->publisher == nullptr){
        this->publisher = zmq_socket(this->context, ZMQ_PUB);
        assert(this->publisher != nullptr);
    }
 
    //socket绑定通信地址
    int result = zmq_bind(this->publisher, this->url.c_str());
    assert(result == 0);
}

ZmqSendUtil::~ZmqSendUtil() {
    zmq_close(this->publisher);
    zmq_ctx_destroy(this->context);
}

int ZmqSendUtil::sendMsg(const std::string& info) {
    int result=-1;
    if (this->context != nullptr && this->publisher != nullptr){
        result = zmq_send(this->publisher, info.c_str(), info.length(), 0);
    return result > 0;
    }
}

订阅者

  • ZmqRecvUtil.h
#ifndef USV_ZMQRECVUTIL_H
#define USV_ZMQRECVUTIL_H
 
#include <string>
#include <cpp/zmq.hpp>
class ZmqRecvUtil{
private:
    void *context;
    void *subscriber;
    std::string url;
 
public:
    ZmqRecvUtil();              //默认构造方法
    ZmqRecvUtil(void *context, void *subscriber, const std::string &url);   //初始化构造方法
    virtual ~ZmqRecvUtil();     //析构函数
    std::string recvMsg();      //接收消息
};
 
#endif
  • ZmqRecvUtil.cpp
#include "ZmqRecvUtil.h"
#define MAX_INFO_SIZE 1024
 
ZmqRecvUtil::ZmqRecvUtil() {}
 
ZmqRecvUtil::ZmqRecvUtil(void *context, void *subscriber, const std::string &url) 
                        :context(context),subscriber(subscriber),url(url){
    //初始化上下文context
    if (this->context == nullptr){
        this->context = zmq_ctx_new();  //如果上下文是空的,那么就初始化上下文
        assert(this->context != nullptr);
    }
    //获取socket对象
    if (this->subscriber == nullptr){   //模式为订阅发布模式 发布端单向发布数据也不管订阅端是否能接收...
        this->subscriber = zmq_socket(this->context, ZMQ_SUB); //如果订阅者是空的句柄那就创建一个不透明的套接字...
        assert(this->subscriber != nullptr);
    }
    //socket绑定通信地址  绑定特定的ip和端口号获取数据....
    int result = zmq_connect(this->subscriber, this->url.c_str());
    assert(result == 0);
    result = zmq_setsockopt(this->subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(result == 0);
}
 
ZmqRecvUtil::~ZmqRecvUtil() {
    zmq_close(this->subscriber);
    zmq_ctx_destroy(this->context);
}
 
std::string ZmqRecvUtil::recvMsg() {
    //读取消息
    char infoBuffer[MAX_INFO_SIZE] = {0};
    int ret = zmq_recv(this->subscriber, infoBuffer, MAX_INFO_SIZE , 0);
    assert(ret != -1);
    std::string tmp="";                     //返回值不能使用infoBuffer强化类型转换,要重新构建字符串
    for (int i = 0; i < ret; ++i) {
        tmp+=infoBuffer[i];
    }
    return tmp;
}

测试

  • main.cpp
#include"ZmqSendUtil.h"
#include"ZmqRecvUtil.h"
#include <iostream>
#include <thread>
using namespace std;

ZmqSendUtil* publisher = new ZmqSendUtil(nullptr,nullptr,"tcp://127.0.0.1:6666");
ZmqRecvUtil* subscriber = new ZmqRecvUtil(nullptr,nullptr,"tcp://127.0.0.1:6666");

// zmq的发送线程函数
void zmq_send_thread() {
    int cnt = 0;
    while(true){
        cnt++;
        string msg = "hello world " + to_string(cnt);
        publisher->sendMsg(msg);
        cout << "send msg: " << msg << endl;
        Sleep(1000); 
    }
}
// zmq的接收线程函数
void zmq_recv_thread() {
    while (true) {
        string msg = subscriber->recvMsg();
        cout << "recv msg: " << msg << endl;
    }
}

int main(int argc, char const *argv[])
{
    cout<<"Hello world"<<endl;
    thread zmq_send(zmq_send_thread);
    thread zmq_recv(zmq_recv_thread);
    zmq_send.join();
    zmq_recv.join();
    return 0;
}

cmakelist.txt编写

# 版本和项目名称
cmake_minimum_required(VERSION 3.20)
project(example1)
# 设定编译标准为C++11
# 设定头文件路径和动态库路径
set(CXX_STANDARD 11)
set(ZEROMQ_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/zeromq425/include)
set(ZEROMQ_LIBRARY_DIRS ${CMAKE_SOURCE_DIR}/zeromq425/bin/Debug/v140/dynamic)
set(ZEROMQ_LIB libzmq.lib)
include_directories(${ZEROMQ_INCLUDE_DIRS})
link_directories(${ZEROMQ_LIBRARY_DIRS})
# 生成可执行文件并连接到库文件
aux_source_directory(. SRC_DIR)
add_executable(zmqtest ${SRC_DIR})
target_link_libraries(zmqtest libzmq.lib)

结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/850261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

加密、解密、编码

urlencode urlencode_百度百科 Base64是一种二进制到文本的编码方式&#xff0c;而且编码出的字符串只包含ASCII基础字符 下图是Base64码表&#xff0c;可以看到从0到63的每个数字都对应一个上面的一个字符。 文件-base64字符串互转 sun.misc包中的类 try (FileOutputStre…

Node.Js安装与配置教程

目录 1.下载官网 2.选择安装路径 3.添加环境变量 4.验证是否安装成功 5.修改模块下载位置 (1)查看npm默认存放位置 6.在node.js安装目录下&#xff0c;创建两个文件夹 7.修改默认文件夹 8.测试默认位置是否更改成功 9.安装报错解决办法 10.路径未更改成功解决办法 …

MIT 6.830数据库系统 -- lab six

MIT 6.830数据库系统 -- lab six 项目拉取引言steal/no-force策略redo log与undo log日志格式和检查点 开始回滚练习1&#xff1a;LogFile.rollback() 恢复练习2&#xff1a;LogFile.recover() 测试结果疑问点分析 项目拉取 原项目使用ant进行项目构建&#xff0c;我已经更改为…

【uniapp 小程序开发页面篇】代码编写规范 | 页面编写规范 | 小程序API

博主&#xff1a;_LJaXi Or 東方幻想郷 专栏&#xff1a; uni-app | 小程序开发 开发工具&#xff1a;HBuilderX 小程序开发页面篇 小程序组件规范小程序介绍小程序规范代码编写规范须遵循的开发规范 运行特性编译器选择编译规则工程目录结构static目录 使用注意static目录 条件…

Spring-2-透彻理解Spring 注解方式创建Bean--IOC

今日目标 学习使用XML配置第三方Bean 掌握纯注解开发定义Bean对象 掌握纯注解开发IOC模式 1. 第三方资源配置管理 说明&#xff1a;以管理DataSource连接池对象为例讲解第三方资源配置管理 1.1 XML管理Druid连接池(第三方Bean)对象【重点】 数据库准备 -- 创建数据库 create …

Easys Excel的表格导入(读)导出(写)-----java

一,EasyExcel官网: 可以学习一些新知识: EasyExcel官方文档 - 基于Java的Excel处理工具 | Easy Excel 二,为什么要使用easyexcle excel的一些优点和缺点 java解析excel的框架有很多 &#xff1a; poi jxl,存在问题&#xff1a;非常的消耗内存&#xff0c; easyexcel 我们…

使用TDOSCommand调用Powershell脚本对进程进行操作

列出当前运行的进程&#xff1a; varPowerShellPath, ScriptPath, CommandLine: string; beginMemo6.Clear;PowerShellPath : powershell.exe ; // 假设 PowerShell 可执行文件在系统环境变量中// 构造命令行参数CommandLine : Get-Process | Select-Object Name,Id;// 设置命…

【Linux】总结2-进程篇1

文章目录 冯诺伊曼结构操作系统什么是程序&#xff1f;什么是进程&#xff1f;操作系统是如何来管理进程的&#xff1f;PCB&#xff08;struct task_struct{...}&#xff09; 冯诺伊曼结构 冯诺依曼提出了计算机制造的三个基本原则&#xff0c;即采用二进制逻辑、程序存储执行…

Stable Diffusion - 常用的负向提示 Embeddings 解析与 坐姿 (Sitting) 提示词

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132145248 负向 Embeddings 是用于提高 StableDiffusion 生成图像质量的技术&#xff0c;可以避免生成一些不符合预期的图像特征&#xff0c;比如…

day5gdb调试模式和makefile

一、gdb调试 1.1gdb调试的作用 gdb调试检查的是逻辑错误&#xff0c;而非语法错误 1.2gdb流程 1、gcc -g 1.c ---->加-g参数的作用&#xff0c;生成可以调试的gdb文件 2、gdb 可执行文件名/a.out ---->进入gdb工具进行调试 3、输入l&#xff0c;带行号打印文件信息…

管理类联考——逻辑——论证逻辑——汇总篇——目录+提炼

文章目录 一、削弱方法关系的削弱必要方法的削弱因果推理的削弱果因推理的削弱概念跳跃的削弱数量比例的削弱比例因果的削弱 二、支持方法关系的支持必要方法的支持因果推理的支持果因推理的支持概念跳跃的支持数量比例的支持比例因果的支持 三、假设方法关系的假设必要方法的假…

不分股权不分管理,只分利润:共享模式的新零售布局

实体行业如何通过共享模式去整合那些有资源的人&#xff0c;来完成新零售的一个布局&#xff1f;比如对于餐饮行业而言&#xff0c;一样的资源&#xff0c;经常有用餐、聚餐需求的人是谁&#xff1f; 有商会组织者、公司的管理层、培训机构、社群群主等等。那么如何把这些人整…

记一次Linux启动Mysql异常解决

文章目录 第一步&#xff1a; netstat -ntlp 查看端口情况2、启动Mysql3、查看MySQL日志 tail -100f /var/log/mysqld.log4、查看磁盘占用情况&#xff1a;df -h5、思路小结 第一步&#xff1a; netstat -ntlp 查看端口情况 并没有发现3306数据库端口 2、启动Mysql service …

【Windows】Windows11系统用户自己添加开机启动项的方法

按win R快捷键&#xff0c;打开运行窗口&#xff0c;在输入框中输入shell:startup后点击运行&#xff0c;打开启动文件夹&#xff1a; 把想增加的开机启动软件的快捷方式图标拖入到该文件夹中&#xff0c;如下图所示&#xff1a; 按ctrl shift esc打开任务管理器&#xff0c…

UWB伪应用场景 - 别再被商家忽悠

近几年UWB技术在网上宣传得如火如荼&#xff0c;与高精度定位几乎或等号&#xff0c;笔者认为这是营销界上的一大成功案例。 UWB超宽带技术凭借着低功耗、高精度&#xff0c;确实在物联网行业混得风生水起&#xff0c;但在无数实际应用案例中&#xff0c;根据客户的反馈情况&a…

python小游戏代码200行左右,python小游戏代码1000行

大家好&#xff0c;小编为大家解答20行python代码的入门级小游戏的问题。很多人还不知道python小游戏代码200行左右&#xff0c;现在让我们一起来看看吧&#xff01; 大家小时候都玩过贪吃蛇吧&#xff1f;小编小时候可喜欢拿爸妈的手机玩了&#xff0c;厉害着呢&#xff01;今…

Spring-2-深入理解Spring 注解依赖注入(DI):简化Java应用程序开发

今日目标 掌握纯注解开发依赖注入(DI)模式 学习使用纯注解进行第三方Bean注入 1 注解开发依赖注入(DI)【重点】 问题导入 思考:如何使用注解方式将Bean对象注入到类中 1.1 使用Autowired注解开启自动装配模式&#xff08;按类型&#xff09; Service public class StudentS…

redis基础(三十六)

安装redis、配置redis 目录 一、 概述 &#xff08;一&#xff09;NoSQL 1、类型 2、应用场景 &#xff08;二&#xff09;Redis 二、安装 &#xff08;一&#xff09;编译安装 &#xff08;二&#xff09;RPM安装 三、目录结构 四、命令解析 五、redis登录更改 1、…

三层交换实验

前言 在实际的企业应用中&#xff0c;我们会先建立不同的vlan把用户先隔开来。然后再通过三次交换机技术打通vlan直接的网络。 这样的目的如下&#xff1a; 隔离&#xff1a; 隔离是广播域&#xff0c;也就是隔离的是故障连通&#xff1a; 连通的是正常的通信 比如校园网&am…

在魔塔社区搭建通义千问-7B(Qwen-7B)流程

复制以下语句 python3 -m venv myvenvsource myvenv/bin/activatepip install modelscope pip install transformers_stream_generator pip install transformers pip install tiktoken pip install accelerate pip install bitsandbytestouch run.py vi run.py复制下面代码粘…