【分布式websocket】群聊中的各种难点以及解决推拉结合【第16期】

news2024/9/25 21:29:15

前言

群聊中未读消息如何设计,以及是推消息还是拉去消息如何选择是需要讨论的。推送消息是推送全量消息还是推送信号消息让客户端再去拉取。其中方案如何选型会比较纠结。
首先基本的推拉结合思路是在线用户推送消息。用户离线的话上线去拉取消息。这是简单的推拉结合。问题在于群聊消息的特点有发送消息比较频繁。假设群里面有一秒钟发送了100条消息。如果推送的话一个人需要推送一百次。但是拉取的话只需要一次就可以拉取所有消息。但是通过 信号消息这样的方案使得设计非常复杂。详情看这篇文章。
目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期(废弃))】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】
微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】
微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】
【分布式websocket】RocketMQ发送消息保证消息最终一致性需要做哪些处理?【第15期】
B站上面关注我呐 B站和CSDN同名,B站1000粉丝后建群。然后B站关注我后可以私信CSDN来的,然后后面我建群的时候拉你!

群聊中消息已读未读如何设计

数据模型设计

在群聊系统中,管理未读消息的两种常见方法是:记录每个用户与每条消息之间的已读/未读状态,以及记录用户的最后一次阅读消息ID。每种方法都有其优缺点,适用于不同的场景。
结论
如果你的系统需要精确跟踪每条消息的阅读状态,或者需要支持复杂的消息阅读状态查询,可以选择记录每个用户与每条消息之间的已读/未读状态。
如果你的系统更注重性能和可扩展性,或者只需要基本的未读消息功能,记录用户的最后一次阅读消息ID是一个更高效的选择。
通常,考虑到性能和实现的复杂度,许多现代的群聊系统倾向于使用记录最后一次阅读消息ID的方法。这种方法能够满足大多数场景的需求,同时保持系统的高效和简洁。
我们采用 记录用户的最后一次阅读消息ID。

CREATE TABLE yan_im_read(
    user_id VARCHAR(255) NOT NULL,
    group_id VARCHAR(255) NOT NULL,
    last_read_message_id BIGINT,
	count int
    
);

具体未读这块会在下一篇离线消息设计中说明
设计思路
功能实现

  1. 发送消息:
    当用户发送消息时,将消息存入消息表,并为每个接收者在用户-会话关系表中的未读消息数加一。
    对于群聊,为群内每个成员(除了发送者)的未读消息数加一。
  2. 阅读消息:
    当用户打开一个聊天窗口时,系统将该会话的未读消息数重置为0,并更新最后阅读时间或最后阅读的消息ID。
    同时,前端展示未读消息数,并将其清零。
  3. 查询未读消息数:
    用户登录或在主界面时,系统查询用户-会话关系表,获取每个会话的未读消息数,以及总的未读消息数。
    这些信息用于在用户的聊天列表中显示每个聊天窗口的未读消息数,以及应用图标上显示的总未读数。

群聊中消息推送模型采用推送还是用户自己拉取

** 模式一 推送模式(Push)**
在推送模式下,当有新消息时,服务器主动将消息推送给客户端。这种模式可以实现实时通信,用户体验较好。
优点:
实时性:用户可以即时接收到消息,无需主动查询,提高了通信的实时性。
减轻客户端负担:客户端无需定时向服务器发送查询请求,减少了网络请求和资源消耗。
缺点:
服务器负担较重:需要服务器跟踪每个客户端的连接状态,并实时推送消息。
可能存在消息丢失:在网络不稳定或客户端离线时,推送的消息可能会丢失。
模式二 拉取模式(Pull)

  • 在拉取模式下,客户端定时向服务器发送请求,查询是否有新消息。如果有,客户端再拉取这些新消息。
  • 优点:
    简单可靠:客户端主动拉取,可以根据需要重试,减少了消息丢失的风险。
    服务器处理简单:服务器不需要跟踪客户端的连接状态,只需响应客户端的请求。
  • 缺点:
    延迟:用户接收到消息的速度取决于拉取的频率,可能无法做到实时通信。
    增加客户端负担:客户端需要定时发送请求,增加了网络请求和资源消耗。

模式三 推送通知后客户端拉取消息
在这种策略中,当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。
优点:
减轻了服务器推送的压力,尤其是在高并发场景下。
更灵活,可以根据客户端的实际情况(如网络状况、用户设置等)决定是否拉取新消息。
方便实现对离线消息的处理,客户端上线后可以主动拉取期间的所有新消息。
缺点:
实时性略差,用户收到消息有一定的延迟。
客户端逻辑更复杂,需要实现拉取新消息的逻辑。

