发布-订阅(Publish-Subscribe)C++实现

news2025/1/23 9:26:36

1、原理

发布-订阅(Publish-Subscribe)模式是一种消息传递模式,用于构建分布式系统中的通信机制。在这种模式下,消息的发送者(发布者)和消息的接收者(订阅者)之间通过一个称为“主题(Topic)”的中介进行通信。发布者将消息发布到特定的主题上,而订阅者可以选择订阅感兴趣的主题,并在消息发布到该主题时接收消息。

假设我们有一个名为"NewsHub"的新闻平台,它采用发布-订阅模式来传递新闻。在这个平台上,有多个新闻频道发布各种类型的新闻,比如"政治新闻"、"体育新闻"、"娱乐新闻"等。同时,用户可以选择订阅自己感兴趣的新闻频道。当某个新闻频道发布新闻时,所有订阅了该频道的用户将会收到新闻通知。这样,用户和新闻频道之间通过"主题"(新闻频道)进行了解耦,用户无需关心特定新闻频道的具体实现,只需要订阅自己感兴趣的主题即可。

2、实现框架

本文实现主要包括3个部分:Publisher、Subscriber和MessageCentrer

框架代码实现如下:

#ifndef __PUBLISHER_H__
#define __PUBLISHER_H__

#include <string>

class Publisher
{
public:
	virtual void Publish(std::string Topic, void* msgdata, unsigned int datasize) = 0;
};


#endif // __PUBLISHER_H__
#ifndef __SUBSCRIBER_H__
#define __SUBSCRIBER_H__

#include <string>

class Subscriber
{
public:
	virtual void Subscribe(std::string Topic) = 0;
	virtual void UnSubscribe(std::string Topic) = 0;
	virtual void HandeEnvent(std::string Topic,void * msgdata)=0;

};


#endif // __SUBSCRIBER_H__
#ifndef __MESSAGECENTRER_H__
#define __MESSAGECENTRER_H__

#include <thread>
#include <mutex>
#include <map>
#include <list>

/*
* 消息中心
*/
class MessageCentrer
{
public:
	static MessageCentrer* GetMC();
	void Run();
	void RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize);
	void RegistSubscribe(std::string tpcKey, class Subscriber* subscriber);
	void CancelSubscribe(std::string tpcKey, class Subscriber* subscriber);

private:
	MessageCentrer();
	virtual ~MessageCentrer();
	void CoreProcss();

private:
	std::map<std::string, std::list<void*>> mPublisher;	//topickey:PublishDatas,只关心数据,不关心是谁发布的
	std::map<std::string, std::list<class Subscriber*>> mSubscriber;	//topickey:Subscribers  只关心订阅者(其后续会处理订阅的消息)

	std::unique_ptr<std::thread> mCoreProcss;	//核心线程,维护发布数据队列 + 订阅触发处理
	std::mutex mPublishMutex;	//发布数据队列修改时的保护锁
	std::mutex mSubscribeMutex;		//订阅者注册/取消订阅时的保护锁

	static MessageCentrer* mSgMC;	//消息中心单例对象
	static std::mutex mMCMutex;	  //线程安全单例保护锁
};



#endif // __MESSAGECENTRER_H__
#include "MessageCentrer.h"
#include "Subscriber.h"
#include "Publisher.h"

#define MAX_PUBLISHES  10000

MessageCentrer* MessageCentrer::mSgMC = nullptr;
std::mutex MessageCentrer::mMCMutex;

MessageCentrer* MessageCentrer::GetMC()
{
	if (mSgMC == nullptr)
	{
		std::unique_lock<std::mutex> lock(mMCMutex); 
		if (mSgMC == nullptr)
		{
			volatile auto temp = new (std::nothrow) MessageCentrer();
			mSgMC = temp;
		}
	}
	return mSgMC;
}

void MessageCentrer::Run()
{
	mCoreProcss.reset(new std::thread(&MessageCentrer::CoreProcss,this));
}

