消息存储与同步策略设计

news2025/1/13 7:25:35

消息存储与同步策略

https://github.com/robinfoxnan/BirdTalkServer

思路:

  • 私聊写扩散,以用户为中心,存储2次;
  • 群聊读扩散,以群组为中心,存储一次;
  • scylladb易于扩展,适合并发,但是并不适合搜索;如果需要针对聊天记录在服务端搜索的功能,可能还需要加上ES,以会话为中心存储一份;

存储的三级结构如下:

在这里插入图片描述

私聊

优点:以用户为中心比以会话为中心(tinode)的好处就是消息管理更加容易;每个用户的数据相对集中,可以快速的找到并一次性同步给客户;

缺点:数据需要存储2份;

群聊

优点:群聊使用读扩散,存储数据量少;

缺点:读扩散,如果用户反复离线与上线,需要读取离线数据,对scylladb压力比较大;

所有类型的IM系统都有一个共同的难点:如何同步数据,不丢消息?

同步机制

私聊和群聊在正常情况下如果所有用户在线,服务器也不重启,那么很容易保证实时转发不丢消息。

之所以会同步起来比较复杂就在于:

1)用户离线不定长时间后,上线时需要同步消息,而消息可能会非常的多(大群);

2)支持多终端登录,某个终端长久未使用,上线后也需要同步消息;

其实WX在多终端登录同步数据这一点上做的挺差的;多终端在线,某个终端时而离线,就会无法同步到所有的数据;而其他的一些系统就好多了,后加入群聊的也能看到之前的对话;多终端同步也好多了。

用户离线后重新登录,需要与服务器同步消息,需要保证尽量不丢包;这里需要有一个合适的同步机制。

基本策略:

登录后,根据本地保存的消息记录,对比时间差;如果离线时间不久,优先使用正向加载(私聊手机端);如果离线时间久,或者是群聊,优先使用倒序加载;(私聊的电脑端长期未登录也需要倒序加载)

同步流程:

用户登录就绪后,分为3类情况:

1)私聊:客户端比对最后接收消息时间,如果小于1天,则可以尝试正向加载消息,向服务器提供该条msgId,直到同步到消息列表末尾(一般情况下一天的私聊数据也不会超过1000条);如果时间较久,应该向服务申请反向加载数据到msgId为止;如果是老用户的新终端,也应该反向加载数据,并在用户界面提示用户按需加载;

2)群聊:新用户加入群聊后,以及离线后再次登录,都需要倒序加载数据;(这是因为群聊数据量可能非常庞大,而且用户也不需要从最开始的消息开始阅读,可以根据需要适当加载)

3)服务器假死:集群情况下,服务器由于负载大,没有即时上报心跳状态,造成其他服务器没有即时发送转发的消息;服务器恢复后发现此状态,应该按比例断开部分客户端链接;未断开的用户也应该要求客户端重新同步离线数据;

详见第2节部分。

1. ScyllaDb存储

这里使用了一个snow雪花算法生成唯一的消息ID,使用高42比特来保存毫秒时间戳,12比特作为流水号,所以每个毫秒最多支持4096个流水号;

那么这个ID就可以代表时间了,所以我们可以用它来排序,或者得到时间;

1.1 传输结构

// 聊天存储的基本信息
message MsgChat {
  int64 msgId = 1;                // 消息的全网唯一标识,服务端使用雪花算法生成,因为客户端生成的不可靠
  int64 userId = 2;               // 用于存储的clusterKey,因为一份消息要存储2次,要转发,需要有这个字段

  int64 fromId = 3;              // 发送消息的用户 ID
  int64 toId = 4;                // 接收消息的用户 ID(对方的用户 ID)

  int64 tm = 5;                   // 消息的时间戳

  string devId = 6;               // 多设备登录时的设备 ID
  string sendId = 7;              // 用于确认消息的发送 ID

  ChatMsgType msgType = 8;        // 消息类型,建议使用枚举
  bytes data = 9;                 // 消息的内容,可以使用 bytes 存储二进制数据或文本数据

  MsgPriority priority = 10;      // 消息的优先级,建议使用枚举
  int64 refMessageId = 11;        // 引用的消息 ID,如果有的话

  ChatMsgStatus status = 12;      // 消息状态,建议使用枚举
  int64 sendReply = 13;           // 发送消息的回执状态
  int64 recvReply = 14;           // 接收消息的回执状态
  int64 readReply = 15;           // 已读状态的回执

  EncryptType encType = 16;       // 加密类型
  ChatType chatType = 17;         // p2p, group, system
  int32 subMsgType = 18;          // 传递给插件区分代码,插件都注册为整数类型,
  int64 keyPrint = 19;            // 秘钥指纹
}