结合使用
在实际应用中,为了兼顾实时性和系统资源的有效利用,往往会结合推送和拉取两种模式:

  • 推送+拉取:对于实时性要求高的消息,如即时聊天消息,采用推送模式,确保用户能够及时收到。对于实时性要求不高的信息,如离线消息或通知,可以在用户上线时通过拉取模式获取。
  • 状态同步:使用推送模式进行实时消息通信,同时,客户端在特定情况下(如启动、网络恢复等)主动拉取最新状态,以确保没有遗漏的消息。
  • 根据消息优先级去选择推送或者拉取:还可以根据消息的优先级和类型,选择不同的推送策略。对于重要或紧急的消息,可能采用直接推送的方式;而对于普通消息,则采用通知加拉取的方式
  • 注意事项
    推送策略:在推送模式下,需要合理设计推送策略,比如使用消息队列管理待推送消息,以应对高并发场景。
    拉取策略:在拉取模式下,需要考虑合理的拉取频率,避免过于频繁导致的资源浪费,或过于稀疏导致的实时性不足。
    用户体验:在设计消息推送模型时,应考虑到用户体验,提供稳定、可靠、及时的消息服务。

实时推送的优化策略

1.消息队列优化
利用消息队列来管理消息的发送。当有新消息时,先将消息发送到消息队列中,然后使用消费者服务批量从队列中取出消息进行处理和推送。这种方式可以有效地平衡负载,提高消息处理的效率。
2.分批推送
对于大群聊,一次性向所有成员推送可能会导致服务器压力过大。可以将群成员分批,每批包含一定数量的用户,然后逐批推送。这样既可以减轻服务器压力,又可以避免网络拥塞。
3.WebSocket连接池
对于基于WebSocket或长连接的推送方式,使用连接池来管理和复用连接。这样可以减少频繁建立和断开连接的开销,提高推送效率。
4. 监控和调优
持续监控推送系统的性能指标,如推送延迟、失败率等。根据监控结果调整批处理大小、分批策略和资源分配,以达到最优的推送效率。

MQ批量消费逻辑

先说明一下消费什么。批量消费可以解决什么问题。在之前的消息链路消息有落库的环节。
在这里插入图片描述

落库的话需要监听mq然后消费这条消息进行落库。这里采用的策略是批量落库。避免发送一条消息保存一条消息这样的频繁访问数据库。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
@Service
@RocketMQMessageListener(topic = "yourTopic", consumerGroup = "yourConsumerGroup", consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 10, messageModel = MessageModel.CLUSTERING)
public class MyBatchConsumerService implements RocketMQListener<List<MessageExt>> {

    @Override
    public void onMessage(List<MessageExt> messages) {
        for (MessageExt message : messages) {
            // 处理每条消息
            System.out.println(new String(message.getBody()));
        }
        // 实现批量处理逻辑
    }
}

配置消费者属性
在application.properties或application.yml中配置消费者的属性,特别是consumeMessageBatchMaxSize,这个属性决定了消费者每次批量拉取处理的消息最大数量。

rocketmq:
  consumer:
    consumeMessageBatchMaxSize: 10

如果需要更细致地控制消费者的配置,可以通过编程方式自定义消费者。这通常涉及到创建DefaultMQPushConsumer的实例,并设置相关属性。

@Configuration
public class RocketMQConsumerConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    @Bean
    public DefaultMQPushConsumer batchConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeMessageBatchMaxSize(10); // 设置批量消费的大小
        consumer.subscribe("YourTopic", "*"); // 订阅主题
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // 实现批量消息处理逻辑
            msgs.forEach(msg -> {
                System.out.println(new String(msg.getBody()));
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        return consumer;
    }
}

. 注意事项

  • 消息大小限制:确保批量消费的总消息大小不超过RocketMQ的限制(默认不超过4MB)。
  • 异常处理:在批量处理消息时,确保有适当的异常处理机制。如果批量中的某个消息处理失败,需要决定是重试整个批次还是仅重试失败的消息。
  • 性能测试:在实际部署前,进行充分的性能测试,以确定最优的consumeMessageBatchMaxSize值。过大或过小的批量大小都可能影响消费效率。
    通过上述配置和建议,可以在Spring Boot应用中有效地实现RocketMQ的批量消费,提高消息处理的效率和应用性能。

介绍群聊模式三

