文章目录
- 一、项目介绍
- 1. 基本原理
- 2. 涉及到的技术栈
- 3. 最终实现的效果
- 二、 第三方库的介绍与使用
- 1. JsonCpp库
- Json的数据格式
- JsonCpp介绍
- 封装Json工具类
- 2. muduo库
- muduo库是什么
- Muduo库常见接口介绍
- 3. C++11异步操作
- std::future
- 三、框架设计
- 1. 服务端模块划分
- Network
- Protocol
- Dispatcher
- RpcRouter
- Publish-Subscribe
- Server
- 2.客户端模块设计
- Network
- Protocol
- Dispatcher
- Requestor
- RpcCaller
- Publish-Subscribe
- Registry-Discovery
- Client
- 3. 抽象层
- 4. 具象层
- 5. 业务层
- 6. 整体框架设计
- 四、 项目实现
- 1. 简单日志宏实现
- 2. Json序列化/反序列化
- 3. UUID生成
- 4. 项目消息类型字段信息定义
- 请求字段宏定义
- 消息类型定义
- 响应码类型定义
- RPC请求类型定义
- 主题操作类型定义
- 服务操作类型定义
- 5. 通信抽象实现
- 6. 消息抽象实现
- 7. 通信-Muduo封装实现
- 8. 消息-不同消息封装实现
- 9. Dispatcher实现
- 10. 服务端-RpcRouter实现
- 11. 服务端-Registry&Discovery实现
- 12. 服务端-整合封装Server
- 13. 客户端-Requestor实现
- 14. 客户端-RpcCaller实现
- 15. 客户端-Publish&Subscribe实现
- 16. 客户端-Registry&Discovery实现
- 17. 客户端-整合封装Client
一、项目介绍
项目gitee地址:https://gitee.com/qi-haozhe/rpc
点击跳转:跳转
1. 基本原理
RPC(Remote Procedure Call)远程过程调用,是⼀种通过网络从远程计算机上请求服务,而不需要了解底层网络通信细节。RPC可以使用多种网络协议进行通信, 如HTTP、TCP、UDP等, 并且在TCP/IP网络四层模型中跨越了传输层和应用层。简言之RPC就是像调用本地方法一样调用远程方法。过程可以理解为业务处理、计算任务,更直白的说,就是程序/方法/函数等,就是像调用本地方法一样调用远程方法。
具体原理不多做解释,简单来说就是客户端A需要运行函数func,得到结果,但是这个函数不在客户端这里,而是在另一台主机B上。所以A就把运行函数func的参数进行序列化,通过网络传输到B主机上,B把参数进行反序列化后,把参数传入func函数中,调用func函数,最总得到结果answer,然后把这个answer再通过序列化,通过网络发回主机A,主机A把收到的answer反序列化就得到了最终的结果。
因为A并没有在本地执行这个函数,而是请求其它主机进行处理,并把处理结果返回给A,这个就叫远程调用。
2. 涉及到的技术栈
本项目涉及到的技术栈并不多,主要包括C++面向对象知识、部分STL容器的使用、future,promise的使用、Json的序列化反序列化和muduo库的简要使用。
3. 最终实现的效果
⼀个完整RPC通信框架,大概包含以下内容:
- 序列化协议
- 通信协议
- 连接复用
- 服务注册
- 服务发现
- 服务订阅和通知
- 负载均衡
- 服务监控
- 同步调用
- 异步调用
该项目的核心是一个分布式RPC框架,它实现了同步调用、异步callback调用、异步futrue调用、服务注册/发现,服务上线/下线以及发布订阅等功能设计。通过引入注册中心,实现动态服务发现和负载均衡,服务发布方可以向注册中心注册服务,服务调用方可以向注册中心发现可提供服务的主机地址。底层通过对muduo库的进一步封装实现了请求和响应的高效异步通信,通过实现dispatcher模块,根据不同的类型,调用不同的业务处理函数进行消息处理。利用Json进行序列化/反序列化,通信协议使用了LV格式的通信协议解决粘包问题保证数据完整性。
二、 第三方库的介绍与使用
1. JsonCpp库
Json的数据格式
Json是一种数据交换格式,它采用完全独立于编程语言的文本格式来存储和表示数据。例如:我们想表示一个同学的学生信息
代码表示:
char *name = "xx";
int age = 18;
float score[3] = {88.5, 99, 58};
Json表示:
{
"姓名" : "xx",
"年龄" : 18,
"成绩" : [88.5, 99, 58],
"爱好" :{
"书籍" : "西游记",
"运动" : "打篮球"
}
}
Json的数据类型包括对象,数组,字符串,数字等。
- 对象:使用花括号括起来的表示一个对象
- 数组:使用中括号[括起来的表示一个数组
- 字符串:使用常规双引号"”括起来的表示一个字符串
- 数字:包括整形和浮点型,直接使用
JsonCpp介绍
Jsoncpp库主要是用于实现Json格式数据的序列化和反序列化,它实现了将多个数据对象组织成为json格式字符串,以及将Json格式字符串解析得到多个数据对象的功能。
先看一下Json数据对象类的表示:
class Json::Value{
Value &operator=(const Value &other); //Value重载了[]和=,因此所有的赋值和获取数据都可以通过
Value& operator[](const std::string& key);//简单的⽅式完成 val["name"] = "xx";
Value& operator[](const char* key);
Value removeMember(const char* key);//移除元素
const Value& operator[](ArrayIndex index) const; //val["score"][0]
Value& append(const Value& value);//添加数组元素val["score"].append(88);
ArrayIndex size() const;//获取数组元素个数 val["score"].size();
std::string asString() const;//转string string name = val["name"].asString();
const char* asCString() const;//转char* char *name = val["name"].asCString();
Int asInt() const;//转int int age = val["age"].asInt();
float asFloat() const;//转float float weight = val["weight"].asFloat();
bool asBool() const;//转 bool bool ok = val["ok"].asBool();
};
Jsoncpp 库主要借助三个类以及其对应的少量成员函数完成序列化及反序列化
- 序列化接口
class JSON_API StreamWriter {
virtual int write(Value const& root, std::ostream* sout) = 0;
}
class JSON_API StreamWriterBuilder : public StreamWriter::Factory {
virtual StreamWriter* newStreamWriter() const;
}
- 反序列化接口
class JSON_API CharReader {
virtual bool parse(char const* beginDoc, char const* endDoc,
Value* root, std::string* errs) = 0;
}
class JSON_API CharReaderBuilder : public CharReader::Factory {
virtual CharReader* newCharReader() const;
}
封装Json工具类
class JSON
{
public:
//序列化,提供JSON::Value对象,返回string字符串
static bool serialize(const Json::Value &val, std::string &body)
{
std::stringstream ss;
Json::StreamWriterBuilder swb;
std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());
bool ret = sw->write(val, &ss);
if (ret != 0){
ELOG("json serialize failed!");
return false;
}
body = ss.str();
return true;
}
//反序列化,提供string字符串,返回Json::Value对象
static bool unserialize(const std::string &body, Json::Value &val)
{
Json::CharReaderBuilder crb;
std::string errs;
std::unique_ptr<Json::CharReader> cr(crb.newCharReader());
bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);
if (ret == false){
ELOG("json unserialize failed : %s", errs.c_str());
return false;
}
return true;
}
};
2. muduo库
muduo库是什么
Muduo由陈硕大佬开发,是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。它是一款基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread,所谓one loop per thread指的是:
- 一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件
- 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须归属于某个EventLoop
管理
Muduo库常见接口介绍
TcpServer类基础介绍
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:
InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:
enum Option
{
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort);
void setThreadNum(int numThreads);
void start();
/// 当⼀个新连接建⽴成功的时候被调⽤
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
};
EventLoop类基础介绍
class EventLoop : noncopyable
{
public:
/// Loops forever.
/// Must be called in the same thread as creation of the object.
void loop();
/// Quits loop.
/// This is not 100% thread safe, if you call through a raw pointer,
/// better to call through shared_ptr<EventLoop> for 100% safety.
void quit();
TimerId runAt(Timestamp time, TimerCallback cb);
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
TimerId runAfter(double delay, TimerCallback cb);
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
TimerId runEvery(double interval, TimerCallback cb);
/// Cancels the timer.
/// Safe to call from other threads.
void cancel(TimerId timerId);
private:
std::atomic<bool> quit_;
std::unique_ptr<Poller> poller_;
mutable MutexLock mutex_;
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
TcpConnection类基础介绍
class TcpConnection : noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
bool connected() const { return state_ == kConnected; }
bool disconnected() const { return state_ == kDisconnected; }
void send(string&& message); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message); // this one will swap data
void shutdown(); // NOT thread safe, no simultaneous calling
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
EventLoop* loop_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
boost::any context_;
};
TcpClient类基础介绍
class TcpClient : noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
~TcpClient(); // force out-line dtor, for std::unique_ptr members.
void connect();//连接服务器
void disconnect();//关闭连接
void stop();
//获取客⼾端对应的通信连接Connection对象的接⼝,发起connect后,有可能还没有连接建⽴成功
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_);
return connection_;
}
/// 连接服务器成功时的回调函数
void setConnectionCallback(ConnectionCallback cb)
{ connectionCallback_ = std::move(cb); }
/// 收到服务器发送的消息时的回调函数
void setMessageCallback(MessageCallback cb)
{ messageCallback_ = std::move(cb); }
private:
EventLoop* loop_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。
因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
*/
class CountDownLatch : noncopyable
{
public:
explicit CountDownLatch(int count);
void wait(){
MutexLockGuard lock(mutex_);
while (count_ > 0)
{
condition_.wait();
}
}
void countDown(){
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_.notifyAll();
}
}
int getCount() const;
private:
mutable MutexLock mutex_;
Condition condition_ GUARDED_BY(mutex_);
int count_ GUARDED_BY(mutex_);
};
Buffer类基础介绍
class Buffer : public muduo::copyable
{
public:
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;
explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend);
void swap(Buffer& rhs)
size_t readableBytes() const
size_t writableBytes() const
const char* peek() const
const char* findEOL() const
const char* findEOL(const char* start) const
void retrieve(size_t len)
void retrieveInt64()
void retrieveInt32()
void retrieveInt16()
void retrieveInt8()
string retrieveAllAsString()
string retrieveAsString(size_t len)
void append(const StringPiece& str)
void append(const char* /*restrict*/ data, size_t len)
void append(const void* /*restrict*/ data, size_t len)
char* beginWrite()
const char* beginWrite() const
void hasWritten(size_t len)
void appendInt64(int64_t x)
void appendInt32(int32_t x)
void appendInt16(int16_t x)
void appendInt8(int8_t x)
int64_t readInt64()
int32_t readInt32()
int16_t readInt16()
int8_t readInt8()
int64_t peekInt64() const
int32_t peekInt32() const
int16_t peekInt16() const
int8_t peekInt8() const
void prependInt64(int64_t x)
void prependInt32(int32_t x)
void prependInt16(int16_t x)
void prependInt8(int8_t x)
void prepend(const void* /*restrict*/ data, size_t len)
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
static const char kCRLF[];
};
3. C++11异步操作
std::future
介绍
std:future是C++11标准库中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候获取任务的执行结果。std:future的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
应用场景
-
异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,
std:future
可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率 -
并发控制:在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用
std:future
,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作 -
结果获取:std:future提供了一种安全的方式来获取异步任务的结果。我们可以使用
std:future::get()
函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()
函数时,我们可以确保已经获取到了所需的结果。
用法示例
- 使用std::async关联异步任务
std:async是一种将任务与std:future关联的简单方法。它创建并运行一个异步任务,并返回一个与该任务结果关联的std:future对象。默认情况下,std::async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于你给的参数。这个参数为std:launch类型:
std:launch:deferred
表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务std:launch:async
表明函数会在自己创建的线程上运行std:launch::deferred|std::launch:async
内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <chrono>
int aysnc_task() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 2;
}
int main() {
// 关联异步任务aysnc_task 和 futrue
std::future<int> result_future = std::async(std::launch::async,
aysnc_task);
// 此处可执⾏其他操作, ⽆需等待
std::cout << "hello bit!" << std::endl;
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
- 使用std::packaged_task和std::future配合
std::packaged_task
就是将任务和 std::feature
绑定在一起的模板,是一种对任务的封装。我们可以通过std::packaged_task
对象获取任务相关联的std::feature
对象,通过调用get_future()
方法获得。std::packaged_task
的模板参数是函数签名。可以把std::future
和std::async
看成是分开的, 而std::packaged_task
则是一个整体。
#include <iostream>
#include <future>
#include <chrono>
int add(int num1, int num2)
{
return num1 + num2;
}
int main() {
// 封装任务
std::packaged_task<int(int, int)> task(add);
// 此处可执⾏其他操作, ⽆需等待
std::cout << "hello bit!" << std::endl;
std::future<int> result_future = task.get_future();
// 这⾥必须要让任务执⾏, 否则在get()获取future的值时会⼀直阻塞
task(1, 2);
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
异步执行std::packaged_task任务
#include <iostream>
#include <future>
#include <chrono>
#include <memory>
int add(int num1, int num2)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
return num1 + num2;
}
int main() {
// 封装任务
// std::packaged_task<int(int, int)> task(add);
// 此处可执⾏其他操作, ⽆需等待
// std::cout << "hello bit!" << std::endl;
// std::future<int> result_future = task.get_future();
//需要注意的是,task虽然重载了()运算符,但task并不是⼀个函数,
//std::async(std::launch::async, task, 1, 2); //--错误⽤法
//所以导致它作为线程的⼊⼝函数时,语法上看没有问题,但是实际编译的时候会报错
//std::thread(task, 1, 2); //---错误⽤法
//⽽packaged_task禁⽌了拷⻉构造,
//且因为每个packaged_task所封装的函数签名都有可能不同,因此也⽆法当作参数⼀样传递
//传引⽤不可取,毕竟任务在多线程下执⾏存在局部变量声明周期的问题,因此不能传引⽤
//因此想要将⼀个packaged_task进⾏异步调⽤,
//简单⽅法就只能是new packaged_task,封装函数传地址进⾏解引⽤调⽤了
//⽽类型不同的问题,在使⽤的时候可以使⽤类型推导来解决
auto task = std::make_shared<std::packaged_task<int(int, int)>>(add);
std::future<int> result_future = task->get_future();
std::thread thr([task]() { (*task)(1, 2); });
thr.detach();
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
- 使用
std::promise
和std::future
配合
std::promise
提供了一种设置值的方式,它可以在设置之后通过相关联的std::future
对象进行读取。换种说法就是之前说过std::future
可以读取一个异步函数的返回值了, 但是要等待就绪, 而std::promise就提供一种方式手动让 std::future
就绪
#include <iostream>
#include <future>
#include <chrono>
void task(std::promise<int> result_promise)
{
int result = 2;
std::cout << "task result:" << result << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
result_promise.set_value(result);
}
int main() {
// 创建promise
std::promise<int> result_promise;
std::future<int> result_future = result_promise.get_future();
// 创建⼀个新线程, 执⾏⻓时间运⾏的任务
std::thread task_thread(task, std::move(result_promise));
// 此处可执⾏其他操作, ⽆需等待
std::cout << "hello bit!" << std::endl;
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
task_thread.join();
return 0;
}
三、框架设计
1. 服务端模块划分
服务端的功能需求:
- 基于网络通信接收客户端的请求,提供rpc服务
- 基于网络通信接收客户端的请求,提供服务注册与发现,上线&下线通知
- 基于网络通信接收客户端的请求,提供主题操作(创建/删除/订阅/取消),消息发布
在服务端的模块划分中,基于以上理解的功能,可以划分出这么几个模块
- Network:网络通信模块
- Protocol:应用层通信协议模块
- Dispatcher:消息分发处理模块
- RpcRouter:远端调用路由功能模块
- Publish-Subscribe:发布订阅功能模块
- Registry-Discovery:服务注册/发现/上线/下线功能模块
- Server:基于以上模块整合而出的服务端模块
Network
该模块为网络通信模块,实现底层的网络通信功能,这个模块本质上也是一个比较复杂庞大的模块,因此鉴于项目的庞大,该模块我们将使用陈硕大佬的Muduo库来进行搭建。
Protocol
应用层通信协议模块的存在意义:解析数据,解决通信中有可能存在的粘包问题,能够获取到一条完整的消息。在前边的muduo库基本使用中,我们能够知道想要让一个服务端/客户端对消息处理,就要设置一个onMessage的回调函数,在这个函数中对收到的数据进行应用层协议处理。而Protocol模块就是是网络通信协议模块的设计,也就是在网络通信中,我们必须设计一个应用层的网络通信协议出来,以解决网络通信中可能存在的粘包问题,而解决粘包有三种方式:特殊字符间隔,定长,LV格式。而我们项目中将使用LV格式来定义应用层的通信协议格式。
Length:该字段固定4字节长度,用于表示后续的本条消息数据长度。
MType:该字段为Value中的固定字段,固定4字节长度,用于表示该条消息的类型。
- Rpc调用请求/响应类型消息
- 发布/订阅/取消订阅/消息推送类型消息
- 主题创建/删除类型消息
- 服务注册/发现/上线/下线类型消息
IDLength:为消息中的固定字段,该字段固定4字节长度,用于描述后续ID字段的实际长度。
MID:在每条消息中都会有一个固定字段为ID字段,用于唯一标识消息,ID字段长度不固定。
Body:消息主题正文数据字段,为请求或响应的实际内容字段。
Dispatcher
模块存在的意义:区分消息类型,根据不同的类型,调用不同的业务处理函数进行消息处理。当muduo库底层通信收到数据后,在onMessage回调函数中对数据进行应用层协议解析,得到一条实际消息载荷后,我们就该决定这条消息代表这客户端的什么请求,以及应该如何处理。因此,我们设计出了Dispatcher模块,作为一个分发模块,这个模块内部会保存有一个hash_map<消息类型,回调函数>,以此由使用者来决定哪条消息用哪个业务函数进行处理,当收到消息后,在该模块找到其对应的处理回调函数进行调用即可。
消息类型:
- rpc请求&响应
- 服务注册/发现/上线/下线请求&响应
- 主题创建/删除/订阅/取消订阅请求&响应,消息发布的请求&响应
RpcRouter
RpcRouter模块存在的意义:提供rpc请求的处理回调函数,内部所要实现的功能,分辨出客户端请求的服务进行处理得到结果进行响应。
rpc请求中,最关键的两个点:
- 请求方法名称
- 请求对应要处理的参数信息
在Rpc远端调用中,首先将客户端到服务端的通信链路打通,然后将自己所需要调用的服务名称,以及参数信息传递给服务端,由服务端进行接收处理,并返回结果。不管是客户端要传递给服务端的服务名称以及参数信息,或者服务端返回的结果,都是在上边Protocol中定义的Body字段中,因此Body字段中就存在了另一层的正文序列化/反序列化过程。序列化方式有很多种,鉴于当前我们是json-rpc,因此这个序列化过程我们就初步使用json序列化来进行,所定义格式如下:
//RPC-request
{
"method" : "Add",
"parameters" : {
"num1" : 11,
"num2" : 22
}
}
//RPC-response
{
"rcode" : OK,
"result": 33
}
{
"rcode" : ERROR_INVALID_PARAMETERS
}
需要注意的是,在服务端,当接收到这么一条消息后,Dispatcher模块会找到该Rpc请求类型的回调处理函数进行业务处理,但是在进行业务处理的时候,也是只会将parameters参数字段传入回调函数中进行处理。然而,对服务端来说,应该从传入的Json:Value对象中,有什么样的参数,以及参数信息是否符合自己所提供的服务的要求,都应该有一个检测,是否符合要求,符合要求了再取出指定字段的数据进行处理。因此,对服务端来说,在进行服务注册的时候,必须有一个服务描述,以代码段中的Add请求为例,该服务描述中就应该描述:
- 服务名称:Add,
- 参数名称:num1,是一个整形
- 参数名称:num2,是一个整形,
- 返回值类型:整形
有了这个描述,在回调函数中就可以先对传入的参数进行校验,没问题了则取出指定字段数据进行处理并返回结果基于以上理解,在实现该模块时,该有以下设计:
- 该模块必须具备一个Rpc路由管理,其中包含对于每个服务的参数校验功能
- 该模块必须具备一个方法名称和方法业务回调的映射
- 该模块必须向外提供Rpc请求的业务处理函数。
Publish-Subscribe
Publish-Subscribe模块存在的意义:针对发布订阅请求进行处理,提供一个回调函数设置给Dispatcher模块。
发布订阅所包含的请求操作:
- 主题的创建
- 主题的删除
- 主题的订阅
- 主题的取消订阅
- 主题消息的发布
在当前的项目中,我们也实现一个简单的发布订阅功能,该功能是围绕多个客户端与一个服务端来展开的。
即任意一个客户端在发布或订阅之前先创建一个主题,比如在新闻发布中我们创建一个音乐新闻主题,哪些客户端希望能够收到音乐新闻相关的消息,则就订阅这个主题,服务端会建立起该主题与客户端之间的联系。
当某个客户端向服务端发布消息,且发布消息的目标主题是音乐新闻主题,则服务端会找出订阅了该主题的客户端,将消息推送给这些客户端。
既然涉及到网络通信,那就先将通信消息的正文格式定义出来:
//Topic-request
{
"key" : "music", //主题名称
// 主题操作类型
"optype" :
TOPIC_CRAETE/TOPIC_REMOVE/TOPIC_SUBSCRIBE/TOPIC_CANCEL/TOPIC_PUBLISH,
//TOPIC_PUBLISH请求才会包含有message字段
"message" : "Hello World"
}
//Topic-response
{
"rcode" : OK,
}
{
"rcode" : ERROR_INVALID_PARAMETERS,
}
功能思想并不复杂,因此我们需要把更多的精力放到其实现设计上:
- 该模块必须具备一个主题管理,且主题中需要保存订阅了该主题的客户端连接
- 主题收到一条消息,需要将这条消息推送给订阅了该主题的所有客户端
- 该模块必须具备一个订阅者管理,且每个订阅者描述中都必须保存自己所订阅的主题名称
- 目的是为了当一个订阅客户端断开连接时,能够找到订阅信息的关联关系,进行删除
- 该模块必须向外提供主题创建/销毁,主题订阅/取消订阅,消息发布处理的业务处理函数
6.2.1.6 Registry-Discovery
Registry-Discovery模块存在的意义:就是针对服务注册与发现请求的处理。
- 服务注册/发现类型请求中的详细划分
- 服务注册:服务provider告诉中转中心,自己能提供哪些服务
- 服务发现:服务caller询问中转中心,谁能提供指定服务
- 服务上线:在一个provider上线了指定服务后,通知发现过该服务的客户端有个provider可以提供该服务
- 服务下线:在一个provider断开连接,通知发现过该服务的caller,谁下线了哪个服务
服务注册模块,该模块主要是为了实现分布式架构而存在,让每一个rpc客户端能够从不同的节点主机上获取自己所需的服务,让业务更具扩展性,系统更具健壮性。
而为了能够让rpc-caller知道有哪些rpc-provider能提供自己所需服务,那么就需要有一个注册中心让这些rpc-provider去注册登记自己的服务,让rpc-caller来发现这些服务。
因此,在我们的服务端功能中,还需实现服务的注册/发现,以及服务的上线/下线功能。
//RD--request
{
//SERVICE_REGISTRY-Rpc-provider进⾏服务注册
//SERVICE_DISCOVERY - Rpc-caller进⾏服务发现
//SERVICE_ONLINE/SERVICE_OFFLINE 在provider下线后对caller进⾏服务上下线通知
"optype" :
SERVICE_REGISTRY / SERVICE_DISCOVERY / SERVICE_ONLINE / SERVICE_OFFLINE,
"method" : "Add",
//服务注册/上线/下线有host字段,发现则⽆host字段
"host" : {
"ip" : "127.0.0.1",
"port" : 9090
}
}
//Registry/Online/Offline-response
{
"rcode" : OK,
}
//error-response
{
"rcode" : ERROR_INVALID_PARAMETERS,
}
//Discovery-response
{
"method" : "Add",
"host" : [
{"ip" : "127.0.0.1", "port" : 9090},
{ "ip" : "127.0.0.2", "port" : 8080 }
]
}
该模块的设计如下:
- 必须具备一个服务发现者的管理:
- 方法与发现者:当一个客户端进行服务发现的时候,进行记录谁发现过该服务,当有一个新的提供者上线的时候,可以通知该发现者
- 连接与发现者:当一个发现者断开连接了,删除关联关系,往后就不需要通知了
- 必须具备一个服务提供者的管理:
- 连接与提供者:当一个提供者断开连接的时候,能够通知该提供者提供的服务对应的发现者,该主机的该服务下线了
- 方法与提供者:能够知道谁的哪些方法下线了,然后通知发现过该方法的客户端
- 必须向Dispatcher模块提供一个服务注册/发现的业务处理回调函数这样,当一个rpc-provider登记了服务,则将其管理起来,当rpc-caller进行服务发现时,则将保存的对应服务所对应的主机信息,响应给rpc-caller。而,当中途一个rpc-provider上线登记服务时,则可以给进行了对应服务发现的rpc-caller进行服务上线通知,通知rpc-caller当前多了一个对应服务的rpc-provider。
同时,当一个rpc-provider下线时,则可以找到进行了该服务发现的rpc-caller进行服务的下线通知。
Server
当以上的所有功能模块都完成后,我们就可以将所有功能整合到一起来实现服务端程序了。
- RpcServer:rpc功能模块与网络通信部分结合。
- RegistryServer:服务发现注册功能模块与网络通信部分结合
- TopicServer:发布订阅功能模块与网络通信部分结合。
2.客户端模块设计
在客户端的模块划分中,基于以上理解的功能,可以划分出这么几个模块
- Protocol:应用层通信协议模块
- Network:网络通信模块
- Dispatcher:消息分发处理模块
- Requestor:请求管理模块
- RpcCaller:远端调用功能模块
- Publish-Subscribe:发布订阅功能模块
- Registry-Discovery:服务注册/发现/上线/下线功能模块
- Client:基于以上模块整合而出的客户端模块
Network
网络通信基于muduo库实现网络通信客户端
Protocol
应用层通信协议处理,与服务端保持一致。
Dispatcher
IO数据分发处理,逻辑与服务端一致
Requestor
Requestor模块存在的意义:针对客户端的每一条请求进行管理,以便于对请求对应的响应做出合适的操作。
首先,对于客户端来说,不同的地方在于,更多时候客户端是请求方,是主动发起请求服务的一方,而在多线程的网络通信中,多线程下,针对多个请求进行响应可能会存在时序的问题,这种情况下,则我们无法保证一个线程发送一个请求后,接下来接收到的响应就是针对自己这条请求的响应,这种情况是非常危险的一种情况。
其次,类似于Muduo库这种异步IO网络通信库,通常IO操作都是异步操作,即发送数据就是把数据放入发送缓冲区,但是什么时候会发送由底层的网络库来进行协调,并且也并不会提供recv接口,而是在连接触发可读事件后,1O读取数据完成后调用处理回调进行数据处理,因此也无法直接在发送请求后去等待该条请求的响应。
针对以上问题,我们则创建出当前的请求管理模块来解决,它的思想也非常简单,就是给每一个请求都设定一个请求ID,服务端进行响应的时候标识响应针对的是哪个请求(也就是响应信息中会包含请求ID),因此客户端这边我们不管收到哪条请求的响应,将数据存储入一则hash_map中,以请求ID作为映射,并向外提供获取指定请求ID响应的阻塞接口,这样只要在发送请求的时候知道自己的请求ID,那么就能获取到自己想要的响应,而不会出现异常。
针对这个思想,我们再进一步,可以将每个请求进一步封装描述,添加入异步的future控制,或者设置回调函数的方式,在不仅可以阻塞获取响应,也可以实现异步获取响应以及回调处理响应。
RpcCaller
RpcCaller模块存在的意义:向用户提供进行rpc调用的模块。
Rpc服务调用模块,这个模块相对简单,只需要向外提供几个rpc调用的接口,内部实现向服务端发送请求,等待获取结果即可,稍微麻烦一些的是Rpc调用我们需要提供多种不同方式的调用:
- 同步调用:发起调用后,等收到响应结果后返回
- 异步调用:发起调用后立即返回,在想获取结果的时候进行获取
- 回调调用:发起调用的同时设置结果的处理回调,收到响应后自动对结果进行回调处理
Publish-Subscribe
Publish-Subscribe模块存在意义:向用户提供发布订阅所需的接口,针对推送过来的消息进行处理。
发布订阅稍微能复杂一丢丢,因为在发布订阅中有两种角色,一个客户端可能是消息的发布者,也可能是消息的订阅者。
而且不管是哪个角色都是对主题进行操作,因此其中也包含了主题的相关操作,比如,要发布一条消息需要先创建主题。
且一个订阅者可能会订阅多个主题,每个主题的消息可能都会有不同的处理方式,因此需要有订阅者主题回调的管理。
Registry-Discovery
服务注册和发现模块需要实现的功能会稍微复杂一些,因为分为两个角色来完成其功能
- 注册者:作为Rpc服务的提供者,需要向注册中心注册服务,因此需要实现向服务器注册服务的功能
- 发现者:作为Rpc服务的调用者,需要先进行服务发现,也就是向服务器发送请求获取能够提供指定服务的主机地址,获取地址后需要管理起来留用,且作为发现者,需要关注注册中心发送过来的服务上线/下线消息,以及时对已经下线的服务和主机进行管理。
Client
将以上模块进行整合就可以实现各个功能的客户端了。
- RegistryClient:服务注册功能模块与网络通信客户端结合
- DiscoveryClient:服务发现功能模块与网络通信客户端结合
- RpcClient:DiscoveryClient&RPC功能模块与网络通信客户端结合
- TopicClient:发布订阅功能模块与网络通信客户端结合
3. 抽象层
框架设计
在当前项目的实现中,我们将整个项目的实现划分为三层来进行实现
- 抽象层:将底层的网络通信以及应用层通信协议以及请求响应进行抽象,使项目更具扩展性和灵活性。
- 具象层:针对抽象的功能进行具体的实现。
- 业务层:基于抽象的框架在上层实现项目所需功能。
抽象层
在咱们的项目实现中,网络通信部分采用了第三方库Muduo库,以及通信协议使用了LV格式的通信协议解决粘包问题,数据正文中采用了JsOn格式进行序列化和反序列化,而这几方面我们都可能会存在继续优化的可能,甚至在序列化方面不一定非要采用JsOn,因此在设计项目框架的时候,我们对于底层通信部分相关功能先进行抽象,形成一层抽象层,而上层业务部分根据抽象层来完成功能,这样的好处是在具体的底层功能实现部分,我们可以实现插拔式的模块化替换,以此来提高项目的灵活性和扩展性。
4. 具象层
具象层就是针对抽象的具体实现。
而具体的实现也比较简单,从抽象类派生出具体功能的派生类,然后在内部实现各个接口功能即可。
- 基于Muduo库实现网络通信部分抽象
- 基于LV通信协议实现Protocol部分抽象
不过这一层中比较特殊的是,我们需要针对不同的请求,从BaseMessage中派生出不同的请求和响应类型,以便于在针对指定消息处理时,能够更加轻松的获取或设置请求及响应中的各项数据元素。
5. 业务层
业务层就是基于底层的通信框架,针对项目中具体的业务功能的实现了,比如Rpc请求的处理,发布订阅请求的处理以及服务注册与发现的处理等等。
Rpc:
发布订阅:
服务注册&发现:
6. 整体框架设计
四、 项目实现
1. 简单日志宏实现
意义:快速定位程序运行逻辑出错的位置。项目在运行中可能会出现各种问题,出问题不可怕,关键的是要能找到问题,并解决问题。
解决问题的方式:
- gdb调试:逐步调试过于繁琐,缓慢。主要用于程序崩溃后的定位。
- 系统运行日志分析:在任何程序运行有可能逻辑错误的位置进行输出提示,快速定位逻辑问题的位置。
#define LDBG 0
#define LINF 1
#define LERR 2
#define LDEFAULT LINF
#define LOG(level, format, ...) \
{ \
if (level >= LDEFAULT) \
{ \
time_t t = time(NULL); \
struct tm *lt = localtime(&t); \
char time_tmp[32] = {0}; \
strftime(time_tmp, 31, "%m-%d %T", lt); \
fprintf(stdout, "[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
}
#define DLOG(format, ...) LOG(LDBG, format, ##__VA_ARGS__);
#define ILOG(format, ...) LOG(LINF, format, ##__VA_ARGS__);
#define ELOG(format, ...) LOG(LERR, format, ##__VA_ARGS__);
2. Json序列化/反序列化
class JSON
{
public:
//序列化,提供JSON::Value对象,返回string字符串
static bool serialize(const Json::Value &val, std::string &body)
{
std::stringstream ss;
Json::StreamWriterBuilder swb;
std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());
bool ret = sw->write(val, &ss);
if (ret != 0){
ELOG("json serialize failed!");
return false;
}
body = ss.str();
return true;
}
//反序列化,提供string字符串,返回Json::Value对象
static bool unserialize(const std::string &body, Json::Value &val)
{
Json::CharReaderBuilder crb;
std::string errs;
std::unique_ptr<Json::CharReader> cr(crb.newCharReader());
bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);
if (ret == false){
ELOG("json unserialize failed : %s", errs.c_str());
return false;
}
return true;
}
};
3. UUID生成
UUID(Universally Unique Identifier), 也叫通用唯一识别码,通常由32位16进制数字字符组成。UUID的标准型式包含32个16进制数字字符,以连字号分为五段,形式为8-4-4-4-12的32个字符,
如:550e8400-e29b-41d4-a716-446655440000。
在这里,uuid生成,我们采用生成8个随机数字,加上8字节序号,共16字节数组成成32位16进制字符的组合形式来确保全局唯一的同时能够根据序号来分辨数(随机数肉眼分辨起来真是太难了…)。
class UUID
{
public:
static std::string uuid()
{
std::stringstream ss;
// 1. 构造一个机器随机数对象
std::random_device rd;
// 2. 以机器随机数为种子构造伪随机数对象
std::mt19937 generator(rd());
// 3. 构造限定数据范围的对象
std::uniform_int_distribution<int> distribution(0, 255);
// 4. 生成8个随机数,按照特定格式组织成为16进制数字字符的字符串
for (int i = 0; i < 8; i++){
if (i == 4 || i == 6)
ss << "-";
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);
}
ss << "-";
// 5. 定义一个8字节序号,逐字节组织成为16进制数字字符的字符串
static std::atomic<size_t> seq(1); // 00 00 00 00 00 00 00 01
size_t cur = seq.fetch_add(1);
for (int i = 7; i >= 0; i--){
if (i == 5)
ss << "-";
ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> (i * 8)) & 0xFF);
}
return ss.str();
}
};
4. 项目消息类型字段信息定义
请求字段宏定义
- 消息ID:
- 消息类型:
- 消息正文
- Rpc请求
- 方法名称
- 方法参数
- 发布订阅相关请求
- 主题名称
- 操作类型
- 主题消息
- 服务操作相关请求
- 方法名称
- 操作类型
- 主机信息
- IP地址
- PORT端口
- 响应码
- Rpc响应
- 调用结果
#define KEY_METHOD "method"
#define KEY_PARAMS "parameters"
#define KEY_TOPIC_KEY "topic_key"
#define KEY_TOPIC_MSG "topic_msg"
#define KEY_OPTYPE "optype"
#define KEY_HOST "host"
#define KEY_HOST_IP "ip"
#define KEY_HOST_PORT "port"
#define KEY_RCODE "rcode"
#define KEY_RESULT "result"
消息类型定义
- Rpc请求&响应
- 主题操作请求&响应:
- 消息发布请求&响应
- 服务操作请求&响应:
enum class MType {
REQ_RPC = 0,
RSP_RPC,
REQ_TOPIC,
RSP_TOPIC,
REQ_SERVICE,
RSP_SERVICE
};
响应码类型定义
- 成功处理
- 解析失败
- 消息中字段缺失或错误导致无效消息
- 连接断开
- 无效的Rpc调用参数
- Rpc服务不存在
- 无效的Topic操作类型
- 主题不存在
- 无效的服务操作类型
enum class RCode {
RCODE_OK = 0,
RCODE_PARSE_FAILED,
RCODE_ERROR_MSGTYPE,
RCODE_INVALID_MSG,
RCODE_DISCONNECTED,
RCODE_INVALID_PARAMS,
RCODE_NOT_FOUND_SERVICE,
RCODE_INVALID_OPTYPE,
RCODE_NOT_FOUND_TOPIC,
RCODE_INTERNAL_ERROR
};
static std::string errReason(RCode code) {
static std::unordered_map<RCode, std::string> err_map = {
{RCode::RCODE_OK, "成功处理!"},
{RCode::RCODE_PARSE_FAILED, "消息解析失败!"},
{RCode::RCODE_ERROR_MSGTYPE, "消息类型错误!"},
{RCode::RCODE_INVALID_MSG, "⽆效消息"},
{RCode::RCODE_DISCONNECTED, "连接已断开!"},
{RCode::RCODE_INVALID_PARAMS, "⽆效的Rpc参数!"},
{RCode::RCODE_NOT_FOUND_SERVICE, "没有找到对应的服务!"},
{RCode::RCODE_INVALID_OPTYPE, "⽆效的操作类型"},
{RCode::RCODE_NOT_FOUND_TOPIC, "没有找到对应的主题!"},
{RCode::RCODE_INTERNAL_ERROR, "内部错误!"}
};
auto it = err_map.find(code);
if (it == err_map.end()) {
return "未知错误!";
}
return it->second;
}
RPC请求类型定义
- 同步请求:等待收到响应后返回
- 异步请求:返回异步对象,在需要的时候通过异步对象获取响应结果(还未收到结果会阻塞)
- 回调请求:设置回调函数,通过回调函数对响应进行处理
enum class RType {
REQ_ASYNC = 0,
REQ_CALLBACK
};
主题操作类型定义
- 主题创建
- 主题删除
- 主题订阅
- 主题取消订阅
- 主题消息发布
enum class TopicOptype {
TOPIC_CREATE = 0,
TOPIC_REMOVE,
TOPIC_SUBSCRIBE,
TOPIC_CANCEL,
TOPIC_PUBLISH
};
服务操作类型定义
- 服务注册
- 服务发现
- 服务上线
- 服务下线
enum class ServiceOptype
{
SERVICE_REGISTRY = 0,
SERVICE_DISCOVERY,
SERVICE_ONLINE,
SERVICE_OFFLINE,
SERVICE_UNKNOW
};
5. 通信抽象实现
- BaseMessage
- BaseBuffer
- BaseProtocol
- BaseConnection
- BaseServer
- BaseClient
class BaseMessage
{
public:
using ptr = std::shared_ptr<BaseMessage>;
virtual ~BaseMessage() {}
virtual std::string rid(){
return _rid;
}
virtual void setRid(const std::string rid){
_rid = rid;
}
virtual MType mtype(){
return _mtype;
}
virtual void setMType(MType mtype){
_mtype = mtype;
}
virtual bool check() = 0;
virtual std::string serialize() = 0;
virtual bool unserialize(const std::string &msg) = 0;
protected:
MType _mtype;
std::string _rid;
};
//缓冲区,主要就是读取数据,后面用muduo库中的buffer进行进一步封装
class BaseBuffer
{
public:
using ptr = std::shared_ptr<BaseBuffer>;
virtual size_t readableSize() = 0;
virtual int32_t peekInt32() = 0;
virtual void retrieveInt32() = 0;
virtual int32_t readInt32() = 0;
virtual std::string retrieveAsString(size_t len) = 0;
};
//协议抽象
class BaseProtocol
{
public:
using ptr = std::shared_ptr<BaseProtocol>;
virtual bool canProcessed(const BaseBuffer::ptr &buf) = 0;
virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) = 0;
virtual std::string serialize(const BaseMessage::ptr &msg) = 0;
};
class BaseConnection
{
public:
using ptr = std::shared_ptr<BaseConnection>;
virtual void send(const BaseMessage::ptr &msg) = 0;
virtual void shutdown() = 0;
virtual bool connected() = 0;
};
using ConnectionCallback = std::function<void(const BaseConnection::ptr &)>;
using CloseCallback = std::function<void(const BaseConnection::ptr &)>;
using MessageCallback = std::function<void(const BaseConnection::ptr &, BaseMessage::ptr &)>;
class BaseServer
{
public:
using ptr = std::shared_ptr<BaseServer>;
virtual void start() = 0;
virtual void setConnectionCallback(const ConnectionCallback &cb){
_cb_connection = cb;
}
virtual void setCloseCallback(const CloseCallback &cb){
_cb_close = cb;
}
virtual void setMessageCallback(const MessageCallback &cb){
_cb_message = cb;
}
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
class BaseClient
{
public:
using ptr = std::shared_ptr<BaseClient>;
virtual void setConnectionCallback(const ConnectionCallback &cb){
_cb_connection = cb;
}
virtual void setCloseCallback(const CloseCallback &cb){
_cb_close = cb;
}
virtual void setMessageCallback(const MessageCallback &cb){
_cb_message = cb;
}
virtual void connect() = 0;
virtual void shutdown() = 0;
virtual bool connected() = 0;
virtual bool send(const BaseMessage::ptr &) = 0;
virtual BaseConnection::ptr connection() = 0;
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
6. 消息抽象实现
- JsonMessage
- JsonRequest & JsonResponse
- RpcRequest & RpcResponse
- TopicRequest & TopicResponse
- ServiceRequest & ServiceResponse
using Address = std::pair<std::string, int>;
//JsonMessage,和BaseMessage相比多了一个 Json::Value _body字段
class JsonMessage : public BaseMessage
{
public:
using ptr = std::shared_ptr<JsonMessage>;
//序列化,将成员Json::Value _body序列化成string返回
virtual std::string serialize() override
{
std::string body;
bool ret = JSON::serialize(_body, body);
if (ret == false){
return std::string();
}
return body;
}
//反序列化 传入stirng,将字符串中的内容反序列化,保存到成员变量 _body中
virtual bool unserialize(const std::string &msg) override
{
return JSON::unserialize(msg, _body);
}
protected:
Json::Value _body;
};
class JsonRequest : public JsonMessage
{
public:
using ptr = std::shared_ptr<JsonRequest>;
};
//response中还得包含rcode的设置和检验
class JsonResponse : public JsonMessage
{
public:
using ptr = std::shared_ptr<JsonMessage>;
virtual bool check()
{
if (_body[KEY_RCODE].isNull() == true){
ELOG("响应中没有响应状态码!");
return false;
}
if (_body[KEY_RCODE].isIntegral() == false){
ELOG("响应中的响应状态码类型不对!");
return false;
}
return true;
}
virtual RCode rcode()
{
return (RCode)_body[KEY_RCODE].asInt();
}
virtual void setRCode(RCode rcode)
{
_body[KEY_RCODE] = (int)rcode;
}
protected:
};
7. 通信-Muduo封装实现
- MuduoBuffer
- MuduoProtocol
- MuduoConnection
- MuduoServer
- MuduoClient
#pragma once
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpClient.h>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"
#include "message.hpp"
#include <mutex>
#include <unordered_map>
namespace qi_rpc
{
//继承BaseBuffer封装的MuduoBuffer,利用muduo::net::buffer* 做了一个封装
class MuduoBuffer : public BaseBuffer
{
public:
using ptr = std::shared_ptr<MuduoBuffer>;
MuduoBuffer(muduo::net::Buffer *buf) : _buf(buf) {}
virtual size_t readableSize() override
{
return _buf->readableBytes();
}
virtual int32_t peekInt32() override
{
return _buf->peekInt32();
}
virtual void retrieveInt32() override
{
return _buf->retrieveInt32();
}
virtual int32_t readInt32() override
{
return _buf->readInt32();
}
virtual std::string retrieveAsString(size_t len) override
{
DLOG("进入了net里的retrieveAsString,len:%d",len);
return _buf->retrieveAsString(len);
}
private:
muduo::net::Buffer *_buf;
};
class BufferFactory
{
public:
template <class... Args>
static BaseBuffer::ptr create(Args &&...args)
{
//需要传个muduo::net::Buffer *buf作为参数,进行初始化
return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);
}
};
//Length + Value 定长报头+可变字段
class LVProtocol : public BaseProtocol
{
public:
using ptr = std::shared_ptr<LVProtocol>;
//判断是否可以进行处理,包括缓冲区中的数据是否足够处理
virtual bool canProcessed(const BaseBuffer::ptr &buf) override
{
DLOG("实际readablesize:%d,lenFieldsLength:%d",buf->readableSize(),lenFieldsLength);
if (buf->readableSize() < lenFieldsLength){
return false;
}
int32_t total_len = buf->peekInt32();
DLOG("实际total_le:%d,(total_len + lenFieldsLength):%d",buf->readableSize(),total_len + lenFieldsLength);
if (buf->readableSize() < (total_len + lenFieldsLength)){
return false;
}
return true;
}
//序列化 此处的序列化包括两部分 自定义协议LV的序列化,和msg中Json::Value中的序列化
virtual std::string serialize(const BaseMessage::ptr &msg) override
{
// 包括两部分,一部分是LV格式的序列化,一部分是Json格式(body)的序列化
// 其中Json格式的序列化已经在JsonMessage中封装好了,这里直接调用就好了。
std::string body = msg->serialize();
// |--Len--|--mtype--|--idlen--|--id--|--body--|
std::string id = msg->rid();
auto mtype = htonl((int32_t)msg->mtype());
int32_t idlen = htonl(id.size());
int32_t h_total_len = mtypeFieldsLength + idlenFieldsLength + id.size() + body.size();
int32_t n_total_len = htonl(h_total_len);
DLOG("h_total_len:%d", h_total_len);
std::string result;
result.reserve(h_total_len);
result.append((char *)&n_total_len, lenFieldsLength);
result.append((char *)&mtype, mtypeFieldsLength);
result.append((char *)&idlen, idlenFieldsLength);
result.append(id);
result.append(body);
return result;
}
//别看叫onMessage其实就是反序列化
//从缓冲区把数据按照LV格式读取出来,将正文部分通过Message类的反序列化函数,将内容反序列化到类内
virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) override
{
// 当调用onMessage的时候,默认认为缓冲区中的数据足够一条完整的消息
int32_t total_len = buf->readInt32();
MType mtype = (MType)buf->readInt32();
int32_t idlen = buf->readInt32();
//int32_t body_len = total_len - lenFieldsLength - mtypeFieldsLength - idlenFieldsLength;
int32_t body_len = total_len - idlen - idlenFieldsLength - mtypeFieldsLength;
DLOG("retrieveAsString(id_len)");
std::string id = buf->retrieveAsString(idlen);
DLOG("retrieveAsString(body_len)");
std::string body = buf->retrieveAsString(body_len);
DLOG("retrieveAsString(body_len)完成");
msg = MessageFactory::create(mtype);
if (msg.get() == nullptr){
ELOG("消息类型错误,构造消息对象失败!");
return false;
}
// 反序列化
bool ret = msg->unserialize(body);
if (ret == false){
ELOG("消息正文反序列化失败!");
return false;
}
msg->setRid(id);
msg->setMType(mtype);
DLOG("反序列化成功");
return true;
}
private:
const size_t lenFieldsLength = 4;
const size_t mtypeFieldsLength = 4;
const size_t idlenFieldsLength = 4;
};
class ProtocolFactory
{
public:
template <class... Args>
static BaseProtocol::ptr create(Args &&...args)
{
return std::make_shared<LVProtocol>(std::forward<Args>(args)...);
}
};
//利用muduo::net::TcpConnectionPtr继承BaseMessage进行了一次封装
class MuduoConnection : public BaseConnection
{
public:
using ptr = std::shared_ptr<MuduoConnection>;
MuduoConnection(const muduo::net::TcpConnectionPtr &conn,
const BaseProtocol::ptr &protocol) : _protocol(protocol), _conn(conn) {}
virtual void send(const BaseMessage::ptr &msg) override
{
//包含了两次序列化,LV的和Json的
DLOG("MuduoConnection正在进行序列化");
std::string body = _protocol->serialize(msg);
DLOG("MuduoConnection正在进行send");
_conn->send(body);
DLOG("MuduoConnection进行send成功");
}
//关闭连接
virtual void shutdown() override
{
_conn->shutdown();
}
//判断是否连接
virtual bool connected() override
{
return _conn->connected();
}
private:
BaseProtocol::ptr _protocol;
muduo::net::TcpConnectionPtr _conn;
};
class ConnectionFactory
{
public:
template <class... Args>
static BaseConnection::ptr create(Args &&...args)
{
//两参数初始化,一个_protocol一个_conn;
MuduoConnection::ptr p = std::make_shared<MuduoConnection>(std::forward<Args>(args)...);
if(p.get()==nullptr) DLOG("factory一开始create connection就是空的");
DLOG("factory一开始create connectionb不是空的");
return p;
}
};
class MuduoServer : public BaseServer
{
public:
using ptr = std::shared_ptr<MuduoServer>;
MuduoServer(int port) : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
"MuduoServer", muduo::net::TcpServer::kReusePort),
_protocol(ProtocolFactory::create()) {}
virtual void start()
{
_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 return _message_callback(shared_from_this(), &_in_buffer);
_server.start(); // 先开始监听
_baseloop.loop(); // 开始死循环事件监控
}
private:
//设置muduo库内部的连接成功的回调函数
//一旦连接成功就得将TcpConnectionPtr和BaseConnection利用哈希表建立好映射
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
std::cout << "连接建立\n";
auto muduo_conn = ConnectionFactory::create(conn, _protocol);//俩参数 conn和protocol
{
std::unique_lock<std::mutex> lock(_mutex);
_conns.insert(std::make_pair(conn, muduo_conn));
}
if (_cb_connection) //执行自己设置的回调函数
_cb_connection(muduo_conn);
}
else
{
std::cout << "连接断开!\n";
BaseConnection::ptr muduo_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it == _conns.end()){
return;
}
muduo_conn = it->second;
_conns.erase(conn);
}
if (_cb_close)//执行自己设置的回调函数
_cb_close(muduo_conn);
}
}
//设置muduo库内部的有消息到来的回调函数
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp)
{
DLOG("连接有数据到来,开始处理!");
auto base_buf = BufferFactory::create(buf);
while (1)
{ //判断数据是否能够处理
DLOG("进入while");
if (_protocol->canProcessed(base_buf) == false){
DLOG("进入第一个if");
if (base_buf->readableSize() > maxDataSize){
conn->shutdown();
ELOG("缓冲区中数据过大!");
return;
}
DLOG("数据量不足!");
break;
}
DLOG("缓冲区中数据可处理!");
BaseMessage::ptr msg;
//把消息从缓冲区中拿出来,经过两次反序列化后得到消息msg
bool ret = _protocol->onMessage(base_buf, msg);
if (ret == false){
conn->shutdown();
ELOG("缓冲区中数据错误!");
return;
}
DLOG("消息反序列化成功!")
BaseConnection::ptr base_conn;//连接已成功就把映射关系加到哈希表里去了
//去哈希表中找到BaseConnection,并作为参数传递给自定义的回调函数,进行消息的处理
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it == _conns.end()){
conn->shutdown();
return;
}
base_conn = it->second;
}
DLOG("调用回调函数进行消息处理!");
if (_cb_message)//调用的是dispatcher,dispatcher再根据消息类型用不同的回调函数处理不同的请求
_cb_message(base_conn, msg);
}
}
private:
size_t maxDataSize = (1 << 16);
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
BaseProtocol::ptr _protocol;
std::mutex _mutex;
//每一个连接到来,都会产生一个TcpConnectionPtr,并且还得对应一个BaseConnection::ptr
std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;
};
class ServerFactory
{
public:
template <class... Args>
static BaseServer::ptr create(Args &&...args)
{
return std::make_shared<MuduoServer>(std::forward<Args>(args)...);
}
};
class MuduoClient : public BaseClient
{
public:
using ptr = std::shared_ptr<MuduoClient>;
MuduoClient(const std::string &sip, int sport) : _protocol(ProtocolFactory::create()),
_baseloop(_loopthread.startLoop()),
_downlatch(1),
_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient") {}
virtual void connect() override
{
DLOG("设置回调函数,连接服务器");
_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));
// 设置连接消息的回调
_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 连接服务器
_client.connect();
//sleep(100);
_downlatch.wait();
if(_conn.get()==nullptr) DLOG("_conn.get()=nullptr");
DLOG("连接服务器成功!");
}
virtual void shutdown() override
{
return _client.disconnect();
}
virtual bool send(const BaseMessage::ptr &msg) override
{
if (connected() == false){
ELOG("连接已断开!");
return false;
}
_conn->send(msg);
return true;
}
virtual BaseConnection::ptr connection() override
{
return _conn;
}
virtual bool connected() override
{
return (_conn && _conn->connected());
}
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
DLOG("这个函数到底tm执行没啊");
if (conn->connected()){
std::cout << "连接建立!\n";
//_downlatch.countDown(); // 计数--,为0时唤醒阻塞
DLOG("这里tm到底执行过来没?");
_conn = ConnectionFactory::create(conn, _protocol);
_downlatch.countDown(); // 计数--,为0时唤醒阻塞
if(_conn.get()==nullptr) DLOG("_conn.get()=nullptr");
}
else{
std::cout << "连接断开!\n";
_conn.reset();
}
}
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp)
{
DLOG("连接有数据到来,开始处理!");
auto base_buf = BufferFactory::create(buf);
while (1)
{
if (_protocol->canProcessed(base_buf) == false){
if (base_buf->readableSize() > maxDataSize){
ELOG("缓冲区中数据过大!");
return;
}
break;
}
BaseMessage::ptr msg;
bool ret = _protocol->onMessage(base_buf, msg);
if (ret == false){
conn->shutdown();
ELOG("缓冲区中数据错误!");
return;
}
DLOG("处理用户自定义的on_message函数");
if (_cb_message)
_cb_message(_conn, msg);
}
}
private:
const size_t maxDataSize = (1 << 16);
BaseProtocol::ptr _protocol;
BaseConnection::ptr _conn;
muduo::CountDownLatch _downlatch;
muduo::net::EventLoopThread _loopthread;
muduo::net::EventLoop *_baseloop;
muduo::net::TcpClient _client;
};
class ClientFactory
{
public:
template <class... Args>
static BaseClient::ptr create(Args &&...args)
{
BaseClient::ptr p = std::make_shared<MuduoClient>(std::forward<Args>(args)...);
if(p.get()==nullptr) DLOG("factory一开始create client就是空的");
return p;
}
};
}
8. 消息-不同消息封装实现
- JsonRequest
- RpcRequest
- TopicRequest
- ServiceRequest
- JsonResponse
- RpcResponse
- TopicResponse
- ServiceResponse
class RpcRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<RpcRequest>;
virtual bool check() override
{
if (_body[KEY_METHOD].isNull() == true || _body[KEY_METHOD].isString() == false){
ELOG("RPC请求中请求方法为空或请求方法类型错误!");
return false;
}
if (_body[KEY_PARAMS].isNull() == true || _body[KEY_PARAMS].isObject() == false){
ELOG("RPC请求中没有参数信息或参数信息类型错误!");
return false;
}
return true;
}
//设置方法名和返回方法名
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &method)
{
_body[KEY_METHOD] = method;
}
//设置参数和返回参数
Json::Value params()
{
return _body[KEY_PARAMS];
}
void setParams(const Json::Value params)
{
_body[KEY_PARAMS] = params;
}
};
// //Topic-request
// {
// "key" : "music", //主题名称
// // 主题操作类型
// "optype" : TOPIC_CRAETE/TOPIC_REMOVE/TOPIC_SUBSCRIBE/TOPIC_CANCEL/TOPIC_PUBLISH,
// //TOPIC_PUBLISH请求才会包含有message字段
// "message" : "Hello World"
// }
class TopicRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<TopicRequest>;
virtual bool check() override
{
if (_body[KEY_TOPIC_KEY].isNull() == true || _body[KEY_TOPIC_KEY].isString() == false){
ELOG("主题请求中没有主题名称或主题名称类型错误!");
return false;
}
if (_body[KEY_OPTYPE].isNull() == true || _body[KEY_OPTYPE].isIntegral() == false){
ELOG("主题请求中没有操作类型或操作类型的类型错误!");
return false;
}
if (_body[KEY_OPTYPE].asInt() == (int)TopicOptype::TOPIC_PUBLISH &&
(_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false))
{
ELOG("主题消息发布请求中没有消息内容字段或消息内容类型错误!");
return false;
}
return true;
}
//返回和设置主题消息 TOPIC_PUBLISH请求才会包含有message字段
std::string topicMsg()
{
return _body[KEY_TOPIC_MSG].asString();
}
void setTopicMsg(const std::string &msg)
{
_body[KEY_TOPIC_MSG] = msg;
}
//返回和设置主题名称
std::string topicKey()
{
return _body[KEY_TOPIC_KEY].asString();
}
void setTopicKey(const std::string &topickey)
{
_body[KEY_TOPIC_KEY] = topickey;
}
//返回和设置主题操作类型 创建、删除、订阅、取消订阅
TopicOptype optype()
{
return (TopicOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(TopicOptype optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
};
//RD--request
// {
// //SERVICE_REGISTRY-Rpc-provider进⾏服务注册
// //SERVICE_DISCOVERY - Rpc-caller进⾏服务发现
// //SERVICE_ONLINE/SERVICE_OFFLINE 在provider下线后对caller进⾏服务上下线通知
// "optype" : SERVICE_REGISTRY/SERVICE_DISCOVERY/SERVICE_ONLINE/SERVICE_OFFLINE,
// "method" : "Add",
// //服务注册/上线/下线有host字段,发现则⽆host字段
// "host" : {
// "ip" : "127.0.0.1",
// "port" : 9090
// }
// }
class ServiceRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<ServiceRequest>;
virtual bool check() override
{
if (_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false)
{
ELOG("服务请求中没有方法名称或方法名称类型错误!");
return false;
}
if (_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("服务请求中没有操作类型或操作类型的类型错误!");
return false;
}
if (_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&
(_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isObject() == false ||
_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST][KEY_HOST_IP].isString() == false ||
_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false))
{
ELOG("服务请求中主机地址信息错误!");
return false;
}
return true;
}
//设置方法名称
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &method)
{
_body[KEY_METHOD] = method;
}
//返回和设置操作类型: 服务注册、发现、上线、下线
ServiceOptype optype()
{
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOptype optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
//返回和设置ip+port
Address host()
{
Address host;
host.first = _body[KEY_HOST][KEY_HOST_IP].asString();
host.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();
return host;
}
void setHost(const Address &host)
{
Json::Value val;
val[KEY_HOST_IP] = host.first;
val[KEY_HOST_PORT] = host.second;
_body[KEY_HOST] = val;
}
};
//RPC-response
// {
// "rcode" : OK,
// "result": 33
// }
// {
// "rcode" : ERROR_INVALID_PARAMETERS
// }
class RpcResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check() override
{
if (_body[KEY_RCODE].isNull() == true || _body[KEY_RCODE].isIntegral() == false){
ELOG("响应中没有响应状态码,或状态码类型错误!");
return false;
}
if (_body[KEY_RESULT].isNull() == true){
ELOG("响应中没有Rpc调用结果,或结果类型错误!");
return false;
}
return true;
}
Json::Value result()
{
return _body[KEY_RESULT];
}
void setResult(const Json::Value &result)
{
_body[KEY_RESULT] = result;
}
};
//Topic-response
// {
// "rcode" : OK,
// }
// {
// "rcode" : ERROR_INVALID_PARAMETERS,
// }
//没啥可设置的字段,JsonResponse中以及够用了,返回值的设置和返回
class TopicResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
// //Registry/Online/Offline-response
// {
// "rcode" : OK,
// }
// //error-response
// {
// "rcode" : ERROR_INVALID_PARAMETERS,
// }
// //Discovery-response
// {
// "method" : "Add",
// "host" : [
// {"ip" : "127.0.0.1","port" : 9090},
// {"ip" : "127.0.0.2","port" : 8080}
// ]
// }
class ServiceResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check() override
{
if (_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false)
{
ELOG("响应中没有响应状态码,或状态码类型错误!");
return false;
}
if (_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("响应中没有操作类型,或操作类型的类型错误!");
return false;
}
if (_body[KEY_OPTYPE].asInt() == (int)(ServiceOptype::SERVICE_DISCOVERY) &&
(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false ||
_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isArray() == false))
{
ELOG("服务发现响应中响应信息字段错误!");
return false;
}
return true;
}
//返回和设置操作类型
ServiceOptype optype()
{
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOptype optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
//返回和设置方法名称
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &method)
{
_body[KEY_METHOD] = method;
}
//设置ip+port,此处不止一次ip+port,是个数组
void setHost(std::vector<Address> addrs)
{
for (auto &addr : addrs)
{
Json::Value val;
val[KEY_HOST_IP] = addr.first;
val[KEY_HOST_PORT] = addr.second;
_body[KEY_HOST].append(val);
}
}
std::vector<Address> hosts()
{
std::vector<Address> addrs;
int sz = _body[KEY_HOST].size();
for (int i = 0; i < sz; i++)
{
Address addr;
addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
};
class MessageFactory
{
public:
//返回基类消息BaseMessage,会产生多态
static BaseMessage::ptr create(MType mtype)
{
switch (mtype)
{
case MType::REQ_RPC:
return std::make_shared<RpcRequest>();
case MType::RSP_RPC:
return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC:
return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC:
return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE:
return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE:
return std::make_shared<ServiceResponse>();
}
return BaseMessage::ptr();
}
//返回指定类型的消息类型
template <class T, class... Args>
static std::shared_ptr<T> create(Args... args)
{
return std::make_shared<T>(std::forward(args)...);
}
};
9. Dispatcher实现
- 注册消息类型-回调函数映射关系
- 提供消息处理接口
#pragma once
#include "net.hpp"
#include "message.hpp"
namespace qi_rpc
{
class Callback
{
public:
using ptr = std::shared_ptr<Callback>;
virtual void onMessage(const BaseConnection::ptr &, BaseMessage::ptr &) = 0;
};
template <class T>
class CallbackT : public Callback
{
public:
using ptr = std::shared_ptr<CallbackT<T>>;
using MessageCallBack = std::function<void(const BaseConnection::ptr &, std::shared_ptr<T> &)>;
CallbackT(const MessageCallBack &handler) : _handler(handler) {}
virtual void onMessage(const BaseConnection::ptr &conn, BaseMessage::ptr &msg) override
{
auto type_msg = std::static_pointer_cast<T>(msg);
_handler(conn, type_msg);
}
private:
MessageCallBack _handler;
};
class Dispatcher
{
public:
using ptr = std::shared_ptr<Dispatcher>;
template <class T>//第二个参数的const得研究一下-------------------------------------------------------
void registerHandler(MType mtype, const typename CallbackT<T>::MessageCallBack &handler)
{
std::unique_lock<std::mutex> lock(_mutex);
auto cb = std::make_shared<CallbackT<T>>(handler);
_handlers.insert(std::make_pair(mtype, cb));
}
//例如:add服务通过rpcrouter注册进去,将rpcrouter的onRpcRequest设置到dispatcher中,把disp中的onmessage设置到muduo回调中去
void onMessage(const BaseConnection::ptr &conn, BaseMessage::ptr &msg)
{
std::unique_lock<std::mutex> lock(_mutex);
//根据消息类型,查找对应的回调函数进行处理
auto it = _handlers.find(msg->mtype());
if (it != _handlers.end()){
return it->second->onMessage(conn, msg);
}
ELOG("收到未知类型的消息: %d!", (int)msg->mtype());
conn->shutdown();
}
private:
std::mutex _mutex;
std::unordered_map<MType, Callback::ptr> _handlers;
};
}
10. 服务端-RpcRouter实现
- 提供Rpc请求处理回调函数
- 内部的服务管理
- 方法名称
- 参数信息
- 对外提供参数校验接口
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
namespace qi_rpc
{
namespace server
{
enum class VType
{
BOOL = 0,
INTEGRAL,
NUMERIC,
STRING,
ARRAY,
OBJECT
};
// 服务描述,其实就是一个函数的函数名、参数、返回值等的描述
// 一个服务描述,相当于一个完成的服务
class ServiceDescribe
{
public:
using ptr = std::shared_ptr<ServiceDescribe>;
using ParamsDescribe = std::pair<std::string, VType>; // 函数参数和对应参数类型
// 此处的ServiceCallback其实就是一个具体的函数而已,例如:
// void add(const Json::Value &req,Json::Value &rsp){
// int num1=req["num1"].asInt();
// int num2=req["num2"].asInt();
// rsp=num1 + num2;
// }
using ServiceCallback = std::function<void(const Json::Value &, Json::Value &)>;
ServiceDescribe(std::string &&method_name
, std::vector<ParamsDescribe> &¶ms_desc
, VType return_type
, ServiceCallback &&callback)
: _method_name(std::move(method_name)), _params_desc(std::move(params_desc)), _callback(std::move(callback)), _return_type(return_type)
{
}
const std::string &methodname()
{
return _method_name;
}
// 客户端请求调用的,要和我服务器本来设置的要对应,所以要进行检验
bool paramCheck(const Json::Value ¶ms)
{
for (auto &desc : _params_desc)
{
if (params.isMember(desc.first) == false)
{
ELOG("参数字段完整性校验失败!%s 字段缺失!", desc.first.c_str());
return false;
}
if (check(desc.second, params[desc.first]) == false)
{
ELOG("%s 参数类型校验失败!", desc.first.c_str());
return false;
}
}
return true;
}
bool call(const Json::Value ¶ms, Json::Value &result)
{
//result输出型参数
_callback(params, result);
// 例如此处的_callback可以是一个:
// void add(const Json::Value &req,Json::Value &rsp){
// int num1=req["num1"].asInt();
// int num2=req["num2"].asInt();
// rsp=num1 + num2;
// }
if (rtypeCheck(result) == false)
{
ELOG("回调处理函数中的响应信息校验失败!");
return false;
}
return true;
}
private:
bool rtypeCheck(const Json::Value &val)
{
return check(_return_type, val);
}
bool check(VType type, const Json::Value &val)
{
switch (type)
{
case VType::BOOL:
return val.isBool();
case VType::ARRAY:
return val.isArray();
case VType::INTEGRAL:
return val.isIntegral();
case VType::NUMERIC:
return val.isNumeric();
case VType::OBJECT:
return val.isObject();
case VType::STRING:
return val.isString();
}
return false;
}
private:
std::string _method_name; // 方法名称
std::vector<ParamsDescribe> _params_desc; // 函数参数和对应参数类型
ServiceCallback _callback; // 具体的执行函数
VType _return_type; // 返回值类型
};
class SDescribeFactory
{
public:
// 设置方法名称
void setMethodName(const std::string &method_name)
{
_method_name = method_name;
}
// 设置返回值的类型
void setReturnType(VType return_type)
{
_return_type = return_type;
}
// 设置参数 参数名称 参数类型
void setParamsDesc(const std::string &pname, VType type)
{
_params_desc.push_back(std::make_pair(pname, type));
}
// 设置真正的服务函数
void setCallback(const ServiceDescribe::ServiceCallback &cb)
{
_callback = cb;
}
ServiceDescribe::ptr build()
{
return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));
}
private:
// ServiceDescribe类的成员函数,利用建造者模式来进行初始化
std::string _method_name;
std::vector<ServiceDescribe::ParamsDescribe> _params_desc;
ServiceDescribe::ServiceCallback _callback;
VType _return_type;
};
// 服务管理,ServiceDescribe只是管理一个服务,用ServiceManager来管理多个ServiceDescribe
class ServiceManager
{
public:
using ptr = std::shared_ptr<ServiceManager>;
// 服务添加
void insert(const ServiceDescribe::ptr &desc)
{
std::unique_lock<std::mutex> lock(_mutex);
_services.insert(std::make_pair(desc->methodname(), desc));
}
// 服务查找
ServiceDescribe::ptr select(const std::string method_name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _services.find(method_name);
if (it == _services.end())
{
return ServiceDescribe::ptr();
}
return it->second;
}
// 服务删除
void remove(const std::string &method_name)
{
std::unique_lock<std::mutex> lock(_mutex);
_services.erase(method_name);
}
private:
std::mutex _mutex;
// 一个方法名称对应一个方法管理,利用哈希表把所有的服务组织起来
std::unordered_map<std::string, ServiceDescribe::ptr> _services;
};
// 一个RpcRputer管理一个ServiceManager,进而管理多个服务
class RpcRouter
{
public:
using ptr = std::shared_ptr<RpcRouter>;
RpcRouter() : _service_manager(std::make_shared<ServiceManager>()) {}
// onRpcRequest函数要绑定到dispatcher中去,发现消息类型是RPC_REQ类型,就去调用该函数进行处理
void onRpcRequest(const BaseConnection::ptr &conn, RpcRequest::ptr &request)
{ // 先去查查要请求的服务,存不存在.
// 1. 查询客户端请求的方法描述--判断当前服务端能否提供对应的服务
DLOG("%s 正在查找服务!", request->method().c_str());
auto service = _service_manager->select(request->method());
if (service.get() == nullptr)
{
ELOG("%s 服务未找到!", request->method().c_str());
return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);
}
DLOG("服务已找到,开始进行参数校验");
// 找到该服务描述,判断你不能处理
// 2. 进行参数校验,确定能否提供服务
if (service->paramCheck(request->params()) == false)
{
ELOG("%s 服务参数校验失败!", request->method().c_str());
return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);
}
// 能处理,则调用服务描述中设置好的函数(服务注册)进行处理,结果保存在Json::Value result中去
// 3. 调用业务回调接口进行业务处理
DLOG("参数校验通过,开始进行回调处理");
Json::Value result;
bool ret = service->call(request->params(), result);
if (ret == false)
{
ELOG("%s 服务参数校验失败!", request->method().c_str());
return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);
}
// 处理完,组织好响应发送回去,消息类型为RSP_RPC
// 4. 处理完毕得到结果,组织响应,向客户端发送
DLOG("回调处理完毕,开始组织相应并发送");
return response(conn, request, result, RCode::RCODE_OK);
}
void registerMethod(const ServiceDescribe::ptr &service)
{
return _service_manager->insert(service);
}
private:
void response(const BaseConnection::ptr &conn,
const RpcRequest::ptr &req,
const Json::Value &res, RCode rcode)
{
auto msg = MessageFactory::create<RpcResponse>();
msg->setRid(req->rid()); // id要和请求id相同,客户端会连续发多个请求,要根据返回消息的uid来确定这是那一个请求的回复
msg->setMType(qi_rpc::MType::RSP_RPC);
msg->setRCode(rcode); // 设置处理结果状态码
msg->setResult(res); // 设置处理结果
conn->send(msg);
}
private:
ServiceManager::ptr _service_manager;
};
}
}
11. 服务端-Registry&Discovery实现
- 对外提供服务操作(注册/发现)消息处理回调函数
- 内部进行服务发现者的管理
- 内部进行服务提供者的管理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>
namespace qi_rpc
{
namespace server
{
class ProviderManager
{
public:
using ptr = std::shared_ptr<ProviderManager>;
struct Provider
{
using ptr = std::shared_ptr<Provider>;
std::mutex _mutex;
BaseConnection::ptr _conn;
Address _host;
// 一个主机可以对应多个服务,一个连接注册过的所有服务
std::vector<std::string> _methods;
Provider(const BaseConnection::ptr &c, const Address &host) : _conn(c), _host(host) {}
void appendMethod(const std::string &method)
{
std::unique_lock<std::mutex> lock(_mutex);
_methods.emplace_back(method);
}
};
// 当一个新的服务提供者进行服务注册的时候调用
void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method)
{
Provider::ptr provider;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(c);
if (it != _conns.end())
{
provider = it->second;
}
else
{
provider = std::make_shared<Provider>(c, h);
_conns.insert(std::make_pair(c, provider));
}
// 引用是重点,[]找到了则返回,没找到则创建一个新的返回
auto &providers = _providers[method];
providers.insert(provider);
}
// 向服务对象中新增一个所能提供的服务名称
provider->appendMethod(method);
}
// 当一个服务提供者断开连接的时候,获取他的信息--用于进行服务下线通知
Provider::ptr getProvider(const BaseConnection::ptr &c)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(c);
if (it != _conns.end())
{
return it->second;
}
return Provider::ptr();
}
// 当一个服务提供者断开连接的时候,删除它的关联信息
void delProvider(const BaseConnection::ptr &c)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(c);
if (it == _conns.end())
{
// 当前断开连接的不是一个服务提供者
return;
}
// 如果是提供者,看看提供了什么服务,从服务者提供信息中删除当前服务提供者
for (auto &method : it->second->_methods) // for(string &method:vector<string>)
{
// 从map中找到管理provider的set
auto &providers = _providers[method];
// 删除set中的指定元素
providers.erase(it->second); // 这里的second是provider::ptr类型
}
// 删除连接与服务提供者的关联关系
_conns.erase(it);
}
// 通过方法名称,获取可以提供该方法的所有主机地址
std::vector<Address> methodHosts(const std::string &method)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _providers.find(method);
if (it == _providers.end())
{
return std::vector<Address>();
}
std::vector<Address> result;
for (auto &provider : it->second)
{
result.push_back(provider->_host);
}
return result;
}
private:
std::mutex _mutex;
// 提供者要知道某一个任务有哪些主机可以提供,所以一个methond对应多个Provider
std::unordered_map<std::string, std::set<Provider::ptr>> _providers;
// 服务提供者的连接和服务提供者描述的映射关系
// 一个连接对应一个provider,里面包含该主机已经注册过的所有服务
std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;
};
class DiscovererManager
{
public:
using ptr = std::shared_ptr<DiscovererManager>;
struct Discoverer
{
using ptr = std::shared_ptr<Discoverer>;
std::mutex _mutex;
BaseConnection::ptr conn; // 发现者关联的客户端连接
std::vector<std::string> methods; // 发现过的所有服务名称
Discoverer(const BaseConnection::ptr &c) : conn(c) {}
void appendMethod(const std::string &method)
{
std::unique_lock<std::mutex> lock(_mutex);
methods.push_back(method);
}
};
// 当每次客户端进行服务发现的时候新增发现者,新增服务名称
Discoverer::ptr addDiscoverer(const BaseConnection::ptr &c, const std::string &method)
{
Discoverer::ptr discover;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(c);
if (it != _conns.end())
{
discover = it->second;
}
else
{
discover = std::make_shared<Discoverer>(c);
_conns.insert(std::make_pair(c, discover));
}
auto &discovers = _discoverers[method];
discovers.insert(discover);
}
// 通过连接可以找到一个discover,里面包含了所有发现过的服务
discover->appendMethod(method);
return discover;
}
// 发现者客户端断开连接时,找到发现者信息,删除关联数据
void delDiscoverer(const BaseConnection::ptr &c)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(c);
if (it == _conns.end())
{
// 没有找到连接对应的发现者信息,代表客户端不是一个服务发现者
return;
}
for (auto &method : it->second->methods)
{
auto discovers = _discoverers[method];
// 可能会出现,不同服务method对应同一个discover,同时discover里的method包含着这个discover发现过的所有服务
discovers.erase(it->second);
}
_conns.erase(it);
}
// 当有一个新的服务提供者上线,则进行上线通知
void onlineNotify(const std::string &method, const Address &host)
{
return notify(method, host, ServiceOptype::SERVICE_ONLINE);
}
// 当有一个服务提供者断开连接,则进行下线通知
void offlineNotify(const std::string &method, const Address &host)
{
return notify(method, host, ServiceOptype::SERVICE_OFFLINE);
}
private:
void notify(const std::string &method, const Address &host, ServiceOptype optype)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _discoverers.find(method);
if (it == _discoverers.end())
{
// 这代表这个服务当前没有发现者
return;
}
auto msg_req = MessageFactory::create<ServiceRequest>();
msg_req->setRid(UUID::uuid());
msg_req->setMType(MType::REQ_SERVICE);
msg_req->setMethod(method);
msg_req->setHost(host);
msg_req->setOptype(optype);
//给所有发现过这个服务的连接发送消息
for (auto &discoverer : it->second)
{
discoverer->conn->send(msg_req);
}
}
private:
std::mutex _mutex;
// 方法名称和发现该方法的discoverer描述
std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers;
// 一个连接和与之对应的发现描述,描述里有该连接发现的所有服务
std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;
};
class PDManager
{
public:
using ptr = std::shared_ptr<PDManager>;
PDManager() : _providers(std::make_shared<ProviderManager>()),
_discoverers(std::make_shared<DiscovererManager>())
{}
//感觉这个函数也是要bind到dispatcher里,暂且是猜的,以后在验证
void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
{
//服务操作请求:服务注册/服务发现
ServiceOptype optype = msg->optype();
if(optype==ServiceOptype::SERVICE_REGISTRY){
//服务注册
//1. 新增服务提供者; 2. 进行服务上线通知
ILOG("%s:%d 注册服务 %s", msg->host().first.c_str(), msg->host().second, msg->method().c_str());
_providers->addProvider(conn,msg->host(),msg->method());
_discoverers->onlineNotify(msg->method(),msg->host());
return registryResponse(conn,msg);
}else if(optype==ServiceOptype::SERVICE_DISCOVERY){
//服务发现
// 1. 新增服务发现者;
ILOG("客户端要进行 %s 服务发现!", msg->method().c_str());
_discoverers->addDiscoverer(conn,msg->method());
return discoveryResponse(conn,msg);
}else{
ELOG("收到服务操作请求,但是操作类型错误!");
return errorResponse(conn,msg);
}
}
//连接断开的回调函数
void onConnShutdown(const BaseConnection::ptr &conn)
{
auto provider = _providers->getProvider(conn);
//应该是!=不是== -----------------------------
if(provider.get()!=nullptr){
ILOG("%s:%d 服务下线", provider->_host.first.c_str(), provider->_host.second);
for(auto &method:provider->_methods){
_discoverers->offlineNotify(method,provider->_host);
}
_providers->delProvider(conn);
}
_discoverers->delDiscoverer(conn);
}
private:
void errorResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
{
auto msg_rsp = MessageFactory::create<ServiceResponse>();
msg_rsp->setRid(msg->rid());
msg_rsp->setMType(MType::RSP_SERVICE);
msg_rsp->setRCode(RCode::RCODE_INVALID_OPTYPE);
msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);
conn->send(msg_rsp);
}
void registryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
{
auto msg_rsp = MessageFactory::create<ServiceResponse>();
msg_rsp->setRid(msg->rid());
msg_rsp->setMType(MType::RSP_SERVICE);
msg_rsp->setRCode(RCode::RCODE_OK);
msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);
conn->send(msg_rsp);
}
void discoveryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
{
auto msg_rsp = MessageFactory::create<ServiceResponse>();
msg_rsp->setRid(msg->rid());
msg_rsp->setMType(MType::RSP_SERVICE);
msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);
std::vector<Address> hosts = _providers->methodHosts(msg->method());
if (hosts.empty()) {
msg_rsp->setRCode(RCode::RCODE_NOT_FOUND_SERVICE);
return conn->send(msg_rsp);
}
msg_rsp->setRCode(RCode::RCODE_OK);
msg_rsp->setMethod(msg->method());
msg_rsp->setHost(hosts);
return conn->send(msg_rsp);
}
private:
ProviderManager::ptr _providers;
DiscovererManager::ptr _discoverers;
};
}
}
12. 服务端-整合封装Server
#pragma once
#include "rpc_registry.hpp"
#include "rpc_router.hpp"
#include "rpc_topic.hpp"
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
namespace qi_rpc{
namespace server{
//注册中心服务端:只需要针对服务注册与发现请求进行处理即可
class RegistryServer
{
public:
using ptr = std::shared_ptr<RegistryServer>;
RegistryServer(int port)
:_pd_manager(std::make_shared<PDManager>())
,_dispatcher(std::make_shared<Dispatcher>())
{
auto service_cb = std::bind(&PDManager::onServiceRequest,_pd_manager.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,service_cb);
_server = ServerFactory::create(port);
auto message_cb = std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);
_server->setMessageCallback(message_cb);
auto close_cb = std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);
_server->setCloseCallback(close_cb);
}
void start(){
_server->start();
}
private:
void onConnShutdown(const BaseConnection::ptr &conn){
_pd_manager->onConnShutdown(conn);
}
private:
PDManager::ptr _pd_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
class RpcServer
{
public:
using ptr = std::shared_ptr<RpcServer>;
//rpc——server端有两套地址信息:
// 1. rpc服务提供端地址信息--必须是rpc服务器对外访问地址(云服务器---监听地址和访问地址不同)
// 2. 注册中心服务端地址信息 -- 启用服务注册后,连接注册中心进行服务注册用的
RpcServer(const Address &access_address,bool enableRegistry=false,const Address ®istry_server_address=Address())
:_access_addr(access_address)
,_enableRegistry(enableRegistry)
,_router(std::make_shared<RpcRouter>())
,_dispatcher(std::make_shared<Dispatcher>())
{
if(_enableRegistry==true){
_reg_client = std::make_shared<client::RegistryClient>(registry_server_address.first,registry_server_address.second);
}
//当前成员server是一个rpcserver,用于提供rpc服务的
auto rpc_cb = std::bind(&RpcRouter::onRpcRequest,_router.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC,rpc_cb);
_server = ServerFactory::create(access_address.second);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_server->setMessageCallback(message_cb);
}
void registerMethod(const ServiceDescribe::ptr &service) {
if (_enableRegistry) {
_reg_client->registryMethod(service->methodname(), _access_addr);
}
_router->registerMethod(service);
}
void start()
{
_server->start();
}
private:
bool _enableRegistry;
Address _access_addr;
client::RegistryClient::ptr _reg_client;
RpcRouter::ptr _router;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
class TopicServer
{
public:
using ptr = std::shared_ptr<TopicServer>;
TopicServer(int port)
:_topic_manager(std::make_shared<server::TopicManager>())
,_dispatcher(std::make_shared<Dispatcher>())
{
auto topic_cb = std::bind(&TopicManager::onTopicRequest,_topic_manager.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC,topic_cb);
_server = qi_rpc::ServerFactory::create(port);
auto message_cb = std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);
_server->setMessageCallback(message_cb);
auto close_cb = std::bind(&TopicServer::onConnShutdown,this,std::placeholders::_1);
_server->setCloseCallback(close_cb);
}
void start(){
_server->start();
}
private:
void onConnShutdown(const BaseConnection::ptr &conn) {
_topic_manager->onShutdown(conn);
}
private:
server::TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
}
}
13. 客户端-Requestor实现
- 提供发送请求的接口
- 内部进行请求&响应的管理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
#include <functional>
namespace qi_rpc
{
namespace client
{
class Requestor
{
public:
using ptr = std::shared_ptr<Requestor>;
using RequestCallback = std::function<void(BaseMessage::ptr &)>;
using AsyncResponse = std::future<BaseMessage::ptr>;
struct RequestDescribe
{
using ptr = std::shared_ptr<RequestDescribe>;
BaseMessage::ptr request;
RType rtype; // 请求类型,异步还是回调
std::promise<BaseMessage::ptr> response;
RequestCallback callback;
};
// 把这个函数设置到dispatcher中去,服务器处理完请求,发送response回来
// dispatcher根据消息类型,找到该请求描述,根据异步还是回调进行处理
void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg)
{
std::string rid = msg->rid();
RequestDescribe::ptr rdp = getDescribe(rid);
if (rdp->rtype == RType::REQ_ASYNC)
{
rdp->response.set_value(msg);
}
else if (rdp->rtype == RType::REQ_CALLBACK)
{
if (rdp->callback)
rdp->callback(msg);
}
else
{
ELOG("请求类型未知!!");
}
// 处理完毕,删除对应描述
delDescribe(rid);
}
//异步
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
{
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
if (rdp.get() == nullptr)
{
ELOG("构造请求描述对象失败!");
return false;
}
conn->send(req);
async_rsp = rdp->response.get_future();
return true;
}
//同步
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
{
AsyncResponse rsp_future;
bool ret = send(conn, req, rsp_future);
if (ret == false)
{
return false;
}
rsp = rsp_future.get();
return true;
}
//回调
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
{
DLOG("进入了requestor send函数");
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
if (rdp.get() == nullptr)
{
ELOG("构造请求描述对象失败!");
return false;
}
DLOG("requestor正在发送回调请求");
if(conn.get()==nullptr) DLOG("conn为空");
conn->send(req);
DLOG("requestor正在发送回调请求成功");
return true;
}
// 回忆RType类型
// enum class RType
// {
// REQ_ASYNC = 0,
// REQ_CALLBACK
// };
private:
// 创建一个新的请求request描述,并插入到哈希表中去,返回一个请求描述的shared_ptr
RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback())
{
std::unique_lock<std::mutex> lock(_mutex);
RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
rd->request = req;
rd->rtype = rtype;
if (rtype == RType::REQ_CALLBACK && cb)
{
rd->callback = cb;
}
_request_desc.insert(std::make_pair(req->rid(), rd));
return rd;
}
// 根据rid,获取对应请求request的描述信息
RequestDescribe::ptr getDescribe(const std::string &rid)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _request_desc.find(rid);
if (it == _request_desc.end())
{
return RequestDescribe::ptr();
}
return it->second;
}
void delDescribe(const std::string &rid)
{
std::unique_lock<std::mutex> lock(_mutex);
_request_desc.erase(rid);
}
private:
std::mutex _mutex;
// rid充当key,把多个请求存储起来,服务端处理完请求req之后就可以根据rid找到原请求,返回处理结果等信息
std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;
};
}
}
14. 客户端-RpcCaller实现
- 提供Rpc请求接口
#pragma once
#include "requestor.hpp"
namespace qi_rpc
{
namespace client
{
class RpcCaller
{
public:
using ptr = std::shared_ptr<RpcCaller>;
using JsonAsyncResponse = std::future<Json::Value>;
using JsonResponseCallback = std::function<void(const Json::Value &)>;
RpcCaller(const Requestor::ptr &requestor): _requestor(requestor){}
// 同步请求
bool call(const BaseConnection::ptr &conn,
const std::string &method,
const Json::Value ¶ms,
Json::Value &result)
{
DLOG("开始同步rpc调用...");
// 1.组织请求
RpcRequest::ptr req_msg = MessageFactory::create<RpcRequest>();
req_msg->setRid(UUID::uuid());
req_msg->setMethod(method);
req_msg->setParams(params);
req_msg->setMType(MType::REQ_RPC);
BaseMessage::ptr rsp_msg;
bool ret = _requestor->send(conn, req_msg, rsp_msg);
if (ret == false)
{
ELOG("同步Rpc请求失败!");
return false;
}
DLOG("收到响应,进行解析,获取结果!");
// 3. 等待响应
auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
if (!rpc_rsp_msg)
{
ELOG("rpc响应,向下类型转换失败!");
return false;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
{
ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return false;
}
result = rpc_rsp_msg->result();
DLOG("结果设置完毕!");
return true;
}
// 异步请求
bool call(const BaseConnection::ptr &conn,
const std::string &method,
const Json::Value ¶ms,
JsonAsyncResponse &result)
{
// 向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setRid(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
auto json_promise = std::make_shared<std::promise<Json::Value>>();
result = json_promise->get_future();
Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback,
this, json_promise, std::placeholders::_1);
bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);
if (ret == false)
{
ELOG("异步Rpc请求失败!");
return false;
}
return true;
}
// 回调请求
bool call(const BaseConnection::ptr &conn,
const std::string &method,
const Json::Value ¶ms,
const JsonResponseCallback &cb)
{
DLOG("开始回调rpc调用...");
// 1.组织请求
RpcRequest::ptr req_msg = MessageFactory::create<RpcRequest>();
req_msg->setRid(UUID::uuid());
req_msg->setMethod(method);
req_msg->setParams(params);
req_msg->setMType(MType::REQ_RPC);
Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);
DLOG("即将开始使用requestor发送回调请求");
if(_requestor.get()==nullptr) DLOG("requestor 智能指针为空");
DLOG("requestor 智能指针不为空");
if(conn.get()==nullptr) DLOG("conn为空");
bool ret = _requestor->send(conn, req_msg, req_cb);
DLOG("使用requestor发送回调请求");
if (ret == false)
{
ELOG("回调Rpc请求失败!");
return false;
}
DLOG("使用requestor发送回调请求成功");
return true;
}
private:
void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg)
{
auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if (!rpc_rsp_msg)
{
ELOG("rpc响应,向下类型转换失败!");
return;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
{
ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
// 将结果传入回调函数进行处理
cb(rpc_rsp_msg->result());
}
void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg)
{
auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if (!rpc_rsp_msg)
{
ELOG("rpc响应,向下类型转换失败!");
return;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
{
ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
result->set_value(rpc_rsp_msg->result());
}
private:
Requestor::ptr _requestor;
};
}
}
15. 客户端-Publish&Subscribe实现
- 提供消息发布接口
- 提供主题操作接口
- 内部进行主题及订阅者的管理
#pragma once
#include "requestor.hpp"
#include <unordered_set>
namespace qi_rpc{
namespace client{
class TopicManager
{
public:
using ptr=std::shared_ptr<TopicManager>;
//订阅的主题有消息发来该如何处理的回调函数 //主题名称 主题消息
using SubCallback = std::function<void(const std::string &key,const std::string &msg)>;
TopicManager(const Requestor::ptr &requestor):_requestor(requestor)
{}
bool create(const BaseConnection::ptr &conn,const std::string &key){
return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);
}
bool remove(const BaseConnection::ptr &conn,const std::string &key){
return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);
}
bool subscribe(const BaseConnection::ptr &conn,const std::string &key,const SubCallback &cb){
addSubcribe(key,cb);
bool ret = commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
if(ret==false){
delSubcribe(key);
return false;
}
return true;
}
bool cancel(const BaseConnection::ptr &conn,const std::string &key){
return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);
}
bool publish(const BaseConnection::ptr &conn,const std::string &key,const std::string &msg){
return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);
}
//bind到dispatcher里,收到了订阅主题的消息应该如何处理
void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){
//1. 从消息中取出操作类型进行判断,是否是消息请求
auto msg_type=msg->optype();
if(msg_type!=TopicOptype::TOPIC_PUBLISH){
ELOG("收到了错误类型的主题操作!");
return;
}
//2. 取出消息主题名称,以及消息内容
auto topic_key=msg->topicKey();
auto topic_msg=msg->topicMsg();
//3. 通过主题名称,查找对应主题的回调处理函数,有在处理,无在报错
auto callback=getSubcribe(topic_key);
if(!callback){
ELOG("收到了 %s 主题消息,但是该消息无主题处理回调!", topic_key.c_str());
return;
}
callback(topic_key,topic_msg);
}
private:
void addSubcribe(const std::string &key,const SubCallback& cb){
std::unique_lock<std::mutex> lock(_mutex);
_topic_callbacks.insert(std::make_pair(key,cb));
}
void delSubcribe(const std::string &key){
std::unique_lock<std::mutex> lock(_mutex);
_topic_callbacks.erase(key);
}
const SubCallback getSubcribe(const std::string &key){
std::unique_lock<std::mutex> lock(_mutex);
auto it = _topic_callbacks.find(key);
if(it==_topic_callbacks.end()){
return SubCallback();
}
return it->second;
}
bool commonRequest(const BaseConnection::ptr &conn,const std::string &key,TopicOptype type,const std::string &msg=""){
//1. 构造请求对象,并填充数据
auto msg_req = MessageFactory::create<TopicRequest>();
msg_req->setRid(UUID::uuid());
msg_req->setMType(MType::REQ_TOPIC);
msg_req->setOptype(type);
msg_req->setTopicKey(key);
if(type==TopicOptype::TOPIC_PUBLISH){
msg_req->setTopicMsg(msg);
}
//2. 向服务端发送请求,等待响应
BaseMessage::ptr msg_rsp;
bool ret = _requestor->send(conn,msg_req,msg_rsp);
if(ret==false){
ELOG("主题操作请求失败!");
return false;
}
auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rsp);
if(topic_rsp_msg==nullptr){
ELOG("主题操作响应,向下类型转换失败!");
return false;
}
if(topic_rsp_msg->rcode()!=RCode::RCODE_OK){
ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()));
return false;
}
return true;
}
private:
std::mutex _mutex;
Requestor::ptr _requestor;
std::unordered_map<std::string,SubCallback> _topic_callbacks;
};
}
}
16. 客户端-Registry&Discovery实现
- 提供服务发现接口
- 提供服务注册接口
- 提供服务操作(上线/下线)通知处理回调函数
- 内部进行发现的服务与主机信息管理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include "requestor.hpp"
namespace qi_rpc
{
namespace client
{
// 就提供一个注册接口,客户端提供者能注册自己的方法就行了啥也不需要保存
class Provider
{
public:
using ptr = std::shared_ptr<Provider>;
Provider(const Requestor::ptr &requestor) : _requestor(requestor) {}
bool registryMethod(const BaseConnection::ptr &conn, const std::string &method, const Address &host)
{
auto msg_req = MessageFactory::create<ServiceRequest>();
msg_req->setRid(UUID::uuid());
msg_req->setHost(host);
msg_req->setMethod(method);
msg_req->setMType(MType::REQ_SERVICE);
msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);
BaseMessage::ptr msg_rsp;
// 同步方式发送,处理之后的回复保存在msg_rsp中
bool ret = _requestor->send(conn, msg_req, msg_rsp);
if (ret == false)
{
ELOG("%s 服务注册失败!", method.c_str());
return false;
}
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
if (service_rsp.get() == nullptr)
{
ELOG("响应类型向下转换失败!");
return false;
}
if (service_rsp->rcode() != RCode::RCODE_OK)
{
ELOG("服务注册失败,原因:%s", errReason(service_rsp->rcode()).c_str());
return false;
}
return true;
}
private:
Requestor::ptr _requestor;
};
// 类是大驼峰,函数名是小驼峰,变量是蛇形,宏定义之类是全大写 + _
// 客户端要保存某一个服务哪几个主机能提供服务,MethodHost类就是用来保存某一个服务的所有主机的
class MethodHost
{
public:
using ptr = std::shared_ptr<MethodHost>;
//------------------------------------------------------------------------------------
MethodHost(const std::vector<Address> &host=std::vector<Address>()) : _hosts(host.begin(), host.end()), _id(0) {}
// 中途收到了服务上线请求后被调用
void appendHost(const Address &host)
{
std::unique_lock<std::mutex> lock(_mutex);
_hosts.push_back(host);
}
// 中途收到了服务下线请求后被调用
void removeHost(const Address &host)
{
std::unique_lock<std::mutex> lock(_mutex);
for (auto it = _hosts.begin(); it != _hosts.end(); it++)
{
if (*it == host)
{
_hosts.erase(it);
break;
}
}
}
Address chooseHost()
{
std::unique_lock<std::mutex> lock(_mutex);
size_t pos = _id++ % _hosts.size();
return _hosts[pos];
}
bool empty()
{
std::unique_lock<std::mutex> lock(_mutex);
return _hosts.empty();
}
private:
std::mutex _mutex;
size_t _id;
std::vector<Address> _hosts;
};
// 客户端需要去发现哪些主机可以提供服务
class Discoverer
{
public:
using ptr = std::shared_ptr<Discoverer>;
using OfflineCallback = std::function<void(const Address &)>;
Discoverer(const Requestor::ptr &requestor, const OfflineCallback &cb) : _requestor(requestor), _offline_callback(cb) {}
// 客户端需要去发现哪些主机可以提供服务,serviceDiscovery就是干这个事的
// host是个输出型参数
bool serviceDiscovery(const BaseConnection::ptr &conn, const std::string &method, Address &host)
{
// 当前所保管的提供者信息存在,则直接返回地址
auto it = _method_hosts.find(method);
if (it != _method_hosts.end())
{
if (it->second->empty() == false)
{
host = it->second->chooseHost();
return true;
}
}
// 当前服务的提供者为空
auto msg_req = MessageFactory::create<ServiceRequest>();
msg_req->setRid(UUID::uuid());
// msg_req->setHost() 服务发现不需要这个字段
msg_req->setMethod(method);
msg_req->setMType(MType::REQ_SERVICE);
msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);
BaseMessage::ptr msg_rsp;
bool ret = _requestor->send(conn, msg_req, msg_rsp);
if (ret == false)
{
ELOG("服务发现失败!");
return false;
}
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
if (service_rsp.get() == nullptr)
{
ELOG("服务发现失败!响应类型转换失败!");
return false;
}
if (service_rsp->rcode() != RCode::RCODE_OK)
{
ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());
return false;
}
// 服务器里有个哈希表里保存每个method对应的所有主机信息,以ServiceResponse的形式返回
// 能走到这里,代表当前是没有对应的服务提供主机的
std::unique_lock<std::mutex> lock(_mutex);
// service_rsp->hosts()类型是一个vector,里面有多个host
auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());
if (method_host->empty())
{
ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());
return false;
}
host = method_host->chooseHost();
_method_hosts[method] = method_host;
return true;
}
// 这个接口是提供给Dispatcher模块进行服务上线下线请求处理的回调函数
//服务上线下线的主机信息等等都要管理起来
void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
{
// 1. 判断是上线还是下线请求,如果都不是那就不用处理了
auto optype = msg->optype();
std::string method = msg->method();
std::unique_lock<std::mutex> lock(_mutex);
if (optype == ServiceOptype::SERVICE_ONLINE)
{
// 2. 上线请求:找到MethodHost,向其中新增一个主机地址
auto it = _method_hosts.find(method);
if (it == _method_hosts.end())
{
auto method_host = std::make_shared<MethodHost>();
method_host->appendHost(msg->host());
_method_hosts[method] = method_host;
}
else
{
it->second->appendHost(msg->host());
}
}
else if (optype == ServiceOptype::SERVICE_OFFLINE)
{
// 3. 下线请求:找到MethodHost,从其中删除一个主机地址
auto it = _method_hosts.find(method);
if (it == _method_hosts.end())
{
return;
}
it->second->removeHost(msg->host());
_offline_callback(msg->host());
}
}
private:
OfflineCallback _offline_callback;
std::mutex _mutex;
//一个服务哪些主机可以提供
std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
Requestor::ptr _requestor;
};
}
}
17. 客户端-整合封装Client
#pragma once
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"
namespace qi_rpc{
namespace client{
class RegistryClient
{
public:
using ptr = std::shared_ptr<RegistryClient>;
RegistryClient(const std::string &ip,int port)
:_requestor(std::make_shared<Requestor>())
,_provider(std::make_shared<Provider>(_requestor))
,_dispatcher(std::make_shared<Dispatcher>())
{
auto msg_rsp = std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,msg_rsp);
auto message_cb = std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);
_client = ClientFactory::create(ip,port);
_client->setMessageCallback(message_cb);
_client->connect();
}
bool registryMethod(const std::string &method,const Address &host){
return _provider->registryMethod(_client->connection(),method,host);
}
private:
Requestor::ptr _requestor;
Provider::ptr _provider;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
class DiscoveryClient
{
public:
using ptr = std::shared_ptr<DiscoveryClient>;
DiscoveryClient(const std::string &ip,int port,const Discoverer::OfflineCallback &cb)
:_requestor(std::make_shared<Requestor>())
,_discoverer(std::make_shared<Discoverer>(_requestor,cb))
,_dispatcher(std::make_shared<Dispatcher>())
{
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);
auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_client = ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
bool serviceDiscovery(const std::string &method, Address &host) {
//host输出型参数,通过一定的调度策略,每次返回一个host提供服务
return _discoverer->serviceDiscovery(_client->connection(), method, host);
}
private:
Requestor::ptr _requestor;
client::Discoverer::ptr _discoverer;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
class RpcClient
{
public:
using ptr = std::shared_ptr<RpcClient>;
RpcClient(bool enableDiscovery,const std::string &ip,int port)
:_enableDiscovery(enableDiscovery)
,_requestor(std::make_shared<Requestor>())
,_dispatcher(std::make_shared<Dispatcher>())
,_caller(std::make_shared<RpcCaller>(_requestor))
{
//针对rpc请求后的响应进行的回调处理
auto rsp_cb = std::bind(&Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);
//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client
//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_client
if(_enableDiscovery==true){
auto offline_cb = std::bind(&RpcClient::delClient,this,std::placeholders::_1);
//开启的话host是服务注册中心的host
_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);
}else {
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
//没开启的话,那ip,port就是服务提供者的ip、port
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
if(_rpc_client->connection().get()==nullptr) DLOG("_rpc_client->connection().get()=nullptr");
}
}
bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result){
//获取服务提供者:1. 服务发现; 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if(client.get()==nullptr){
return false;
}
//3. 通过客户端连接,发送rpc请求
DLOG("开始进行同步调用")
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result){
//获取服务提供者:1. 服务发现; 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if(client.get()==nullptr){
return false;
}
//3. 通过客户端连接,发送rpc请求
DLOG("开始进行异步调用")
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb){
//获取服务提供者:1. 服务发现; 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if(client.get()==nullptr){
return false;
}
//3. 通过客户端连接,发送rpc请求
DLOG("开始进行回调调用")
if(client->connection().get()==nullptr) DLOG("client->connection().get()=nullptr")
return _caller->call(client->connection(), method, params, cb);
}
private:
BaseClient::ptr newClient(const Address &host){
auto message_cb = std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);
auto client = ClientFactory::create(host.first,host.second);
client->setMessageCallback(message_cb);
client->connect();
putClient(host,client);
return client;
}
BaseClient::ptr getClient(const Address &host){
std::unique_lock<std::mutex> lock(_mutex);
auto it = _rpc_clients.find(host);
if (it == _rpc_clients.end()) {
return BaseClient::ptr();
}
return it->second;
}
BaseClient::ptr getClient(const std::string &method){
BaseClient::ptr client;
if (_enableDiscovery) {
//通过服务发现获取连接特定主机的client
Address host;
//host输出型参数
bool ret = _discovery_client->serviceDiscovery(method,host);
if(ret==false){
ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());
return BaseClient::ptr();
}
//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建
client=getClient(host);
if(client.get()==nullptr){
//没有找打已实例化的客户端,则创建,新创建好的将会放到哈希表里保存
client = newClient(host);
}
}else{
client=_rpc_client;
DLOG("client,是rpc_client");
}
return client;
}
void putClient(const Address &host,BaseClient::ptr &client){
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.insert({host,client});
}
void delClient(const Address &host) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.erase(host);
}
struct AddressHash {
size_t operator()(const Address &host) const{
std::string addr = host.first + std::to_string(host.second);
return std::hash<std::string>{}(addr);
}
};
private:
bool _enableDiscovery;
DiscoveryClient::ptr _discovery_client;
Requestor::ptr _requestor;
RpcCaller::ptr _caller;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;//用于未启用服务发现
std::mutex _mutex;
//<"127.0.0.1:8080", client1>
std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池
};
class TopicClient
{
public:
using ptr = std::shared_ptr<TopicClient>;
TopicClient(const std::string &ip,int port)
:_requestor(std::make_shared<Requestor>())
,_dispatcher(std::make_shared<Dispatcher>())
,_topic_manager(std::make_shared<TopicManager>(_requestor))
{
auto rsp_cb = std::bind(&Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC,rsp_cb);
auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
bool create(const std::string &key){
return _topic_manager->create(_rpc_client->connection(),key);
}
bool remove(const std::string &key){
return _topic_manager->remove(_rpc_client->connection(),key);
}
bool subscribe(const std::string &key,const TopicManager::SubCallback &cb){
return _topic_manager->subscribe(_rpc_client->connection(),key,cb);
}
bool cancel(const std::string &key){
return _topic_manager->cancel(_rpc_client->connection(),key);
}
bool publish(const std::string &key,const std::string &msg){
return _topic_manager->publish(_rpc_client->connection(),key,msg);
}
void shutdown(){
_rpc_client->shutdown();
}
private:
Requestor::ptr _requestor;
Dispatcher::ptr _dispatcher;
client::TopicManager::ptr _topic_manager;
BaseClient::ptr _rpc_client;//用于未启用服务发现
};
}
}