在传输过程中,私聊和群聊的消息是共用的;

服务为了保存到数据库需要进行格式转化:

1.2 私聊

私聊是写扩散,所以需要在表中对每个人都写一次,区别在于uid1和uid2交换一次,pk肯定也是需要交换的

type PChatDataStore struct {
	Pk   int16 `db:"pk"`
	Uid1 int64 `db:"uid1"`
	Uid2 int64 `db:"uid2"`
	Id   int64 `db:"id"`
	Usid int64 `db:"usid"`
	Tm   int64 `db:"tm"`
	Tm1  int64 `db:"tm1"`
	Tm2  int64 `db:"tm2"`

	Io    int8   `db:"io"`  // 0=out, 1=in
	St    int8   `db:"st"`  // 0=normal, 1=送达,2阅读,
	Ct    int8   `db:"ct"`  // 0=p2p_plain, 1=system, 2=p2_encrypted,
	Mt    int8   `db:"mt"`  // 0=text, 1=pic, 2=
	Print int64  `db:"pr"`  // 秘钥哈希的低8字节作为指纹
	Ref   int64  `db:"ref"` // 引用
	Draf  []byte `db:"draf"`
}

对应的建表语句:

const cqlCreateTablePChat = `CREATE TABLE IF NOT EXISTS  chatdata.pchat (
			pk smallint,
			uid1 bigint, 
			uid2 bigint,
			id bigint,
			usid bigint,
			tm bigint,
			tm1 bigint,
			tm2 bigint,
			io tinyint,
			st tinyint,
			ct tinyint,
			mt tinyint,
			draf blob,
			pr  varint,
			ref varint,
			PRIMARY KEY (pk, uid1, id)
		)`

这里提供了如下几个函数:

// 写2次,首先是发方A,然后是收方B
func (me *Scylla) SavePChatData(msg *model.PChatDataStore, pk2 int) error

// 对发送方设置回执,收方不需要设置,这里提供了收方的参数,是为了兼容,以后也许也保存
func (me *Scylla) SetPChatRecvReply(pk1, pk2, uid1, uid2, msgId, tm1 int64) error
func (me *Scylla) SetPChatReadReply(pk1, pk2, uid1, uid2, msgId, tm2 int64)
func (me *Scylla) SetPChatRecvReadReply(pk1, pk2, uid1, uid2, msgId, tm1, tm2 int64) error

// 设置删除,不可逆
func (me *Scylla) SetPChatMsgDeleted(pk1, pk2, uid1, uid2, msgId int64) error

// 正向查找,如果从头开始查找,那么设置为littleId = 0
func (me *Scylla) FindPChatMsg(pk, uid, littleId int64, pageSize uint) ([]model.PChatDataStore, error) 

