IM项目------消息存储子服务

news2024/9/24 11:32:53

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 消息存储表
  • 消息存储表操作句柄
  • ES消息操作句柄
    • 创建索引
    • 添加数据
    • 删除数据
    • 查询消息
  • Message服务类
  • rpc业务代码
    • 获取指定会话的最近N条消息
      • 获取用户信息
      • 获取文件内容
    • 获取指定会话的时间返回消息
    • 关键字消息搜索


前言

消息存储子服务通过MQ消费者客户端订阅队列消息,来消费消息转发子服务发来的消息。需要将消息存储进mysql数据库/ES搜索引擎,另外文件消息的文件需要通过文件子服务存储进行上传。
消息存储子服务主要负责消息的存储以及提供消息的各种操作。提供三个服务:获取指定会话的最近N条消息/获取指定会话时间范围内的消息/按关键字搜索消息。


消息存储表

包含的字段:消息ID,会话ID,发送者ID,消息类型,产生时间,消息内容,文件ID,文件名称,文件大小。
其中会话ID需要添加索引,后面查询消息都是按会话ID来查询的。
消息内容是只有文本消息才填写的。
文件ID是图片/语音/文件消息才有的,当收到这个类型的消息,需要上传到文件存贮子服务上去,同时返回文件ID
文件名称和文件大小都是文件消息才有的字段。

#pragma db id auto
unsigned long _id;
#pragma db type("varchar(64)") index unique
std::string _message_id;
#pragma db type("varchar(64)") index
std::string _session_id;                //所属会话ID
#pragma db type("varchar(64)")
std::string _user_id;                   //发送者用户ID
unsigned char _message_type;            //消息类型 0-文本;1-图片;2-文件;3-语音
#pragma db type("TIMESTAMP")
boost::posix_time::ptime _create_time;  //消息的产生时间

odb::nullable<std::string> _content;    //文本消息内容--非文本消息可以忽略
#pragma db type("varchar(64)")
odb::nullable<std::string> _file_id;    //文件消息的文件ID -- 文本消息忽略
#pragma db type("varchar(128)")
odb::nullable<std::string> _file_name;  //文件消息的文件名称 -- 只针对文件消息有效
odb::nullable<unsigned int> _file_size; //文件消息的文件大小 -- 只针对文件消息有效

消息存储表操作句柄

需要提供四个操作:
//• 新增消息 ---- 从MQ中消费一条消息时执行
//• 删除指定会话消息 ---- 删除好友/群聊时执行
//• 通过会话 ID,时间范围,获取指定时间段之内的消息,并按时间进行排序
//• 通过会话 ID,消息数量,获取最近的 N 条消息(逆序+limit 即可)

新增消息,外部直接传入一个Message对象进行插入。

bool insert(Message &msg) {
    try {
        odb::transaction trans(_db->begin());
        _db->persist(msg);
        trans.commit();
    }catch (std::exception &e) {
        LOG_ERROR("新增消息失败 {}:{}!", msg.message_id(),e.what());
        return false;
    }
    return true;
}

删除消息,通过会话ID进行删除。当删除好友或者删除群聊时会话被删除了,就需要删除指定会话的消息。

bool remove(const std::string &ssid) {
    try {
        odb::transaction trans(_db->begin());
        typedef odb::query<Message> query;
        typedef odb::result<Message> result;
        _db->erase_query<Message>(query::session_id == ssid);
        trans.commit();
    }catch (std::exception &e) {
        LOG_ERROR("删除会话所有消息失败 {}:{}!", ssid, e.what());
        return false;
    }
    return true;
}

获取指定会话的最近N条消息。根据消息产生时间降序,并且Limit指定个数,就可以获取最近N条消息。
返回一个Message数组,需要给这个数组逆序一下哎,方便前端进行渲染。

