TeamTalk数据库代理服务器

news2024/11/23 23:42:32

文章目录

  • main函数主流程
    • 关键步骤
    • 线程池
    • redis缓存
      • 未读消息计数
        • 未读消息计数-单聊
        • 未读消息计数-群聊
      • 群成员管理

main函数主流程

关键步骤

  1. 初始化epoll + 线程池
  2. 数据入口 reactor CProxyConn::HandlePduBuf
  3. 异步task任务封装,把任务放入线程池;线程池里的线程异步执行任务,然后放入回复列表 CProxyConn::AddResponsePdu()
  4. epoll 主线程读取回复列表的数据发送给请求端 CProxyConn::SendResponsePduList()

db_proxy_server 处理逻辑与其他server不同的是 epoll(主线程)解析 pdu,然后封装成task任务,再给到线程池。

void CProxyConn::HandlePduBuf(uchar_t* pdu_buf, uint32_t pdu_len)
{
    CImPdu* pPdu = NULL;
    pPdu = CImPdu::ReadPdu(pdu_buf, pdu_len);
    if (pPdu->GetCommandId() == IM::BaseDefine::CID_OTHER_HEARTBEAT) {
        return;
    }
    
    pdu_handler_t handler = s_handler_map->GetHandler(pPdu->GetCommandId());
    
    if (handler) {
        CTask* pTask = new CProxyTask(m_uuid, handler, pPdu);
        g_thread_pool.AddTask(pTask);
    } else {
        log("no handler for packet type: %d", pPdu->GetCommandId());
    }
}

// 工作线程调用
void CProxyConn::AddResponsePdu(uint32_t conn_uuid, CImPdu* pPdu)
{
	ResponsePdu_t* pResp = new ResponsePdu_t;
	pResp->conn_uuid = conn_uuid;
	pResp->pPdu = pPdu;

	s_list_lock.lock();
	s_response_pdu_list.push_back(pResp);
	s_list_lock.unlock();
}

void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
	CProxyConn::SendResponsePduList(); // epoll 主线程调用
}

// 主线程调用
void CProxyConn::SendResponsePduList()
{
	s_list_lock.lock();
	while (!s_response_pdu_list.empty()) {
		ResponsePdu_t* pResp = s_response_pdu_list.front();
		s_response_pdu_list.pop_front();
		s_list_lock.unlock();

		CProxyConn* pConn = get_proxy_conn_by_uuid(pResp->conn_uuid);
		if (pConn) {
			if (pResp->pPdu) {
				pConn->SendPdu(pResp->pPdu);
			} else {
				log("close connection uuid=%d by parse pdu error\b", pResp->conn_uuid);
				pConn->Close();
			}
		}

		if (pResp->pPdu)
			delete pResp->pPdu;
		delete pResp;

		s_list_lock.lock();
	}

	s_list_lock.unlock();
}

m_uuid 作用和 socketfd 一致,但是不用 socketfd 是避免在线重连导致 socketfd 复用。
或者说由于处理请求和发送回复在两个线程,socket的handle可能重用(怎么理解???),所以需要用一个一直增加的uuid来表示一个连接

线程池


int init_proxy_conn(uint32_t thread_num)
{
	s_handler_map = CHandlerMap::getInstance(); // 根据CmdID找到处理函数
	g_thread_pool.Init(thread_num);

	netlib_add_loop(proxy_loop_callback, NULL); // 回发数据

	signal(SIGTERM, sig_handler);

	return netlib_register_timer(proxy_timer_callback, NULL, 1000);
}

void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
	CProxyConn::SendResponsePduList(); // epoll 主线程调用
}

int CThreadPool::Init(uint32_t worker_size)
{
    m_worker_size = worker_size;
	m_worker_list = new CWorkerThread [m_worker_size];
	if (!m_worker_list) {
		return 1;
	}

	for (uint32_t i = 0; i < m_worker_size; i++) {
		m_worker_list[i].SetThreadIdx(i);
		m_worker_list[i].Start();
	}

	return 0;
}