void MessageCentrer::RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize)
{
	if (this->mPublisher[tpcKey].size() > MAX_PUBLISHES) return;
	mPublishMutex.lock();
	void* tmpdata = new char[datasize];
	memcpy(tmpdata, msgdata, datasize);
	this->mPublisher[tpcKey].push_back(tmpdata);
	mPublishMutex.unlock();
}

void MessageCentrer::RegistSubscribe(std::string tpcKey, Subscriber* subscriber)
{
	mSubscribeMutex.lock();
	this->mSubscriber[tpcKey].remove(subscriber);
	this->mSubscriber[tpcKey].push_back(subscriber);
	mSubscribeMutex.unlock();
}

void MessageCentrer::CancelSubscribe(std::string tpcKey, Subscriber* subscriber)
{
	if (this->mSubscriber.find(tpcKey) != this->mSubscriber.end())
		this->mSubscriber.find(tpcKey)->second.remove(subscriber);
}

MessageCentrer::MessageCentrer()
{
	this->mPublisher.clear();
	this->mSubscriber.clear();
}

MessageCentrer::~MessageCentrer()
{
}

void MessageCentrer::CoreProcss()
{
	while (true)
	{
		auto it = this->mSubscriber.begin();
		while (it != this->mSubscriber.end())
		{
			if (this->mPublisher.find(it->first) != this->mPublisher.end())
			{
				auto itt = it->second.begin();
				while (itt != it->second.end())
				{
					auto mpitr = this->mPublisher.find(it->first)->second.begin();
					auto mpitrend = this->mPublisher.find(it->first)->second.end();
					while (mpitr != mpitrend)
					{
						(*itt)->HandeEnvent(it->first,*mpitr);
						++mpitr;
					}
					++itt;
				}
				mPublishMutex.lock();
				auto mpitr = this->mPublisher.find(it->first)->second.begin();
				auto mpitrend = this->mPublisher.find(it->first)->second.end();
				while (mpitr != mpitrend)
				{
					delete[](*mpitr);
					++mpitr;
				}
				this->mPublisher.find(it->first)->second.clear();
				this->mPublisher.erase(it->first);
				mPublishMutex.unlock();
			}
			++it;
		}
	}
}

3、应用测试

定义topic数据

#ifndef __TOPICS_H__
#define __TOPICS_H__
#include <string>


struct Person
{
	std::string name;
	int age;
};



#endif // __TOPICS_H__

实现发布者/订阅者接口

#pragma once
#include "Publisher.h"
#include "MessageCentrer.h"


class AppPublisher : public Publisher
{
public:
	void Publish(std::string Topic, void* msgdata, unsigned int datasize) override
	{
		MessageCentrer::GetMC()->RegistPublish(Topic, msgdata, datasize);
	}
};
#pragma once
#pragma warning(disable:4996)
#include "Topics.h"
#include "Subscriber.h"
#include <map>
#include <string>
#include <iostream>
#include <chrono>
#include <ctime>


class AppSubscriber : public Subscriber
{
	typedef void (*HandlerFun)(void*);

public:
	AppSubscriber()
	{
		HandlerMap.clear();
		HandlerMap["Person"] = HandeEnvent_Person;
		HandlerMap["Other"] = HandeEnvent_Other;
		
	}

public:
	void Subscribe(std::string Topic) override
	{
		MessageCentrer::GetMC()->RegistSubscribe(Topic, this);
	}

	void UnSubscribe(std::string Topic) override
	{
		MessageCentrer::GetMC()->CancelSubscribe(Topic, this);
	} 

	void HandeEnvent(std::string Topic, void* msgdata) override
	{
		if (HandlerMap.find(Topic) != HandlerMap.end()) HandlerMap[Topic](msgdata);
	}

private:
	static void HandeEnvent_Person(void* msgdata)
	{
		struct Person* dt = (struct Person*)msgdata;
		// 获取当前系统时间
		auto now = std::chrono::system_clock::now();
		// 转换为 time_t
		std::time_t now_time = std::chrono::system_clock::to_time_t(now);
		std::cout << dt->name << dt->age << std::ctime(&now_time) << std::endl;
	}

