发布-订阅(ZeroMQ) C++实现

news2024/10/6 14:33:07

1、目的

自从发了《发布-订阅(Publish-Subscribe)C++实现》博文,收到不少反馈:主要的问题就是无法跨主机使用。

本次实现主要解决:

  • 简化ZeroMQ的开发过程;
  • 尽可能简化发布订阅的API调用;
  • 订阅者消息处理采用守护线程模式;
  • 支撑跨主机的发布订阅(By ZeroMQ)

下载地址:【免费】PublishSubscribe-ZMQ发布订阅C++实现资源-CSDN文库

2、ZeroMQ库选择

一般,在可以选择的情况下,我比较偏爱C接口的库,主要原因:一致性好,至少比C++接口的库兼容性强。

鉴于这个理由,我就选择了libzmq,库版本4.3.5。

推荐优先使用MD模式的库:libzmq-v142-mt-4_3_5.lib。

3、实现思路

 TOPIC是满足:
        1)可compare的,组合数据类型至少重载 == 操作;
        2)可直接网络传输的,常见的比如整型、结构体。
主题对应的数据一般多见结构体,也可以是字符串、二进制等任何适用于网络传输的数据类型。

4、主要代码

#ifndef __PUBLISHER_HPP__
#define __PUBLISHER_HPP__
#include "TOPIC_DEFS.h"
#include "zmq.h"
#include <string>


template<typename _TOPIC_>
class Publisher
{
public:
	//addr :"tcp://*:port"
	Publisher(const char * addr)
	{
		// 初始化 ZeroMQ 上下文
		mContext = zmq_ctx_new();
		// 创建 PUB 套接字
		mPublisher = zmq_socket(mContext, ZMQ_PUB);
		int linger = 0;
		zmq_setsockopt(mPublisher, ZMQ_LINGER, &linger, sizeof(int64_t));
		// 绑定地址
		zmq_bind(mPublisher, addr);
	}

	virtual ~Publisher()
	{
		// 关闭套接字和 ZeroMQ 上下文
		if (mPublisher) zmq_close(mPublisher);
		if(mContext) zmq_ctx_destroy(mContext);
	}

	void Publish(_TOPIC_ topic, void* msg, int msg_size)
	{
		zmq_send(mPublisher, &topic, sizeof(_TOPIC_), ZMQ_SNDMORE);
		zmq_send(mPublisher, msg, msg_size, 0);
	}


private:
	void* mContext;		// ZeroMQ 上下文
	void* mPublisher;	// PUB 套接字
};



#endif // !__PUBLISHER_HPP__
#ifndef __SUBCORE_HPP__
#define __SUBCORE_HPP__

#include "zmq.h"
#include <thread>
#include <mutex>
#include <map>
#include <list>
#define _MAX_SUBSCRIBE_ 1024


template<typename _TOPIC_>
class Subcore
{
public:
	Subcore()
	{
		// 初始化 ZeroMQ 上下文
		mContext = zmq_ctx_new();
		mSubscribeSize = 0;
		memset(mSubscribe, 0, sizeof(zmq_pollitem_t) * _MAX_SUBSCRIBE_);
		mStop = false;
		mSubTopic.clear();
		mSubPollitem.clear();
		mRecvThreadPtr.reset(new std::thread(&Subcore::EventProcess, this));
	}

	virtual ~Subcore()
	{
		// 关闭监听事件
		for (int i = 0; i < mSubscribeSize; ++i)
		{
			if (mSubscribe[i].socket != nullptr) 
				mSubscribe[i].events = 0;
		}

		// 等待事件处理线程退出
		mStop = true;
		if (mRecvThreadPtr->joinable()) {
			mRecvThreadPtr->join();
		}

		// 关闭套接字
		for (int i = 0; i < mSubscribeSize; ++i)
		{
			if (mSubscribe[i].socket != nullptr) {
				zmq_close(mSubscribe[i].socket);
				mSubscribe[i].socket = nullptr;
			}
		}

		mSubscribeSize = 0;

		// 销毁 ZeroMQ 上下文
		if (mContext) {
			zmq_ctx_destroy(mContext);
			mContext = nullptr;
		}
	}