当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。这种模式实现起来暂时有点复杂。先按照简单的推拉结合处理。但是可以保留着思路,然后后面解决。
方案概述

  1. 信号消息推送:当有新消息发送到群聊时,服务器不会直接将完整的消息内容推送给所有群成员。相反,它只发送一个信号消息,通知客户端有新消息可用。
  2. 客户端拉取消息:收到信号消息后,客户端会主动向服务器发起请求,拉取自上次更新以来的所有新消息。
    方案优点
    减少带宽消耗:由于不是所有的消息内容都通过推送发送,这种方案可以显著减少网络带宽的消耗。
    提高效率:客户端可以根据实际需要批量拉取消息,减少网络请求的次数,提高数据同步的效率。
    保证消息完整性:通过拉取机制,即使在网络不稳定的情况下,客户端也能确保最终获取到所有的消息,避免消息丢失。
    支持离线消息:当用户离线时,信号消息可以被服务器暂存,用户上线后再拉取所有未读消息,保证消息的完整同步。
    方案实现要点

信号消息设计:信号消息应包含足够的信息,以便客户端知道从哪里开始拉取新消息。例如,可以包含最新消息的ID或时间戳。

消息存储:服务器需要有效地存储和管理消息,以支持高效的消息拉取操作。通常需要对消息进行索引,以便快速查询到新消息。
拉取策略:客户端可以实现智能的拉取策略,例如,在用户查看群聊时主动拉取新消息,或者在收到多个信号消息时合并请求,减少服务器的负载。
错误处理和重试机制:为了保证消息的完整性,客户端在拉取消息时应该实现错误处理和重试机制,确保在网络不稳定时也能成功获取消息。
适用场景
这种结合推送和拉取的方案非常适合消息量大、用户基数广的群聊系统,尤其是在需要优化网络资源消耗、保证消息可靠传递的场景下。同时,这种方案也适用于需要支持离线消息同步的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1517434.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nginx、LVS、HAProxy工作原理和负载均衡架构

当前大多数的互联网系统都使用了服务器集群技术&#xff0c;集群是将相同服务部署在多台服务器上构成一个集群整体对外提供服务&#xff0c;这些集群可以是 Web 应用服务器集群&#xff0c;也可以是数据库服务器集群&#xff0c;还可以是分布式缓存服务器集群等等。 在实际应用…

Camtasia Studio 编辑好的视频如何生成MP4

Camtasia Studio 是一款功能强大的屏幕录像和视频编辑软件&#xff0c;由 TechSmith 公司开发。它可以帮助用户轻松地录制屏幕和音频、添加注释和特效、剪辑和编辑视频&#xff0c;并导出高质量的视频文件。作为一款非常流行的视频制作工具&#xff0c;Camtasia Studio 在教育、…

论文阅读-federated unlearning via class-discriminative pruning

论文阅读-federated unlearning via class-discriminative pruning FUCP 通过类别区分性剪枝进行联邦遗忘 综述中描述&#xff1a;属于面向全局模型中的局部参数调整 利用卷积层的结构特定进行联邦忘却学习&#xff0c;wang等人提出了针对图像分类任务的联邦忘却学习算法FUCP&…

Oracle with as用法

一、简介 with…as关键字&#xff0c;是以‘with’关键字开头的sql语句&#xff0c;在实际工作中&#xff0c;我们经常会遇到同一个查询sql会同时查询多个相同的结果集&#xff0c;即sql一模一样&#xff0c;这时候我们可以将这些相同的sql抽取出来&#xff0c;使用with…as定…

ETH共识升级之路

简介 根据我们之前的介绍&#xff0c;了解到ETH网络的共识方式&#xff0c;已经从 PoW 切换到了 PoS&#xff0c;今天我们就回顾下升级之路&#xff0c;以及升级带来的影响 最早的共识机制 PoW 以太坊创建之初采用了类似比特币的工作量证明机制&#xff0c;即矿工通过计算哈希函…

【排序算法】-- 深入理解桶排序算法

概述 在计算机科学中&#xff0c;排序算法是一种对数据进行有序排列的重要技术。桶排序&#xff08;Bucket Sort&#xff09;是一种常见的排序算法&#xff0c;它通过将数据分到有限数量的桶中&#xff0c;并对每个桶中的数据分别排序&#xff0c;最后按照顺序将所有桶中的数据…

PBKDF2算法:保障密码安全的利器

title: PBKDF2算法&#xff1a;保障密码安全的利器 date: 2024/3/14 16:40:05 updated: 2024/3/14 16:40:05 tags: PBKDF2算法密码安全性迭代盐值密钥 PBKDF2算法起源&#xff1a; PBKDF2&#xff08;Password-Based Key Derivation Function 2&#xff09;算法是一种基于密码…

远程办公、企业内网服务器的Code-Server上如何配置使用CodeGeeX插件

很多小伙伴都会在工作中使用code-server&#xff0c;比如说远程办公&#xff0c;当你需要在家访问你的工作环境&#xff0c;亦或者是你们公司的Docker是放入服务器中。code-server 无疑是最好的选择&#xff0c;它可以让你通过互联网安全地连接到远程服务器上的开发环境并且使用…

