1、原理
发布-订阅(Publish-Subscribe)模式是一种消息传递模式,用于构建分布式系统中的通信机制。在这种模式下,消息的发送者(发布者)和消息的接收者(订阅者)之间通过一个称为“主题(Topic)”的中介进行通信。发布者将消息发布到特定的主题上,而订阅者可以选择订阅感兴趣的主题,并在消息发布到该主题时接收消息。
假设我们有一个名为"NewsHub"的新闻平台,它采用发布-订阅模式来传递新闻。在这个平台上,有多个新闻频道发布各种类型的新闻,比如"政治新闻"、"体育新闻"、"娱乐新闻"等。同时,用户可以选择订阅自己感兴趣的新闻频道。当某个新闻频道发布新闻时,所有订阅了该频道的用户将会收到新闻通知。这样,用户和新闻频道之间通过"主题"(新闻频道)进行了解耦,用户无需关心特定新闻频道的具体实现,只需要订阅自己感兴趣的主题即可。
2、实现框架
本文实现主要包括3个部分:Publisher、Subscriber和MessageCentrer
框架代码实现如下:
#ifndef __PUBLISHER_H__
#define __PUBLISHER_H__
#include <string>
class Publisher
{
public:
virtual void Publish(std::string Topic, void* msgdata, unsigned int datasize) = 0;
};
#endif // __PUBLISHER_H__
#ifndef __SUBSCRIBER_H__
#define __SUBSCRIBER_H__
#include <string>
class Subscriber
{
public:
virtual void Subscribe(std::string Topic) = 0;
virtual void UnSubscribe(std::string Topic) = 0;
virtual void HandeEnvent(std::string Topic,void * msgdata)=0;
};
#endif // __SUBSCRIBER_H__
#ifndef __MESSAGECENTRER_H__
#define __MESSAGECENTRER_H__
#include <thread>
#include <mutex>
#include <map>
#include <list>
/*
* 消息中心
*/
class MessageCentrer
{
public:
static MessageCentrer* GetMC();
void Run();
void RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize);
void RegistSubscribe(std::string tpcKey, class Subscriber* subscriber);
void CancelSubscribe(std::string tpcKey, class Subscriber* subscriber);
private:
MessageCentrer();
virtual ~MessageCentrer();
void CoreProcss();
private:
std::map<std::string, std::list<void*>> mPublisher; //topickey:PublishDatas,只关心数据,不关心是谁发布的
std::map<std::string, std::list<class Subscriber*>> mSubscriber; //topickey:Subscribers 只关心订阅者(其后续会处理订阅的消息)
std::unique_ptr<std::thread> mCoreProcss; //核心线程,维护发布数据队列 + 订阅触发处理
std::mutex mPublishMutex; //发布数据队列修改时的保护锁
std::mutex mSubscribeMutex; //订阅者注册/取消订阅时的保护锁
static MessageCentrer* mSgMC; //消息中心单例对象
static std::mutex mMCMutex; //线程安全单例保护锁
};
#endif // __MESSAGECENTRER_H__
#include "MessageCentrer.h"
#include "Subscriber.h"
#include "Publisher.h"
#define MAX_PUBLISHES 10000
MessageCentrer* MessageCentrer::mSgMC = nullptr;
std::mutex MessageCentrer::mMCMutex;
MessageCentrer* MessageCentrer::GetMC()
{
if (mSgMC == nullptr)
{
std::unique_lock<std::mutex> lock(mMCMutex);
if (mSgMC == nullptr)
{
volatile auto temp = new (std::nothrow) MessageCentrer();
mSgMC = temp;
}
}
return mSgMC;
}
void MessageCentrer::Run()
{
mCoreProcss.reset(new std::thread(&MessageCentrer::CoreProcss,this));
}
void MessageCentrer::RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize)
{
if (this->mPublisher[tpcKey].size() > MAX_PUBLISHES) return;
mPublishMutex.lock();
void* tmpdata = new char[datasize];
memcpy(tmpdata, msgdata, datasize);
this->mPublisher[tpcKey].push_back(tmpdata);
mPublishMutex.unlock();
}
void MessageCentrer::RegistSubscribe(std::string tpcKey, Subscriber* subscriber)
{
mSubscribeMutex.lock();
this->mSubscriber[tpcKey].remove(subscriber);
this->mSubscriber[tpcKey].push_back(subscriber);
mSubscribeMutex.unlock();
}
void MessageCentrer::CancelSubscribe(std::string tpcKey, Subscriber* subscriber)
{
if (this->mSubscriber.find(tpcKey) != this->mSubscriber.end())
this->mSubscriber.find(tpcKey)->second.remove(subscriber);
}
MessageCentrer::MessageCentrer()
{
this->mPublisher.clear();
this->mSubscriber.clear();
}
MessageCentrer::~MessageCentrer()
{
}
void MessageCentrer::CoreProcss()
{
while (true)
{
auto it = this->mSubscriber.begin();
while (it != this->mSubscriber.end())
{
if (this->mPublisher.find(it->first) != this->mPublisher.end())
{
auto itt = it->second.begin();
while (itt != it->second.end())
{
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
(*itt)->HandeEnvent(it->first,*mpitr);
++mpitr;
}
++itt;
}
mPublishMutex.lock();
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
delete[](*mpitr);
++mpitr;
}
this->mPublisher.find(it->first)->second.clear();
this->mPublisher.erase(it->first);
mPublishMutex.unlock();
}
++it;
}
}
}
3、应用测试
定义topic数据
#ifndef __TOPICS_H__
#define __TOPICS_H__
#include <string>
struct Person
{
std::string name;
int age;
};
#endif // __TOPICS_H__
实现发布者/订阅者接口
#pragma once
#include "Publisher.h"
#include "MessageCentrer.h"
class AppPublisher : public Publisher
{
public:
void Publish(std::string Topic, void* msgdata, unsigned int datasize) override
{
MessageCentrer::GetMC()->RegistPublish(Topic, msgdata, datasize);
}
};
#pragma once
#pragma warning(disable:4996)
#include "Topics.h"
#include "Subscriber.h"
#include <map>
#include <string>
#include <iostream>
#include <chrono>
#include <ctime>
class AppSubscriber : public Subscriber
{
typedef void (*HandlerFun)(void*);
public:
AppSubscriber()
{
HandlerMap.clear();
HandlerMap["Person"] = HandeEnvent_Person;
HandlerMap["Other"] = HandeEnvent_Other;
}
public:
void Subscribe(std::string Topic) override
{
MessageCentrer::GetMC()->RegistSubscribe(Topic, this);
}
void UnSubscribe(std::string Topic) override
{
MessageCentrer::GetMC()->CancelSubscribe(Topic, this);
}
void HandeEnvent(std::string Topic, void* msgdata) override
{
if (HandlerMap.find(Topic) != HandlerMap.end()) HandlerMap[Topic](msgdata);
}
private:
static void HandeEnvent_Person(void* msgdata)
{
struct Person* dt = (struct Person*)msgdata;
// 获取当前系统时间
auto now = std::chrono::system_clock::now();
// 转换为 time_t
std::time_t now_time = std::chrono::system_clock::to_time_t(now);
std::cout << dt->name << dt->age << std::ctime(&now_time) << std::endl;
}
static void HandeEnvent_Other(void* msgdata)
{
struct Other* dt = (struct Other*)msgdata;
//do something
}
private:
//TopicKey:HandlerFun
std::map<std::string, HandlerFun> HandlerMap;
};
编写测试用例
#include "MessageCentrer.h"
#include "AppPublisher.h"
#include "AppSubscriber.h"
int main()
{
MessageCentrer::GetMC()->Run();
AppSubscriber appSub;
AppPublisher appPub;
appSub.Subscribe("Person");
appSub.Subscribe("Person");
//appSub.UnSubscribe("Person");
struct Person ps { "sma", 18 };
struct Person ps1 { "wxq", 17 };
appPub.Publish("Person", &ps, sizeof(ps));
while (true)
{
//appPub.Publish("Person", &ps, sizeof(ps));
appPub.Publish("Person", &ps1, sizeof(ps1));
}
return 0;
}