基于Tars高并发IM系统的设计与实现-实战篇5
群聊服务 GroupChatServer
群聊服务既可以接受来自BrokerServer的用户请求,也需要接收来自其他服务的RPC请求;所以本服务提供两套RPC接口:通用RPC接口和专用RPC接口。
通用RPC接口
通用RPC接口主要处理如下请求:
- 创建群聊
- 群聊加成员
- 群聊减成员
- 修改群资料
- 发群消息
- 换群主
- 解散群聊
- 同步用户群聊
- 获取群成员
- 解散群聊
- 判断一个人是否为群成员
针对以上每个业务,根据用户请求的类型进行不同的业务逻辑处理,处理代码如下:
switch(req.header.type){
case otim::PT_MSG_GROUP_CHAT:
this->sendMsg(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_SYNC:
this->syncGroup(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_CREATE:
this->createGroup(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_JION:
this->joinGroup(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_QUIT:
this->quitGroup(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_DISMISS:
this->dismissGroup(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_UPDATE_CREATOR:
this->updateGroupCreator(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_INFO_UPDATE:
this->updateGroupInfo(clientContext, req, resp);
break;
case otim::PT_GROUPCHAT_MEMBERS_GET:
this->getGroupMember(clientContext, req, resp);
break;
default:
MLOG_DEBUG("the type is invalid:"<<otim::etos((otim::PACK_TYPE)req.header.type));
return otim::EC_PROTOCOL;
}
群聊相关请求实现方法:
int syncGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int createGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int dismissGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int updateGroupCreator(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int updateGroupInfo(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int joinGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int quitGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
int getGroupMember(const otim::ClientContext & clientContext, const otim::OTIMPack & req, otim::OTIMPack & resp);
专用RPC接口
主要提供两个接口:
interface GroupChatRPCServant
{
int getGroupMember(string groupId, out vector<string> memberIds);
bool isGroupMember(string groupId, string memberId);
};
历史消息服务 HistoryMsgServer
该服务主要处理用户历史消息即相关的业务:
- 热会话同步
- 历史消息存取
- 高优先级消息存取
该服务提供通用RPC服务,主要服务对象为接入服务BrokerServer;
用户所有消息都通过该服务进行存取;为高效存取,历史消息主要存储在redis,存储量及时长可以根据需求进一步来做配置开发。
业务逻辑处理接口
该服务采用通用接口来处理客户端请求;
tars::Int32 processHotsessionReq(const otim::ClientContext & clientContext,const otim::OTIMPack & req,otim::OTIMPack &resp);
tars::Int32 processPullHistoryReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
tars::Int32 processHighPriorMsgSyncReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
冷存储服务器 OlapServer
该服务主要将IM数据存储到mysql中永久保存;专用RPC服务;
业务逻辑处理接口
interface OlapServant
{
int saveMsg(otim::ClientContext clientContext, OTIMPack pack, string sessionId, long seqId);
};
消息操作服务 MsgOperatorServer
该服务主要有如下功能逻辑处理:
- 消息控制请求(包含撤回,删除,覆写)
- 消息已读处理
业务逻辑处理接口
tars::Int32 processMsgUnreadReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
tars::Int32 processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
消息撤回,删除,覆写逻辑处理
tars::Int32 MsgOperatorServantImp::processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack)
{
SCOPELOGGER(scopelogger);
scopelogger<<"reqPack:"<<reqPack.header.writeToJsonString();
otim::MsgControl req;
otim::unpackTars<otim::MsgControl>(reqPack.payload, req);
MLOG_DEBUG("clientContext:"<<clientContext.writeToJsonString()<<" req:"<<req.writeToJsonString());
respPack = reqPack;
respPack.header.flags |= otim::PF_ISACK;
otim::CommonErrorCode respData;
respData.code = otim::EC_SUCCESS;
if (req.sessionId.empty() || req.seqId == 0 || req.packId.empty()){
respData.code = otim::EC_PARAM;
otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
MLOG_DEBUG("sessionId,packId or seqId is is empty req:"<<req.writeToJsonString());
return respData.code;
}
otim::RedisConnPtr redis(otim::RedisPool::instance());
//get old msg
std::vector<std::string> msgs;
std::vector<std::string> scores;
EMRStatus ret = redis->ZRangeByScoreAndLimit(otim::RKEY_MSG + req.sessionId, req.seqId, 5, msgs);
if (EMRStatus::EM_KVDB_ERROR == ret){
MLOG_ERROR("get msg fail!, sessionId:" << req.sessionId<<" msgId:"<<req.seqId);
respData.code = otim::EC_DB_ERROR;
otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
return otim::EC_DB_ERROR;
}
MLOG_DEBUG("get old msg size:"<<msgs.size());
otim::OTIMPack packOrg;
for (auto item : msgs){
std::vector<char> vctItem(item.begin(), item.end());
otim::OTIMPack packItem;
otim::unpackTars<otim::OTIMPack>(vctItem, packItem);
MLOG_DEBUG("msgs :"<<packItem.header.writeToJsonString());
if (packItem.header.packId == req.packId){
packOrg = packItem;
}
}
if (packOrg.header.packId.empty())
{
MLOG_WARN("The org msg is not exist:"<<req.sessionId<<" packId:"<<req.packId <<" seqId:"<<req.seqId);
respData.code = otim::EC_MSG_NOT_EXIST;
otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
return respData.code;
}
MLOG_DEBUG("org msg:"<<packOrg.header.writeToJsonString());
std::string to;
if (req.command == otim::MC_REVOKE){
packOrg.header.flags |= otim::PF_REVOKE;
MLOG_DEBUG("revoke msg:"<<req.packId);
}
else if (req.command == otim::MC_OVERRIDE){
packOrg.header.flags |= otim::PF_OVERRIDE;
otim::MsgReq msgReq;
otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);
msgReq.content = req.content;
otim::packTars<otim::MsgReq>(msgReq, packOrg.payload);
to = msgReq.to;
MLOG_DEBUG("override msg:"<<req.packId);
}
else if (req.command == otim::MC_DELETE){
// packOrg.header.flags |= otim::PF_REVOKE;
MLOG_DEBUG("delete msg:"<<req.packId);
}
else{
MLOG_WARN("The command is error:"<<req.command<<" packId:"<<req.packId);
respData.code = otim::EC_MSG_OP_CMD;
otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
return otim::EC_MSG_OP_CMD;
}
ret = redis->ZSetRemoveByScore(otim::RKEY_MSG + req.sessionId, req.seqId, req.seqId);
if (EMRStatus::EM_KVDB_SUCCESS != ret ){
MLOG_ERROR("delete original msg fail:"<<(int)ret);
}
//增加新的消息
if (req.command != otim::MC_DELETE){
std::string msgSave;
otim::packTars<otim::OTIMPack>(packOrg, msgSave);
ret = redis->ZSetAdd(otim::RKEY_MSG + req.sessionId, req.seqId, msgSave);
if ( EMRStatus::EM_KVDB_SUCCESS != ret ){
MLOG_ERROR("save cancel msg fail!");
}
}
//通知在线接收者其他端
otim::sendPackToMySelf(clientContext, reqPack);
// send to user
std::vector<std::string> vctUserId;
if (packOrg.header.type == otim::PT_MSG_SINGLE_CHAT || packOrg.header.type == otim::PT_MSG_BIZ_NOTIFY){
if (to.empty()){
otim::MsgReq msgReq;
otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);
to = msgReq.to;
}
vctUserId.push_back(to);
MLOG_DEBUG("single or notify chat packId:"<<packOrg.header.packId<<" to:"<<to);
}
else if (packOrg.header.type == otim::PT_MSG_GROUP_CHAT){
//get groupMember
otim::GroupChatRPCServantPrx groupChatRPCServantPrx = otim::getServantPrx<otim::GroupChatRPCServantPrx>(PRXSTR_GROUP_CHAT_RPC);
groupChatRPCServantPrx->getGroupMember(req.sessionId, vctUserId);
MLOG_DEBUG("group chat packId:"<<packOrg.header.packId<<" to:"<<req.sessionId<<" member Size:"<<vctUserId.size());
}
int64_t seqId = otim::genSeqId();
for (auto userId : vctUserId){
otim::savePriorityMsg(redis.get(), reqPack, userId, seqId);
otim::dispatchMsg(clientContext, reqPack, userId);
}
respData.code = otim::EC_SUCCESS;
otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
return otim::EC_SUCCESS;
}
Http接口服务
该服务针对第三方提供消息能力,主要提供如下接口:
- 发送消息(简单文本消息,复杂消息)
- 添加好友
- 删除好友
- 查看好友
功能实现函数
std::string doSendSimpleMsgCmd(TC_HttpRequest &cRequest);
std::string doSendMsgCmd(TC_HttpRequest & cRequest);
std::string doAddFriend(TC_HttpRequest &request);
std::string doDelFriend(TC_HttpRequest &request);
std::string doGetFriends(TC_HttpRequest &request);
Push推送服务
该服务主要实现IM消息的离线推送能力, APP客户端不在线的场景下,将消息通过离线通道push用户的手机上,以提高消息的触达率。
主要实现iOS APNS,Android FCM,华为,小米,oppo,vivo等厂商的离线消息推送功能;
需要根据各个厂商开放平台提供的API进行开发集成。
Android 厂商开发平台地址:
- 华为:
https://developer.huawei.com/consumer/cn/hms/huawei-pushkit - 小米:
https://dev.mi.com/console/appservice/push.html - 魅族:
http://open-wiki.flyme.cn/doc-wiki/index - vivo:
https://push.vivo.com.cn/#/ - oppo:
https://push.oppo.com/
RPC接口
enum PushServiceType
{
PS_TYPE_NONE = 0, //无 Push服务提供商
PS_TYPE_IOS = 1, //IOS Push服务提供商
PS_YPE_HUAWEI = 2, //华为 Push服务提供商
PS_TYPE_XIAOMI = 3, //小米 Push服务提供商
PS_TYPE_MEIZU = 4, //魅族 Push服务提供商
PS_TYPE_VIVO = 5, //vivi服务
PS_TYPE_OPPO = 6, //oppo服务
PS_TYPE_FCM = 7, //FCM服务
};
struct RegInfo {
0 require string packId = ""; //消息的id
1 require PushServiceType serviceType = 0; //push服务提供商
2 require string packageName = ""; //包名
3 require string userId = ""; //用户id
4 optional string appVersion = ""; //app version
};
struct PushInfo {
0 require string packId = ""; //消息的id
1 require string userId = ""; //用户id
2 require int unReadCount = 0; //未读消息数
3 require string title = ""; /push标题
4 require string content = ""; //push内容
5 optional string uri = ""; //跳转uri
6 optional string extraData=""; //业务自定义字段
};
interface PushServant
{
int register(RegInfo regInfo);
int pushMessage(PushInfo pushInfo);
};
服务端部署
编译打包
所有服务开发完成后,执行如下命令进行编译,打包:
make release
make clean all
make tar
程序包部署
根据前期部署好的Tars框架环境、web管理系统,将程序包逐个发布,发布后的系统如图: