文章目录
- 客⼾端模块实现
- 订阅者模块
- 信道管理模块
- 异步⼯作线程实现
- 连接管理模块
- 生产者客户端
- 消费者客户端
客⼾端模块实现
在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念,也就是说在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。其实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。
基于以上的思想,客⼾端的实现共分为四⼤模块:
基于以上模块,实现⼀个客⼾端的流程也就⽐较简单了
- 实例化异步线程对象
- 实例化连接对象
- 通过连接对象,创建信道
- 根据信道获取⾃⼰所需服务
- 关闭信道
- 关闭连接
订阅者模块
与服务端,并⽆太⼤差别,客⼾端这边虽然订阅者的存在感微弱了很多,但是还是有的,当进⾏队列消息订阅的时候,会伴随着⼀个订阅者对象的创建,⽽这个订阅者对象有以下⼏个作⽤:
- 描述当前信道订阅了哪个队列的消息。
- 描述了收到消息后该如何对这条消息进⾏处理
- 描述收到消息后是否需要进⾏确认回复
- 订阅者信息:
- 订阅者标识
- 订阅队列名
- 是否⾃动确认标志
- 回调处理函数(收到消息后该如何处理的回调函数对象)
using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;
struct Consumer {
using ptr = std::shared_ptr<Consumer>;
std::string tag; //消费者标识
std::string qname; //消费者订阅的队列名称
bool auto_ack; //自动确认标志
ConsumerCallback callback;
Consumer(){
DLOG("new Consumer: %p", this);
}
Consumer(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb):
tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {
DLOG("new Consumer: %p", this);
}
~Consumer() {
DLOG("del Consumer: %p", this);
}
};
信道管理模块
同样的,客⼾端也有信道,其功能与服务端⼏乎⼀致,或者说不管是客⼾端的channel还是服务端的channel都是为了⽤⼾提供具体服务⽽存在的,只不过服务端是为客⼾端的对应请求提供服务,⽽客⼾端的接⼝服务是为了⽤⼾具体需要服务,也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。
- 信道信息:
- a. 信道ID
- b. 信道关联的⽹络通信连接对象
- c. protobuf协议处理对象
- d. 信道关联的消费者
- e. 请求对应的响应信息队列(这⾥队列使⽤<请求ID,响应>hash表,以便于查找指定的响应)
- f. 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步)
- 信道操作:
- a. 提供创建信道操作
- b. 提供删除信道操作
- c. 提供声明交换机操作(强断⾔-有则OK,没有则创建)
- d. 提供删除交换机
- e. 提供创建队列操作(强断⾔-有则OK,没有则创建)
- f. 提供删除队列操作
- g. 提供交换机-队列绑定操作
- h. 提供交换机-队列解除绑定操作
- i . 提供添加订阅操作
- j. 提供取消订阅操作
- k. 提供发布消息操作
- l. 提供确认消息操作
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;
using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;
class Channel {
public:
using ptr = std::shared_ptr<Channel>;
Channel(const muduo::net::TcpConnectionPtr& conn, const ProtobufCodecPtr& codec):
_cid(UUIDHelper::uuid()), _conn(conn), _codec(codec) {}
~Channel() { basicCancel(); }
std::string cid() { return _cid; }
bool openChannel() {
std::string rid = UUIDHelper::uuid();
openChannelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
_codec->send(_conn, req);
basicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
void closeChannel() {
std::string rid = UUIDHelper::uuid();
closeChannelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
_codec->send(_conn, req);
waitResponse(rid);
return ;
}
bool declareExchange(
const std::string &name,
ExchangeType type,
bool durable,
bool auto_delete,
google::protobuf::Map<std::string, std::string> &args) {
//构造一个声明虚拟机的请求对象,
std::string rid = UUIDHelper::uuid();
declareExchangeRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(name);
req.set_exchange_type(type);
req.set_durable(durable);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
//然后向服务器发送请求
_codec->send(_conn, req);
//等待服务器的响应
basicCommonResponsePtr resp = waitResponse(rid);
//返回
return resp->ok();
}
void deleteExchange(const std::string &name) {
std::string rid = UUIDHelper::uuid();
deleteExchangeRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(name);
_codec->send(_conn, req);
waitResponse(rid);
return ;
}
bool declareQueue(
const std::string &qname,
bool qdurable,
bool qexclusive,
bool qauto_delete,
google::protobuf::Map<std::string, std::string> &qargs) {
std::string rid = UUIDHelper::uuid();
declareQueueRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(qname);
req.set_durable(qdurable);
req.set_auto_delete(qauto_delete);
req.set_exclusive(qexclusive);
req.mutable_args()->swap(qargs);
_codec->send(_conn, req);
basicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
void deleteQueue(const std::string &qname) {
std::string rid = UUIDHelper::uuid();
deleteQueueRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(qname);
_codec->send(_conn, req);
waitResponse(rid);
return ;
}
bool queueBind(
const std::string &ename,
const std::string &qname,
const std::string &key) {
std::string rid = UUIDHelper::uuid();
queueBindRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(ename);
req.set_queue_name(qname);
req.set_binding_key(key);
_codec->send(_conn, req);
basicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
void queueUnBind(const std::string &ename, const std::string &qname) {
std::string rid = UUIDHelper::uuid();
queueUnBindRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(ename);
req.set_queue_name(qname);
_codec->send(_conn, req);
waitResponse(rid);
return ;
}
void basicPublish(
const std::string &ename,
const BasicProperties *bp,
const std::string &body) {
std::string rid = UUIDHelper::uuid();
basicPublishRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_body(body);
req.set_exchange_name(ename);
if (bp != nullptr) {
req.mutable_properties()->set_id(bp->id());
req.mutable_properties()->set_delivery_mode(bp->delivery_mode());
req.mutable_properties()->set_routing_key(bp->routing_key());
}
_codec->send(_conn, req);
waitResponse(rid);
return ;
}
void basicAck(const std::string &msgid) {
if (_consumer.get() == nullptr) {
DLOG("消息确认时,找不到消费者信息!");
return ;
}
std::string rid = UUIDHelper::uuid();
basicAckRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(_consumer->qname);
req.set_message_id(msgid);
_codec->send(_conn, req);
waitResponse(rid);
return;
}
void basicCancel() {
if (_consumer.get() == nullptr) {
return ;
}
std::string rid = UUIDHelper::uuid();
basicCancelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(_consumer->qname);
req.set_consumer_tag(_consumer->tag);
_codec->send(_conn, req);
waitResponse(rid);
_consumer.reset();
return;
}
bool basicConsume(
const std::string &consumer_tag,
const std::string &queue_name,
bool auto_ack,
const ConsumerCallback &cb) {
if (_consumer.get() != nullptr) {
DLOG("当前信道已订阅其他队列消息!");
return false;
}
std::string rid = UUIDHelper::uuid();
basicConsumeRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(queue_name);
req.set_consumer_tag(consumer_tag);
req.set_auto_ack(auto_ack);
_codec->send(_conn, req);
basicCommonResponsePtr resp = waitResponse(rid);
if (resp->ok() == false) {
DLOG("添加订阅失败!");
return false;
}
_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);
return true;
}
public:
//连接收到基础响应后,向hash_map中添加响应
void putBasicResponse(const basicCommonResponsePtr& resp) {
std::unique_lock<std::mutex> lock(_mutex);
_basic_resp.insert(std::make_pair(resp->rid(), resp));
_cv.notify_all();
}
//连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理
void consume(const basicConsumeResponsePtr& resp) {
if (_consumer.get() == nullptr) {
DLOG("消息处理时,未找到订阅者信息!");
return;
}
if (_consumer->tag != resp->consumer_tag()) {
DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致!");
return ;
}
_consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());
}
private:
basicCommonResponsePtr waitResponse(const std::string &rid) {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [&rid, this](){
return _basic_resp.find(rid) != _basic_resp.end();
});
//while(condition()) _cv.wait();
basicCommonResponsePtr basic_resp = _basic_resp[rid];
_basic_resp.erase(rid);
return basic_resp;
}
private:
std::string _cid;
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
Consumer::ptr _consumer;
std::mutex _mutex;
std::condition_variable _cv;
std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;
};
- 信道管理:
- a. 创建信道
- b. 查询信道
- c. 删除信道
class ChannelManager {
public:
using ptr = std::shared_ptr<ChannelManager>;
ChannelManager(){}
Channel::ptr create(const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec) {
std::unique_lock<std::mutex> lock(_mutex);
auto channel = std::make_shared<Channel>(conn, codec);
_channels.insert(std::make_pair(channel->cid(), channel));
return channel;
}
void remove(const std::string &cid) {
std::unique_lock<std::mutex> lock(_mutex);
_channels.erase(cid);
}
Channel::ptr get(const std::string &cid) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _channels.find(cid);
if (it == _channels.end()) {
return Channel::ptr();
}
return it->second;
}
private:
std::mutex _mutex;
std::unordered_map<std::string, Channel::ptr> _channels;
};
异步⼯作线程实现
客⼾端这边存在两个异步⼯作线程,
- ⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread
- ⼀个是当收到消息后进⾏异步处理的⼯作线程池。
这两项都不是以连接为单元进⾏创建的,⽽是创建后,可以⽤以多个连接中,因此单独进⾏封装。
class AsyncWorker {
public:
using ptr = std::shared_ptr<AsyncWorker>;
muduo::net::EventLoopThread loopthread;
threadpool pool;
};
连接管理模块
在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。
class Connection
{
public:
using ptr = std::shared_ptr<Connection>;
Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker)
: _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),
_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)),
_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_worker(worker),
_channel_manager(std::make_shared<ChannelManager>())
{
_dispatcher.registerMessageCallback<basicCommonResponse>(std::bind(&Connection::basicResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
_client.connect();
_latch.wait(); // 阻塞等待,直到连接建立成功
}
Channel::ptr openChannel()
{
Channel::ptr channel = _channel_manager->create(_conn, _codec);
bool ret = channel->openChannel();
if (ret == false)
{
DLOG("打开信道失败!");
return Channel::ptr();
}
return channel;
}
void closeChannel(const Channel::ptr &channel)
{
channel->closeChannel();
_channel_manager->remove(channel->cid());
}
private:
void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp)
{
// 1. 找到信道
Channel::ptr channel = _channel_manager->get(message->cid());
if (channel.get() == nullptr)
{
DLOG("未找到信道信息!");
return;
}
// 2. 将得到的响应对象,添加到信道的基础响应hash_map中
channel->putBasicResponse(message);
}
void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp)
{
// 1. 找到信道
Channel::ptr channel = _channel_manager->get(message->cid());
if (channel.get() == nullptr)
{
DLOG("未找到信道信息!");
return;
}
// 2. 封装异步任务(消息处理任务),抛入线程池
_worker->pool.push([channel, message]()
{ channel->consume(message); });
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
{
LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
conn->shutdown();
}
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
_latch.countDown(); // 唤醒主线程中的阻塞
_conn = conn;
}
else
{
// 连接关闭时的操作
_conn.reset();
}
}
private:
muduo::CountDownLatch _latch; // 实现同步的
muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接
muduo::net::TcpClient _client; // 客户端
ProtobufDispatcher _dispatcher; // 请求分发器
ProtobufCodecPtr _codec; // 协议处理器
AsyncWorker::ptr _worker;
ChannelManager::ptr _channel_manager;
};
生产者客户端
publish_client.cc
- 实例化异步工作线程对象
- 实例化连接对象
- 通过连接创建信道
- 通过信道提供的服务完成所需
- 循环向交换机发布消息
- 关闭信道
#include "connection.hpp"
int main()
{
//1. 实例化异步工作线程对象
nzq::AsyncWorker::ptr awp = std::make_shared<nzq::AsyncWorker>();
//2. 实例化连接对象
nzq::Connection::ptr conn = std::make_shared<nzq::Connection>("127.0.0.1",8085,awp);
//3. 通过连接创建信道
nzq::Channel::ptr channel = conn->openChannel();
//4. 通过信道提供的服务完成所需
// 1. 声明一个交换机exchange1, 交换机类型为广播模式
google::protobuf::Map<std::string,std::string> tmp_map;
channel->declareExchange("exchange1", nzq::ExchangeType::TOPIC, true, false, tmp_map);
// 2. 声明一个队列queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 3. 声明一个队列queue2
channel->declareQueue("queue2", true, false, false, tmp_map);
// 4. 绑定queue1-exchange1,且binding_key设置为queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 5. 绑定queue2-exchange1,且binding_key设置为news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
//5. 循环向交换机发布消息
for (int i = 0; i < 10; i++) {
nzq::BasicProperties bp;
bp.set_id(nzq::UUIDHelper::uuid());
bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.pop");
channel->basicPublish("exchange1", &bp, "Hello World-" + std::to_string(i));
}
nzq::BasicProperties bp;
bp.set_id(nzq::UUIDHelper::uuid());
bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.sport");
channel->basicPublish("exchange1", &bp, "Hello linux");
bp.set_routing_key("news.sport");
channel->basicPublish("exchange1", &bp, "Hello chileme?");
//6. 关闭信道
conn->closeChannel(channel);
return 0;
return 0;
}
消费者客户端
同生产者客户端,如何消费需要自己设置,下面只是其中一种
void cb(nzq::Channel::ptr &channel, const std::string consumer_tag,
const nzq::BasicProperties *bp, const std::string &body)
{
std::cout << consumer_tag << "消费了消息:" << body << std::endl;
channel->basicAck(bp->id());
}
int main(int argc, char *argv[])
{
if (argc != 2) {
std::cout << "usage: ./consume_client queue1\n";
return -1;
}
//1. 实例化异步工作线程对象
nzq::AsyncWorker::ptr awp = std::make_shared<nzq::AsyncWorker>();
//2. 实例化连接对象
nzq::Connection::ptr conn = std::make_shared<nzq::Connection>("127.0.0.1", 8085, awp);
//3. 通过连接创建信道
nzq::Channel::ptr channel = conn->openChannel();
//4. 通过信道提供的服务完成所需
// 1. 声明一个交换机exchange1, 交换机类型为广播模式
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", nzq::ExchangeType::TOPIC, true, false, tmp_map);
// 2. 声明一个队列queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 3. 声明一个队列queue2
channel->declareQueue("queue2", true, false, false, tmp_map);
// 4. 绑定queue1-exchange1,且binding_key设置为queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 5. 绑定queue2-exchange1,且binding_key设置为news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->basicConsume("consumer1", argv[1], false, functor);
while(1) std::this_thread::sleep_for(std::chrono::seconds(3));
conn->closeChannel(channel);
return 0;
}