// 正序查找,设置边界范围
func (me *Scylla) FindPChatMsgForwardBetween(pk, uid, littleId, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

// 从最新的数据向前倒序查若干条
func (me *Scylla) FindPChatMsgBackward(pk, uid, pageSize uint) ([]model.PChatDataStore, error)

// 从某一点开始向之前的历史数据反向查找,即 所有小于bigId 的
func (me *Scylla) FindPChatMsgBackwardFrom(pk, uid, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

// 从当前最新开始向之前的历史数据反向查找,即 所有大于littlId 的
func (me *Scylla) FindPChatMsgBackwardTo(pk, uid, littleId int64, pageSize uint) ([]model.PChatDataStore, error)

// 向之前的历史数据反向查找
func (me *Scylla) FindPChatMsgBackwardBetween(pk, uid, littleId, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

1.3 群聊

type GChatDataStore struct {
	Pk   int16 `db:"pk"`
	Gid  int64 `db:"gid"`
	Uid  int64 `db:"uid"`
	Id   int64 `db:"id"`
	Usid int64 `db:"usid"`
	Tm   int64 `db:"tm"`
	Res  int8  `db:"res"` // 保留
	St   int8  `db:"st"`  // 0=normal, 1=送达,2阅读,
	Ct   int8  `db:"ct"`  // 0=普通,1=广播
	Mt   int8  `db:"mt"`  // 0=text, 1=pic, 2=

	Print int64  `db:"pr"`  // 秘钥哈希的低8字节作为指纹
	Ref   int64  `db:"ref"` // 引用
	Draf  []byte `db:"draf"`
}

去掉了uid2和tm2, tm3 群聊的消息不保存回执,多次读,每个用户都自己去读;

const cqlCreateTableGChat = `CREATE TABLE IF NOT EXISTS  chatdata.gchat (
			pk smallint,
			gid bigint,
			uid bigint, 
			id bigint,
			usid bigint,
			tm bigint,
			res tinyint,
			st tinyint,
			ct tinyint,
			mt tinyint,
			draf blob,
			pr  varint,
			ref varint,
			PRIMARY KEY (pk, gid, id)
		)`

相关函数如下:

// 保存
func (me *Scylla) SaveGChatData(msg *model.GChatDataStore) error


// 设置删除,不可逆
func (me *Scylla) SetGChatMsgDeleted(pk, gid, msgId int64) error

// 倒序,反向历史数据方向查找,从最新的数据开始向前加载
func (me *Scylla) FindGChatMsgBackwardTo(pk, gid, littleId int64, pageSize uint) ([]model.GChatDataStore, error)

// 倒序,从bigId 向littleId方向去查找,限定一定的个数,如果无法覆盖边界,再来一次
func (me *Scylla) FindGChatMsgBackwardBetween(pk, gid, littleId, bigId int64, pageSize uint) ([]model.GChatDataStore, error)

消息的所有者,以及管理员可以设置删除消息,这里的删除等同于微信的撤回,而不是本地删除;

2. Redis缓存

2.1 群聊消息缓存

每个群组有一个list用于存储,左侧插入,默认1000条缓存,如果超过就会删除;

键名字类似:bsgmsg_1001

func (cli *RedisClient) GetGroupLatestMsg(gid, count int64) ([]string, error)
func (cli *RedisClient) GetGroupLatestMsgPage(gid, offset, count int64) ([]string, error)
func (cli *RedisClient) GetGroupLatestMsgCount(gid, count int64) (int64, error) 
func (cli *RedisClient) PushGroupMsg(gid int64, msg string)

群聊用户离线后,重新上线后,先发所收到的最后一条消息的msgId,如果每个用户上线都搜索数据库,那么会非常耗费数据库资源,所以先从redis将最近的100条数据返回给用户;

这样就有了一个新的问题,用户如何知道中间缺失了部分消息?那么需要有一个节省流量与资源的同步方式:

**原则:**用户每次登录后主动请求加载离线数据,收到数据后回执,如果不请求数据,则不保证数据的完整性,在线时仅仅推送

1)用户登录准备好收发消息后,服务端首先设置状态;

2)用户需要同步群消息时,先发一个群消息同步请求,里面携带收到的最后的群消息msgId;

2)服务器加载最近的所有的消息(redis群缓存里的), 推送之后,需要推送一条待加载数据,通知前边还有数据需要同步;

用户端的群消息存储sqlite如下:

序号msg_id状态
100001
100003
100005
1000017待加载
1000018
1000020
1000025
1000075
1000086

比如,此次登录后,服务器推送了[1000086, 1000075, 1000025, 1000020, 1000018]数据后,尾号17的条目就是服务器发送的通知,这个编号完全可以从前一个msd_id = 1000018 减一得到,意思是从这里向前加载;

客户端需要插入这样的一条数据,下次从本地加载时,发现有这样一条数据,证明需要从这个位置向前加载,

界面上显示 ”待加载“的提示按钮,用户可以选择继续向前查看,客户端发送新的查询请求,

收到新加载的数据后,如果msg_id的范围越过了这条标记,那么这条标记就可以删除了。

这里存在一种异步竞争的情况,可能丢失消息:

登录后同步协程发消息协程
1)检测到用户离线,不推送最新消息m
1) 用户结构建立后,标记在线
2)加载离线数据,推送离线数据
2) redis中插入m

需要将流程改变一下:使用锁或者原子操作atomic来设置和读取用户的状态

登录后同步协程发消息协程
1)保存数据库,并在redis插入最新的消息m
2.1) 发现A不在线,未推送m
1) 用户结构建立后,标记在线
2)加载离线数据,推送离线数据2.2) 发现A在线,直接推送m

这里就会有2种可能性,

2.2) 转发消息的协程发现用户在线,直接转发消息,此时会造成重复推送;

2.1) 转发协程虽然没有转发给用户,但是同步协程会加载离线数据;

