Netty--聊天业务

news2025/1/20 21:56:05

:::info

提醒 : 本文相对比较乱, 主要是关于 Netty websocket 之类的聊天功能相关, 大家了解即可;有兴趣的 可以选读;

1.聊天模块细分微服务:

  1. 用户服务:处理用户身份验证、授权和管理。包括用户注册、登录、个人信息管理等功能。
  2. 聊天服务:处理实时聊天功能,包括消息发送、接收和存储。可使用WebSocket等技术来实现实时通信。
  3. 好友服务:管理用户的好友关系,包括好友请求、好友列表和好友关系的维护。
  4. 群组服务:管理用户群组的创建、加入和退出操作,以及群组消息的发送和接收。
  5. 消息推送服务:负责将消息实时推送给在线用户。可以使用消息队列、推送通知或长连接等技术来实现。
  6. 历史记录服务:负责存储和检索聊天记录,用于用户查看历史消息。
  7. 图片/文件服务:处理聊天中的图片和文件上传、下载和存储。
  8. 搜索服务:提供全文搜索功能,用于用户搜索聊天记录、好友和群组等。
  9. 分析服务:分析聊天数据,提供统计和报表功能,用于监控系统性能和用户行为。
    :::
    image.png
    image.png
    image.png

Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。

Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。

Netty 是最流行的 NIO 框架,它已经得到成百上千的商业、商用项目验证,许多框架和开源组件的底层 rpc 都是使用的 Netty,如Dubbo、Elasticsearch 等等。

优点:

API使用简单,学习成本低。
高度可定制的线程模型——单线程、一个或多个线程池。
功能强大,内置了多种解码编码器,支持多种协议。
社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
Dubbo、Elasticsearch都采用了Netty,质量得到验证。
更好的吞吐量,更低的等待延迟,更少的资源消耗

各部分介绍:
BossGroup 和 WorkerGroup:
bossGroup 和 workerGroup 是两个线程池, 它们默认线程数为 CPU 核心数乘以 2
bossGroup 用于接收客户端传过来的请求,接收到请求后将后续操作交由 workerGroup 处理

Selector(选择器):
检测多个通道上是否有事件的发生

TaskQueue(任务队列):
上面的任务都是在当前的 NioEventLoop ( 反应器 Reactor 线程 ) 中的任务队列中排队执行 , 在其它线程中也可以调度本线程的 Channel 通道与该线程对应的客户端进行数据读写

Channel:
Channel 是框架自己定义的一个通道接口,
Netty 实现的客户端 NIO 套接字通道是 NioSocketChannel
提供的服务器端 NIO 套接字通道是 NioServerSocketChannel
当服务端和客户端建立一个新的连接时, 一个新的 Channel 将被创建,同时它会被自动地分配到它专属的 ChannelPipeline

ChannelPipeline:
是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,并定义了用于在该链上传播入站和出站事件流的 API

ChannelHandler:
分为 ChannelInBoundHandler 和 ChannelOutboundHandler 两种
如果一个入站 IO 事件被触发,这个事件会从第一个开始依次通过 ChannelPipeline中的 ChannelInBoundHandler,先添加的先执行。
若是一个出站 I/O 事件,则会从最后一个开始依次通过 ChannelPipeline 中的 ChannelOutboundHandler,后添加的先执行,然后通过调用在 ChannelHandlerContext 中定义的事件传播方法传递给最近的 ChannelHandler。
在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。
如果某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler,直到它找到和该事件所期望的方向相匹配的为止。

2.搭建服务器netty服务器

A.创建WebSocketNettyServer 类编写服务器启动代码
服务端实现步骤:

  1. 创建bossGroup线程组: 处理网络事件–连接事件
  2. 创建workerGroup线程组: 处理网络事件–读写事件
  3. 创建服务端启动助手
  4. 设置bossGroup线程组和workerGroup线程组
  5. 设置服务端通道实现为NIO
  6. 参数设置
  7. 创建一个通道初始化对象
  8. 向pipeline中添加自定义业务处理handler
  9. 启动服务端并绑定端口,同时将异步改为同步
  10. 关闭通道和关闭连接池
