TeamTalk消息服务器学习

news2024/9/21 11:13:02

msg_server发送消息

信令

//service id  0x0003
message IMMsgData{
	//cmd id:		0x0301
	required uint32 from_user_id = 1;				//消息发送方
	required uint32 to_session_id = 2;				//消息接受方
	required uint32 msg_id = 3; // 非常重要:由谁产生?答:redis具体见下文
	required uint32 create_time = 4; 
	required IM.BaseDefine.MsgType msg_type = 5; // 单聊或者群聊
	required bytes msg_data = 6;
	optional bytes attach_data = 20;
}

message IMMsgDataAck{
	//cmd id:		0x0302
	required uint32 user_id = 1;			//发送此信令的用户id
	required uint32 session_id = 2;				
	required uint32 msg_id = 3;
	required IM.BaseDefine.SessionType session_type = 4;
}

流程图:
请添加图片描述

客户端A发送消息给客户端B,信令为CID_MSG_DATA,msg_server收到后调用 _HandleClientMsgData 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{
	// request authorization check
	if (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {
        log("HandlePdu, wrong msg. ");
        throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");
		return;
    }
	switch (pPdu->GetCommandId()) {
        // ...... 省略无关逻辑 
        case CID_MSG_DATA:
            _HandleClientMsgData(pPdu);
            break;
        case CID_MSG_DATA_ACK:
            _HandleClientMsgDataAck(pPdu);
            break;
        // ...... 省略无关逻辑 
        default:
            log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());
            break;
	}
}
void CMsgConn::_HandleClientMsgData(CImPdu* pPdu)
{
    IM::Message::IMMsgData msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
	if (msg.msg_data().length() == 0) {
		log("discard an empty message, uid=%u ", GetUserId());
		return;
	}

	if (m_msg_cnt_per_sec >= MAX_MSG_CNT_PER_SECOND) {
		log("!!!too much msg cnt in one second, uid=%u ", GetUserId());
		return;
	}
    
    if (msg.from_user_id() == msg.to_session_id() && CHECK_MSG_TYPE_SINGLE(msg.msg_type()))
    {
        log("!!!from_user_id == to_user_id. ");
        return;
    }

	m_msg_cnt_per_sec++;

	uint32_t to_session_id = msg.to_session_id();
    uint32_t msg_id = msg.msg_id();
	uint8_t msg_type = msg.msg_type();
    string msg_data = msg.msg_data();

	if (g_log_msg_toggle) {
		log("HandleClientMsgData, %d->%d, msg_type=%u, msg_id=%u. ", GetUserId(), to_session_id, msg_type, msg_id);
	}

	uint32_t cur_time = time(NULL);
    CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);
    msg.set_from_user_id(GetUserId());
    msg.set_create_time(cur_time);
    msg.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());
    pPdu->SetPBMsg(&msg);
	// send to DB storage server
	CDBServConn* pDbConn = get_db_serv_conn();
	if (pDbConn) {
		pDbConn->SendPdu(pPdu);
	}
}

该函数直接将数据包转发给db_proxy_server,db_proxy_server有一个map来映射信令所对应的处理函数。


// message content
m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));
    
