TeamTalk消息服务器(未读计数)

news2025/1/12 21:37:27

信令和协议设计

enum MessageCmdID {
   // ...... 省略无关逻辑 
  CID_MSG_UNREAD_CNT_REQUEST = 775,
  CID_MSG_UNREAD_CNT_RESPONSE = 776,
   // ...... 省略无关逻辑 
};

message IMUnreadMsgCntReq{
	//cmd id:		0x0307
	required uint32 user_id = 1;
	optional bytes attach_data = 20;	
}

message IMUnreadMsgCntRsp{
	//cmd id:		0x0308
	required uint32 user_id = 1;
	required uint32 total_cnt = 2; // 多个人的未读消息
	repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;
	optional bytes attach_data = 20;
}

message UnreadInfo{
	required uint32 session_id = 1; // 会话ID
	required SessionType session_type = 2; // 会话类型
	required uint32 unread_cnt = 3; // 未读消息数量
	required uint32 latest_msg_id = 4; // 最新的消息ID
	required bytes latest_msg_data = 5; // 最新的消息
    required MsgType latest_msg_type = 6;  // 消息类型
    required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{
	// ...... 省略无关逻辑 
	switch (pPdu->GetCommandId()) {
         // ...... 省略无关逻辑          
        case CID_MSG_UNREAD_CNT_REQUEST:
            _HandleClientUnreadMsgCntRequest(pPdu );
		break;              
        // ...... 省略无关逻辑 
	}
}

void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{
	log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());
    IM::Message::IMUnreadMsgCntReq msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
    
	CDBServConn* pDBConn = get_db_serv_conn_for_login();
	if (pDBConn) {
		CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);
        msg.set_user_id(GetUserId());
        msg.set_attach_data(attach.GetBuffer(), attach.GetLength());
        pPdu->SetPBMsg(&msg);
        pDBConn->SendPdu(pPdu);
	}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));

void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{
        IM::Message::IMUnreadMsgCntReq msg;
        IM::Message::IMUnreadMsgCntRsp msgResp;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            CImPdu* pPduResp = new CImPdu;

            uint32_t nUserId = msg.user_id();

            list<IM::BaseDefine::UnreadInfo> lsUnreadCount;
            uint32_t nTotalCnt = 0;
            // 从redis获取未读消息数量 和 从mysql获取最后一条未读消息
            CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);
            CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);
            msgResp.set_user_id(nUserId);
            msgResp.set_total_cnt(nTotalCnt);
            for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it)
            {
                IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();
                pInfo->set_session_id(it->session_id());
                pInfo->set_session_type(it->session_type());
                pInfo->set_unread_cnt(it->unread_cnt());
                pInfo->set_latest_msg_id(it->latest_msg_id());
                pInfo->set_latest_msg_data(it->latest_msg_data());
                pInfo->set_latest_msg_type(it->latest_msg_type());
                pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());
            }
                        
            log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);
            msgResp.set_attach_data(msg.attach_data());
            pPduResp->SetPBMsg(&msgResp);
            pPduResp->SetSeqNum(pPdu->GetSeqNum());
            pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);
            pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);
            CProxyConn::AddResponsePdu(conn_uuid, pPduResp);
        }
        else
        {
            log("parse pb failed");
        }
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{
	// redis
    CacheManager* pCacheManager = CacheManager::getInstance();
    CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn)
    {
        map<string, string> mapUnread;
        string strKey = "unread_" + int2string(nUserId);
        bool bRet = pCacheConn->hgetAll(strKey, mapUnread);
        pCacheManager->RelCacheConn(pCacheConn);
        if(bRet)
        {
            IM::BaseDefine::UnreadInfo cUnreadInfo;
            for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {
                cUnreadInfo.set_session_id(atoi(it->first.c_str()));
                cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));
                cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);
                uint32_t nMsgId = 0;
                string strMsgData;
                IM::BaseDefine::MsgType nMsgType;
                // 从mysql获取最后一条未读消息 mysql
                getLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); 
                if(IM::BaseDefine::MsgType_IsValid(nMsgType))
                {
                    cUnreadInfo.set_latest_msg_id(nMsgId);
                    cUnreadInfo.set_latest_msg_data(strMsgData);
                    cUnreadInfo.set_latest_msg_type(nMsgType);
                    cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());
                    lsUnreadCount.push_back(cUnreadInfo);
                    nTotalCnt += cUnreadInfo.unread_cnt();
                }
                else
                {
                    log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);
                }
            }
        }
        else
        {
            log("hgetall %s failed!", strKey.c_str());
        }
    }
    else
    {
        log("no cache connection for unread");
    }
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{
    uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);
    
    if (nRelateId != INVALID_VALUE)
    {
        CDBManager* pDBManager = CDBManager::getInstance();
        // 读从库
        CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");
        if (pDBConn)
        {
            string strTableName = "IMMessage_" + int2string(nRelateId % 8);
            string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";
            CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());
            if (pResultSet)
            {
                while (pResultSet->Next())
                {
                    nMsgId = pResultSet->GetInt("msgId");

                    nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));
                    if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO)
                    {
                        // "[语音]"加密后的字符串
                        strMsgData = strAudioEnc;
                    }
                    else
                    {
                        strMsgData = pResultSet->GetString("content");
                    }
                }
                delete pResultSet;
            }
            else
            {
                log("no result set: %s", strSql.c_str());
            }
            pDBManager->RelDBConn(pDBConn);
        }
        else
        {
            log("no db connection_slave");
        }
    }
    else
    {
        log("no relation between %lu and %lu", nFromId, nToId);
    }
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{
    IM::Message::IMUnreadMsgCntRsp msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));

	uint32_t user_id = msg.user_id();
    uint32_t total_cnt = msg.total_cnt();
	uint32_t user_unread_cnt = msg.unreadinfo_list_size();
    CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
	uint32_t handle = attach_data.GetHandle();
	
	log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,
        total_cnt, user_unread_cnt);

    CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);

	if (pMsgConn && pMsgConn->IsOpen()) {
        msg.clear_attach_data();
        pPdu->SetPBMsg(&msg);
        pMsgConn->SendPdu(pPdu);
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2093797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Labview]图片叠加下的表格视图拖拽功能:挖坑粗糙版

