案例一
生产者Producer
#include <zmq.hpp>
#include <iostream>
#include <string>
#include<chrono>
#include<thread>
using namespace std;
using namespace zmq;
int main() {
context_t context(1);
// 创建 PUSH 套接字,用于发送消息到代理
socket_t push_socket(context, ZMQ_PUSH);
push_socket.connect("tcp://localhost:8888");
cout << "connect success" << endl;
int i = 0;
for (; ;) {
string message = "Message " + to_string(++i);
push_socket.send(buffer(message), send_flags::none);
cout << "Sending: " << message << endl;
this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
}
return 0;
}
队列queue()
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
// 休眠指定的时间长度
using namespace std;
using namespace zmq;
int main() {
context_t context(2);
// 创建 PULL 套接字,用于接收生产者的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.bind("tcp://*:8888");
// 创建 PUSH 套接字,用于发送消息给消费者
socket_t push_socket(context, ZMQ_PUSH);
push_socket.bind("tcp://*:8889");
cout << "bind success" << endl;
while (true) {
message_t message;
pull_socket.recv(message, recv_flags::none);
cout << "broken recv :" << message.to_string() << endl;
// 睡眠一段时间模拟处理时间
this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
cout << "broken send :" << message.to_string() << endl;
push_socket.send(message, send_flags::none);
}
return 0;
}```
## 消费者(Consumer)
```cpp
// client.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>
using namespace std;
using namespace zmq;
int main() {
context_t context(1);
// 创建 PULL 套接字,用于接收代理发送的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.connect("tcp://localhost:8889");
cout << "connect success" << endl;
for (; ; ) {
message_t message;
pull_socket.recv(message, recv_flags::none);
cout << "Received: " << message.to_string() << endl;
}
return 0;
}
编译
g++ -o server server.cpp `pkg-config --cflags --libs libzmq`
g++ -o client client.cpp `pkg-config --cflags --libs libzmq`
g++ -o queue queue.cpp `pkg-config --cflags --libs libzmq`
队列程序逻辑
队列作为管道需要接受来只上游生产者的消息,又需要发送消息给下游的消费者。
绑定端点
context_t context(1);
// 创建 PULL 套接字,用于接收生产者的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.bind("tcp://*:8888");
// 创建 PUSH 套接字,用于发送消息给消费者
socket_t push_socket(context, ZMQ_PUSH);
push_socket.bind("tcp://*:8889");
收发消息
message_t message;
pull_socket.recv(message, recv_flags::none);
cout << "broken recv :" << message.to_string() << endl;
// 睡眠一段时间模拟处理时间
this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
cout << "broken send :" << message.to_string() << endl;
push_socket.send(message, send_flags::none);
生产者程序逻辑
连接管道
context_t context(1);
// 创建 PUSH 套接字,用于发送消息到代理
socket_t push_socket(context, ZMQ_PUSH);
push_socket.connect("tcp://localhost:8888");
发送消息
string message = "Message " + to_string(++i);
push_socket.send(buffer(message), send_flags::none);
消费者程序逻辑
连接管道
context_t context(1);
// 创建 PULL 套接字,用于接收代理发送的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.connect("tcp://localhost:8889");
接受消息
message_t message;
pull_socket.recv(message, recv_flags::none);
总结
在管道两边,可以有n个生产者,m个消费者,生产者的消息会以先进先出的顺序经过管道到达消费者,管道发给m个消费者的消息是负载均衡的。