案例一
生产者Producer
#include <zmq.hpp>
#include <iostream>
#include <string>
#include<chrono>
#include<thread>
using namespace std;
using namespace zmq;
int main() {
    context_t context(1);
    // 创建 PUSH 套接字,用于发送消息到代理
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.connect("tcp://localhost:8888");
    cout << "connect success" << endl;
    int i = 0;
    for (; ;) {
        string message = "Message " + to_string(++i);
        push_socket.send(buffer(message), send_flags::none);
      
        cout << "Sending: " << message << endl;
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
    }
    return 0;
}
队列queue()
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
// 休眠指定的时间长度
using namespace std;
using namespace zmq;
int main() {
    context_t context(2);
    // 创建 PULL 套接字,用于接收生产者的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.bind("tcp://*:8888");
    // 创建 PUSH 套接字,用于发送消息给消费者
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.bind("tcp://*:8889");
    cout << "bind success" << endl;
    while (true) {
        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "broken recv :" << message.to_string() << endl;
        // 睡眠一段时间模拟处理时间
        
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
        cout << "broken send :" << message.to_string() << endl;
        push_socket.send(message, send_flags::none);
        
    }
    return 0;
}```
## 消费者(Consumer)
```cpp
// client.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>
using namespace std;
using namespace zmq;
int main() {
    context_t context(1);
    // 创建 PULL 套接字,用于接收代理发送的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.connect("tcp://localhost:8889");
    cout << "connect success" << endl;
    for (; ; ) {
        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "Received: " << message.to_string() << endl;
    }
    return 0;
}
编译
g++ -o server server.cpp `pkg-config --cflags --libs libzmq`
g++ -o client client.cpp `pkg-config --cflags --libs libzmq`
g++ -o queue queue.cpp `pkg-config --cflags --libs libzmq`

队列程序逻辑
队列作为管道需要接受来只上游生产者的消息,又需要发送消息给下游的消费者。
绑定端点
context_t context(1);
// 创建 PULL 套接字,用于接收生产者的消息
socket_t pull_socket(context, ZMQ_PULL);
pull_socket.bind("tcp://*:8888");
// 创建 PUSH 套接字,用于发送消息给消费者
socket_t push_socket(context, ZMQ_PUSH);
push_socket.bind("tcp://*:8889");
收发消息
        message_t message;
        pull_socket.recv(message, recv_flags::none);
        cout << "broken recv :" << message.to_string() << endl;
        // 睡眠一段时间模拟处理时间
        this_thread::sleep_for(chrono::seconds(1)); // 休眠1秒
        cout << "broken send :" << message.to_string() << endl;
        push_socket.send(message, send_flags::none);
生产者程序逻辑
连接管道
    context_t context(1);
    // 创建 PUSH 套接字,用于发送消息到代理
    socket_t push_socket(context, ZMQ_PUSH);
    push_socket.connect("tcp://localhost:8888");
发送消息
		string message = "Message " + to_string(++i);
        push_socket.send(buffer(message), send_flags::none);
消费者程序逻辑
连接管道
    context_t context(1);
    // 创建 PULL 套接字,用于接收代理发送的消息
    socket_t pull_socket(context, ZMQ_PULL);
    pull_socket.connect("tcp://localhost:8889");
接受消息
        message_t message;
        pull_socket.recv(message, recv_flags::none);
总结
在管道两边,可以有n个生产者,m个消费者,生产者的消息会以先进先出的顺序经过管道到达消费者,管道发给m个消费者的消息是负载均衡的。











![CPP中lamada表达式作用一览[more cpp-6]](https://i-blog.csdnimg.cn/direct/46cb19f128b84d6aa190ed60846a963b.png)