package com.my.server;

import com.my.handler.WebSocketNettyHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.Data;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * netty服务端
 * 实现DisposableBean 在容器销毁前会调用destroy 方法进行线程组的关闭
 */
@Data
@Component
public class WebSocketNettyServer implements DisposableBean {

    /**
     * 自定义入站规则
     */
    @Autowired
    private WebSocketNettyHandler webSocketNettyHandler;

    /**
     * 通道初始化对象
     */
    @Autowired
    private WebSocketChannelInit webSocketChannelInit;

    /**
     * boos线程组
     */
    private EventLoopGroup boos;

    /**
     * work线程组
     */
    private EventLoopGroup work;


    /**
     * 自定义启动方法
     * @param port
     */
    public void start(int port) {
        // 设置boos线程组
        boos = new NioEventLoopGroup(1);
        // 设置work线程组
        EventLoopGroup work = new NioEventLoopGroup();
        // 创建启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boos,work)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                .childHandler(webSocketChannelInit);
        // 绑定ip和端口启动服务端
        ChannelFuture sync = null;
        try {
            // 绑定netty的启动端口
            sync = serverBootstrap.bind(port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            close();
        }
        System.out.println("netty服务器启动成功"+"--端口:"+port);
        sync.channel().closeFuture();
    }

    /**
     * 容器销毁前关闭线程组
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        close();
    }

    /**
     * 关闭方法
     */
    public void close() {
        if (boos!=null) {
            boos.shutdownGracefully();
        }
        if (work!=null) {
            work.shutdownGracefully();
        }
    }
}

B.**创建一个通道初始化对象 **WebSocketChannelInit
这里面主要添加了编解码器和自定义入站规则和对WebSocket的支持

package com.my.server;

import com.my.config.NettyConfig;
import com.my.handler.WebSocketNettyHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 通道初始化对象
 */
@Component
public class WebSocketChannelInit extends ChannelInitializer {

    @Autowired
    NettyConfig nettyConfig;

    @Autowired
    WebSocketNettyHandler webSocketNettyHandler;

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8000));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
        // 自定义处理handler
        pipeline.addLast(webSocketNettyHandler);
    }
}

C.定义消息类来进行客户端和服务端的通讯

package com.my.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 消息体
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {

    /**
     * 发送者
     */
    private String send;

    /**
     * 接收者
     */
    private String receive;

    /**
     * 消息id
     */
    private String id;

    /**
     * 消息值
     */
    private String info;


    /**
     * 类型 1 上线 2发消息
     */
    private int type;
}

D.服务端自定义处理入站消息 WebSocketNettyHandler
大致步骤:
1.重写通道连接事件,存放通道连接信息到自定义的list集合中。
2.当客户连接上服务端时会默认发送一条上线信息 type=1
3.服务端接收到上线消息后,将send字段的发送者信息和当前通道信息做一个映射,放入MAP 集合中,key 是用户信息 value是通道的信息,为后面的私聊创造可能。
4.服务端接收到上线消息后,还需要存放通道id和用户关联,方便在后续通道关闭事件和异常事件时打印是哪一个用户发生了下线,和异常。
5.服务端接收到非上线消息后,先判断消息的接收者是否为空,如果为空就群发消息,拿出map中存放的其他(排除当前通道=自己发的消息不需要接收)通道消息,进行消息发送。
6.服务端接收到非上线消息后消息的接收者不是空,代表需要私聊,在map中查找当前需要接收的通道信息,如果查不到,就给发送者回复,用户不在线了。
7.发生通道
关闭事件
,需要删除缓存中对应的map和list信息。并发送给所有在线用户,该用户离线了(可以不要这步)。
8.发生通道异常事件,需要删除缓存中对应的map和list信息。并发送给所有在线用户,该用户离线了(可以不要这步)。