std::vector<Message> recent(const std::string &ssid, int count) {
   std::vector<Message> res;
   try {
       odb::transaction trans(_db->begin());
       typedef odb::query<Message> query;
       typedef odb::result<Message> result;
       //本次查询是以ssid作为过滤条件,然后进行以时间字段进行逆序,通过limit
       // session_id='xx' order by create_time desc  limit count;
       std::stringstream cond;
       cond << "session_id='" << ssid << "' ";
       cond << "order by create_time desc limit " << count;
       result r(_db->query<Message>(cond.str()));
       for (result::iterator i(r.begin()); i != r.end(); ++i) {
           res.push_back(*i);
       }
       std::reverse(res.begin(), res.end());
       trans.commit();
   }catch (std::exception &e) {
       LOG_ERROR("获取最近消息失败:{}-{}-{}!", ssid, count, e.what());
   }
   return res;
}

获取指定会话的时间范围内的消息。就是where >=stime <=etime就行。
也是返回一个MEssage数组。

std::vector<Message> range(const std::string &ssid, 
    boost::posix_time::ptime &stime, 
    boost::posix_time::ptime &etime) {
    std::vector<Message> res;
    try {
        odb::transaction trans(_db->begin());
        typedef odb::query<Message> query;
        typedef odb::result<Message> result;
        //获取指定会话指定时间段的信息
        result r(_db->query<Message>(query::session_id == ssid && 
            query::create_time >= stime &&
            query::create_time <= etime));
        for (result::iterator i(r.begin()); i != r.end(); ++i) {
            res.push_back(*i);
        }
        trans.commit();
    }catch (std::exception &e) {
        //将ptime类型转换为string类型进行日志打印
        LOG_ERROR("获取区间消息失败:{}-{}:{}-{}!", ssid, 
            boost::posix_time::to_simple_string(stime), 
            boost::posix_time::to_simple_string(etime), e.what());
    }
    return res;
}

ES消息操作句柄

需要把文本消息存储进ES搜索引擎中,方便前端进行关键字的消息查询,放进ES中是由于通过MYSQL的模糊查询效率太低了。
前面我们以及封装了一个ES操作类了,我们需要对这个类进一步封装,让他更加贴合与我们消息存储子服务。

创建索引

创建索引,索引字段需要有用户ID,消息ID,产生时间,会话ID,消息内容。
正文的是需要分词的,且需要参与索引。会话ID也是需要参与索引的。前端是查询指定会话的指定关键字的消息。
这里存入用户ID/消息ID/产生时间/是为了方便我们查询到消息后直接构建完整的消息类型。通过用户ID向用户子服务获取用户消息就可以构建完整的消息类型了。

bool createIndex() {
    bool ret = ESIndex(_es_client, "message")
        .append("user_id", "keyword", "standard", false)
        .append("message_id", "keyword", "standard", false)
        .append("create_time", "long", "standard", false)
        .append("chat_session_id", "keyword", "standard", true)
        .append("content")
        .create();
    if (ret == false) {
        LOG_INFO("消息信息索引创建失败!");
        return false;
    }
    LOG_INFO("消息信息索引创建成功!");
    return true;
}

添加数据

从消息队列获取到消息后,如果是文本消息则插入进ES中。这里是通过消息ID作为文档ID进行插入的。

bool appendData(const std::string &user_id,
    const std::string &message_id,
    const long create_time,
    const std::string &chat_session_id,
    const std::string &content) {
    bool ret = ESInsert(_es_client, "message")
        .append("message_id", message_id)
        .append("create_time", create_time)
        .append("user_id", user_id)
        .append("chat_session_id", chat_session_id)
        .append("content", content)
        .insert(message_id);
    if (ret == false) {
        LOG_ERROR("消息数据插入/更新失败!");
        return false;
    }
    LOG_INFO("消息数据新增/更新成功!");
    return true;
}

删除数据

通过文档ID也就是消息ID进行删除。

 bool remove(const std::string &mid) {
    bool ret = ESRemove(_es_client, "message").remove(mid);
    if (ret == false) {
        LOG_ERROR("消息数据删除失败!");
        return false;
    }
    LOG_INFO("消息数据删除成功!");
    return true;
}

查询消息

通过前端输入的关键字和会话ID进行查询,因为在ES中存储的消息产生时间是一个时间戳,在我们odb映射类型中是ptime,需要进行一个转换。返回值是一个Message数组。这里ES查询数据和我们前面查询用户有些不同。
查询用户需要设置一个must_not和should参数。
这里查询消息,是通过must_term和must_match字段进行查询的。
must_term是精确匹配,一般用于keyword字段,我们这是是会话ID字段必须精确匹配。
must_match是全文搜索,一般用于text字段。也就是content字段必须匹配用户的关键字。

