ZeroMQ
参考ZMQ从入门到掌握一
ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。它有自己的模式,不同于更底层的点对点通讯模式。它有比 tcp 协议更高一级的协议。(当然 ZeroMQ 不一定基于 TCP 协议,它也可以用于进程间和进程内通讯)它改变了通讯都基于一对一的连接这个假设。ZeroMQ 把通讯的需求看成四类。其中一类是一对一结对通讯,用来支持传统的 TCP socket 模型,但并不推荐使用。常用的通讯模式只有三类:
- 请求回应模型:由请求端发起请求,并且等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是 1:N 的模型。通常把 1 认为是 server ,N 认为是 Client 。ZeroMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为 N:M (只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。
- 发布订阅:这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的 socket 采用请求回应模型满足这个需求。
- 管道模式:单向的
发布者
- ZmqSendUtil.h
#ifndef ZMQSENDUTIL_H
#define ZMQSENDUTIL_H
#include <string>
#include <cpp/zmq.hpp>
class ZmqSendUtil{
private:
void *context;
void *publisher;
std::string url;
public:
ZmqSendUtil(); //默认构造方法
ZmqSendUtil(void *context, void *publisher, const std::string &url); //初始化构造方法
virtual ~ZmqSendUtil(); //析构函数
int sendMsg(const std::string& info); //发送消息
};
#endif
- ZmqSendUtil.cpp
#include "ZmqSendUtil.h"
ZmqSendUtil::ZmqSendUtil() {}
ZmqSendUtil::ZmqSendUtil(void *context, void *publisher, const std::string &url)
:context(context),publisher(publisher), url(url){
//初始化上下文
if (this->context == nullptr){
this->context = zmq_ctx_new();
assert(this->context != nullptr);
}
//获取socket对象
if (this->publisher == nullptr){
this->publisher = zmq_socket(this->context, ZMQ_PUB);
assert(this->publisher != nullptr);
}
//socket绑定通信地址
int result = zmq_bind(this->publisher, this->url.c_str());
assert(result == 0);
}
ZmqSendUtil::~ZmqSendUtil() {
zmq_close(this->publisher);
zmq_ctx_destroy(this->context);
}
int ZmqSendUtil::sendMsg(const std::string& info) {
int result=-1;
if (this->context != nullptr && this->publisher != nullptr){
result = zmq_send(this->publisher, info.c_str(), info.length(), 0);
return result > 0;
}
}
订阅者
- ZmqRecvUtil.h
#ifndef USV_ZMQRECVUTIL_H
#define USV_ZMQRECVUTIL_H
#include <string>
#include <cpp/zmq.hpp>
class ZmqRecvUtil{
private:
void *context;
void *subscriber;
std::string url;
public:
ZmqRecvUtil(); //默认构造方法
ZmqRecvUtil(void *context, void *subscriber, const std::string &url); //初始化构造方法
virtual ~ZmqRecvUtil(); //析构函数
std::string recvMsg(); //接收消息
};
#endif
- ZmqRecvUtil.cpp
#include "ZmqRecvUtil.h"
#define MAX_INFO_SIZE 1024
ZmqRecvUtil::ZmqRecvUtil() {}
ZmqRecvUtil::ZmqRecvUtil(void *context, void *subscriber, const std::string &url)
:context(context),subscriber(subscriber),url(url){
//初始化上下文context
if (this->context == nullptr){
this->context = zmq_ctx_new(); //如果上下文是空的,那么就初始化上下文
assert(this->context != nullptr);
}
//获取socket对象
if (this->subscriber == nullptr){ //模式为订阅发布模式 发布端单向发布数据也不管订阅端是否能接收...
this->subscriber = zmq_socket(this->context, ZMQ_SUB); //如果订阅者是空的句柄那就创建一个不透明的套接字...
assert(this->subscriber != nullptr);
}
//socket绑定通信地址 绑定特定的ip和端口号获取数据....
int result = zmq_connect(this->subscriber, this->url.c_str());
assert(result == 0);
result = zmq_setsockopt(this->subscriber, ZMQ_SUBSCRIBE, "", 0);
assert(result == 0);
}
ZmqRecvUtil::~ZmqRecvUtil() {
zmq_close(this->subscriber);
zmq_ctx_destroy(this->context);
}
std::string ZmqRecvUtil::recvMsg() {
//读取消息
char infoBuffer[MAX_INFO_SIZE] = {0};
int ret = zmq_recv(this->subscriber, infoBuffer, MAX_INFO_SIZE , 0);
assert(ret != -1);
std::string tmp=""; //返回值不能使用infoBuffer强化类型转换,要重新构建字符串
for (int i = 0; i < ret; ++i) {
tmp+=infoBuffer[i];
}
return tmp;
}
测试
- main.cpp
#include"ZmqSendUtil.h"
#include"ZmqRecvUtil.h"
#include <iostream>
#include <thread>
using namespace std;
ZmqSendUtil* publisher = new ZmqSendUtil(nullptr,nullptr,"tcp://127.0.0.1:6666");
ZmqRecvUtil* subscriber = new ZmqRecvUtil(nullptr,nullptr,"tcp://127.0.0.1:6666");
// zmq的发送线程函数
void zmq_send_thread() {
int cnt = 0;
while(true){
cnt++;
string msg = "hello world " + to_string(cnt);
publisher->sendMsg(msg);
cout << "send msg: " << msg << endl;
Sleep(1000);
}
}
// zmq的接收线程函数
void zmq_recv_thread() {
while (true) {
string msg = subscriber->recvMsg();
cout << "recv msg: " << msg << endl;
}
}
int main(int argc, char const *argv[])
{
cout<<"Hello world"<<endl;
thread zmq_send(zmq_send_thread);
thread zmq_recv(zmq_recv_thread);
zmq_send.join();
zmq_recv.join();
return 0;
}
cmakelist.txt编写
# 版本和项目名称
cmake_minimum_required(VERSION 3.20)
project(example1)
# 设定编译标准为C++11
# 设定头文件路径和动态库路径
set(CXX_STANDARD 11)
set(ZEROMQ_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/zeromq425/include)
set(ZEROMQ_LIBRARY_DIRS ${CMAKE_SOURCE_DIR}/zeromq425/bin/Debug/v140/dynamic)
set(ZEROMQ_LIB libzmq.lib)
include_directories(${ZEROMQ_INCLUDE_DIRS})
link_directories(${ZEROMQ_LIBRARY_DIRS})
# 生成可执行文件并连接到库文件
aux_source_directory(. SRC_DIR)
add_executable(zmqtest ${SRC_DIR})
target_link_libraries(zmqtest libzmq.lib)