E.创建对外提供的结构并转到聊天页面

package com.my.handler;

import com.alibaba.fastjson.JSON;
import com.my.pojo.Message;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.internal.StringUtil;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 服务端自定义处理入站消息
 */
@ChannelHandler.Sharable
@Component
public class WebSocketNettyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 存储用户对应的通道
     */
    Map<String,ChannelHandlerContext> MAP = new ConcurrentHashMap<>(16);

    /**
     * 存放通道和用户关联
     */
    Map<String,String> CHANNEL_USER = new ConcurrentHashMap<>(16);

    /**
     * 存储当前连接上的通道
     */
    List<ChannelHandlerContext> LIST = new CopyOnWriteArrayList<>();

    /**
     * 通道连接事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LIST.add(ctx);
        System.out.println("有新的连接.>>当前连接数量:"+LIST.size());
    }

    /**
     * 通道消息事件
     * @param channelHandlerContext
     * @param textWebSocketFrame
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("前端发来的消息:"+textWebSocketFrame.text());
        Message message = JSON.parseObject(textWebSocketFrame.text(), Message.class);
        if (message.getType()==1) {
            setMap(channelHandlerContext,message);
            // 给其他服务器发送上线消息
            for (ChannelHandlerContext handlerContext : MAP.values()) {
                if (handlerContext==channelHandlerContext) {
                    continue;
                }
                handlerContext.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
            }
            return;
        }
        // 获取到需要转发的客户端
        String receive = message.getReceive();
        // 没有指定接收者代表要群发
        if (StringUtil.isNullOrEmpty(receive)) {
            for (ChannelHandlerContext handlerContext : MAP.values()) {
                if (handlerContext==channelHandlerContext) {
                    continue;
                }
                handlerContext.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
            }
            return;
        }
        // 从缓存的存储用户对应的通道 map中获取
        if (!MAP.containsKey(receive)) {
            Message message1 = new Message("服务端",channelHandlerContext.name(), UUID.randomUUID().toString(),"用户未在线,你的消息不能及时送达。",2);
            channelHandlerContext.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message1)));
            return;
        }
        // 服务端转发消息到指定的客户端
        ChannelHandlerContext channelHandlerContext1 = MAP.get(receive);
        channelHandlerContext1.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
    }

    /**
     * 设置连接映射
     * @param channelHandlerContext
     * @param message
     */
    private void setMap(ChannelHandlerContext channelHandlerContext, Message message) {
        MAP.put(message.getSend(),channelHandlerContext);
        CHANNEL_USER.put(channelHandlerContext.channel().id().toString(),message.getSend());
    }

    /**
     * 通达关闭事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String s = CHANNEL_USER.get(ctx.channel().id().toString());
        MAP.remove(s);
        // 给其他在线用户发送该用户离线的信息
        for (ChannelHandlerContext handlerContext : MAP.values()) {
            Message message = new Message("服务端",null, UUID.randomUUID().toString(),"用户--"+s+"--已经离线了",2);
            handlerContext.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
        }
        LIST.remove(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String s = CHANNEL_USER.get(ctx.channel().id().toString());
        MAP.remove(s);
        // 给其他在线用户发送该用户离线的信息
        for (ChannelHandlerContext handlerContext : MAP.values()) {
            Message message = new Message("服务端",null, UUID.randomUUID().toString(),"用户--"+s+"--连接发生问题,已被迫离线了",2);
            handlerContext.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
        }
        LIST.remove(ctx);
    }
}




E.创建对外提供的结构并转到聊天页面

package com.my.Controller;


import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * 聊天接口
 */
@Controller
public class ChatController {

    @RequestMapping("/")
    public String chat() {
        return "chat";
    }
}

配置文件

#服务器端口
server:
  port: 8080
#  netty服务端口和webSocket path
netty:
  port: 8081
  path: /chat