redis缓存

  1. 连接数量如何设置?同步连接池,应该测试为准,经验redis cpu核数2;mysql核数4(不理解???)
  2. 为什么分开不同的db?便于水平扩展。
  3. pool_name的意义?做抽象,不必关注redis是否分机。
class CacheManager {
public:
	virtual ~CacheManager();

	static CacheManager* getInstance();

	int Init();
	CacheConn* GetCacheConn(const char* pool_name);
	void RelCacheConn(CacheConn* pCacheConn);
private:
	CacheManager();

private:
	static CacheManager* 	s_cache_manager;
	map<string, CachePool*>	m_cache_pool_map; // 缓存连接池
};

未读消息计数

存储在unread连接池所在的redis数据库

  1. 单聊的消息ID设计:
    key设计为"msg_id_" + nRelateId
    函数:nMsgId = pMsgModel->getMsgId(nRelateId);
    nRelateId 来自于mysql的自增id,取决于两个人之间的映射
  2. 群聊的消息ID设计:
    key设计为 “group_msg_id_” + nGroupId
    函数:nMsgId = pGroupMsgModel->getMsgId(nToId);
未读消息计数-单聊

当发送消息时调用
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());

else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {
                        if (nFromId != nToId) {
                            nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
                            if (INVALID_VALUE == nSessionId) {
                                nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);
                            }
                            nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
                            if(INVALID_VALUE ==  nPeerSessionId)
                            {
                                nSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);
                            }
                            uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
                            if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE)
                            {
                                nMsgId = pMsgModel->getMsgId(nRelateId);
                                if(nMsgId != INVALID_VALUE)
                                {
                                    pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());
                                    CSessionModel::getInstance()->updateSession(nSessionId, nNow);
                                    CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
                                }
                                else
                                {
                                    log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
                                }
                            }
                            else{
                                log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
                            }
                        }

bool CMessageModel::sendMessage(uint32_t nRelateId, uint32_t nFromId, uint32_t nToId, IM::BaseDefine::MsgType nMsgType, uint32_t nCreateTime, uint32_t nMsgId, string& strMsgContent)
{
    bool bRet =false;
    if (nFromId == 0 || nToId == 0) {
        log("invalied userId.%u->%u", nFromId, nToId);
        return bRet;
    }

	CDBManager* pDBManager = CDBManager::getInstance();
	CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");
	if (pDBConn)
    {
        string strTableName = "IMMessage_" + int2string(nRelateId % 8);
        string strSql = "insert into " + strTableName + " (`relateId`, `fromId`, `toId`, `msgId`, `content`, `status`, `type`, `created`, `updated`) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
        // 必须在释放连接前delete CPrepareStatement对象,否则有可能多个线程操作mysql对象,会crash
        CPrepareStatement* pStmt = new CPrepareStatement();
        if (pStmt->Init(pDBConn->GetMysql(), strSql))
        {
            uint32_t nStatus = 0;
            uint32_t nType = nMsgType;
            uint32_t index = 0;
            pStmt->SetParam(index++, nRelateId);
            pStmt->SetParam(index++, nFromId);
            pStmt->SetParam(index++, nToId);
            pStmt->SetParam(index++, nMsgId);
            pStmt->SetParam(index++, strMsgContent);
            pStmt->SetParam(index++, nStatus);
            pStmt->SetParam(index++, nType);
            pStmt->SetParam(index++, nCreateTime);
            pStmt->SetParam(index++, nCreateTime);
            bRet = pStmt->ExecuteUpdate();
        }
        delete pStmt;
        pDBManager->RelDBConn(pDBConn);
        if (bRet)
        {
            uint32_t nNow = (uint32_t) time(NULL);
            incMsgCount(nFromId, nToId);
        }
        else
        {
            log("insert message failed: %s", strSql.c_str());
        }
	}
    else
    {
        log("no db connection for teamtalk_master");
    }
	return bRet;
}