std::vector<lkm_im::Message> search(const std::string &key, const std::string &ssid) {
   std::vector<lkm_im::Message> res;
   Json::Value json_user = ESSearch(_es_client, "message")
       .append_must_term("chat_session_id.keyword", ssid)
       .append_must_match("content", key)
       .search();
   if (json_user.isArray() == false) {
       LOG_ERROR("用户搜索结果为空,或者结果不是数组类型");
       return res;
   }
   int sz = json_user.size();
   LOG_DEBUG("检索结果条目数量:{}", sz);
   for (int i = 0; i < sz; i++) {
       lkm_im::Message message;
       message.user_id(json_user[i]["_source"]["user_id"].asString());
       message.message_id(json_user[i]["_source"]["message_id"].asString());
       boost::posix_time::ptime ctime(boost::posix_time::from_time_t(
           json_user[i]["_source"]["create_time"].asInt64()));
       message.create_time(ctime);
       message.session_id(json_user[i]["_source"]["chat_session_id"].asString());
       message.content(json_user[i]["_source"]["content"].asString());
       res.push_back(message);
   }
   return res;
}

Message服务类

消息存贮子服务不光要搭建一个rpc服务器。还有一个MQ消费者客户端。
这个客户端订阅了消息转发子服务创建的队列,当消息转发子服务收到一个新消息后,就会把该消息生产进消息队列中。
所以我们的服务类中,需要订阅这个队列,同时提供一个回调函数,当消息来临进行消费。

根据消息首地址和消息长度提取出完整的报文,进行反序列化,提取出完整的消息类型。
根据不同的消息类型进行不同的处理,
如果是文本消息,则需要存储进ES中。同时需要存储进MYsql消息表中。这里文本消息所需要的字段都有,直接存进mysql就行。
如果是其他类型的消息,在Mysql中是需要存一个文件ID的,我们需要把文件内容上传到文件存储子服务中,获取返回的文件ID,在Mysql进行一个存储。另外如果是文件类型的消息,还需要存入文件名和文件大小。
这里提一下上传消息的req中有一个FileUploadData类型,这个类型就是需要提供文件名称,文件大小,文件内容。而我们的图片类型消息和语音类型消息是没有文件名称和文件大小的。所以这里我们在上传文件时填入空。

proto中定义的消息类型中消息创建时间是一个时间戳int64类型,在mysql映射类时ptime因此需要进行转换。

void onMessage(const char *body, size_t sz) {
	LOG_DEBUG("收到新消息,进行存储处理!");
	//1. 取出序列化的消息内容,进行反序列化
	lkm_im::MessageInfo message;
	bool ret = message.ParseFromArray(body, sz);
	if (ret == false) {
	    LOG_ERROR("对消费到的消息进行反序列化失败!");
	    return;
	}
	//2. 根据不同的消息类型进行不同的处理
	std::string file_id, file_name, content;
	int64_t file_size;
	switch(message.message().message_type()) {
	    //  1. 如果是一个文本类型消息,取元信息存储到ES中
	    case MessageType::STRING:
	        content = message.message().string_message().content();
	        ret = _es_message->appendData(
	            message.sender().user_id(),
	            message.message_id(),
	            message.timestamp(),
	            message.chat_session_id(),
	            content);
	        if (ret == false) {
	            LOG_ERROR("文本消息向存储引擎进行存储失败!");
	            return;
	        }
	        break;
	    //  2. 如果是一个图片/语音/文件消息,则取出数据存储到文件子服务中,并获取文件ID
	    case MessageType::IMAGE:
	        {
	            const auto &msg = message.message().image_message();
	            ret = _PutFile("", msg.image_content(), msg.image_content().size(), file_id);
	            if (ret == false) {
	                LOG_ERROR("上传图片到文件子服务失败!");
	                return ;
	            }
	        }
	        break;
	    case MessageType::FILE:
	        {
	            const auto &msg = message.message().file_message();
	            file_name = msg.file_name();
	            file_size = msg.file_size();
	            ret = _PutFile(file_name, msg.file_contents(), file_size, file_id);
	            if (ret == false) {
	                LOG_ERROR("上传文件到文件子服务失败!");
	                return ;
	            }
	        }
	        break;
	    case MessageType::SPEECH:
	        {
	            const auto &msg = message.message().speech_message();
	            ret = _PutFile("", msg.file_contents(), msg.file_contents().size(), file_id);
	            if (ret == false) {
	                LOG_ERROR("上传语音到文件子服务失败!");
	                return ;
	            }
	        }
	        break;
	    default:
	        LOG_ERROR("消息类型错误!");
	        return;
	}
	//3. 提取消息的元信息,存储到mysql数据库中
	lkm_im::Message msg(message.message_id(), 
	    message.chat_session_id(),
	    message.sender().user_id(),
	    message.message().message_type(),
	    boost::posix_time::from_time_t(message.timestamp()));
	msg.content(content);
	msg.file_id(file_id);
	msg.file_name(file_name);
	msg.file_size(file_size);
	ret = _mysql_message->insert(msg);
	if (ret == false) {
	    LOG_ERROR("向数据库插入新消息失败!");
	    return;
	}
	}