import gdal 报错

1.下载gdal https://www.lfd.uci.edu/~gohlke/pythonlibs/#gdal 2.安装正确版本 &#xff08;1&#xff09;查看python版本 python -v我的版本Python 3.7.9 建议下载 GDAL-3.4.2-cp37-cp37m-win_amd64.whl &#xff08;2&#xff09;放到Scripts文件夹下 执行 pip install GD…

【mask】根据bbox提示同一张图片生成多个矩形框掩码

前提&#xff1a;使用labelimg得到bbox 1.代码 import cv2 import numpy as np# 读取图片 image cv2.imread("D:\Desktop\mult_test\images\SL03509990_1694761223500.jpg")# 假设我们有多个目标的ROI&#xff08;感兴趣区域&#xff09; rois [(565,635,1006,85…

1.绪论

目录 1.1 Web原理基础 1.1.1 Internet与万维网 1.1.2 Web架构 1.2 Web前端技术基础 1.2.1 HTML技术 1.2.2 CSS技术 1.2.3 JavaScript技术 1.3 Web前端新技术 1.3.1 HTML5技术 1.3.2 CSS3技术 1.3.3 jQuery技术 1.4 Web开发工具 1.1 Web原理基础 1.1.1 Internet与万…

浅谈路由器基本结构与工作原理

目录 一、结构 1.1 输入端口 1.2 交换结构 1.3 输出端口 1.4 路由选择处理器 二、输入端口处理和基于目的地转发 三、交换 四、输出端口处理 五、何时出现排队 5.1 输入排队 5.2 输出排队 一、结构 下图是一个通用路由器体系结构的总体试图视图&#xff0c;其主要由…

mysql中动态行转列

场景&#xff1a;不确定转换完有多少列且转换完以后要存入临时表以供其他查询使用。 原始数据如下&#xff1a; 一张生产卡号对应多种添加剂&#xff0c;有多少种添加剂就有多少行数据 转换后数据如下&#xff1a; 一张生产卡号对应多种添加剂&#xff0c;有多少种添加剂就有…

2023版IDEA永久破解教程带patch.exe破解程序

2023版IDEA永久破解教程带patch.exe破解程序 第零步&#xff1a;百度云盘获取程序第一步&#xff1a;关闭电脑的病毒和危险防护&#xff08;目的是避免电脑自动清除破解程序&#xff09;1.找到电脑的 病毒和威胁防护2.蓝色按钮表示防护处于开启状态3.关闭成功会展示“实时保护已…

聊聊vue的nextTick方法

前言 nextTick是面试常考的vue中的一个重要知识点&#xff0c;但是很多小伙伴常常无法真正的理解nextTick的执行机制&#xff0c;并且背后包含的许多vue的重要知识。本文会把nextTick聊的非常细&#xff0c;让大家彻底搞懂它。 正文 那么在nextTick正式登场之前呢&#xff0…

深度解析Elasticsearch索引数据量过大的优化与部署策略

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 1. 分片和副本策略 1.1分片策略 1.1.1 数据量 1.1.…

Gitee 服务器

Git 服务器集成 1. 创建仓库 2. 远程仓库简易操作指令 # Git 全局设置&#xff0c;修改成自己的信息 git config --global user.name "Muko" git config --global user.email "txk0x7d2163.com" # 创建 git 仓库&#xff0c;基本操作指令和其他远程仓库一…

边缘计算与物联网的核心 —— 低功耗芯片

一、低功耗芯片 在边缘计算与物联网&#xff08;IoT&#xff09;中&#xff0c;低功耗芯片扮演了至关重要的角色&#xff0c;主要体现在以下几个方面&#xff1a; 延长设备寿命&#xff1a;物联网设备通常需要部署在难以更换电池或不方便进行频繁维护的环境中&#xff0c;比如…

「Paraverse平行云」受邀参与编写国内首个3D数字内容生产技术白皮书

1月26日&#xff0c;2024中关村论坛系列活动——数据&#xff08;数字&#xff09;资产高质量发展大会在银保国际会议中心成功举办。本届数据&#xff08;数字&#xff09;资产高质量发展大会由市科委中关村管委会、市经济和信息化局、石景山区人民政府主办&#xff0c;3D/XR产…

【算法与数据结构】深入解析二叉树(一)

文章目录 &#x1f4dd;数概念及结构&#x1f320; 树的概念&#x1f309;树的表示&#x1f320; 树在实际中的运用&#xff08;表示文件系统的目录树结构&#xff09; &#x1f309;二叉树概念及结构&#x1f320;概念&#x1f309;数据结构中的二叉树&#x1f320;特殊的二叉…