void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{
	CacheManager* pCacheManager = CacheManager::getInstance();
	// increase message count
	CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
	if (pCacheConn) {
		pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);
		pCacheManager->RelCacheConn(pCacheConn);
	} else {
		log("no cache connection to increase unread count: %d->%d", nFromId, nToId);
	}
}

long CacheConn::hincrBy(string key, string field, long value)
{
	if (Init()) {
		return -1;
	}

	redisReply* reply = (redisReply *)redisCommand(m_pContext, "HINCRBY %s %s %ld", key.c_str(), field.c_str(), value);
	if (!reply) {
		log("redisCommand failed:%s", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

未读消息计数的key设计:“unread_” + int2string(nToId)
使用一个hash存储同一个user_id对应不同聊天的未读消息数量
请添加图片描述

未读消息计数-群聊

思考:如果群聊和单聊设计类似(群收到消息后,对每个群成员聊天数量+1),会有什么问题?群人多的话,效率低。

bool bRet = pStmt->ExecuteUpdate();
if (bRet)
{
	CGroupModel::getInstance()->updateGroupChat(nGroupId);
	incMessageCount(nFromId, nGroupId);
	clearMessageCount(nFromId, nGroupId);
} else {
	log("insert message failed: %s", strSql.c_str());
}

#define     GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX    "_im_group_msg"
#define     GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX     "_im_user_group"
#define     GROUP_COUNTER_SUBKEY_COUNTER_FIELD          "count"

bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{
    bool bRet = false;
    CacheManager* pCacheManager = CacheManager::getInstance();
    CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn)
    {
        string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
        pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);
        map<string, string> mapGroupCount;
        bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
        if(bRet)
        {
            string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
            if(!strReply.empty())
            {
                bRet = true;
            }
            else
            {
                log("hmset %s failed !", strUserKey.c_str());
            }
        }
        else
        {
            log("hgetAll %s failed!", strGroupKey.c_str());
        }
        pCacheManager->RelCacheConn(pCacheConn);
    }
    else
    {
        log("no cache connection for unread");
    }
    return bRet;
}
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{
    list<uint32_t> lsGroupId;
    CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);
    uint32_t nCount = 0;
    
    CacheManager* pCacheManager = CacheManager::getInstance();
    CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn)
    {
        for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it)
        {
            uint32_t nGroupId = *it;
            string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
            if(strGroupCnt.empty())
            {
//                log("hget %s : count failed !", strGroupKey.c_str());
                continue;
            }
            uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));
            
            string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
            
            uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );
            if(nGroupCnt >= nUserCnt) {
                nCount = nGroupCnt - nUserCnt;
            }
            if(nCount > 0)
            {
                IM::BaseDefine::UnreadInfo cUnreadInfo;
                cUnreadInfo.set_session_id(nGroupId);
                cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);
                cUnreadInfo.set_unread_cnt(nCount);
                nTotalCnt += nCount;
                string strMsgData;
                uint32_t nMsgId;
                IM::BaseDefine::MsgType nType;
                uint32_t nFromId;
                getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);
                if(IM::BaseDefine::MsgType_IsValid(nType))
                {
                    cUnreadInfo.set_latest_msg_id(nMsgId);
                    cUnreadInfo.set_latest_msg_data(strMsgData);
                    cUnreadInfo.set_latest_msg_type(nType);
                    cUnreadInfo.set_latest_msg_from_user_id(nFromId);
                    lsUnreadCount.push_back(cUnreadInfo);
                }
                else
                {
                    log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);
                }
            }
        }
        pCacheManager->RelCacheConn(pCacheConn);
    }
    else
    {
        log("no cache connection for unread");
    }
}

群未读消息计数:
1)一个群 Group_id 对应多个user_id
2) 同一个群 Group_id 对应多个user_id,不同的user_id对应的未读消息数量是不一样的
3)每次发消息时,群消息数量+1,发消息的个人计数也+1
4)未读消息数量 = 群消息数量 - 个人已读消息数量