这里队列中加载所有数据都需要收到用户确认回执后再删除;

然而这里还有一个问题,存入redis队列中的消息,是使用protobuf定义的结构序列化,或者使用model.GChatDataStore结构序列化为JSON保存好;从效率上说,应该是protobuf的版本更好;

2.2 私聊消息缓存

私聊消息在redis中不设置缓存,在每个用户的内存结构中使用循环队列保存,如果离线,则内存也不保存离线消息,只在离线的数据库中保存。

单机模式下,用户A的数据的加载可能是因为对方给A发送数据,所以即便缓存数据,(因为服务器可能重启过)也未必是所有的离线数据;

集群模式下,用户A和聊天的对象不一定在同一台服务器上,即便某台服务器内存缓存了A的离线数据,下次登录页未必一定在这台服务器登录,所以内存缓存没有意义;

而redis缓存中的user信息的hash表中可以保存一个用户最后收到的消息的msgId,那么从这个ID开始搜索就加载所有未同步的离线数据了;

那么,当每次用户提交接收回执的时候,需要记录最后一条回执的ID,为了减少redis的开销,可以每30秒执行一次redis同步;

但是,其实也不需要保存最后的ID,还是让用户根据msgId反向加载即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1565638.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

idea2023.2.1 java项目-web项目创建-servlet类得创建

如何创建Java项目 1.1 方式1: 1.2 方式: 1.3 方式 如何创建web项目 方式 ----- 推荐 如何创建servlet类 复制6 中得代码 给servlet 配置一个路径 启动tomcat 成功了

AHKC系列霍尔电流传感器的选型及应用

安科瑞电气股份有限公司 祁洁 15000363176 一、霍尔传感器分类 01:开口式开环 02:闭口式开环 03:霍尔变送器 04:霍尔闭环 05:直流电压 06:直流漏电流 二、各类霍尔传感器 1、开口式开环霍尔 (1)精度1级 (2)成本低可…

物联网实战--入门篇之(四)嵌入式-UART驱动

目录 一、串口简介 二、串口驱动设计 三、串口发送 四、串口接收处理 五、PM2.5数据接收处理 六、printf重定义 七、总结 一、串口简介 串口在单片机的开发中属于非常常用的外设,最基本的都会预留一个调试串口用来输出调试信息,串口时序这里就不谈…

视频监控/云存储/磁盘阵列/AI智能分析平台EasyCVR集成时调用接口报跨域错误是什么原因?

EasyCVR视频融合平台基于云边端架构,可支持海量视频汇聚管理,能提供视频监控直播、云端录像、云存储、录像检索与回看、智能告警、平台级联、智能分析等视频服务。平台兼容性强,支持多协议、多类型设备接入,包括:国标G…

每日一题(leetcode2952):添加硬币最小数量 初识贪心算法