#静态资源位置和视图解析器
resources:
  static-locations:
    - classpath:/static/
spring:
  thymeleaf:
    cache: false
    checktemplatelocation: true
    enabled: true
    encoding: UTF-8
    mode: HTML5
    prefix: classpath:/templates/
    suffix: .html

启动类

package com.my;

import com.my.config.NettyConfig;
import com.my.server.WebSocketNettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * netty服务端启动
 */
@SpringBootApplication
public class ServerSpringBootApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ServerSpringBootApplication.class,args);
    }

    @Autowired
    private WebSocketNettyServer webSocketNettyServer;

    @Autowired
    private NettyConfig nettyConfig;

    @Override
    public void run(String... args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                webSocketNettyServer.start(nettyConfig.getPort());
            }
        }).start();
    }
}

3.企信系统实现在线聊天和消息推送

以下是一些可能出现在Java企信项目面试中的常见问题和建议的回答:

1. 请简要介绍您对企信项目的理解。

企信项目是一种企业内部的通讯协作工具,主要可以分为在线聊天室、消息推送、在线会议、企业任务
    等多种功能模块组成。在实现企信项目时,需要使用常见的Java开发框架和工具、熟悉Web开发技术、
    具有丰富的数据库设计和优化经验,同时,需要注意系统的稳定性、可靠性、安全性。

2. 如何保证企信系统的高可用性?

高可用性是企信系统的一个重要关注点。可以通过以下方式来保证:

1)集群部署,多机房灾备,保证系统的容灾性和可靠性。

2)使用分布式缓存、分布式一致性算法等技术,确保数据的一致性。

3)构建监控和警报系统,以便及时检测和解决问题。

4)使用负载均衡技术来保证系统的可扩展性和稳定性。

3. 你如何实现在线聊天室功能?

实现在线聊天室的一个常见方式是使用WebSocket协议。WebSocket协议建立在TCP协议之上,可以提供双向
    通信功能,从而实现实时聊天的效果。在Java中,可以使用Spring框架提供的WebSocket支持或使
    用Netty框架来实现。

4. 如何实现消息推送功能?

实现消息推送的方式主要有两种:轮询和长连接。在轮询方式下,浏览器定时向服务器发送Ajax请求,从而
    获取最新的消息,但这种方式会增加服务器的负载;在长连接方式下,浏览器和服务器之间建立一
    个WebSocket连接,实时接收消息,这种方式可以有效地减少服务器的负载。在Java中,可以使
    用WebSocket或其他开源框架来实现消息推送功能。

5. 如何实现在线会议功能?

实现在线会议的方式可以使用WebRTC(Web Real-Time Communication)技术。WebRTC是一种基于Web的实时
    通信技术,可以在无需插件或安装软件的情况下,提供直接从浏览器间进行音频和视频通信的能力。
    在Java中,可以使用WebRTC的开源实现或使用类似Ant-Media-Server, Janus Gateway等开源音
    视频框架来实现。

总之,Java企信项目面试主要关注职位相关技能的掌握和运用,对职位相关经验和技能的具体分析和优化
    能力等方面也是面试官非常关注的。

4. Websocket + Netty 完成聊天功能

Websocket是一种基于HTTP协议的长连接协议,可以支持客户端和服务器之间双向的实时通信。而Netty是一个高性能的开源的网络基础设施框架,可以帮助开发者快速构建可扩展、高性能、高可靠的网络应用。结合使用Websocket和Netty可以很方便地实现聊天功能,主要步骤如下:

  1. 创建Netty Server:通过Netty框架创建业务Server,并在初始化业务Handler时添加Websocket协议的Decoder和Encoder。

  2. 前端页面:在前端页面中创建一个Websocket连接,将相关数据通过Websocket协议发送到服务器。

  3. 处理WebSocket协议消息:在业务Server的Handler中处理Websocket协议消息,根据消息类型来决定消息的处理方式。

  4. 广播消息:当有客户端发送消息后,在业务Server中通过广播的方式将消息发送给所有连接的客户端,同时需要做好消息缓存和消息去重,避免消息重复发送。

  5. 前端消息展示:在前端页面中将接收到的Websocket消息展示出来。