群成员管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2123128.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI学习】AI科普:专有名词介绍

这里是阿川的博客&#xff0c;祝您变得更强 ✨ 个人主页&#xff1a;在线OJ的阿川 &#x1f496;文章专栏&#xff1a;AI入门到进阶 &#x1f30f;代码仓库&#xff1a; 写在开头 现在您看到的是我的结论或想法&#xff0c;但在这背后凝结了大量的思考、经验和讨论 目录 1.AI序…

TCP通信三次握手、四次挥手

前言 前面我说到了&#xff0c;UDP通信的实现&#xff0c;但我们经常说UDP通信不可靠&#xff0c;是因为他只会接收和发送&#xff0c;并不会去验证对方收到没有&#xff0c;那么我们说TCP通信可靠&#xff0c;就是因为他会进行验证接收端是否能够接收和发送&#xff0c;并且只…

给大家推荐好用的AI网站

地址&#xff1a;https://ai.ashuiai.com/auth/register?inviteCode8E8DIC1QCR 个人觉得挺好用的&#xff0c;可以免费&#xff0c;免费有限制次数&#xff0c;也可以会员升级200永久免费&#xff0c;我用的200永久免费。 可以在国内使用以下ai模型 回答问题更智能&#xff…

计算机毕业设计 校内跑腿业务系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

分享6个我喜欢的常用网站,来看看有没有你感兴趣的!

分享6个我自己很喜欢的常用网站&#xff0c;平时工作生活中经常会打开&#xff0c;来看看有没有你感兴趣的&#xff01; 1.Crazygames crazygames.com/ 一个超赞的在线小游戏平台&#xff0c;上面有超过7000种游戏任你选。不管你喜欢冒险、解谜、闯关&#xff0c;还是装扮、赛…

概要设计例题

答案&#xff1a;A 知识点&#xff1a; 概要设计 设计软件系统的总体结构&#xff1a;采用某种方法&#xff0c;将一个复杂的系统按照功能划分成模块&#xff1b;确定每个模块的功能&#xff1b;确定模块之间的调用关系&#xff1b;确定模块之间的接口&#xff0c;即模块之间…

返工(Rework)与返修(Repair)有何不一样