没错&#xff0c;又是Labview表格T - T 由于项目中用到的表格上有一张用于画框的二维图片&#xff0c;感兴趣可看这篇 [Labview] 表格单元格外边框 二维图片叠加绘图 因此在滚动条与鼠标滚轮的基础上&#xff0c;想再增加一个拖拽移动的功能。 但 [二维图片] 并没有 拖拽开始…

windows C++ 并行编程-矩阵乘法

下面我们尝试分步演练演示如何使用 C AMP 加速矩阵乘法的执行。 提供了两种算法&#xff0c;一种不使用平铺&#xff0c;一种使用平铺&#xff0c;看看两者的差别。 在 Visual Studio 中创建项目 在菜单栏上&#xff0c;选择“文件”>“新建”>“项目”&#xff0c;打开…

[Algorithm][综合训练][过桥][最大差值][兑换零钱]详细讲解

目录 1.过桥1.题目链接2.算法原理详解 && 代码实现 2.最大差值1.题目链接2.算法原理详解 && 代码实现 3.兑换零钱1.题目链接2.算法原理详解 && 代码实现 1.过桥 1.题目链接 过桥 2.算法原理详解 && 代码实现 解法&#xff1a;贪心 BFS #in…

【Slurm集群在centos7上的搭建】

Slurm集群在centos7上的部署 集群基本情况1. 前期准备工作2.网络配置3.NTP时间同步配置4.NFS共享目录配置5.NIS用户管理配置6.Munge通信部署7.安装Mariadb数据库以及Slurm安装配置7.1安装配置Mariadb及SlurmID配置7.2Slurm安装配置 附录配置文件slurm.conf&#xff1a;slurmdbd…

Redis从入门再再到入门(下)

文章目录 1.Redis远程连接1.1 Redis远程连接配置1.2 通过桌面版图形化界面连接Redis1.3 通过IDEA中的插件连接Redis 2.Jedis的基本使用2.1 jedis概述2.2 jedis的基本操作2.3 jedis连接池 3.Spring整合Redis3.1 新建maven工程,引入相关依赖3.2 redis.properties3.3 spring-redis…

Python | Leetcode Python题解之第387题字符串中的第一个唯一字符

题目&#xff1a; 题解&#xff1a; class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…

如何开发针对不平衡分类的成本敏感神经网络 python

如何开发针对不平衡分类的成本敏感神经网络 深度学习神经网络是一类灵活的机器学习算法&#xff0c;可以在各种问题上表现良好。 神经网络使用误差反向传播算法进行训练&#xff0c;该算法涉及计算模型在训练数据集上产生的误差&#xff0c;并根据这些误差的比例更新模型权重…