通过上述步骤,就可以很轻松地实现聊天功能。同时,Netty框架的高性能和可扩展性可以很好地支持大并发和高性能的聊天应用,为Web应用开发带来了很大的便利。

聊天模块的数据量可以根据不同的应用场景和设计需求进行估算和规划。以下是一些可能影响数据量的因素:

  1. 并发用户数量:聊天室或群聊天涉及的并发用户数量,越多数据量也就越大。

  2. 消息频率:聊天模块消息发送的频率,即每秒发送消息的数量。频繁发送消息的聊天室或群组,其数据量也
    会随之增多。

  3. 消息类型:聊天模块可涉及的消息类型多样化,可以是纯文本消息、图片、表情、文件或其他类型,不同类
    型的消息所产生的数据量也不同。

  4. 聊天记录保留时间:对聊天记录的保留时间进行设置会直接影响数据量大小。

根据以上因素的不同组合,聊天模块数据量可能会有很大差异。一些大型社交应用和在线游戏可能会每天产生
数百万到数千万条聊天记录,而一些小型企业内部使用的聊天工具则可能会更加精简。因此,在设计和
优化聊天模块时,需要适合应用场景的数据量规划和处理策略,避免对系统的性能产生过大影响。

WebSocket是一种基于TCP的协议,与HTTP一样都是应用层协议,但WebSocket可以在建立连接后实现双向通信。而Netty是一种NIO框架,可用于快速开发可维护的高性能服务器和客户端。

实现聊天功能,可以使用WebSocketNetty中的WebSocket协议实现。基本思路是建立一个WebSocket服务器,通过Netty处理来自客户端的WebSocket请求,然后在服务器和客户端之间进行双向通信。

使用Netty实现WebSocket聊天功能的具体步骤如下:

1.创建一个WebSocket服务器,并注册处理器,处理来自客户端的相关请求。

