文章目录
- main函数主流程
- 关键步骤
- 线程池
- redis缓存
- 未读消息计数
- 未读消息计数-单聊
- 未读消息计数-群聊
- 群成员管理
main函数主流程
关键步骤
- 初始化epoll + 线程池
- 数据入口 reactor CProxyConn::HandlePduBuf
- 异步task任务封装,把任务放入线程池;线程池里的线程异步执行任务,然后放入回复列表 CProxyConn::AddResponsePdu()
- epoll 主线程读取回复列表的数据发送给请求端 CProxyConn::SendResponsePduList()
db_proxy_server 处理逻辑与其他server不同的是 epoll(主线程)解析 pdu,然后封装成task任务,再给到线程池。
void CProxyConn::HandlePduBuf(uchar_t* pdu_buf, uint32_t pdu_len)
{
CImPdu* pPdu = NULL;
pPdu = CImPdu::ReadPdu(pdu_buf, pdu_len);
if (pPdu->GetCommandId() == IM::BaseDefine::CID_OTHER_HEARTBEAT) {
return;
}
pdu_handler_t handler = s_handler_map->GetHandler(pPdu->GetCommandId());
if (handler) {
CTask* pTask = new CProxyTask(m_uuid, handler, pPdu);
g_thread_pool.AddTask(pTask);
} else {
log("no handler for packet type: %d", pPdu->GetCommandId());
}
}
// 工作线程调用
void CProxyConn::AddResponsePdu(uint32_t conn_uuid, CImPdu* pPdu)
{
ResponsePdu_t* pResp = new ResponsePdu_t;
pResp->conn_uuid = conn_uuid;
pResp->pPdu = pPdu;
s_list_lock.lock();
s_response_pdu_list.push_back(pResp);
s_list_lock.unlock();
}
void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
CProxyConn::SendResponsePduList(); // epoll 主线程调用
}
// 主线程调用
void CProxyConn::SendResponsePduList()
{
s_list_lock.lock();
while (!s_response_pdu_list.empty()) {
ResponsePdu_t* pResp = s_response_pdu_list.front();
s_response_pdu_list.pop_front();
s_list_lock.unlock();
CProxyConn* pConn = get_proxy_conn_by_uuid(pResp->conn_uuid);
if (pConn) {
if (pResp->pPdu) {
pConn->SendPdu(pResp->pPdu);
} else {
log("close connection uuid=%d by parse pdu error\b", pResp->conn_uuid);
pConn->Close();
}
}
if (pResp->pPdu)
delete pResp->pPdu;
delete pResp;
s_list_lock.lock();
}
s_list_lock.unlock();
}
m_uuid 作用和 socketfd 一致,但是不用 socketfd 是避免在线重连导致 socketfd 复用。
或者说由于处理请求和发送回复在两个线程,socket的handle可能重用(怎么理解???),所以需要用一个一直增加的uuid来表示一个连接
线程池
int init_proxy_conn(uint32_t thread_num)
{
s_handler_map = CHandlerMap::getInstance(); // 根据CmdID找到处理函数
g_thread_pool.Init(thread_num);
netlib_add_loop(proxy_loop_callback, NULL); // 回发数据
signal(SIGTERM, sig_handler);
return netlib_register_timer(proxy_timer_callback, NULL, 1000);
}
void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
CProxyConn::SendResponsePduList(); // epoll 主线程调用
}
int CThreadPool::Init(uint32_t worker_size)
{
m_worker_size = worker_size;
m_worker_list = new CWorkerThread [m_worker_size];
if (!m_worker_list) {
return 1;
}
for (uint32_t i = 0; i < m_worker_size; i++) {
m_worker_list[i].SetThreadIdx(i);
m_worker_list[i].Start();
}
return 0;
}
redis缓存
- 连接数量如何设置?同步连接池,应该测试为准,经验redis cpu核数2;mysql核数4(不理解???)
- 为什么分开不同的db?便于水平扩展。
- pool_name的意义?做抽象,不必关注redis是否分机。
class CacheManager {
public:
virtual ~CacheManager();
static CacheManager* getInstance();
int Init();
CacheConn* GetCacheConn(const char* pool_name);
void RelCacheConn(CacheConn* pCacheConn);
private:
CacheManager();
private:
static CacheManager* s_cache_manager;
map<string, CachePool*> m_cache_pool_map; // 缓存连接池
};
未读消息计数
存储在unread连接池所在的redis数据库
- 单聊的消息ID设计:
key设计为"msg_id_" + nRelateId
函数:nMsgId = pMsgModel->getMsgId(nRelateId);
nRelateId 来自于mysql的自增id,取决于两个人之间的映射 - 群聊的消息ID设计:
key设计为 “group_msg_id_” + nGroupId
函数:nMsgId = pGroupMsgModel->getMsgId(nToId);
未读消息计数-单聊
当发送消息时调用
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());
else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {
if (nFromId != nToId) {
nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
if (INVALID_VALUE == nSessionId) {
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);
}
nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
if(INVALID_VALUE == nPeerSessionId)
{
nSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);
}
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE)
{
nMsgId = pMsgModel->getMsgId(nRelateId);
if(nMsgId != INVALID_VALUE)
{
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());
CSessionModel::getInstance()->updateSession(nSessionId, nNow);
CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
}
else
{
log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
}
}
else{
log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
}
}
bool CMessageModel::sendMessage(uint32_t nRelateId, uint32_t nFromId, uint32_t nToId, IM::BaseDefine::MsgType nMsgType, uint32_t nCreateTime, uint32_t nMsgId, string& strMsgContent)
{
bool bRet =false;
if (nFromId == 0 || nToId == 0) {
log("invalied userId.%u->%u", nFromId, nToId);
return bRet;
}
CDBManager* pDBManager = CDBManager::getInstance();
CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");
if (pDBConn)
{
string strTableName = "IMMessage_" + int2string(nRelateId % 8);
string strSql = "insert into " + strTableName + " (`relateId`, `fromId`, `toId`, `msgId`, `content`, `status`, `type`, `created`, `updated`) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
// 必须在释放连接前delete CPrepareStatement对象,否则有可能多个线程操作mysql对象,会crash
CPrepareStatement* pStmt = new CPrepareStatement();
if (pStmt->Init(pDBConn->GetMysql(), strSql))
{
uint32_t nStatus = 0;
uint32_t nType = nMsgType;
uint32_t index = 0;
pStmt->SetParam(index++, nRelateId);
pStmt->SetParam(index++, nFromId);
pStmt->SetParam(index++, nToId);
pStmt->SetParam(index++, nMsgId);
pStmt->SetParam(index++, strMsgContent);
pStmt->SetParam(index++, nStatus);
pStmt->SetParam(index++, nType);
pStmt->SetParam(index++, nCreateTime);
pStmt->SetParam(index++, nCreateTime);
bRet = pStmt->ExecuteUpdate();
}
delete pStmt;
pDBManager->RelDBConn(pDBConn);
if (bRet)
{
uint32_t nNow = (uint32_t) time(NULL);
incMsgCount(nFromId, nToId);
}
else
{
log("insert message failed: %s", strSql.c_str());
}
}
else
{
log("no db connection for teamtalk_master");
}
return bRet;
}
void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{
CacheManager* pCacheManager = CacheManager::getInstance();
// increase message count
CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
if (pCacheConn) {
pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);
pCacheManager->RelCacheConn(pCacheConn);
} else {
log("no cache connection to increase unread count: %d->%d", nFromId, nToId);
}
}
long CacheConn::hincrBy(string key, string field, long value)
{
if (Init()) {
return -1;
}
redisReply* reply = (redisReply *)redisCommand(m_pContext, "HINCRBY %s %s %ld", key.c_str(), field.c_str(), value);
if (!reply) {
log("redisCommand failed:%s", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
未读消息计数的key设计:“unread_” + int2string(nToId)
使用一个hash存储同一个user_id对应不同聊天的未读消息数量
未读消息计数-群聊
思考:如果群聊和单聊设计类似(群收到消息后,对每个群成员聊天数量+1),会有什么问题?群人多的话,效率低。
bool bRet = pStmt->ExecuteUpdate();
if (bRet)
{
CGroupModel::getInstance()->updateGroupChat(nGroupId);
incMessageCount(nFromId, nGroupId);
clearMessageCount(nFromId, nGroupId);
} else {
log("insert message failed: %s", strSql.c_str());
}
#define GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX "_im_group_msg"
#define GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX "_im_user_group"
#define GROUP_COUNTER_SUBKEY_COUNTER_FIELD "count"
bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{
bool bRet = false;
CacheManager* pCacheManager = CacheManager::getInstance();
CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
if (pCacheConn)
{
string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);
map<string, string> mapGroupCount;
bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
if(bRet)
{
string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
if(!strReply.empty())
{
bRet = true;
}
else
{
log("hmset %s failed !", strUserKey.c_str());
}
}
else
{
log("hgetAll %s failed!", strGroupKey.c_str());
}
pCacheManager->RelCacheConn(pCacheConn);
}
else
{
log("no cache connection for unread");
}
return bRet;
}
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{
list<uint32_t> lsGroupId;
CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);
uint32_t nCount = 0;
CacheManager* pCacheManager = CacheManager::getInstance();
CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
if (pCacheConn)
{
for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it)
{
uint32_t nGroupId = *it;
string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
if(strGroupCnt.empty())
{
// log("hget %s : count failed !", strGroupKey.c_str());
continue;
}
uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));
string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );
if(nGroupCnt >= nUserCnt) {
nCount = nGroupCnt - nUserCnt;
}
if(nCount > 0)
{
IM::BaseDefine::UnreadInfo cUnreadInfo;
cUnreadInfo.set_session_id(nGroupId);
cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);
cUnreadInfo.set_unread_cnt(nCount);
nTotalCnt += nCount;
string strMsgData;
uint32_t nMsgId;
IM::BaseDefine::MsgType nType;
uint32_t nFromId;
getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);
if(IM::BaseDefine::MsgType_IsValid(nType))
{
cUnreadInfo.set_latest_msg_id(nMsgId);
cUnreadInfo.set_latest_msg_data(strMsgData);
cUnreadInfo.set_latest_msg_type(nType);
cUnreadInfo.set_latest_msg_from_user_id(nFromId);
lsUnreadCount.push_back(cUnreadInfo);
}
else
{
log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);
}
}
}
pCacheManager->RelCacheConn(pCacheConn);
}
else
{
log("no cache connection for unread");
}
}
群未读消息计数:
1)一个群 Group_id 对应多个user_id
2) 同一个群 Group_id 对应多个user_id,不同的user_id对应的未读消息数量是不一样的
3)每次发消息时,群消息数量+1,发消息的个人计数也+1
4)未读消息数量 = 群消息数量 - 个人已读消息数量