鸿蒙开发入门day16-拖拽事件和手势事件

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;还请三连支持一波哇ヾ(&#xff20;^∇^&#xff20;)ノ&#xff09; 目录 拖拽事件 概述 拖拽流程 ​手势拖拽 ​鼠标拖拽 拖拽背板图 …

如何有效防止表单重复提交

如何有效防止表单重复提交 1. 使用重定向&#xff08;Redirect&#xff09;2. 点击后按钮失效3. Loading 遮罩4. 自定义重复提交过滤器 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Web开发中&#xff0c;表单重复提交是一个常见问题&…

计算物理精解【3】

文章目录 力学单位矢量基础定义 矢量加法矢量加法的几何方法矢量加法的代数方法示例注意事项 矢量间的关系矢量&#xff08;或向量&#xff09;的标量积&#xff08;也称为点积、内积或数量积&#xff09;性质计算两矢量之间的夹角例子步骤数值结果 计算两三维矢量之间夹角的例…

厨房老鼠检测算法解决方案老鼠检测算法源码样本详细介绍

厨房老鼠检测算法是一种创新的解决方案&#xff0c;它结合了机器学习和图像识别技术。通过使用高精度的传感器和智能摄像头&#xff0c;这些算法可以实时监控厨房环境&#xff0c;并检测到老鼠的活动痕迹。与传统的检测方法相比&#xff0c;这种算法具有更高的灵敏度和准确性&a…

Java对象的访问定位技术

Java虚拟机规范中规定reference类型是一个指向对象的引用&#xff0c;但规定并没有定义这个引用应该通过什么方式去定位、访问堆中的对象的具体位置&#xff0c;所以对象访问方式取决于具体的虚拟机实现。 目前主流的访问方式有两种&#xff1a;使用句柄和直接指针。 使用句柄…

Altium designer设计经验谈——常用规则的使用(二)

文章目录 前言三、规则设置介绍——走线规则1、Routing——>Width 线宽2、Routing——>Topology 拓扑 四、规则设置介绍——平面层规则1、Plane——>电源层连接样式 Power Plane Connect Style2、Plane——>电源层间距距离 Power Plane Clearance3、Plane——>多…

单片机编程魔法师-并行多任务程序

程序架构 程序代码 小结 数码分离&#xff0c;本质上就是将数据和代码逻辑进行分离&#xff0c;跟第一章使用数据驱动程序一样的道理。 不过这里不同之处在于。这里使用通过任务线程&#xff0c;但是却有2个任务在运行&#xff0c;两个任务都通过先初始化任务数据参数&#x…

C++ | Leetcode C++题解之第387题字符串中的第一个唯一字符

题目&#xff1a; 题解&#xff1a; class Solution { public:int firstUniqChar(string s) {unordered_map<char, int> position;queue<pair<char, int>> q;int n s.size();for (int i 0; i < n; i) {if (!position.count(s[i])) {position[s[i]] i;…

设备管理与文件系统

1、设备管理框架 对于不同类型的设备的操作&#xff0c;全部由一下函数指针来完成。即操作系统对设备进行操作&#xff0c;只需要调用统一的API接口&#xff0c;无需了解相关的细节。 比如如下的接口设计&#xff1a; int (*open) (device_t * dev) ; int (*read) (device_t …

直播行业的未来:南昌络喆科技有限公司的创新无人直播项目!

随着数字化时代的推进&#xff0c;直播行业迎来了前所未有的增长机遇。南昌络喆科技有限公司凭借其创新的无人直播技术&#xff0c;正引领着行业的新潮流&#xff0c;展现出直播领域的新面貌。 无人直播技术突破了传统直播的局限&#xff0c;实现了自动化的高效运营模式。它摒弃…

用Python解决预测问题_对数线性模型模板

对数线性模型&#xff08;Log-linear model&#xff09;是统计学中用于分析计数数据或频率数据的一类模型&#xff0c;特别是在多维列联表&#xff08;contingency tables&#xff09;分析中非常常见。这种模型通过取对数将乘法关系转换为加法关系&#xff0c;从而简化了数据分…

关于自己部署AI大模型踩的坑(三)—— 部署

最近一直在研究如何打算属于我自己的J.A.R.V.I.S.&#xff08;钢铁侠中的机器人管家&#xff09;。 上一篇写了我最近在部署自己的大模型&#xff0c;使用llama3.1&#xff0c; 和通义千问2。虽然最终结果也是成功了&#xff0c;过程却十分地坎坷。所以这一篇文章一是总结其中遇…

Nginx快速入门:编译及常用配置

Nginx 是一个高性能的 HTTP 服务器和反向代理服务器&#xff0c;也是一个 IMAP/POP3 邮件代理服务器。它以其高并发处理能力和低资源消耗而闻名&#xff0c;能够同时处理数千个连接。 Nginx 的主要功能包括&#xff1a; 静态资源服务器&#xff1a;Nginx 可以担任静态资源服务…