基于Tars高并发IM系统的设计与实现-实战篇5

news2025/1/21 22:01:37

基于Tars高并发IM系统的设计与实现-实战篇5

群聊服务 GroupChatServer

群聊服务既可以接受来自BrokerServer的用户请求,也需要接收来自其他服务的RPC请求;所以本服务提供两套RPC接口:通用RPC接口和专用RPC接口。

通用RPC接口

通用RPC接口主要处理如下请求:

  • 创建群聊
  • 群聊加成员
  • 群聊减成员
  • 修改群资料
  • 发群消息
  • 换群主
  • 解散群聊
  • 同步用户群聊
  • 获取群成员
  • 解散群聊
  • 判断一个人是否为群成员

针对以上每个业务,根据用户请求的类型进行不同的业务逻辑处理,处理代码如下:

  switch(req.header.type){
        case otim::PT_MSG_GROUP_CHAT:
            this->sendMsg(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_SYNC:
            this->syncGroup(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_CREATE:
            this->createGroup(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_JION:
            this->joinGroup(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_QUIT:
            this->quitGroup(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_DISMISS:
            this->dismissGroup(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_UPDATE_CREATOR:
            this->updateGroupCreator(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_INFO_UPDATE:
            this->updateGroupInfo(clientContext, req, resp);
            break;
        case otim::PT_GROUPCHAT_MEMBERS_GET:
            this->getGroupMember(clientContext, req, resp);
            break;
        default:
            MLOG_DEBUG("the type is  invalid:"<<otim::etos((otim::PACK_TYPE)req.header.type));
            return otim::EC_PROTOCOL;
    }

群聊相关请求实现方法:

   int syncGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int createGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int dismissGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int updateGroupCreator(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int updateGroupInfo(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int joinGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int quitGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
    int getGroupMember(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);

专用RPC接口

主要提供两个接口:

interface GroupChatRPCServant
{
    int getGroupMember(string groupId, out vector<string> memberIds);
    
    bool isGroupMember(string groupId, string memberId);

};

历史消息服务 HistoryMsgServer

该服务主要处理用户历史消息即相关的业务:

  • 热会话同步
  • 历史消息存取
  • 高优先级消息存取

该服务提供通用RPC服务,主要服务对象为接入服务BrokerServer;
用户所有消息都通过该服务进行存取;为高效存取,历史消息主要存储在redis,存储量及时长可以根据需求进一步来做配置开发。

业务逻辑处理接口

该服务采用通用接口来处理客户端请求;

   tars::Int32 processHotsessionReq(const otim::ClientContext & clientContext,const otim::OTIMPack & req,otim::OTIMPack &resp);
    tars::Int32 processPullHistoryReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
    tars::Int32 processHighPriorMsgSyncReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);

冷存储服务器 OlapServer

该服务主要将IM数据存储到mysql中永久保存;专用RPC服务;

业务逻辑处理接口
interface OlapServant
{
    int saveMsg(otim::ClientContext clientContext, OTIMPack pack, string sessionId, long seqId);
};

消息操作服务 MsgOperatorServer

该服务主要有如下功能逻辑处理:

  • 消息控制请求(包含撤回,删除,覆写)
  • 消息已读处理
业务逻辑处理接口
    tars::Int32 processMsgUnreadReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);
    tars::Int32 processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);

消息撤回,删除,覆写逻辑处理

tars::Int32 MsgOperatorServantImp::processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack)
{
    SCOPELOGGER(scopelogger);
    scopelogger<<"reqPack:"<<reqPack.header.writeToJsonString();
  
    otim::MsgControl req;
    otim::unpackTars<otim::MsgControl>(reqPack.payload, req);
    MLOG_DEBUG("clientContext:"<<clientContext.writeToJsonString()<<" req:"<<req.writeToJsonString());

    respPack = reqPack;
    respPack.header.flags |= otim::PF_ISACK;


    otim::CommonErrorCode respData;
    respData.code = otim::EC_SUCCESS;
    if (req.sessionId.empty() || req.seqId == 0 || req.packId.empty()){
        respData.code = otim::EC_PARAM;
        otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
        MLOG_DEBUG("sessionId,packId or seqId is is empty req:"<<req.writeToJsonString());
        return  respData.code;
    }
       
       
    otim::RedisConnPtr redis(otim::RedisPool::instance());
    //get old msg
    std::vector<std::string> msgs;
    std::vector<std::string> scores;
    EMRStatus ret = redis->ZRangeByScoreAndLimit(otim::RKEY_MSG + req.sessionId, req.seqId, 5, msgs);
    if (EMRStatus::EM_KVDB_ERROR == ret){
        MLOG_ERROR("get msg fail!, sessionId:" << req.sessionId<<" msgId:"<<req.seqId);
        respData.code = otim::EC_DB_ERROR;
        otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
        return otim::EC_DB_ERROR;
    }
    
   
    MLOG_DEBUG("get old msg size:"<<msgs.size());
    otim::OTIMPack packOrg;
    for (auto item : msgs){
        
        std::vector<char> vctItem(item.begin(), item.end());
        otim::OTIMPack packItem;
        otim::unpackTars<otim::OTIMPack>(vctItem, packItem);
        MLOG_DEBUG("msgs :"<<packItem.header.writeToJsonString());

        if (packItem.header.packId == req.packId){
            packOrg = packItem;
        }
    }
    if (packOrg.header.packId.empty())
    {
        MLOG_WARN("The org msg is not exist:"<<req.sessionId<<" packId:"<<req.packId <<" seqId:"<<req.seqId);
        respData.code = otim::EC_MSG_NOT_EXIST;
        otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
        return respData.code;
    }

    MLOG_DEBUG("org msg:"<<packOrg.header.writeToJsonString());
   
    std::string to;
    if (req.command == otim::MC_REVOKE){
        packOrg.header.flags |= otim::PF_REVOKE;
        MLOG_DEBUG("revoke msg:"<<req.packId);

    }
    else if (req.command == otim::MC_OVERRIDE){
        packOrg.header.flags |= otim::PF_OVERRIDE;
        
        otim::MsgReq msgReq;
        otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);
        msgReq.content = req.content;
        
        otim::packTars<otim::MsgReq>(msgReq, packOrg.payload);
        
        to = msgReq.to;
        MLOG_DEBUG("override msg:"<<req.packId);

    }
    else if (req.command == otim::MC_DELETE){
//        packOrg.header.flags |= otim::PF_REVOKE;
        MLOG_DEBUG("delete msg:"<<req.packId);
    }
    else{
        MLOG_WARN("The command is error:"<<req.command<<" packId:"<<req.packId);
        respData.code = otim::EC_MSG_OP_CMD;
        otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);
        return otim::EC_MSG_OP_CMD;
    }
  

    ret = redis->ZSetRemoveByScore(otim::RKEY_MSG + req.sessionId, req.seqId, req.seqId);
    if (EMRStatus::EM_KVDB_SUCCESS != ret ){
        MLOG_ERROR("delete original msg fail:"<<(int)ret);
    }
    
    //增加新的消息
    if (req.command != otim::MC_DELETE){
        std::string msgSave;
        otim::packTars<otim::OTIMPack>(packOrg, msgSave);
        ret = redis->ZSetAdd(otim::RKEY_MSG + req.sessionId, req.seqId, msgSave);
        if ( EMRStatus::EM_KVDB_SUCCESS != ret ){
            MLOG_ERROR("save cancel msg fail!");
        }
    }
 
    //通知在线接收者其他端
    otim::sendPackToMySelf(clientContext, reqPack);

//  send to user
    std::vector<std::string> vctUserId;
    if (packOrg.header.type == otim::PT_MSG_SINGLE_CHAT || packOrg.header.type == otim::PT_MSG_BIZ_NOTIFY){
        if (to.empty()){
            otim::MsgReq msgReq;
            otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);
            to = msgReq.to;
        }
       
        vctUserId.push_back(to);
        MLOG_DEBUG("single or notify chat  packId:"<<packOrg.header.packId<<" to:"<<to);
    }
    else if (packOrg.header.type == otim::PT_MSG_GROUP_CHAT){
        //get groupMember
        otim::GroupChatRPCServantPrx groupChatRPCServantPrx = otim::getServantPrx<otim::GroupChatRPCServantPrx>(PRXSTR_GROUP_CHAT_RPC);
        groupChatRPCServantPrx->getGroupMember(req.sessionId, vctUserId);
        MLOG_DEBUG("group chat  packId:"<<packOrg.header.packId<<" to:"<<req.sessionId<<" member Size:"<<vctUserId.size());
    }
    
    int64_t seqId = otim::genSeqId();
    for (auto userId : vctUserId){
        
        otim::savePriorityMsg(redis.get(), reqPack, userId, seqId);

        otim::dispatchMsg(clientContext, reqPack, userId);
    }
    
    respData.code = otim::EC_SUCCESS;
    otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);

    return otim::EC_SUCCESS;
}

Http接口服务

该服务针对第三方提供消息能力,主要提供如下接口:

  • 发送消息(简单文本消息,复杂消息)
  • 添加好友
  • 删除好友
  • 查看好友

功能实现函数

    std::string  doSendSimpleMsgCmd(TC_HttpRequest &cRequest);
    std::string  doSendMsgCmd(TC_HttpRequest & cRequest);
    std::string  doAddFriend(TC_HttpRequest &request);
    std::string  doDelFriend(TC_HttpRequest &request);
    std::string  doGetFriends(TC_HttpRequest &request);

Push推送服务

该服务主要实现IM消息的离线推送能力, APP客户端不在线的场景下,将消息通过离线通道push用户的手机上,以提高消息的触达率。

主要实现iOS APNS,Android FCM,华为,小米,oppo,vivo等厂商的离线消息推送功能;
需要根据各个厂商开放平台提供的API进行开发集成。

Android 厂商开发平台地址:

  • 华为:
    https://developer.huawei.com/consumer/cn/hms/huawei-pushkit
  • 小米:
    https://dev.mi.com/console/appservice/push.html
  • 魅族:
    http://open-wiki.flyme.cn/doc-wiki/index
  • vivo:
    https://push.vivo.com.cn/#/
  • oppo:
    https://push.oppo.com/

RPC接口

enum PushServiceType
{
    PS_TYPE_NONE = 0, //无 Push服务提供商
    PS_TYPE_IOS = 1,  //IOS Push服务提供商
    PS_YPE_HUAWEI = 2,   //华为 Push服务提供商
    PS_TYPE_XIAOMI = 3,   //小米 Push服务提供商
    PS_TYPE_MEIZU = 4,   //魅族 Push服务提供商
    PS_TYPE_VIVO = 5,  //vivi服务
    PS_TYPE_OPPO = 6, //oppo服务
    PS_TYPE_FCM = 7, //FCM服务
};

struct RegInfo {
    0  require string packId = "";                           //消息的id
    1  require PushServiceType serviceType = 0;              //push服务提供商
    2  require string packageName = "";                          //包名
    3  require string userId = "";                            //用户id
    4  optional string appVersion = "";                       //app version
};


struct PushInfo {
    0  require string packId = "";                           //消息的id
    1  require string userId = "";                            //用户id
    2  require int unReadCount = 0;                          //未读消息数
    3  require string title = "";                            /push标题
    4  require string content = "";                          //push内容
    5  optional string uri = "";                              //跳转uri
    6  optional string extraData="";                           //业务自定义字段
};

interface PushServant
{
    int register(RegInfo regInfo);

    int pushMessage(PushInfo pushInfo);
};

服务端部署

编译打包

所有服务开发完成后,执行如下命令进行编译,打包:

make release
make clean all
make tar

程序包部署

根据前期部署好的Tars框架环境、web管理系统,将程序包逐个发布,发布后的系统如图:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/856758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins自动化打包脚本

一、背景 jenkins可以设置定时任务打包&#xff0c;也已手动点按钮打包&#xff0c;还可以通过执行http请求打包&#xff0c;今天我们就通过shell脚本&#xff0c;通过curl命令进行jenkins打包。 二、步骤 2.1 在jenkins上构建项目 设置触发器 2.2 通过shell脚本触发远程构…

【RabbitMQ上手——单实例安装5种简单模式实现通讯过程】

【RabbitMQ入门-单实例安装&5种简单模式实现通讯过程】 一、环境说明二、安装RabbitMQ三、用户权限及Virtual Host设置四、5种简单模式实现通讯过程的实现五、小结 一、环境说明 安装环境&#xff1a;虚拟机VMWare Centos7.6 Maven3.6.3 JDK1.8RabbitMQ版本&#xff1a;…

并发——线程的生命周期和状态

文章目录 Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态&#xff08;图源《Java 并发编程艺术》4.1.4 节&#xff09;。 线程在生命周期中并不是固定处于某一个状态而是随着代码的执行在不同状态之间切换。Java 线程状态变迁如下图所示&am…

点对点协议PPP

点对点协议PPP(Point-to-Point Protocol)是目前使用最广泛的点对点数据链路层协议。PPP协议是因特网的正确标准。 基本格式&#xff1a; PPP协议是数据链路格式。格式如下&#xff1a; 标志(Flag)字段: PPP的定界符&#xff0c;取值为0x7E 地址(Address)字段: 取值为0xFF&…

多语言自动翻译海外跨境电商独立站源码开发

要搭建一个多语言自动翻译的海外跨境电商独立站&#xff0c;需要进行以下步骤&#xff1a; 1. 选择合适的开发语言和框架&#xff1a;根据自己的技术实力和需求&#xff0c;选择适合的开发语言和框架。 2. 设计数据库结构&#xff1a;根据电商的业务需求&#xff0c;设计数据…

【CHI】架构介绍

Learn the architecture - Introducing AMBA CHI AMBA CHI协议导论--言身寸 1. AMBA CHI简介 一致性集线器接口&#xff08;CHI&#xff09;是AXI一致性扩展&#xff08;ACE&#xff09;协议的演进。它是Arm提供的高级微控制器总线架构&#xff08;AMBA&#xff09;的一部分。…

电源控制--对数与db分贝

在控制理论中&#xff0c;"db"通常表示分贝&#xff08;decibel&#xff09;的缩写。分贝是一种用于度量信号强度、增益或衰减的单位。 在控制系统中&#xff0c;分贝常用于描述信号的增益或衰减。通常&#xff0c;增益以正数的分贝值表示&#xff0c;而衰减以负数的…

C语言——九九乘法表

//九九乘法表 //用程序做一个九九乘法表 #include<stdio.h> int main() {int i,j,result;printf("\n");for(i1;i<10;i){for(j1;j<i;j){resulti*j;printf(" %d*%d%-d",i,j,result);}printf(" \n");}}

成集云 | 畅捷通采购单同步至钉钉 | 解决方案

源系统成集云目标系统 介绍 畅捷通是一家专业的金融科技公司&#xff0c;致力于为投资者提供便捷、高效的金融服务。通过畅捷通T的交易方式&#xff0c;投资者可以更加灵活地进行买卖交易&#xff0c;并且在交易完成后即可获得结算款项&#xff0c;无需等待T1的结算周期。 钉…

利用multiprocessing实现多线程,并实现多个参数传递函数的多并行

前言 利用多线程一般来说都是有 一定的大数据需求。 比如一个函数可能被不断的调用很多次 一般来说我们会使用for循环&#xff0c;但是为了节省时间&#xff0c;我们采用多线程的方式来解决这个问题 show you code 单参数输入 举了两个例子&#xff0c;一看便知 func为我们的函…

探索MongoDB的奥秘:基本命令使用入门指南

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; 探索MongoDB的奥秘&#xff1a;基本命令使用入门指南 ⏱️ 创作时间&a…

世界算力简史(下)

世界算力简史&#xff08;上&#xff09; 世界算力简史&#xff08;中&#xff09; 今天终于要完结了…… █ 1980-1990&#xff1a;PC时代 IBM-PC和“兼容机” 上一篇&#xff0c;我们说到&#xff0c;70年代微处理器崛起&#xff0c;使得个人电脑开始大量出现。 这种情况&…

山东布谷科技直播程序源码使用Redis进行服务器横向扩展

当今&#xff0c;直播程序源码平台作为新媒体时代主流&#xff0c;受到了世界各地人民的喜爱&#xff0c;这也使得直播程序源码平台用户数量的庞大&#xff0c;也难免会出现大量用户同时访问服务器&#xff0c;使服务器过载的情况&#xff0c;当服务器承受不住的时候&#xff0…

进程的创建

进程创建时发生了什么 回顾上节关于存储空间分配的图片&#xff1a; 当程序运行到 fork() 函数了之后&#xff1a; 在早期的Linux中&#xff0c;系统会将fork之前所有的数据段&#xff0c;代码段&#xff0c;堆&#xff0c;栈等对应的全部的存储空间拷贝一份&#xff0c;作为…

保姆级教程:从0到1搭建Stable Diffusion XL完整工作流进行AI绘画

Rocky Ding 公众号&#xff1a;WeThinkIn 写在前面 【人人都是算法专家】栏目专注于分享Rocky在AI行业中对业务/竞赛/研究/产品维度的思考与感悟。欢迎大家一起交流学习&#x1f4aa; 大家好&#xff0c;我是Rocky。 之前Rocky详细介绍了Stable Diffusion&#xff08;SD&#…

Scala(第一章Scala入门)

文章目录 1.1 概述 1.1.1 为什么学习Scala1.1.2 Scala发展历史1.1.3 Scala和Java关系1.1.4 Scala语言特点 1.2 Scala环境搭建1.3 Scala插件安装1.4 HelloWorld案例 1.4.1 创建IDEA项目工程1.4.2 class和object说明1.4.3 Scala程序反编译 1.5 关联Scala源码1.6官方编程指南 1.1…

Arch Linux 使用桥接模式上网

如果我们想要将虚拟机与物理主机同一网段&#xff0c;并且像物理机器一样被其他设备访问&#xff0c;则需要以桥接模式上网&#xff0c;这个时候&#xff0c;物理主机就必须配置为使用网桥上网了。 注意&#xff1a;这里我们使用了 NetworkManager 网络管理工具中的 nmcli 来进…

File 类和 InputStream, OutputStream 的用法总结

目录 一、File 类 1.File类属性 2.构造方法 3.普通方法 二、InputStream 1.方法 2.FileInputStream 三、OutputStream 1.方法 2.FileOutputStream 四、针对字符流对象进行读写操作 一、File 类 1.File类属性 修饰符及类型属性说明static StringpathSeparator依赖于系统的路…

【T3】金蝶kis凭证数据转换到畅捷通T3软件中。

【问题需求】 将金蝶软件中的账套转换到畅捷通T3软件中。 由于金蝶老版本使用的是非sql server数据库。 进而需要将其数据导入到sql中,在转换到T3。 【转换环境】 金蝶中数据:凭证;科目无项目核算。 1、金蝶的数据文件后缀为.AIS; 2、安装office2003全版软件; 3、安装sq…

SpringBoot3文件管理

标签&#xff1a;上传.下载.Excel.导入.导出&#xff1b; 一、简介 在项目中&#xff0c;文件管理是常见的复杂功能&#xff1b; 首先文件的类型比较多样&#xff0c;处理起来比较复杂&#xff0c;其次文件涉及大量的IO操作&#xff0c;容易引发内存溢出&#xff1b; 不同的…