rpc业务代码

消息存储自服务提供了三个服务:

  • 获取指定会话的最近N条消息
  • 获取指定会话的时间范围的消息
  • 通过关键字查询指定会话的消息

获取指定会话的最近N条消息

先从req中提取出会话ID和获取消息个数。
去Mysql消息表查询,获取到一个Message数组。
因为我们要组织完整的消息结构,所以需要获取到发送者信息。另外如果时文件消息,还需要从文件存储子服务获取文件。

获取用户信息

这里需要提供请求ID,和一个用户ID列表。请求ID就是客户端发来的请求Id,用户列表就是mysql查出的Message数组里的用户ID。
我们调用获取一组用户信息的服务,响应中会返回一个 goole::protobuf::map<string, UserInfo> 数组。我们遍历这个数组,把他插入到我们自己的std::unordered_map中,其中first就是用户ID,second就是用户信息。

bool _GetUser(const std::string &rid,
    const std::unordered_set<std::string> &user_id_lists,
    std::unordered_map<std::string, UserInfo> &user_lists) {
    auto channel = _mm_channels->choose(_user_service_name);
    if (!channel) {
        LOG_ERROR("{} 没有可供访问的用户子服务节点!",  _user_service_name);
        return false;
    }
    UserService_Stub stub(channel.get());
    GetMultiUserInfoReq req;
    GetMultiUserInfoRsp rsp;
    req.set_request_id(rid);
    for (const auto &id : user_id_lists) {
        req.add_users_id(id);
    }
    brpc::Controller cntl;
    stub.GetMultiUserInfo(&cntl, &req, &rsp, nullptr);
    if (cntl.Failed() == true || rsp.success() == false) {
        LOG_ERROR("用户子服务调用失败:{}!", cntl.ErrorText());
        return false;
    }
    const auto &umap = rsp.users_info();
    for (auto it = umap.begin(); it != umap.end(); ++it) {
        user_lists.insert(std::make_pair(it->first, it->second));
    }
    return true;
}

获取文件内容

获取文件和获取用户一样的流程。也是提供一组用户ID,这里rsp返回的是map<string, FileDownloadData>,我们遍历这个map提取出FileDownloadData中的文件内容。插入到我们的std::unordered_map中,其中first是文件ID,second是content文件内容。

bool _GetFile(const std::string &rid,
    const std::unordered_set<std::string> &file_id_lists,
    std::unordered_map<std::string, std::string> &file_data_lists) {
    auto channel = _mm_channels->choose(_file_service_name);
    if (!channel) {
        LOG_ERROR("{} 没有可供访问的文件子服务节点!",  _file_service_name);
        return false;
    }
    FileService_Stub stub(channel.get());
    GetMultiFileReq req;
    GetMultiFileRsp rsp;
    req.set_request_id(rid);
    for (const auto &id : file_id_lists) {
        req.add_file_id_list(id);
    }
    brpc::Controller cntl;
    stub.GetMultiFile(&cntl, &req, &rsp, nullptr);
    if (cntl.Failed() == true || rsp.success() == false) {
        LOG_ERROR("文件子服务调用失败:{}!", cntl.ErrorText());
        return false;
    }
    const auto &fmap = rsp.file_data();
    for (auto it = fmap.begin(); it != fmap.end(); ++it) {
        file_data_lists.insert(std::make_pair(it->first, it->second.file_content()));
    }
    return true;
}