	//addr :"tcp://ip:port"
	bool Subscribe(_TOPIC_ topic, const char* addr)
	{
		auto it = mSubTopic.find(addr);
		if (it != mSubTopic.end())
		{
			for (auto itt = it->second.begin(); itt != it->second.end(); ++itt)
			{
				if (*itt == topic)
				{
					return false;
				}
			}
					
			// 订阅主题
			auto sock = mSubPollitem.find(addr);
			if (sock != mSubPollitem.end())
			{
				zmq_setsockopt(sock->second.socket, ZMQ_SUBSCRIBE, &topic, sizeof(_TOPIC_));
				mSubscribeMutex.lock();
				mSubTopic[addr].push_back(topic);
				mSubscribeMutex.unlock();
				return true;
			}
		}

		// 创建 SUB 套接字
		void* subscriber_temp = zmq_socket(mContext, ZMQ_SUB);
		if(!subscriber_temp) return false;
			
		//连接套接字
		if(zmq_connect(subscriber_temp, addr)==-1) 
			return false;

		// 订阅主题
		zmq_setsockopt(subscriber_temp, ZMQ_SUBSCRIBE, &topic, sizeof(_TOPIC_));
		int linger = 0;
		zmq_setsockopt(subscriber_temp, ZMQ_LINGER, &linger, sizeof(int64_t));

		//准备IO复用
		zmq_pollitem_t tmp{ subscriber_temp, 0, ZMQ_POLLIN, 0 };
		mSubscribeMutex.lock();
		mSubscribe[mSubscribeSize] = tmp;
		mSubscribeSize++;
		//更新记录
		mSubTopic[addr].push_back(topic);
		mSubPollitem[addr] = tmp;
		mSubscribeMutex.unlock();
		
		return true;
	}

	void UnSubscribe(_TOPIC_ topic)
	{	
		for (auto it = mSubTopic.begin(); it != mSubTopic.end(); it++)
		{
			for (auto itt = it->second.begin(); itt != it->second.end(); ++itt)
			{
				if (*itt == topic)
				{
					auto sock = mSubPollitem.find(it->first);
					if (sock != mSubPollitem.end())
					{
						zmq_setsockopt(sock->second.socket, ZMQ_UNSUBSCRIBE, &topic, sizeof(_TOPIC_));
					}	
				}
			}			
		}
	}

	virtual void EnventHandler(_TOPIC_, void*, int) = 0;

private:
	void EventProcess()
	{
		while (!mStop)
		{
			if (mSubscribeSize > 0)
			{
				int size = zmq_poll(mSubscribe, mSubscribeSize, 10);
				if (size == -1)
					continue;
				if (size > 0)
				{
					for (size_t i = 0; i < _MAX_SUBSCRIBE_; i++)
					{
						if (mSubscribe[i].revents & ZMQ_POLLIN)
						{
							zmq_msg_t msg;
							zmq_msg_init(&msg);
							zmq_msg_recv(&msg, mSubscribe[i].socket, 0);

							// 第一部分消息(主题,整数类型)
							_TOPIC_ topic;
							memcpy(&topic, zmq_msg_data(&msg), sizeof(_TOPIC_));

							// 第二部分消息(数据)
							int more;
							size_t more_size = sizeof(more);
							zmq_getsockopt(mSubscribe[i].socket, ZMQ_RCVMORE, &more, &more_size);
							if (more)
							{
								zmq_msg_init(&msg);
								zmq_msg_recv(&msg, mSubscribe[i].socket, 0);
								size_t datasize = zmq_msg_size(&msg);
								char* data = static_cast<char*>(zmq_msg_data(&msg));
								EnventHandler(topic, data, datasize);
							}
							zmq_msg_close(&msg);
						}
					}
				}
			}
		}
	}

private:
	void* mContext;		// ZeroMQ 上下文

	int mSubscribeSize;
	zmq_pollitem_t mSubscribe[_MAX_SUBSCRIBE_];	// IO多路复用
	
	std::map<std::string, std::list<_TOPIC_>> mSubTopic;	// 记录订阅的主题  key is addr
	std::map<std::string, zmq_pollitem_t> mSubPollitem;	// 记录订阅的远程主机  key is addr

	std::mutex mSubscribeMutex;				// 订阅或者取消订阅时保护写数据

	std::unique_ptr<std::thread> mRecvThreadPtr;
	bool mStop;
};





#endif // !__SUBCORE_HPP__

测试代码

#define _CRT_SECURE_NO_WARNINGS
#include "Publisher.hpp"
#include "Subscriber.hpp"


#define ADDRESS_S5555 "tcp://*:5555"               //发布者地址1
#define ADDRESS_S5557 "tcp://*:5557"              //发布者地址2
#define ADDRESS_C5555 "tcp://localhost:5555"       
#define ADDRESS_C5557 "tcp://localhost:5557"

void TESTTHREAD()
{
	Hello tpc1{ "sma",20 };
	Publisher<TOPIC_TYPE> pub1(ADDRESS_S5557);
	while (true)
	{
		zmq_sleep(1);
		pub1.Publish(TOPIC_WORLD, &tpc1, sizeof tpc1);
	}
}