void sendMessage(CImPdu* pPdu, uint32_t conn_uuid)
    {
        IM::Message::IMMsgData msg;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            uint32_t nFromId = msg.from_user_id();
            uint32_t nToId = msg.to_session_id();
            uint32_t nCreateTime = msg.create_time();
            IM::BaseDefine::MsgType nMsgType = msg.msg_type();
            uint32_t nMsgLen = msg.msg_data().length();
            
            uint32_t nNow = (uint32_t)time(NULL);
            if (IM::BaseDefine::MsgType_IsValid(nMsgType))
            {
                if(nMsgLen != 0)
                {
                    CImPdu* pPduResp = new CImPdu;

                    uint32_t nMsgId = INVALID_VALUE;
                    uint32_t nSessionId = INVALID_VALUE;
                    uint32_t nPeerSessionId = INVALID_VALUE;

                    CMessageModel* pMsgModel = CMessageModel::getInstance();
                    CGroupMessageModel* pGroupMsgModel = CGroupMessageModel::getInstance();
                    if(nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_TEXT) {
                         // ...... 省略无关逻辑 
                    } else if (nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_AUDIO) {
                         // ...... 省略无关逻辑 
                    } else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {
                        if (nFromId != nToId) {
                            nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
                            if (INVALID_VALUE == nSessionId) {
                            	// 创建会话
                                nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);
                            }
                            nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
                            if(INVALID_VALUE ==  nPeerSessionId)
                            {
                            	// 创建会话关系ID
                                nSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);
                            }
                            uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
                            if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE)
                            {
                                nMsgId = pMsgModel->getMsgId(nRelateId);
                                if(nMsgId != INVALID_VALUE)
                                {
                                	// 写入消息到数据库
                                    pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());
                                    CSessionModel::getInstance()->updateSession(nSessionId, nNow);
                                    CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
                                }
                                else
                                {
                                    log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
                                }
                            }
                            else{
                                log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
                            }
                        }
                        else
                        {
                            log("send msg to self. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
                        }
                        
                    } else if(nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO) {
                        // ...... 省略无关逻辑 
                    }

                    log("fromId=%u, toId=%u, type=%u, msgId=%u, sessionId=%u", nFromId, nToId, nMsgType, nMsgId, nSessionId);

                    msg.set_msg_id(nMsgId);
                    pPduResp->SetPBMsg(&msg);
                    pPduResp->SetSeqNum(pPdu->GetSeqNum());
                    pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);
                    pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_DATA);
                    CProxyConn::AddResponsePdu(conn_uuid, pPduResp);
                }
                else
                {
                    log("msgLen error. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
                }
            }
            else
            {
                log("invalid msgType.fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
            }
        }
        else
        {
            log("parse pb failed");
        }
    }

msg_server收到CID_MSG_DATA信令后调用 _HandleMsgData 函数

void CDBServConn::HandlePdu(CImPdu* pPdu)
{
	switch (pPdu->GetCommandId()) {
        // ...... 省略无关逻辑 
        case CID_MSG_DATA:
            _HandleMsgData(pPdu);
            break;
        // ...... 省略无关逻辑 
        default:
            log("db server, wrong cmd id=%d ", pPdu->GetCommandId());
	}
}

CDBServConn::_HandleMsgData 处理消息有三点

  1. 首先 ack 客户端
  2. 然后,发送到route_server(为啥这么做?不理解
  3. 广播给其他客户端 (为啥这么做?不理解
void CDBServConn::_HandleMsgData(CImPdu *pPdu)
{
    IM::Message::IMMsgData msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
    if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {
        s_group_chat->HandleGroupMessage(pPdu);
        return;
    }
    
    uint32_t from_user_id = msg.from_user_id();
    uint32_t to_user_id = msg.to_session_id();
    uint32_t msg_id = msg.msg_id();
    if (msg_id == 0) {
        log("HandleMsgData, write db failed, %u->%u.", from_user_id, to_user_id);
        return;
    }
    
    uint8_t msg_type = msg.msg_type();
    CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
    uint32_t handle = attach_data.GetHandle();
    
    log("HandleMsgData, from_user_id=%u, to_user_id=%u, msg_id=%u.", from_user_id, to_user_id, msg_id);
    
    CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id, attach_data.GetHandle());
    if (pMsgConn)
    {
        IM::Message::IMMsgDataAck msg2;
        msg2.set_user_id(from_user_id);
        msg2.set_msg_id(msg_id);
        msg2.set_session_id(to_user_id);
        msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_SINGLE);
        CImPdu pdu;
        pdu.SetPBMsg(&msg2);
        pdu.SetServiceId(SID_MSG);
        pdu.SetCommandId(CID_MSG_DATA_ACK);
        pdu.SetSeqNum(pPdu->GetSeqNum());
        pMsgConn->SendPdu(&pdu);
    }
    
    CRouteServConn* pRouteConn = get_route_serv_conn();
    if (pRouteConn) {
        pRouteConn->SendPdu(pPdu);
    }
    
    msg.clear_attach_data();
    pPdu->SetPBMsg(&msg);
    CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);
    CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);
    pPdu->SetSeqNum(0);
    if (pFromImUser) {
        pFromImUser->BroadcastClientMsgData(pPdu, msg_id, pMsgConn, from_user_id);
    }

    if (pToImUser) {
        pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);
    }
    
    IM::Server::IMGetDeviceTokenReq msg3;
    msg3.add_user_id(to_user_id);
    msg3.set_attach_data(pPdu->GetBodyData(), pPdu->GetBodyLength());
    CImPdu pdu2;
    pdu2.SetPBMsg(&msg3);
    pdu2.SetServiceId(SID_OTHER);
    pdu2.SetCommandId(CID_OTHER_GET_DEVICE_TOKEN_REQ);
    SendPdu(&pdu2);
}