在获取完用户信息Map和文件内容Map后我们就可以组织完整的消息结构数组进行返回了。

virtual void GetRecentMsg(::google::protobuf::RpcController* controller,
   const ::lkm_im::GetRecentMsgReq* request,
   ::lkm_im::GetRecentMsgRsp* response,
   ::google::protobuf::Closure* done) {
   brpc::ClosureGuard rpc_guard(done);
   auto err_response = [this, response](const std::string &rid, 
       const std::string &errmsg) -> void {
       response->set_request_id(rid);
       response->set_success(false);
       response->set_errmsg(errmsg);
       return;
   };
   //1. 提取请求中的关键要素:请求ID,会话ID,要获取的消息数量
   std::string rid = request->request_id();
   std::string chat_ssid = request->chat_session_id();
   int msg_count = request->msg_count();
   //2. 从数据库,获取最近的消息元信息
   auto msg_lists = _mysql_message->recent(chat_ssid, msg_count);
   if (msg_lists.empty()) {
       response->set_request_id(rid);
       response->set_success(true);
       return ;
   }
   //3. 统计所有消息中文件类型消息的文件ID列表,从文件子服务下载文件
   std::unordered_set<std::string> file_id_lists;
   for (const auto &msg : msg_lists) {
       if (msg.file_id().empty()) continue;
       LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());
       file_id_lists.insert(msg.file_id());
   }
   std::unordered_map<std::string, std::string> file_data_lists;
   bool ret = _GetFile(rid, file_id_lists, file_data_lists);
   if (ret == false) {
       LOG_ERROR("{} 批量文件数据下载失败!", rid);
       return err_response(rid, "批量文件数据下载失败!");
   }
   //4. 统计所有消息的发送者用户ID,从用户子服务进行批量用户信息获取
   std::unordered_set<std::string> user_id_lists;
   for (const auto &msg : msg_lists) {
       user_id_lists.insert(msg.user_id());
   }
   std::unordered_map<std::string, UserInfo> user_lists;
   ret = _GetUser(rid, user_id_lists, user_lists);
   if (ret == false) {
       LOG_ERROR("{} 批量用户数据获取失败!", rid);
       return err_response(rid, "批量用户数据获取失败!");
   }
   //5. 组织响应
   response->set_request_id(rid);
   response->set_success(true);
   for (const auto &msg : msg_lists) {
       auto message_info = response->add_msg_list();
       message_info->set_message_id(msg.message_id());
       message_info->set_chat_session_id(msg.session_id());
       message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
       message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
       switch(msg.message_type()) {
           case MessageType::STRING:
               message_info->mutable_message()->set_message_type(MessageType::STRING);
               message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
               break;
           case MessageType::IMAGE:
               message_info->mutable_message()->set_message_type(MessageType::IMAGE);
               message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);
               break;
           case MessageType::FILE:
               message_info->mutable_message()->set_message_type(MessageType::FILE);
               message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());
               message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());
               message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);
               break;
           case MessageType::SPEECH:
               message_info->mutable_message()->set_message_type(MessageType::SPEECH);
               message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);
               break;
           default:
               LOG_ERROR("消息类型错误!!");
               return;
       }
   }
   return;
}

获取指定会话的时间返回消息

这里和获取指定会话的最近N条消息一致的思想。
也是获取到会话ID和时间返回。
通过MYsql查询到一个MEssage数组,
由于要组织完整的消息结构,需要从用户子服务获取到用户信息,从文件子服务获取到文件内容。
然后组织消息进行返回

