一.Client模块介绍
二.Client具体实现
1.消费者/订阅者模块
2.信道管理模块
3.异步线程模块
4.连接管理模块
这个模块同样是针对muduo库客户端连接的二次封装,向用户提供创建channel信道的接口,创建信道后,可以通过信道来获取指定服务。
三.全部代码
consumer.hpp
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <vector>
#include <functional>
namespace mq
{
// tag BasicAttributes body
using ConsumerCallBack = std::function<void(const std::string &, const msg::BasicAttributes *, const std::string &)>;
struct Consumer
{
using ptr = std::shared_ptr<Consumer>;
std::string _tag; // 消费者标识
std::string _qname; // 订阅的队列的名称
bool _auto_ack; // 是否自动确认
ConsumerCallBack _cb; // 消费者回调函数
Consumer(const std::string &ctag, const std::string &qname, bool auto_ack, const ConsumerCallBack &cb)
: _tag(ctag), _qname(qname), _auto_ack(auto_ack), _cb(cb)
{
DLOG("consumer:%s created %p", ctag.c_str(), this);
}
Consumer()
{
DLOG("consumer created %p", this);
}
~Consumer()
{
DLOG("consumer destroyed %p", this);
}
};
};
channel.hpp
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include "../common_mq/myproto.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include "muduo/net/TcpConnection.h"
#include <condition_variable>
#include "consumer.hpp"
#include "proto/codec.h"
#include "../common_mq/myproto.pb.h"
namespace mq
{
using BasicConsumeRspPtr = std::shared_ptr<msg::BasicConsumeRsp>;
using BasicCommonResponsePtr = std::shared_ptr<msg::BasicCommonResponse>;
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
class Channel
{
private:
std::string _chid; // 信道ID
Consumer::ptr _consumer; // 消费者对象
muduo::net::TcpConnectionPtr _conn; // 连接对象
ProtobufCodecPtr _codec; // 编解码器
std::mutex _mutex; // 互斥锁
std::condition_variable _cv; // 条件变量
std::unordered_map<std::string, BasicCommonResponsePtr> _consume_rsp_map; // 存放常规响应的map
public:
using ptr = std::shared_ptr<Channel>;
Channel(const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec)
: _conn(conn), _codec(codec)
{
_chid = UUIDHelper::uuid();
DLOG("channel:%s created", _chid.c_str());
}
Channel()
{
}
~Channel()
{
basicCancel();
DLOG("channel:%s destroyed", _chid.c_str());
}
std::string chid() const
{
return _chid;
}
// 发送请求,在服务器端处理请求,创建相应的数据结构对象
// 1. 声明/删除/查找交换机
bool declareExchange(const std::string &name, msg::ExchangeType type,
bool durable,
bool auto_del,
google::protobuf::Map<std::string, std::string> &args)
{
// 构建请求
msg::DeclareExchangeReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_e_name(name);
req.set_e_type(type);
req.set_durable(durable);
req.set_auto_delete(auto_del);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("declare exchange failed, name:%s", name.c_str());
return false;
}
return true;
}
bool removeExchange(const std::string &name)
{
// 构建请求
msg::RemoveExchangeReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_e_name(name);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("remove exchange failed, name:%s", name.c_str());
return false;
}
return true;
}
// 2. 声明/删除队列
bool declareQueue(const std::string &name, bool durable,
bool exclusive, bool auto_del,
google::protobuf::Map<std::string, std::string> &args)
{
// 构建请求
msg::DeclareQueueReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_q_name(name);
req.set_durable(durable);
req.set_exclusive(exclusive);
req.set_auto_delete(auto_del);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("declare queue failed, name:%s", name.c_str());
return false;
}
return true;
}
bool removeQueue(const std::string &name)
{
// 构建请求
msg::RemoveQueueReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_q_name(name);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("remove queue failed, name:%s", name.c_str());
return false;
}
return true;
}
// 3. 绑定/解绑
bool bind(const std::string &ename, const std::string &qname, const std::string &key)
{
// 构建请求
msg::BindReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_e_name(ename);
req.set_q_name(qname);
req.set_bind_key(key);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("bind failed, ename:%s, qname:%s, key:%s", ename.c_str(), qname.c_str(), key.c_str());
return false;
}
return true;
}
bool unbind(const std::string &ename, const std::string &qname)
{
// 构建请求
msg::UnbindReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_e_name(ename);
req.set_q_name(qname);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("unbind failed, ename:%s, qname:%s", ename.c_str(), qname.c_str());
return false;
}
return true;
}
// 4. 订阅/取消订阅
bool basicSubscribe(const std::string &qname, const std::string &consumer_tag, bool auto_ack, const ConsumerCallBack &cb)
{
if (_consumer.get() != nullptr)
{
ELOG("channel has consumer, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());
return false;
}
// 构建请求
msg::BasicSubscribeReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_q_name(qname);
req.set_consumer_tag(consumer_tag);
req.set_auto_ack(auto_ack);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("basic subscribe failed, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());
return false;
}
// 注册消费者
_consumer = std::make_shared<Consumer>(consumer_tag, qname, auto_ack, cb);
return true;
}
void basicCancel()
{
if (_consumer.get() == nullptr)
{
// DLOG("不是消费者信道,不需要取消订阅");
return;
}
// 构建请求
msg::BasicCancelReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_consumer_tag(_consumer->_tag);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("basic cancel failed, consumer:%s", _consumer->_tag.c_str());
return;
}
// 注销消费者
_consumer.reset();
return;
}
// 5. 消息的发布,确认
bool basicPublish(const std::string &ename, msg::BasicAttributes *bp, const std::string &body)
{
// 构建请求
msg::BasicPublishReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_e_name(ename);
req.set_body(body);
if (bp != nullptr)
{
req.mutable_attr()->CopyFrom(*bp);
}
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("basic publish failed, qname:%s", ename.c_str());
return false;
}
return true;
}
void basicAck(const std::string &id)
{
if (_consumer.get() == nullptr)
{
ELOG("channel has no consumer, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());
return;
}
// 构建请求
msg::BasicAckReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
req.set_q_name(_consumer->_qname);
req.set_messagg_id(id);
// 发送请求
_codec->send(_conn, req);
// 异步操作,需要等待响应
BasicCommonResponsePtr rsp = waitResponse(req.rid());
if (rsp->success() != true)
{
ELOG("basic ack failed, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());
return;
}
return;
}
// 6.打开/关闭信道
bool openChannel()
{
msg::OpenChannelReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
if(_codec.get() == nullptr)
{
ELOG("codec is null");
return false;
}
_codec->send(_conn, req);
BasicCommonResponsePtr rsp = waitResponse(req.rid());
return rsp->success() == true;
}
void closeChannel()
{
msg::CloseChannelReq req;
req.set_rid(UUIDHelper::uuid());
req.set_chid(_chid);
_codec->send(_conn, req);
BasicCommonResponsePtr rsp = waitResponse(req.rid());
return;
}
private:
// 等待服务端响应
BasicCommonResponsePtr waitResponse(const std::string &req_id)
{
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this, &req_id]()
{ return _consume_rsp_map.find(req_id) != _consume_rsp_map.end(); });
BasicCommonResponsePtr rsp = _consume_rsp_map[req_id];
_consume_rsp_map.erase(req_id);
return rsp;
}
public:
// 给连接对象提供的函数,收到来自服务器不同类型的消息,进行不同的处理
// 收到通用响应,向map中添加响应
void putBasicCommonResponse(const BasicCommonResponsePtr &rsp)
{
std::unique_lock<std::mutex> lock(_mutex);
_consume_rsp_map[rsp->rid()] = rsp;
_cv.notify_all();
}
// 收到来自服务器的消费响应,调用消费者的回调函数
void consume(const BasicConsumeRspPtr &rsp)
{
if (_consumer.get() == nullptr)
{
ELOG("channel has no consumer");
return;
}
if (_consumer->_tag != rsp->consumer_tag())
{
ELOG("consumer tag not match, consumer_tag:%s", _consumer->_tag.c_str());
return;
}
_consumer->_cb(rsp->consumer_tag(), rsp->mutable_attr(), rsp->body());
}
};
class ChannelManager
{
private:
std::unordered_map<std::string, mq::Channel::ptr> _channels; // 信道管理器
std::mutex _mutex; // 互斥锁
public:
using ptr = std::shared_ptr<ChannelManager>;
ChannelManager() {}
Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
{
std::unique_lock<std::mutex> lock(_mutex);
auto channel = std::make_shared<mq::Channel>(conn, codec);
_channels.insert(std::make_pair(channel->chid(), channel));
return channel;
}
void remove(const std::string &chid)
{
std::unique_lock<std::mutex> lock(_mutex);
_channels.erase(chid);
//DLOG("erase channel:%s", chid.c_str());
}
Channel::ptr get(const std::string &chid)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _channels.find(chid);
if (it == _channels.end())
{
return Channel::ptr();
}
return it->second;
}
};
};
async_worker.hpp
#pragma once
#include <memory>
#include "../common_mq/thread_pool.hpp"
#include "muduo/net/EventLoopThread.h"
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
namespace mq
{
class AsyncWorker
{
public:
using ptr = std::shared_ptr<AsyncWorker>;
// muduo::net::EventLoopThread _loopThread;
std::unique_ptr<muduo::net::EventLoopThread> _loopThread;
ThreadPool _pool;
AsyncWorker()
: _loopThread(std::make_unique<muduo::net::EventLoopThread>())
{
}
};
}
connection.hpp
#pragma once
#include "proto/codec.h"
#include "proto/dispatcher.h"
#include "../include/muduo/base/Logging.h"
#include "../include/muduo/base/Mutex.h"
#include "../include/muduo/net/TcpClient.h"
#include "../include/muduo/net/EventLoopThread.h"
#include "../include/muduo/net/TcpConnection.h"
#include "../include/muduo/base/CountDownLatch.h"
// #include "include/muduo/base/Mutex.h"
// #include "include/muduo/net/TcpClient.h"
// #include "include/muduo/net/EventLoopThread.h"
// #include "include/muduo/net/TcpConnection.h"
// #include "include/muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <string>
#include <stdio.h>
#include <unistd.h>
#include "async_worker.hpp"
#include "channel.hpp"
namespace mq
{
using MessagePtr = std::shared_ptr<google::protobuf::Message>;
class Connection
{
private:
AsyncWorker::ptr _worker; // 异步工作者
muduo::CountDownLatch _latch; // 等待连接成功,通知主线程
muduo::net::TcpClient _client; // TCP客户端
muduo::net::TcpConnectionPtr _conn; // TCP连接
ProtobufDispatcher _dispatcher; // Protobuf消息派发器
ProtobufCodecPtr _codec; // Protobuf编解码器
ChannelManager::ptr _channelManager; // 信道管理器
public:
using ptr = std::shared_ptr<Connection>;
Connection(const std::string &ip, uint16_t port, const AsyncWorker::ptr &worker)
: _worker(worker),
_client(_worker->_loopThread->startLoop(), muduo::net::InetAddress(ip, port), "Connection"),
_latch(1),
_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_channelManager(std::make_shared<ChannelManager>())
{
_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<msg::BasicConsumeRsp>(std::bind(&Connection::onBasicConsumeRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<msg::BasicCommonResponse>(std::bind(&Connection::onBasicCommonRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.connect();
_latch.wait();
}
Channel::ptr openChannel()
{
// 创建客户端channel
auto newchannel = _channelManager->create(_conn, _codec);
// 在服务端也要创建channel
bool ret = newchannel->openChannel();
if (!ret)
{
ELOG("open channel failed");
return Channel::ptr();
}
return newchannel;
}
void closeChannel(const Channel::ptr &channel)
{
channel->closeChannel(); // 关闭服务端channel
std::string chid = channel->chid();
//DLOG("remove channel: %s", chid.c_str());
_channelManager->remove(chid); // 删除客户端channel
}
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected()) // 连接成功
{
LOG_INFO << "Connected to " << conn->peerAddress().toIpPort();
_conn = conn;
_latch.countDown(); // 通知主线程连接成功,可以发送消息
}
else
{
LOG_ERROR << "Disconnected from " << conn->peerAddress().toIpPort();
_conn.reset();
}
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn,
const MessagePtr &message,
muduo::Timestamp time)
{
LOG_ERROR << "Unknown message: " << message->GetTypeName();
conn->shutdown();
}
private:
void onBasicConsumeRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRspPtr &rsp, muduo::Timestamp time)
{
auto channel = _channelManager->get(rsp->chid());
if (channel.get() == nullptr)
{
LOG_ERROR << "channel not found: " << rsp->chid();
return;
}
_worker->_pool.push([channel, rsp]
{
channel->consume(rsp); // 封装任务,抛给线程池执行
});
}
void onBasicCommonRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &rsp, muduo::Timestamp time)
{
auto channel = _channelManager->get(rsp->chid());
if (channel.get() == nullptr)
{
LOG_ERROR << "channel not found: " << rsp->chid();
return;
}
channel->putBasicCommonResponse(rsp); // 向map中添加响应,唤醒等待
}
};
};