	static void HandeEnvent_Other(void* msgdata)
	{
		struct Other* dt = (struct Other*)msgdata;
		//do something

	}

private:
	//TopicKey:HandlerFun
	std::map<std::string, HandlerFun> HandlerMap;
};

编写测试用例

#include "MessageCentrer.h"
#include "AppPublisher.h"
#include "AppSubscriber.h"


int main()
{
	MessageCentrer::GetMC()->Run();

	AppSubscriber appSub;
	AppPublisher appPub;

	appSub.Subscribe("Person");
	appSub.Subscribe("Person");
	//appSub.UnSubscribe("Person");

	struct Person ps { "sma", 18 };
	struct Person ps1 { "wxq", 17 };
	appPub.Publish("Person", &ps, sizeof(ps));
	while (true)
	{
		//appPub.Publish("Person", &ps, sizeof(ps));
		appPub.Publish("Person", &ps1, sizeof(ps1));
	}
	return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1657504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国4月进口以美元计同比增长8.4%,出口同比增长1.5%

中国按美元计4月进出口同比增速均转负为正&#xff0c;双双超预期。 5月9日周四&#xff0c;海关总署公布数据显示&#xff0c;以美元计价&#xff0c;中国2024年4月进口同比增长8.4%至2201亿美元&#xff0c;前值同比下降1.9%&#xff0c;出口同比增长1.5%至2924.5亿美元&…

景源畅信:抖音小店有哪些人气品类?

抖音小店作为短视频平台中的一股清流&#xff0c;已经成为了众多商家和消费者关注的焦点。在这个平台上&#xff0c;有各种各样的商品琳琅满目&#xff0c;让人眼花缭乱。那么&#xff0c;抖音小店有哪些人气品类呢?下面就从四个不同的方面来详细阐述这个问题。 一、美妆护肤类…

华为eNSP Pro模拟器下载(普通账号可用)

好消息&#xff01;华为终于开放了普通账号使用权限&#xff01; 安装教程下载后见《指导手册-eNSP Pro V100R001C00.pdf》 华为eNSP Pro模拟器下载&#xff08;普通账号可用&#xff09; 下载地址 华为eNSP Pro模拟器下载&#xff08;普通账号可用&#xff09; - 下一朵云 …

基于springboot+jsp+Mysql的商务安全邮箱邮件收发

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

Goland开发者软件激活使用教程

Goland开发者工具&#xff1a; Goland是由JetBrains公司推出的专门针对Go语言设计的集成开发环境&#xff08;IDE&#xff09;。这款工具具有智能的代码补全、强大的代码导航和重构功能&#xff0c;同时提供了丰富的调试工具&#xff0c;能够满足Golang开发者的各种需求。 Gol…

蓝桥杯省三爆改省二,省一到底做错了什么?

到底怎么个事 这届蓝桥杯选的软件测试赛道&#xff0c;都说选择大于努力,软件测试一不卷二不难。省赛结束&#xff0c;自己就感觉稳啦&#xff0c;全部都稳啦。没想到一出结果&#xff0c;省三&#xff0c;g了。说落差&#xff0c;是真的有一点&#xff0c;就感觉和自己预期的…

nacos下载安装和nacos启动报错

nacos简介: Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称&#xff0c;一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集&#xff0c;帮助您…

Mysql8.0.30一次表锁问题的解决

起因 给material_config_field_data表的字段建立全文索引的时&#xff0c;发现该表卡死&#xff0c;然后无法对该表进行任何操作。 查找问题 执行sql #这个命令会显示InnoDB存储引擎的详细状态信息&#xff0c;包括锁等待和锁争用的信息 SHOW ENGINE INNODB STATUS结果 复制S…

智慧营销的未来:中国AIGC技术的演进与应用 #未来是现在的趋势#

&#x1f4d1;前言 随着人工智能&#xff08;AI&#xff09;技术的蓬勃发展&#xff0c;尤其是在营销技术&#xff08;MarTech&#xff09;领域&#xff0c;AIGC&#xff08;AI Generated Content&#xff09;技术在中国市场的应用和影响日益显著。2023年&#xff0c;中国在AIG…

网安学习路线终极指南!一步步带你从入门到精通,详尽技能点全解析!

目录 零基础小白&#xff0c;到就业&#xff01;入门到入土的网安学习路线&#xff01; 建议的学习顺序&#xff1a; 一、夯实一下基础&#xff0c;梳理和复习 二、HTML与JAVASCRIPT&#xff08;了解一下语法即可&#xff0c;要求不高&#xff09; 三、PHP入门 四、MYSQL…

【强训笔记】day16

NO.1 代码实现&#xff1a; class StringFormat { public:string formatString(string A, int n, vector<char> arg, int m) {string ret;int j0;for(int i0;i<n;i){if(A[i]%){if(i1<n&&A[i1]s){retarg[j];i;}else {retA[i];}}else {retA[i];}}while(j&l…

【C++ 关键字】const 关键字详解

文章目录 1. const 概念2.常量指针 和 指针常量 的区别2.1 常量指针&#xff08;底层 const&#xff09;2.2 指针常量 (顶层 const) 3.const 关键字的作用4.const 和 define 的区别5.const 总结 1. const 概念 const 是一个关键字&#xff0c;被修饰的值不能改变&#xff0c;是…

端口占用解决方法

1、查询端口 打开cmd命令提示符窗口&#xff0c;输入以下指令查询所有端口 netstat -ano //查询所有端口 netstat -ano|findstr 8080 //查询指定端口 2、杀死进程 taskkill /t /f /im 进程号(PID)

度小满——征信报告图建模

目录 背景介绍 发展趋势 技术演进 图在金融风控领域中的演进 度小满图机器学习技术体系 案例 征信报告介绍 征信报告图建模

【每日刷题】Day35

【每日刷题】Day35 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 844. 比较含退格的字符串 - 力扣&#xff08;LeetCode&#xff09; 2. 2487. 从链表中移除节点 - 力…

HarmonyOS开发之ArkTS使用:用户登录页面应用

目录 目录 前言 关于HarmonyOS 环境准备 新建项目 设计用户登录页面 1. 布局设计 2. 编写ArkTS代码 运行和测试 结束语 前言 随着HarmonyOS&#xff08;鸿蒙操作系统&#xff09;的不断发展&#xff0c;越来越多的开发者开始投入到这个全新的生态系统中&#xff0c;而…

OceanBase开发者大会实录:SaaS 场景降本50%!石基零售应用 OB Cloud 实践

本文来自2024 OceanBase开发者大会&#xff0c;石基零售助理总裁 、 ROC 产品事业部负责人陈亮的演讲实录—《石基零售与 OB Cloud 零售行业应用实践》。完整视频回看&#xff0c;请点击这里&#xff1e;> 大家下午好&#xff01;我是石基零售的陈亮。今天和大家分享一下石基…

拼多多集体断流的原因是什么?拼多多无货源还能继续做吗?

最近很多粉丝反馈&#xff1a;“自从315后店铺就时不时的断流&#xff0c;有的甚至报活动也没流量&#xff0c;跟去年明显不一样&#xff0c;以前流量再少每个小时都有新访客&#xff0c;现在访客半天都不动。” 莫名其妙的就断流了&#xff0c;关键还什么提示都没有。你说断流…

美食推荐网站设计

**中文摘要&#xff1a;**在当今信息化、网络化的时代背景下&#xff0c;美食文化正逐渐融入人们的日常生活&#xff0c;而网络平台成为人们获取美食信息、分享美食体验的重要途径。为了满足广大美食爱好者对美食信息的探索和推荐需求&#xff0c;本文提出了一种创新的美食推荐…

通过Docker Compose部署GitLab和GitLab Runner(一)

GitLab 是一个用于版本控制、项目管理和持续集成的开源软件平台&#xff0c;它提供了一整套工具&#xff0c;能够帮助团队高效地协作开发。而 GitLab Runner 则是 GitLab CI/CD 的执行者&#xff0c;用于运行持续集成和持续交付任务。 在本文中&#xff0c;我们将使用 Docker …