msg_id:两个人之间的映射关系。如果按照时间排列,两个客户端之间的时间可能不一样,所以按照序号生成消息id。
4. 每条消息id唯一
5. 使用redis生成消息id

uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{
    uint32_t nMsgId = 0;
    CacheManager* pCacheManager = CacheManager::getInstance();
    CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
    if(pCacheConn)
    {
        string strKey = "msg_id_" + int2string(nRelateId);
        nMsgId = pCacheConn->incrBy(strKey, 1);
        pCacheManager->RelCacheConn(pCacheConn);
    }
    return nMsgId;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2086597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ShenNiusModularity项目源码学习(2:登录页面验证码)

前端登录页面Login.cshtml位于ShenNius.Admin.Mvc项目的Areas\Sys\Views\User内,页面中使用的验证码是调用同项目内UserController的OnGetVCode函数获取验证码图片。   点击验证码图片,会调用wwwroot\js\login.js定义的changeSrcCode函数刷新验证码。…

Qt调用外部exe并嵌入到Qt界面中(验证成功的成功)

http://t.csdnimg.cn/CDsqQ 原作者在这里 本文章主要介绍如何用Qt调用其他应用的exe,并将窗口嵌入到Qt界面中。很多人查到的代码都能成功的将exe调用起来,但是嵌入不到窗口中。主要有两种原因,现在从头到尾的梳理一下。 1.主要代码 1.1启动exe //包含…

keil编译输出的信息program size含义,以及个人使用中的编译内存溢出问题

keil编译后输出的信息含义 data对应的是片内的RAM,xdata对应的是程序中片外扩展的存储器上需要占用的容量,code是编写的程序占用单片机片内的存储程序ROM上的容量。 编译中发现错误 上图中的data值占用了147字节,超过了128字节。 一般解决…

2700+存储过程的超复杂Oracle,国产化怎么办?

前不久部门同事参与了一个关键客户的国产数据库Poc工作。从群里反馈的一些信息来看,难度还是比较大。 我们知道,对于数据库国产改造,其实有几个比较大的难点,其中最让人头疼的无非就是存储过程之类的对象了。 从同时提供的迁移报告…

掌握MySQL就差这一个——超详细讲解Mysql集群技术(包含主从复制,半同步模式,组复制,MHA)

一 Mysql 在服务器中的部署方法 在企业中 90% 的服务器操作系统均为 Linux 在企业中对于 Mysql 的安装通常用源码编译的方式来进行 官网: http://www.mysql.com 1.1 在Linux下部署mysql 1 安装依赖性: [rootmysql_node1 ~]# yum install cmake gcc-c…

django(二):第一个项目

接上文,django(一):项目搭建开始开发第一个项目。 1. 新建app 创建一个app应用,取名为company。 python manage.py startapp company注册app到settings.py文件中。 2.实现Model层 settings.py文件中配置数据库&am…

软件测试学习笔记丨静态测试与代码审计 SonarQube

本文转自测试人社区,原文链接:https://ceshiren.com/t/topic/32049 一,SonarQube 平台搭建 1.1, 介绍 Sonar 是一个用于代码质量管理的开放平台。通过插件机制,Sonar 可以集成不同的测试工具、代码分析工具&#xff…

FATE Board 执行流程探索

背景介绍 FATE Board 是 FATE 提供的一个工程,用于给 FATE 提供可视化能力,方便在联邦学习训练中实时查看执行状态,更好地定位执行中遇到的问题。 查看 FATE 架构可以看到 FATE Board 是建立在 MySQL 和 FATE Flow Server 的基础上的&#…

Unity(2022.3.41LTS) - 着色器

目录 一、着色器的基本概念 二、表面着色器 一、着色器的基本概念 定义与作用: 着色器是一种在图形硬件上运行的程序,用于控制物体的颜色、纹理、光照、透明度等视觉属性。它通过对输入的几何数据(如顶点位置、法线、纹理坐标等&#xff09…

某地级市攻防演练突破边界实战

0x1前言 今年某地级市的攻防中,前期通过fofa、google语法、天眼查等待信息收集获取到了某集团的单位后台后台是这样的,当看到这个后台系统时候脸上 不直接也就微笑了起来。最近风声紧,所有厚码希望大家多多理解(我也怕死啊呜呜呜…

【MRI基础】k空间基本属性

K 空间本质上是一个矩阵,用于存储从 MRI 扫描仪获得的原始数据。这些数据是在扫描过程中收集的,K 空间中的每个点代表被扫描身体发出的 MRI 信号的不同频率和相位。存储在 K 空间中的数据不是图像形式,而是包含重建图像所需的所有空间频率信息…

灵办 AI:免费智能利器,开启高效办公与智能创作学习新时代

目录 引言一、初识灵办AI—开启智能办公二、灵办 AI 的强大功能解析1. 多语言翻译2. AI 对话功能3. AI 阅读功能4. AI 代码助手功能 三、灵办 AI 的其他优点四、结语 引言 在当今快节奏的时代,技术人员们都在努力寻觅能够提高工作效率、助力学习创作的工具。我同样…

YOLO | YOLO目标检测算法(YOLO-V1)

github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 YOLO目标检测算法 YOLO V1概述(2016) YOLO V1概述(2016) 经典的One-stage方法 YOLO:You Only Look Once 把…

4.sklearn-K近邻算法、模型选择与调优

文章目录 环境配置(必看)头文件引用1.sklearn转换器和估计器1.1 转换器 - 特征工程的父类1.2 估计器(sklearn机器学习算法的实现) 2.K-近邻算法2.1 简介:2.2 K-近邻算法API2.3 K-近邻算法代码2.4 运行结果2.5 K-近邻算法优缺点 3.模型选择与调…

uni-app插槽(默认插槽,命名插槽,作用域插槽)

目录 什么是插槽? 基本概念 默认插槽 命名插槽 作用域插槽 场景一:子插槽向父组件传递一个字符串 场景二:子插槽向父组件传递对象 什么是插槽? 在 UniApp 中,插槽(Slot)是一种允许父组件向子组件特定位置插入HTML内容的方式。这种方式使得组…

ROS 2 Jazzy和QT组合开发教程

ROS2 Jazzy和QT组合开发教程 ROS2 和 Qt 组合开发是一个强大且灵活的组合,可以用来构建具有 GUI 的机器人应用程序。这个组合让你能够将 Qt 的图形界面与 ROS 2 的分布式通信功能结合在一起,实现数据的展示、机器人控制、调试等功能。 环境如下&#x…

掌握人事管理,这张报表帮你轻松搞定

在人事管理中,数据分析的重要性不言而喻。无论是了解员工的整体情况,还是洞察人才流失的原因,数据都能为我们提供有力的支持。但是,面对海量的数据,很多HR都会感到不知所措。那么,如何通过一张报表解决所有…

数字化转型升级探索(三)

在数字化转型升级的探索过程中,我们将通过深入应用人工智能、物联网、大数据和云计算等技术,全面重塑业务流程和运营模式,推动信息流、业务流和价值流的高度融合,实现从传统手工操作到智能自动化的全方位升级,提升业务…

【Python报错已解决】`SyntaxError`:`invalid syntax`

🎬 鸽芷咕:个人主页 🔥 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 引言 在Python编程中,SyntaxError是最常见的错误类型之一,它表示代码中存在语法错误。本文将探讨一个具…

★ 算法OJ题 ★ 力扣202 - 快乐数

Ciallo&#xff5e;(∠・ω< )⌒☆ ~ 今天&#xff0c;我将和大家一起做一道双指针算法题--快乐数~ 目录 一 题目 二 算法解析 三 编写算法 一 题目 202. 快乐数 - 力扣&#xff08;LeetCode&#xff09; 二 算法解析 题⽬告诉我们&#xff0c;当我们不断重复操作…