这道题如果整体去思考,情况会比较复杂。因此我们考虑使用贪心算法。 1 我们可以假定一个X,认为[1,X-1]区间的金额都可以取到,不断去扩张X直到大于target。(这里为什么要用[1,X-1]而不是[1,X],总的来说是方便,潜在思想…

uniapp开发app使用谷歌地图(ios跟安卓)

前提条件&#xff1a; 谷歌地图需要翻墙&#xff0c;否则无法加载 谷歌地图说明 文档地址&#xff1a;概览 | Maps JavaScript API | Google for Developers 设置地图语言 <script asyncsrc"https://maps.googleapis.com/maps/api/js?keyYOUR_API_KEY&lang…

MySQL安装卸载-Linux

目录 1.概述 2.安装 2.1.上传 2.2.解压 ​​​​​​​2.3.安装 ​​​​​​​2.4.启动服务 ​​​​​​​2.5.查询临时密码 ​​​​​​​2.6.修改临时密码 ​​​​​​​2.7.创建用户 ​​​​​​​2.8.分配权限 ​​​​​​​2.9.重新链接 3.卸载 3.1.停…

YOLOV5 改进:更换主干网络为Resnet

1、前言 之前实现了yolov5更换主干网络为MobileNet和vgg网络 本章将继续将yolov5代码进行更改,通过引用官方实现的resnet网络,替换原有的yolov5主干网络 替换的效果如下: 2、resnet 网络结构 测试的代码为官方的resnet34 通过summary 打印的resnet网络结构如下 =======…

在哪申请免费IP地址证书

IP证书&#xff0c;也被称为IP SSL证书&#xff0c;是一种特殊的SSL证书&#xff0c;不同于传统的域名验证&#xff08;DV&#xff09;证书&#xff0c;它是通过验证公网IP地址而不是域名来确保安全连接。这种证书是用于保护IP地址&#xff0c;并在安装后起到加密作用。 申请条…

振弦式应变计:简单操作,方便实用的应变监测工具

在现代工程领域中&#xff0c;对于结构物的应变监测是一项至关重要的任务。振弦式应变计作为一种高精度、高稳定性的应变监测工具&#xff0c;因其简单操作、方便实用的特点&#xff0c;受到了广大工程师和技术人员的青睐。 振弦式应变计的工作原理基于振弦的振动特性。它通过将…

学代码是理解就行,还是全部背?

在我没接触编程以前&#xff0c;看到程序&#xff0c;觉得这玩意到底怎么写出来的&#xff0c;写出这些代码的人&#xff0c;也太厉害了吧&#xff1f; 不会很多都要背下来吧&#xff1f; 我小学背课本都费劲&#xff0c;背不出来&#xff0c;中午不准回家吃饭&#xff0c;我就…

人类研究人员通过反复提问来削弱人工智能伦理

你如何让人工智能回答一个它不应该回答的问题&#xff1f;有很多这样的“越狱”技术&#xff0c;Anthropic的研究人员刚刚发现了一种新的技术&#xff0c;在这种技术中&#xff0c;如果你先用几十个危害较小的问题来启动它&#xff0c;就可以说服一个大型语言模型&#xff08;L…

RuntimeError: Error compiling objects for extension虚拟环境和系统环境——添加、删除、修改环境变量

前言&#xff1a;因为一个报错RuntimeError: Error compiling objects for extension 没有配置cl.exe环境变量&#xff0c;我的应用场景是需要搞定虚拟环境变量配置 RuntimeError: Error compiling objects for extension手把手带你解决&#xff08;超详细&#xff09;-CSDN博…

Redis数据库——性能管理

目录 一、Redis性能管理 1.Info Memory——查看Redis内存使用 2.内存碎片率 3.内存使用率 4.内存回收key 二、Redis缓存雪崩、穿透、击穿、预热 1.缓存雪崩 1.1什么是缓存雪崩 1.2产生原因 1.3实际应用场景 1.4解决方案 1.4.1方案一设置redis的某些key永不过期 1.…

基于Zabbix 5.0 实现windows服务器上应用程序和主机端口的状态监控

基于Zabbix 5.0 实现windows服务器上应用程序和主机端口的状态监控 背景 用python开发的应用程序在服务器上运行,有时候会出现程序自动退出却收不到告警的情况 环境 zabbix服务器:Centos7 64位 Windows服务器: Windows 10 64位 软件 zabbix_server:zabbix5.0 zabbix_…

02 - 全加器和加法器

---- 整理自B站UP主 踌躇月光 的视频 1. 全加器 用门电路实现两个二进制数相加并求出和的组合线路&#xff0c;称为一位全加器。一位全加器可以处理低位进位&#xff0c;并输出本位加法进位。全加器比半加器多了一位进位。 1.1 实验 1&#xff1a;通过两个半加器设计全加器 1.…

10.图像高斯滤波的原理与FPGA实现思路

1.概念 高斯分布 图像滤波之高斯滤波介绍 图像处理算法|高斯滤波   高斯滤波(Gaussian filter)包含很多种&#xff0c;包括低通、高通、带通等&#xff0c;在图像上说的高斯滤波通常是指的高斯模糊(Gaussian Blur)&#xff0c;是一种高斯低通滤波。通常这个算法也可以用来模…

基本电路理论-电流和电压的参考方向

&#x1f308;个人主页&#xff1a;会编程的果子君 &#x1f4ab;个人格言:“成为自己未来的主人~” 电流及参考方向 电流&#xff1a;带电粒子有规则的定向移动 电流强度&#xff1a;单位时间内通过导体横截面的电荷量&#xff0c;即&#xff1a;idq/dt 单位&#xff1a…

解决Toad for Oracle显示乱中文码问题

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://122.227.135.243:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a…

leet hot 100-13 最大子数组和

53. 最大子数组和 原题链接思路代码 原题链接 leet hot 100-10 53. 最大子数组和 思路 生成一个数字来记录last 表示前面数字全部之和与0取最大值 如果大于0 就加上如果不大于0 就不管 从当前位置从新开始遍历计算 时间复杂度O(n) 空间复杂度(1) 代码 class Solution {…