```java
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new WebSocketServerInitializer());
            
    Channel ch = bootstrap.bind(PORT).sync().channel();
    ch.closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workGroup.shutdownGracefully();
}

2.创建WebSocketServerInitializer处理器,配置WebSocket通道的WebSocketServerSocketChannel。

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());
    }
}

3.创建TextWebSocketFrameHandler处理器,处理文本数据帧。

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private final Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         channelMap.put(ctx.channel().id(), ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         channelMap.remove(ctx.channel().id());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
         String text = msg.text();
         for (Channel channel : channelMap.values()) {
              if (channel != ctx.channel()) {
                     channel.writeAndFlush(new TextWebSocketFrame(text));
              }
         }
    }
}

4.编写前端代码,通过WebSocket通道与服务器建立连接。

var socket = new WebSocket("ws://localhost:8080/ws");
socket.onmessage = function (event) {
    var messagesAreaEl = document.getElementById("messagesArea");
    messagesAreaEl.value = messagesAreaEl.value + event.data;
}

function sendMessage() {
    var messageEl = document.getElementById("message");
    socket.send(messageEl.value);
    messageEl.value = "";
}

以上就是使用Netty实现WebSocket聊天功能的基本流程。

对于小型企业内部使用的聊天工具,数据量相对于大型社交应用和在线游戏来说通常比较小。以下是一些可能的数据量估算,仅供参考:

  1. 并发用户数量:一般来说,小型企业内部使用的聊天工具并发用户数量不会太多,通常在100-1000之间。

  2. 消息频率:相对于大型社交应用,小型企业内部使用的聊天工具消息发送频率会相对较低。可能会出现一些重要的即时消息,但大部分情况下,消息发送频率不会很高。因此,消息发送速率相对较低,可能是每秒钟10-50条消息。

  3. 消息类型:大部分情况下,小型企业内部使用的聊天工具主要是文本消息和一些简单的表情、图片等,而不会有很复杂的消息类型。因此,单个消息的平均数据量较小,通常在几十KB或更少。

  4. 聊天记录保留时间:由于小型企业内部使用的聊天工具并不需要保存聊天记录不断地产生大量的数据,很多聊天工具不会保存聊天记录或者只保留最近几个月的聊天数据。

据此可见,小型企业内部使用的聊天工具的数据量相对较小,通常几乎不会与存储和性能构成问题。

5. 站内信问题难点 ⭐

消息推送中的已读消息和未读消息设计难题

偏移量的具体实现,已读未读的优化
答 : 消息推送中的已读消息和未读消息设计难题
    
“站内信”有两个基本功能:
 点到点的消息传送。用户给用户发送站内信,管理员给用户发送站内信。
 点到面的消息传送。管理员给用户(指定满足某一条件的用户群)群发消息
    
这两个功能实现起来也很简单{如图}。
只需要设计一个消息内容表和一个用户通知表,
当创建一条系统通知后,数据插入到消息内容表。消息内容包含了发送渠道,根据发送
渠道决定后续动作。
如果是站内渠道,在插入消息内容后异步的插入记录到用户通知表。
这个方案看起来没什么问题,但实际上,我们把所有用户通知的消息全部放在一个表里
面,如果有 10W 个用户,那么同样的消息需要存储 10W 条。
很明显,会带来两个问题:
问题一. 随着用户量的增加,发送一次消息需要插入到数据库中的数据量会越来越大,导致
耗时会越来越长
问题二. 用户通知表的数据量会非常大,对未读消息的查询效率会严重下降
所以上面这种方案很明显行不通,要解决这两个问题,我有两个参考解决思路。
    
第一个方式(如图),先取消用户通知表, 避免在发送平台消息的时候插入大量重复
数据问题。
其次增加一个“message_offset”站内消息进度表,每个用户维护一个消息消费的进
度 Offset。
每个用户去获取未读消息的时候,只需要查询大于当前维护的 msg_id_offset 的数据即可。
在这种设计方式中,即便我们发送给 10W 人,也只需要在消息内容表里面插入一条记
录即可。
在性能上和数据量上都有较大的提升。
    
第二种方式,和第一种方式类似,使用 Redis 中的 Set 集合来保存已经读取过的消息 id。
使用 userid_read_message 作为 key,这样就可以为每个用户保存已经读取过的所有
消息的 id。
当用户读取了未读消息后, 就直接在 redis 的已读消息 id 的 set 中新增一条记录。
这样,在已经得知到已读消息的数量和具体消息 id 的情况下,我们可以直接使用消息
id 来查询没有消费过的数据。
    
你们看,一个小小的方案设计的优化,就能带来性能上的巨大提升

在这里插入图片描述
在这里插入图片描述
在实现消息推送中的已读消息和未读消息时,有以下几个设计难题需要解决:

  1. 消息状态的维护:如何在推送消息过程中维护消息的状态,以便服务端能够知道哪些消息已被阅读,哪些消息未被阅读等。
  2. 消息推送的实现:如何确保服务端已成功推送消息到客户端并将其正确标记为已读或未读。
  3. 数据同步的保证:如何确保客户端和服务端之间的消息状态数据同步,以便客户端能够准确显示所有已读或未读的消息。
    针对以上难题,可以考虑使用以下解决方案:
  4. 在服务端和客户端之间传递消息时,建议在消息的属性中添加一个“状态”字段,用于标记该消息的已读或未读状态。服务端可以通过数据库等持久化存储记录消息状态的变化,客户端则可以基于该字段来显示已读和未读的消息。
  5. 可以考虑使用WebSocket等推送技术来实现消息的实时推送和标记。WebSocket可以建立一个持续连接,允许服务端主动向客户端推送消息,使得消息推送更加实时和高效。
  6. 可以使用定时同步等机制,例如每隔一段时间或每次推送消息时,客户端从服务端获取最新的消息状态。当状态有更新时,客户端需要及时响应并更新显示的状态。这将确保客户端和服务端之间的数据同步,从而避免由于数据不同步而导致的问题。

写在后面 : 站内信问题 可以值得 研究记录一下,这个改进改善的过程可以进行阐述复述;

码字不易, 抽空点赞关注奥, 加油加油 奥利给 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/718891.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在markdown中或者CSDN中如何展示双下滑线

最近在CSDN中写文章时&#xff0c;遇到了一个问题&#xff0c;当我输入__proto__ 时&#xff0c;在展示的时候&#xff0c;下滑想不显示emm… 于是乎我一通翻找&#xff0c;发现原来不止csdn&#xff0c;markdown里也有这样的问题&#xff0c;并最终找到了解决办法&#xff01…

生成模型一文认识图像生成

最近看了一些图像生成的论文和博客&#xff0c;觉得要总结一下。本文主要介绍图像生成技术&#xff0c;包括研究背景、研究意义、相关应用、以及所用到的技术。 目录 一、背景与意义 二、图像生成应用 2.1 图像到图像的转换 2.2 文本到图像的生成 2.3 图像超分辨率 2.4 风…

转转闲鱼交易猫源码搭建

后台一键生成链接&#xff0c;后台管理 教程&#xff1a;解压源码&#xff0c;修改数据库config/Congig 不会可以看源码里有教程 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3

深脑接口 | 清华大学李路明团队NSR综述

更多脑机接口前沿技术&#xff0c;关注公众号&#xff1a;脑机接口社区 如何让机器与人类的大脑深处实现交互&#xff1f;清华大学李路明教授研究团队在《国家科学评论》&#xff08;National Science Review, NSR&#xff09;发表综述文章&#xff0c;介绍深脑接口&#xff0…

百度地图搜索控件获取的点位不准

一. 问题讲解 我们在使用百度 2D 地图时&#xff0c;添加其搜索控件 <bm-control><bm-auto-complete v-model"workAddress" :sugStyle"{ zIndex: 999999 }" confirm"handleConfirm"><el-input v-model"workAddress" …

thinkphp6 基于redis 的消息队列 queue

1. 安装queue 组件 composer require topthink/think-queue2 . 配置队列 queue.php <?php // ---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // --------------------------------------------…

力扣 131. 分割回文串

题目来源&#xff1a;https://leetcode.cn/problems/palindrome-partitioning/description/ C题解1&#xff1a; 直接回溯。 传入参数&#xff1a;字符串s和已切割的位置startind&#xff1b;终止条件&#xff1a;已切割的位置大于等于字符串的长度范围&#xff0c;保存已切割…

幂等性及解决方案

什么是幂等性 幂等性简单的说就是相同条件下&#xff0c;一次请求和多次重复的请求&#xff0c;接口的执行结果是相同的。 什么情况下会出现幂等性问题呢&#xff1f; 前端重复提交表单&#xff1a;如用户在提交表单的时候&#xff0c;由于网络波动没有及时给用户做出提交成…

OpenAI的新语言模型升级是否会改变人工智能领域的格局?

近年来&#xff0c;人工智能领域取得了巨大的进展&#xff0c;其中语言模型的发展尤为引人注目。而在这个领域的重要参与者之一&#xff0c;OpenAI近期宣布了其大型语言模型API的重大升级&#xff0c;引发了业界的广泛关注。随着GPT-4和gpt-3.5-turbo等新版本的推出&#xff0c…

开源数字名片生成器EnBizCard

什么是 EnBizCard &#xff1f; EnBizCard 可帮助您创建美观、响应灵敏的基于 HTML 的数字名片&#xff0c;并将其托管在您的网站上。 无需注册100% 免费和开源没有用户跟踪和数据收集离线工作 如果不想自己搭建&#xff0c;可以去试用官方的在线体验站点&#xff0c;地址&…

SpringBoot 项目模板:摆脱步步搭建

前言 在我的工作中&#xff0c;我从零开始搭建了不少软件项目&#xff0c;其中包含了基础代码框架和持续集成基础设施等&#xff0c;这些内容在敏捷开发中通常被称为“第0个迭代”要做的事情。但是&#xff0c;当项目运行了一段时间之后再来反观&#xff0c;我总会发现一些不足…

第六节 计算器 趣味问答

使用tkinter 制作计算器 1 Radiobutton组件 单选按钮. 需要使用的组件名称Radiobutton 如何使用单选按钮 、 单选按钮属于互斥的,只能选用一个。 Radiobutton按钮选项参数的说明&#xff1a; text 显示文字。variable : 绑定变量。value :指定每个按钮代表什么值。 2 计算…

Vulkan Tutorial 10 重采样

目录 30 多重采样 获得可用的样本数 设置一个渲染目标 添加新的附件 30 多重采样 我们的程序现在可以为纹理加载多层次的细节&#xff0c;这修复了在渲染离观众较远的物体时出现的假象。现在的图像平滑了许多&#xff0c;然而仔细观察&#xff0c;你会发现在绘制的几何图形…

ESP8266 RTOS SDK开发 windows开发

https://blog.csdn.net/qq_36347513/article/details/105066905 文件下载路径 https://docs.espressif.com/projects/esp8266-rtos-sdk/en/latest/get-started/windows-setup.html 下载编译环境MSYS2 下载完成后解压到根目录 双击mingw32.exe打开&#xff0c;ls看一下是在什么…

青岛大学_王卓老师【数据结构与算法】Week04_04_双向链表的插入_学习笔记

本文是个人学习笔记&#xff0c;素材来自青岛大学王卓老师的教学视频。 一方面用于学习记录与分享&#xff0c;另一方面是想让更多的人看到这么好的《数据结构与算法》的学习视频。 如有侵权&#xff0c;请留言作删文处理。 课程视频链接&#xff1a; 数据结构与算法基础–…

微信小程序实现抖音视频效果

当我们进行开发的时候可能会遇到需要实现抖音视频效果的需求&#xff0c;并且网上该效果的开源代码少&#xff0c;找到的开源代码代码量大&#xff0c;很难进行二次开发 对此我将自己的代码进行简化&#xff0c;仅留下可动性高的代码模块 以上是实现效果与此处demo的模板 wx…

c++的输入与输出

c中的各种流 文件输入流ifstream 读数据 #include <iostream> #include <iostream> #include <fstream> int main(int const argc, char const *const *argv) {std::ifstream is{"hello.txt"};if (is.good()){std::string s;while (is >> s)…

如何给没有坐标的栅格数据添加坐标信息

在进行NETCDF和HDF格式转换时&#xff0c;经常会出现数据转出来了。但没有任何坐标信息的情况。这如下图转出来的数据就完全不带坐标信息&#xff0c;就好像一副图片一样。但数据的值和像素信息保存完好。如下边一个NC数转出的TIFF栅格数据&#xff1a; 这是我用GIS数据转换器-…

【Matlab】神经网络遗传算法(BP-GA)函数极值寻优——非线性函数求极值

目前关于神经网络遗传算法函数极值寻优——非线性函数求极值的博客资源已经不少了&#xff0c;我看了下来源&#xff0c;最初的应该是来自于Matlab中文论坛&#xff0c;论坛出版的《MATLAB神经网络30个案例分析》第4章就是《神经网络遗传算法函数极值寻优——非线性函数极值寻优…

Ubuntu 添加新用户并配额

背景&#xff1a;在配置工作站或者服务器时&#xff0c;需要为多个用户提供服务&#xff0c;但是需要各个用户之间操作互不干扰&#xff0c;自己所安装的各种环境不会对其他人或root账号下的主系统环境有影响&#xff0c;并且各用户每个用户需要分配额定的内存空间。 安装 quo…