msg_server发送消息
信令
//service id 0x0003
message IMMsgData{
//cmd id: 0x0301
required uint32 from_user_id = 1; //消息发送方
required uint32 to_session_id = 2; //消息接受方
required uint32 msg_id = 3; // 非常重要:由谁产生?答:redis具体见下文
required uint32 create_time = 4;
required IM.BaseDefine.MsgType msg_type = 5; // 单聊或者群聊
required bytes msg_data = 6;
optional bytes attach_data = 20;
}
message IMMsgDataAck{
//cmd id: 0x0302
required uint32 user_id = 1; //发送此信令的用户id
required uint32 session_id = 2;
required uint32 msg_id = 3;
required IM.BaseDefine.SessionType session_type = 4;
}
流程图:
客户端A发送消息给客户端B,信令为CID_MSG_DATA,msg_server收到后调用 _HandleClientMsgData 函数
void CMsgConn::HandlePdu(CImPdu* pPdu)
{
// request authorization check
if (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {
log("HandlePdu, wrong msg. ");
throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");
return;
}
switch (pPdu->GetCommandId()) {
// ...... 省略无关逻辑
case CID_MSG_DATA:
_HandleClientMsgData(pPdu);
break;
case CID_MSG_DATA_ACK:
_HandleClientMsgDataAck(pPdu);
break;
// ...... 省略无关逻辑
default:
log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());
break;
}
}
void CMsgConn::_HandleClientMsgData(CImPdu* pPdu)
{
IM::Message::IMMsgData msg;
CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
if (msg.msg_data().length() == 0) {
log("discard an empty message, uid=%u ", GetUserId());
return;
}
if (m_msg_cnt_per_sec >= MAX_MSG_CNT_PER_SECOND) {
log("!!!too much msg cnt in one second, uid=%u ", GetUserId());
return;
}
if (msg.from_user_id() == msg.to_session_id() && CHECK_MSG_TYPE_SINGLE(msg.msg_type()))
{
log("!!!from_user_id == to_user_id. ");
return;
}
m_msg_cnt_per_sec++;
uint32_t to_session_id = msg.to_session_id();
uint32_t msg_id = msg.msg_id();
uint8_t msg_type = msg.msg_type();
string msg_data = msg.msg_data();
if (g_log_msg_toggle) {
log("HandleClientMsgData, %d->%d, msg_type=%u, msg_id=%u. ", GetUserId(), to_session_id, msg_type, msg_id);
}
uint32_t cur_time = time(NULL);
CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);
msg.set_from_user_id(GetUserId());
msg.set_create_time(cur_time);
msg.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());
pPdu->SetPBMsg(&msg);
// send to DB storage server
CDBServConn* pDbConn = get_db_serv_conn();
if (pDbConn) {
pDbConn->SendPdu(pPdu);
}
}
该函数直接将数据包转发给db_proxy_server,db_proxy_server有一个map来映射信令所对应的处理函数。
// message content
m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));
void sendMessage(CImPdu* pPdu, uint32_t conn_uuid)
{
IM::Message::IMMsgData msg;
if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
{
uint32_t nFromId = msg.from_user_id();
uint32_t nToId = msg.to_session_id();
uint32_t nCreateTime = msg.create_time();
IM::BaseDefine::MsgType nMsgType = msg.msg_type();
uint32_t nMsgLen = msg.msg_data().length();
uint32_t nNow = (uint32_t)time(NULL);
if (IM::BaseDefine::MsgType_IsValid(nMsgType))
{
if(nMsgLen != 0)
{
CImPdu* pPduResp = new CImPdu;
uint32_t nMsgId = INVALID_VALUE;
uint32_t nSessionId = INVALID_VALUE;
uint32_t nPeerSessionId = INVALID_VALUE;
CMessageModel* pMsgModel = CMessageModel::getInstance();
CGroupMessageModel* pGroupMsgModel = CGroupMessageModel::getInstance();
if(nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_TEXT) {
// ...... 省略无关逻辑
} else if (nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_AUDIO) {
// ...... 省略无关逻辑
} else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {
if (nFromId != nToId) {
nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
if (INVALID_VALUE == nSessionId) {
// 创建会话
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);
}
nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
if(INVALID_VALUE == nPeerSessionId)
{
// 创建会话关系ID
nSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);
}
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE)
{
nMsgId = pMsgModel->getMsgId(nRelateId);
if(nMsgId != INVALID_VALUE)
{
// 写入消息到数据库
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());
CSessionModel::getInstance()->updateSession(nSessionId, nNow);
CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
}
else
{
log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
}
}
else{
log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);
}
}
else
{
log("send msg to self. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
}
} else if(nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO) {
// ...... 省略无关逻辑
}
log("fromId=%u, toId=%u, type=%u, msgId=%u, sessionId=%u", nFromId, nToId, nMsgType, nMsgId, nSessionId);
msg.set_msg_id(nMsgId);
pPduResp->SetPBMsg(&msg);
pPduResp->SetSeqNum(pPdu->GetSeqNum());
pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);
pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_DATA);
CProxyConn::AddResponsePdu(conn_uuid, pPduResp);
}
else
{
log("msgLen error. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
}
}
else
{
log("invalid msgType.fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);
}
}
else
{
log("parse pb failed");
}
}
msg_server收到CID_MSG_DATA信令后调用 _HandleMsgData 函数
void CDBServConn::HandlePdu(CImPdu* pPdu)
{
switch (pPdu->GetCommandId()) {
// ...... 省略无关逻辑
case CID_MSG_DATA:
_HandleMsgData(pPdu);
break;
// ...... 省略无关逻辑
default:
log("db server, wrong cmd id=%d ", pPdu->GetCommandId());
}
}
CDBServConn::_HandleMsgData 处理消息有三点
- 首先 ack 客户端
- 然后,发送到route_server(为啥这么做?不理解)
- 广播给其他客户端 (为啥这么做?不理解)
void CDBServConn::_HandleMsgData(CImPdu *pPdu)
{
IM::Message::IMMsgData msg;
CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {
s_group_chat->HandleGroupMessage(pPdu);
return;
}
uint32_t from_user_id = msg.from_user_id();
uint32_t to_user_id = msg.to_session_id();
uint32_t msg_id = msg.msg_id();
if (msg_id == 0) {
log("HandleMsgData, write db failed, %u->%u.", from_user_id, to_user_id);
return;
}
uint8_t msg_type = msg.msg_type();
CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
uint32_t handle = attach_data.GetHandle();
log("HandleMsgData, from_user_id=%u, to_user_id=%u, msg_id=%u.", from_user_id, to_user_id, msg_id);
CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id, attach_data.GetHandle());
if (pMsgConn)
{
IM::Message::IMMsgDataAck msg2;
msg2.set_user_id(from_user_id);
msg2.set_msg_id(msg_id);
msg2.set_session_id(to_user_id);
msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_SINGLE);
CImPdu pdu;
pdu.SetPBMsg(&msg2);
pdu.SetServiceId(SID_MSG);
pdu.SetCommandId(CID_MSG_DATA_ACK);
pdu.SetSeqNum(pPdu->GetSeqNum());
pMsgConn->SendPdu(&pdu);
}
CRouteServConn* pRouteConn = get_route_serv_conn();
if (pRouteConn) {
pRouteConn->SendPdu(pPdu);
}
msg.clear_attach_data();
pPdu->SetPBMsg(&msg);
CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);
CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);
pPdu->SetSeqNum(0);
if (pFromImUser) {
pFromImUser->BroadcastClientMsgData(pPdu, msg_id, pMsgConn, from_user_id);
}
if (pToImUser) {
pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);
}
IM::Server::IMGetDeviceTokenReq msg3;
msg3.add_user_id(to_user_id);
msg3.set_attach_data(pPdu->GetBodyData(), pPdu->GetBodyLength());
CImPdu pdu2;
pdu2.SetPBMsg(&msg3);
pdu2.SetServiceId(SID_OTHER);
pdu2.SetCommandId(CID_OTHER_GET_DEVICE_TOKEN_REQ);
SendPdu(&pdu2);
}
msg_id:两个人之间的映射关系。如果按照时间排列,两个客户端之间的时间可能不一样,所以按照序号生成消息id。
4. 每条消息id唯一
5. 使用redis生成消息id
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{
uint32_t nMsgId = 0;
CacheManager* pCacheManager = CacheManager::getInstance();
CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");
if(pCacheConn)
{
string strKey = "msg_id_" + int2string(nRelateId);
nMsgId = pCacheConn->incrBy(strKey, 1);
pCacheManager->RelCacheConn(pCacheConn);
}
return nMsgId;
}