int main()
{
	//测试一个Subscriber 订阅来自不同Publisher的主题------------------------------
	{
		Hello tpc{ "wxq",18 };
		Subscriber<TOPIC_TYPE> sub;
		Publisher<TOPIC_TYPE> pub(ADDRESS_S5555);
		new std::thread(TESTTHREAD);

		sub.Subscribe(TOPIC_HELLO, ADDRESS_C5555);
		//sub.Subscribe(TOPIC_WORLD, ADDRESS_C5555);
		sub.Subscribe(TOPIC_WORLD, ADDRESS_C5557);
		//sub.UnSubscribe(TOPIC_WORLD);

		while (1)
		{
			zmq_sleep(1);
			pub.Publish(TOPIC_HELLO,&tpc,sizeof tpc);
		}
	}

	//测试一个Subscriber 订阅来自同一个Publisher的不同主题------------------------------  
	//{
	//	Hello tpc{ "wxq",18 };
	//	Hello tpc1{ "sma",20 };
	//	Publisher<TOPIC_TYPE> pub(ADDRESS_S5555);
	//	Publisher<TOPIC_TYPE> pub1(ADDRESS_S5555);

	//	Subscriber<TOPIC_TYPE> sub;
	//	sub.Subscribe(TOPIC_HELLO, ADDRESS_C5555);
	//	sub.Subscribe(TOPIC_WORLD, ADDRESS_C5555);
	//	while (1)
	//	{
	//		zmq_sleep(1);
	//		pub.Publish(TOPIC_HELLO, &tpc, sizeof tpc);
	//		pub.Publish(TOPIC_WORLD, &tpc1, sizeof tpc1);
	//	}
	//}

	return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1718447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机毕设】基于SpringBoot的学生心理咨询评估系统设计与实现 - 源码免费(私信领取)

免费领取源码 &#xff5c; 项目完整可运行 &#xff5c; v&#xff1a;chengn7890 诚招源码校园代理&#xff01; 1. 研究目的 随着社会的快速发展和竞争压力的增加&#xff0c;学生心理健康问题日益突出。设计和实现一个基于SpringBoot的学生心理咨询评估系统&#xff0c;旨在…

在Ubuntu乌班图上安装Docker

最近在学习乌班图相关的内容&#xff0c;找了一些文档安装的都是报错的&#xff0c;于是记录一下学习过程&#xff0c;希望也能帮助有缘人&#xff0c;首先查看乌班图的系统版本&#xff0c;我的是如下的&#xff1a; cat /proc/version以下是在Ubuntu 20.04版本上安装Docker。…

LeetCode739:每日温度

题目描述 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 解题思想 使用单…

Windows Server安全配置

Windows Server操作系统安全配置&#xff0c;加固windows server。 1、密码安全 设置密码最小长度为10 开始-管理工具-本地安全策略-安全设置-账户策略-密码策略&#xff0c;修改密码长度最小值为10。 2、密码使用期限 设置密码最长使用期限为30天 开始-管理工具-本地安全策…

list 的实现

目录 list 结点类 结点类的构造函数 list的尾插尾删 list的头插头删 迭代器 运算符重载 --运算符重载 和! 运算符重载 * 和 -> 运算符重载 list 的insert list的erase list list实际上是一个带头双向循环链表,要实现list,则首先需要实现一个结点类,而一个结点需要…

【Python】解决Python错误报错:IndexError: tuple index out of range

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

宝塔 nginx 配置负载均衡 upstream

nginx 主配置文件加入 upstream myapp1 {server 192.168.124.101:5051;server 192.168.124.102:5052;server 192.168.124.111:5050;}站点配置文件中加入 location / {proxy_pass http://myapp1;}80端口映射到外网域名配置方法 加入红框中的代码 upstream myapp3 {server 192.16…

物联边缘网关有哪些功能?物联边缘网关在工业方向的应用-天拓四方

随着物联网技术的快速发展&#xff0c;越来越多的设备和系统正在接入到网络中&#xff0c;形成了一个庞大的智能生态系统。在这个系统中&#xff0c;物联边缘网关扮演着至关重要的角色&#xff0c;它不仅是连接设备和云端的桥梁&#xff0c;更是推动智能应用落地的关键。在当今…

强烈推荐十款数据防泄密软件,高人气的数据防泄密软件

100G的文件不见了&#xff1f;客户的电话信息被拷贝走了&#xff1f;源代码被竞争对手搞到手了&#xff1f;这些都是严重的数据泄密事件&#xff0c;为此&#xff0c;我们需要数据防泄密软件来全方位保护数据安全。根据当前市场上的热门推荐和综合评价&#xff0c;以下几款数据…

Arm发布Cortex X925、A725、A520,Armv9.2架构

随着半导体行业的不断发展&#xff0c;Arm 通过突破技术界限&#xff0c;为终端用户提供尖端解决方案&#xff0c;在核心和 IP 架构创新方面处于领先地位&#xff0c;尤其是在移动领域。2024 年&#xff0c;Arm 的年度战略进步重点是增强去年的 Armv9.2 架构&#xff0c;并带来…

Vue3-Vite-ts 前端生成拓扑图,复制即用

完整代码&#xff0c;复制即可用&#xff0c;样式自调 试过 jointjs dagre-d3 vis&#xff0c;好用一点 方法1&#xff1a;Vis.js npm install vis-network <template><div id"mynetwork" class"myChart" :style"{width: 100%, height: 9…

你是否正确地编写了 Git 提交信息?

介绍 在版本控制方面&#xff0c;Git 是一个非常有效的工具。然而&#xff0c;像任何其他工具一样&#xff0c;你必须正确使用它才能充分发挥其作用。你需要考虑不同的方面。本文着重介绍如何按照传统提交规范&#xff08;Conventional Commits specification&#xff09;编写…

在Unity中配置Android项目以允许HTTP流量,解决AVPro在Android平台中无法播放http视频

解决方法快速通道&#xff1a;拉到底&#xff0c;看倒数第二张图 好记性不如烂笔头 最近在使用AVpro插件播放http视频&#xff0c;在Editor中一切正常&#xff0c;然而打包在Android平台下就播放不了 AVPro在Unity中的警告&#xff1a; 感觉只是个警告&#xff0c;没引起注意…

3d渲染的常用概念和技术,渲染100邀请码1a12

之前我们介绍了3D渲染的基本原理和流程&#xff0c;这次说下几个常用概念和技术。 3D渲染中涉及到很多专业的概念和技术&#xff0c;它们决定了渲染质量和效果&#xff0c;常用的有以下几个。1、光线追踪 光线追踪是一些专业渲染器&#xff08;如V-Ray和Corona等&#xff09;…

EXSI虚拟机新增磁盘并将空间扩充到已有分区

这里写自定义目录标题 1、在EXSI虚拟机中新增一块磁盘配置大小2、确认新磁盘3、格式化新分区4、添加新分区到LVM5、将新增分区添加到已有分区里 1、在EXSI虚拟机中新增一块磁盘配置大小 注意事项&#xff1a; (1)需确保虚拟机已关闭活处于维护模式&#xff0c;避免数据丢失 (2…

多输入多输出非线性对象的模型预测控制—Matlab实现

本示例展示了如何在 Simulink 中设计多输入多输出对象的闭环模型预测控制。该对象有三个操纵变量和两个测量输出。 一、非线性对象的线性化 运行该示例需要同时安装 Simulink 和 Simulink Control Design。 % 检查是否同时安装了 Simulink 和 Simulink Control Design if ~m…

探索第三方美颜SDK:美颜插件的技术原理

本篇文章&#xff0c;我们将深入了解第三方美颜SDK&#xff0c;主要探讨关于美颜插件的工作机制与算法。 一、第三方美颜SDK的概述 第三方美颜SDK是由专业团队开发的一套用于实现美颜功能的软件开发工具包。它通常包括了各种美颜算法、滤镜效果、人脸识别等核心技术&#xff…

面试题vue+uniapp(个人理解-面试口头答述)未编辑完整....

1.vue2和vue3的区别&#xff08;vue3与vue2的区别&#xff08;你不知道细节全在这&#xff09;_vue2和vue3区别-CSDN博客&#xff09;参考 Vue3 在组合式&#xff08;Composition &#xff09;API&#xff0c;中使用生命周期钩子时需要先引入&#xff0c;而 Vue2 在选项API&am…

数字化转型推动生物技术企业增长—纷享销客与集萃药康共探新动力

上周&#xff0c;在南京锦创书城&#xff0c;一场主题为“生物技术企业增长新动力&#xff1a;以客户为中心的数字化转型与创新”的研讨会圆满落幕。此次活动由纷享销客江苏分公司联合江苏集萃药康生物科技股份有限公司共同举办&#xff0c;吸引了众多生物技术领域企业的负责人…

新零售收银解决方案:传统门店超市的数字化-亿发

在数字化浪潮的推动下&#xff0c;零售行业正经历着前所未有的变革。阿里巴巴提出的“新零售”概念&#xff0c;不仅仅是一个商业口号&#xff0c;它代表了一种全新的商业模式和运营理念。随着时代的进步和消费需求的不断升级&#xff0c;新零售的兴起已成为行业发展的必然趋势…