IATF 16949 汽车业质量管理体系,以客户需求为基础,组织透过相关单位了解客户需求后,向内部流程展开,目的是确保从研发到出货每个环节都能满足客户需求,同时管控制造过程的效率及良率,使产线能够稳定及准时交货给客户。 IATF 16949 条文中,针对「返工(Rework)」与「返修(…

linux工具的使用

1.yum和apt的概念与使用 yum 和 apt 是两种不同的包管理工具&#xff0c;用于在 Linux 系统上管理软件包。 yum (Yellowdog Updater, Modified) 发行版: 用于基于 RPM 的发行版&#xff0c;如 CentOS、RHEL 和 Fedora。基本命令: 更新包列表: sudo yum update安装包: sudo y…

Sky Takeaway

软件开发整体介绍 软件开发流程 角色分工 软件环境 苍穹外卖 项目介绍 定位&#xff1a;专门为餐饮企业定制的一款软件产品 技术选型 前端环境搭建 阅读readme文档 nginx.exe放入无中文目录运行并启动 后端环境搭建 项目结构 Nginx反向代理 优点 配置 Nginx反向代理 负…

QXlsx编译静态库-配置为Qt模块

Qt读写Excel–QXlsx编译为静态库-配置为Qt模块&#x1f346; 文章目录 Qt读写Excel--QXlsx编译为静态库-配置为Qt模块&#x1f346;[toc]1、概述&#x1f954;2、准备工作&#x1f955;3、配置环境&#x1f33d;4、加载QXlsx静态库&#x1f952; &#x1f449;QXlsx使用&#x…

《深度学习》OpenCV 高阶 图像金字塔 用法解析及案例实现

目录 一、图像金字塔 1、什么是图像金字塔 2、图像金字塔作用 1&#xff09;金字塔尺度间的图像信息补充 2&#xff09;目标检测与识别 3&#xff09;图像融合与拼接 4&#xff09;图像增强与去噪 5&#xff09;图像压缩与编码 二、用法解析 1、向下采样 1&#xff09;概念…

【C++11 ——— 可变参数模板】

C11 ——— 可变参数模板 可变参数模板的概念可变参数模板的定义方式参数包的展开递归式展开参数包逗号表达式展开参数包 emplaceemplace 的使用emplace 的优势 可变参数模板的概念 在C11之前,函数模板和类模板中的模板参数数量是固定的。可变参数模板打破了这个限制,提供了一…

Visual Studio汇编代码高亮与自动补全

高亮插件&#xff1a;AsmDude (可以按照我的颜色进行设置&#xff0c;或者你自己改) 代码自动补全&#xff1a;CodeGeex (功能很多&#xff0c;支持的语言很多)&#xff0c;按Tab补全

Gitea Action注册runner

我的是gitea也可以和github 兼容&#xff0c;只是没有github 那么靓而已 安装一个gitea仓库 docker run -d --name gitea \-p3000:3000 -p2222:22 \-v /git/data:/data \ -v /etc/timezone:/etc/timezone:ro \-v /etc/localtime:/etc/localtime:ro \gitea/gitea:1.21.1setti…

嵌入式实时操作系统(RTOS):原理、应用与发展

摘要&#xff1a;本文围绕嵌入式实时操作系统&#xff08;RTOS&#xff09;展开。首先介绍嵌入式系统与实时操作系统的概念&#xff0c;阐述嵌入式 RTOS 的体系结构。接着分析其关键特性&#xff0c;包含任务管理&#xff08;如任务的创建与删除、调度、同步与通信&#xff09;…

基于SSM架构的农产品朔源系统

项目描述 这是一款基于SSM架构的农产品朔源系统 模块描述 农产品溯源系统 1、农产品管理 农产品列表 新增农产品 2、二维码管理 二维码列表 3、溯源管理 溯源列表 溯源图表 4、 企业管理 设置 添加企业 截图

ts复合流讲解

一、什么是复合流 复合流指的是一条音视频数据流中同时包含了音频ES和视频ES数据&#xff08;ES指的是从编码器出来的音视频裸流比如H264&#xff0c;AAC&#xff09;。在音视频开发中最常见的复合流一般是TS、MP4、flv等。TS和flv一般用于网络传输&#xff0c;MP4一般用于本地…

【区块链 + 人才服务】教育区域初中综合素质评价系统 | FISCO BCOS应用案例

根据国家及相关省份制定的高中阶段学校考试招生制度改革实施意见&#xff0c;全国部分地市将开展初中学生综合素质评 价工作。评价将从思想品德、学业水平、身心健康、艺术素养和社会实践五个维度来记录学生的发展过程。例如&#xff0c; 学生的党团社团活动参与情况、公益活动…

windows手工杀毒-关闭恶意弹窗

上篇回顾&#xff1a;windows手工杀毒-寻找可疑进程之网络连接-CSDN博客 上篇主要介绍了如何通过网络连接发现可疑进程。滥用公认端口的软件可能是可疑软件&#xff0c;因为占用公认端口&#xff0c;可能导致正常服务不能正常使用。可以查询ip或域名的相关情报信息&…

一篇文章了解Pytest单元测试框架

文章目录 1.Pytest是什么2.Pytest的安装3.Pytest快速入门4.Pytest文件规范5.常用的断言类型 1.Pytest是什么 pytest 是一个功能强大且灵活的 Python 测试框架,主要优点包括简洁易用、自动测试发现、丰富的插件生态系统、参数化测试、详细的断言错误信息、以及强大的 fixtures …