virtual void GetHistoryMsg(::google::protobuf::RpcController* controller,
   const ::lkm_im::GetHistoryMsgReq* request,
   ::lkm_im::GetHistoryMsgRsp* response,
   ::google::protobuf::Closure* done) {
   brpc::ClosureGuard rpc_guard(done);
   auto err_response = [this, response](const std::string &rid, 
       const std::string &errmsg) -> void {
       response->set_request_id(rid);
       response->set_success(false);
       response->set_errmsg(errmsg);
       return;
   };
   //1. 提取关键要素:会话ID,起始时间,结束时间
   std::string rid = request->request_id();
   std::string chat_ssid = request->chat_session_id();
   boost::posix_time::ptime stime = boost::posix_time::from_time_t(request->start_time());
   boost::posix_time::ptime etime = boost::posix_time::from_time_t(request->over_time());
   //2. 从数据库中进行消息查询
   auto msg_lists = _mysql_message->range(chat_ssid, stime, etime);
   if (msg_lists.empty()) {
       response->set_request_id(rid);
       response->set_success(true);
       return ;
   }
   //3. 统计所有文件类型消息的文件ID,并从文件子服务进行批量文件下载
   std::unordered_set<std::string> file_id_lists;
   for (const auto &msg : msg_lists) {
       if (msg.file_id().empty()) continue;
       LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());
       file_id_lists.insert(msg.file_id());
   }
   std::unordered_map<std::string, std::string> file_data_lists;
   bool ret = _GetFile(rid, file_id_lists, file_data_lists);
   if (ret == false) {
       LOG_ERROR("{} 批量文件数据下载失败!", rid);
       return err_response(rid, "批量文件数据下载失败!");
   }
   //4. 统计所有消息的发送者用户ID,从用户子服务进行批量用户信息获取
   std::unordered_set<std::string> user_id_lists; // {猪爸爸吧, 祝妈妈,猪爸爸吧,祝爸爸}
   for (const auto &msg : msg_lists) {
       user_id_lists.insert(msg.user_id());
   }
   std::unordered_map<std::string, UserInfo> user_lists;
   ret = _GetUser(rid, user_id_lists, user_lists);
   if (ret == false) {
       LOG_ERROR("{} 批量用户数据获取失败!", rid);
       return err_response(rid, "批量用户数据获取失败!");
   }
   //5. 组织响应
   response->set_request_id(rid);
   response->set_success(true);
   for (const auto &msg : msg_lists) {
       auto message_info = response->add_msg_list();
       message_info->set_message_id(msg.message_id());
       message_info->set_chat_session_id(msg.session_id());
       message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
       message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
       switch(msg.message_type()) {
           case MessageType::STRING:
               message_info->mutable_message()->set_message_type(MessageType::STRING);
               message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
               break;
           case MessageType::IMAGE:
               message_info->mutable_message()->set_message_type(MessageType::IMAGE);
               message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);
               break;
           case MessageType::FILE:
               message_info->mutable_message()->set_message_type(MessageType::FILE);
               message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());
               message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());
               message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);
               break;
           case MessageType::SPEECH:
               message_info->mutable_message()->set_message_type(MessageType::SPEECH);
               message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());
               message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);
               break;
           default:
               LOG_ERROR("消息类型错误!!");
               return;
       }
   }
   return;
}

关键字消息搜索

这个功能相比与前两个更加简单了,因为关键字消息搜索只能搜索文本消息,所以免去了向文件子服务获取问价内容的过程。
首先提取出会话ID,和关键字。
在ES中进行查询,会返回一个Message数组。
根据数组中的用户ID查询用户子服务获取用户信息,然后组织响应返回。

virtual void MsgSearch(::google::protobuf::RpcController* controller,
   const ::lkm_im::MsgSearchReq* request,
   ::lkm_im::MsgSearchRsp* response,
   ::google::protobuf::Closure* done) {
   brpc::ClosureGuard rpc_guard(done);
   auto err_response = [this, response](const std::string &rid, 
       const std::string &errmsg) -> void {
       response->set_request_id(rid);
       response->set_success(false);
       response->set_errmsg(errmsg);
       return;
   };
   //关键字的消息搜索--只针对文本消息
   //1. 从请求中提取关键要素:请求ID,会话ID, 关键字
   std::string rid = request->request_id();
   std::string chat_ssid = request->chat_session_id();
   std::string skey = request->search_key();
   //2. 从ES搜索引擎中进行关键字消息搜索,得到消息列表
   auto msg_lists = _es_message->search(skey, chat_ssid);
   if (msg_lists.empty()) {
       response->set_request_id(rid);
       response->set_success(true);
       return ;
   }
   //3. 组织所有消息的用户ID,从用户子服务获取用户信息
   std::unordered_set<std::string> user_id_lists;
   for (const auto &msg : msg_lists) {
       user_id_lists.insert(msg.user_id());
   }
   std::unordered_map<std::string, UserInfo> user_lists;
   bool ret = _GetUser(rid, user_id_lists, user_lists);
   if (ret == false) {
       LOG_ERROR("{} 批量用户数据获取失败!", rid);
       return err_response(rid, "批量用户数据获取失败!");
   }
   //4. 组织响应 
   response->set_request_id(rid);
   response->set_success(true);
   for (const auto &msg : msg_lists) {
       auto message_info = response->add_msg_list();
       message_info->set_message_id(msg.message_id());
       message_info->set_chat_session_id(msg.session_id());
       message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
       message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
       message_info->mutable_message()->set_message_type(MessageType::STRING);
       message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
   }
   return;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2160235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

netfilter和iptables--netfilter源码篇

netfilter和iptables–netfilter源码篇 防火墙是保护服务器和基础设施的重要工具&#xff0c;在Linux系统下&#xff0c;目前广泛使用的防火墙工具是iptables&#xff0c;但实际进行规则实施并产生实际作用的是Netfilter&#xff0c;iptables与内核中的netfilter框架中Hook协同…

❤Node11-登录人token信息接口

❤Node11-登录人token信息接口​ 上一章我们已经从登录部分拿到了用户的登录jwt返回的token信息&#xff0c;接下来我们就通过token来换取用户信息 这里我们可以将其理解为一种加密以及解密的思想来思考这个jwt和token的关系&#xff0c;token就是一个加密的字符串&#xff0c…

【JavaEE】——内存可见性问题

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯&#xff0c;你们的点赞收藏是我前进最大的动力&#xff01;&#xff01;希望本文内容能够帮助到你&#xff01; 目录 一&#xff1a;内存可见性问题 1&#xff1a;代码解释 2&#xff1a;结果分析 &#xff08;1&#xf…

《现代畜牧兽医》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答 问&#xff1a;《现代畜牧兽医》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的第一批认定 学术期刊。 问&#xff1a;《现代畜牧兽医》级别&#xff1f; 答&#xff1a;省级。主管单位&#xff1a;辽宁省科学技术协会 …

vue2实现提取字符串数字并修改数字样式(正则表达式)

如果你要在循环中提取 item.companyName 中的数字&#xff0c;并且希望为这些数字改变颜色和边距&#xff0c;可以对每个 item 进行处理。此处是一个实现示例&#xff1a; <template> <div> <div class"box" v-for"(item, index) in coldBase…

学校气膜体育馆:低成本、高效率的灵活运动空间—轻空间

在当前教育设施的升级中&#xff0c;传统体育馆的建设往往面临长时间、高成本、以及繁琐的审批流程等诸多挑战。然而&#xff0c;学校无需再为这些问题烦恼&#xff0c;只需选择气膜结构的体育馆&#xff0c;就能快速、高效地解决体育场地需求。气膜体育馆凭借其灵活的设计和高…

Java项目实战II基于SSM的国外摇滚乐队交流和周边售卖系统的设计与实现(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者 一、前言 随着互联网技术的飞速发展&#xff0c;信息传播的广度和深度不断拓展&#xff0c;为各行业的创新发展…

二分查找算法(4) _搜索插入位置

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 二分查找算法(4) _搜索插入位置 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 …

太爱这5本书了,建议所有大模型人去翻烂它❗

要说现在最热门的技术&#xff0c;可谓非大模型莫属&#xff01; 不少小伙伴都想要学习大模型技术&#xff0c;转战AI领域&#xff0c;以适应未来的大趋势&#xff0c;寻求更有前景的发展~~ 然而&#xff0c;在学习大模型技术这条道路上&#xff0c;却不知道如何进行系统的学…

无人机飞手培训校企合作特训技术详解

随着无人机技术的飞速发展&#xff0c;其在航拍、农业、测绘、救援等多个领域的应用日益广泛&#xff0c;市场对高素质无人机飞手的需求急剧增加。为满足这一需求&#xff0c;促进教育与产业深度融合&#xff0c;无人机飞手培训校企合作模式应运而生。本文将从确定合作目标、共…

可视化大屏看阿里,阿里出品,必属精品。

阿里云有自己的可视化平台——dataV&#xff0c;经常会出一些高颜值、强交互的大屏&#xff0c;本期为大家分享一波。

HTML、CSS

初识web前端 web标准 Web标准也称为网页标准&#xff0c;由一系列的标准组成&#xff0c;大部分由W3C (World Wide Web Consortium&#xff0c;万维网联盟) 负责制定。三个组成部分: HTML: 负责网页的结构(页面元素和内容)。CSS: 负责网页的表现(页面元素的外观、位置等页面样…

Tableau|一入门

一 什么是BI工具 BI 工具即商业智能&#xff08;Business Intelligence&#xff09;工具&#xff0c;是一种用于收集、整理、分析和展示企业数据的软件系统&#xff0c;其主要目的是帮助企业用户更好地理解和利用数据&#xff0c;以支持决策制定。 主要功能&#xff1a; 1.数据…

Vue3使用通信组件库mitt作为事件总线实现跨组件通信

mitt 介绍: Mitt 是一个在 Vue.js 应用程序中使用的小型事件总线库。该库允许组件进行通信&#xff0c;而不必过度依赖父级或子级组件之间的 props。 先看项目用例&#xff1a; 【 以下转载自&#xff1a;https://blog.csdn.net/yuanlong12178/article/details/139579299 】…

虚拟机安装xubuntu

新建一个新的虚拟机&#xff0c;选择自定义安装 默认下一步 选择稍后安装操作系统 选择所要创建的系统及版本 填写虚拟机的名称及创建的虚拟机保存的位置 选择处理器和内核的数量 处理器数量指的是&#xff1a;虚拟的CPU数量。 每个处理器的内核数量指的是&#xff1a;虚拟CPU…

Ubuntu 24.04.1 LTS 安装 node 16.20.2环境

目录 step1&#xff1a;确认版本 step2&#xff1a;选择方式 step3&#xff1a;二进制文件安装 step1&#xff1a;确认版本 不同的版本情况可能有稍许不同&#xff0c;尽可能环境安装前版本保持一致&#xff1b; lsb_release -a 或者 cat /etc/os-release 可以查看版本信…

【机器学习】---元强化学习

目录 1. 元学习简介1.1 什么是元学习&#xff1f;1.2 元学习的应用 2. 强化学习基础2.1 什么是强化学习&#xff1f;2.2 强化学习的基本框架2.3 深度强化学习 3. 元强化学习的概念与工作原理3.1 元强化学习是什么&#xff1f;3.2 元强化学习与普通强化学习的区别 4. 元强化学习…

Arthas ognl(执行ognl表达式)

文章目录 二、命令列表2.1 jvm相关命令2.1.12 ognl&#xff08;执行ognl表达式&#xff09;举例1&#xff1a;获取静态属性举例2&#xff1a;调用静态方法 二、命令列表 2.1 jvm相关命令 2.1.12 ognl&#xff08;执行ognl表达式&#xff09; 使用场景&#xff1a; Arthas 的 …

【Vue系列五】—Vue学习历程的知识分享!

前言 本篇文章讲述前端工程化从模块化到如今的脚手架的发展&#xff0c;以及Webpack、Vue脚手架的详解&#xff01; 一、模块化 模块化就是把单独的功能封装到模块&#xff08;文件&#xff09;中&#xff0c;模块之间相互隔离&#xff0c;但可以通过特定的接口公开内部成员…

黑马智数Day2

表单基础校验实现 基础双向绑定 v-model <el-input v-model"formData.username" /> <script> export default {name: Login,data() {return {formData: {username: ,password: ,remember: }}} } </script> 表单校验配置 按照业务要求编写校验规…