spring boot 实现直播聊天室

news2025/1/23 1:04:33

spring boot 实现直播聊天室

技术方案:

  • spring boot
  • websocket
  • rabbitmq

使用 rabbitmq 提高系统吞吐量

引入依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.42</version>
    </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.23</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
</dependencies>

websocket 实现

MHttpSessionHandshakeInterceptor

参数拦截

/**
 * @Date: 2023/12/8 14:52
 * websocket 握手拦截
 * 1. 参数拦截(header或者 url 参数)
 * 2. token 校验
 */
@Slf4j
public class MHttpSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest servletRequest){
            //ws://127.0.0.1:8080/group/2?username=xxxx
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
            String requestURI = httpServletRequest.getRequestURI();
            String groupId = requestURI.substring(requestURI.lastIndexOf("/") + 1);
            String username = httpServletRequest.getParameter("username");
            log.info(">>>>>>> beforeHandshake groupId: {} - username: {}", groupId, username);
            attributes.put("username", username);
            //解析占位符
            attributes.put("groupId", groupId);
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }


}

GroupWebSocketHandler

消息发送

@Slf4j
public class GroupWebSocketHandler implements WebSocketHandler {

    //Map<room,List<map<session,username>>>
    private ConcurrentHashMap<String, Queue<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();

    @Autowired
    private MessageClient messagingClient;


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info("{} 用户上线房间 {}", username, groupId);
        TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);
        SessionRegistry.getInstance().addSession(wsSession);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String groupId = (String) session.getAttributes().get("groupId");
        String username = (String) session.getAttributes().get("username");
        if (message instanceof PingMessage){
            log.info("PING");
            return;
        }
        else if (message instanceof TextMessage textMessage) {
            MessageDto messageDto = new MessageDto();
            messageDto.setSessionId(session.getId());
            messageDto.setGroup(groupId);
            messageDto.setFromUser(username);
            messageDto.setContent(new String(textMessage.getPayload()));
            messagingClient.sendMessage(messageDto);
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info(">>> handleTransportError {} 用户上线房间 {}", username, groupId);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info("{} 用户下线房间 {}", username, groupId);
        TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);
        SessionRegistry.getInstance().removeSession(wsSession);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }


}
WebSocketConfig

websocket 配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {


    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/group/{groupId}")
            .addInterceptors(new MHttpSessionHandshakeInterceptor()).setAllowedOrigins("*");
    }

    @Bean
    public GroupWebSocketHandler myHandler() {
        return new GroupWebSocketHandler();
    }


    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);  //文本消息最大缓存
        container.setMaxBinaryMessageBufferSize(8192);  //二进制消息大战缓存
        container.setMaxSessionIdleTimeout(3L * 60 * 1000); // 最大闲置时间,3分钟没动自动关闭连接
        container.setAsyncSendTimeout(10L * 1000); //异步发送超时时间
        return container;
    }

}

session 管理

将 websocketSession进行抽像,websocketsession可以由不同容器实现

WsSession
public interface  WsSession {

    /**
     * session 组
     * @return
     */
    String group();

    /**
     * session Id
     * @return
     */
    String getId();

    /**
     * 用户名或其他唯一标识
     * @return
     */
    String identity();

    /**
     * 发送文本消息
     * @param messageDto
     */

    void sendTextMessage(MessageDto messageDto);
}

public abstract class AbstractWsSession implements WsSession {

    private String id;
    private String group;

    private String identity;

    public AbstractWsSession(String id, String group, String identity) {
        this.id = id;
        this.group = group;
        this.identity = identity;
    }

    @Override
    public String group() {
        return this.group;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public String identity() {
        return this.identity;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        AbstractWsSession that = (AbstractWsSession) o;
        //简单比较 sessionId
        return Objects.equals(id, that.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, group, identity);
    }
}

TomcatWsSession

默认session实现

@Slf4j
public class TomcatWsSession extends AbstractWsSession {

    private WebSocketSession webSocketSession;

    public TomcatWsSession(String id, String group, String identity, WebSocketSession webSocketSession) {
        super(id, group, identity);
        this.webSocketSession = webSocketSession;
    }

    @Override
    public void sendTextMessage(MessageDto messageDto) {
        String content = messageDto.getFromUser() + " say: " + messageDto.getContent();
        try {
            webSocketSession.sendMessage(new TextMessage(content));
        } catch (IOException e) {
            log.error("TomcatWsSession sendTextMessage error: identity:{}-group:{}-msg: {}",
                    super.identity(), super.group(), JSON.toJSONString(messageDto));
        }

    }
}

SessionRegistry

websocket session管理

public class SessionRegistry {

    private static SessionRegistry instance;

    private SessionRegistry() {

    }

    public static SessionRegistry getInstance() {
        if (instance == null) {
            synchronized (SessionRegistry.class) {
                if (instance == null) {
                    instance = new SessionRegistry();
                }
            }
        }
        return instance;
    }


    //Map<group,List<Session>>
    private ConcurrentHashMap<String, Queue<WsSession>> sessionMap = new ConcurrentHashMap<>();


    /**
     * 添加 session
     * @param wsSession
     */
    public void addSession(WsSession wsSession) {
        sessionMap.computeIfAbsent(wsSession.group(),g -> new ConcurrentLinkedDeque<>()).add(wsSession);
    }

    /**
     * 移除 session
     * @param wsSession
     */
    public void removeSession(WsSession wsSession) {
        Queue<WsSession> wsSessions = sessionMap.get(wsSession.group());
        if (!CollectionUtils.isEmpty(wsSessions)){
            //重写 WsSession equals 和 hashCode 方法,不然会移除失败
            wsSessions.remove(wsSession);
            if (CollectionUtils.isEmpty(wsSessions)){
                sessionMap.remove(wsSession.group());
            }
        }
    }

    /**
     * 发送消息
     * @param messageDto
     */
    public void sendGroupTextMessage(MessageDto messageDto){
        Queue<WsSession> wsSessions = sessionMap.get(messageDto.getGroup());
        if (!CollectionUtils.isEmpty(wsSessions)){
            for (WsSession wsSession : wsSessions) {
                if (wsSession.getId().equals(messageDto.getSessionId())){
                    continue;
                }
                wsSession.sendTextMessage(messageDto);
            }
        }
    }


    /**
     * session 在线统计
     * @param groupId
     * @return
     */
    public Integer getSessionCount(String groupId) {
        if (StrUtil.isNotBlank(groupId)) {
            return sessionMap.get(groupId).size();
        }
        return sessionMap.values().stream().map(l -> l.size()).collect(Collectors.summingInt(a -> a));
    }
}

消息队列

这里使用 rabbitmq

MessageDto

消息体

@Data
public class MessageDto {

    /**
     * sessionId
     */
    private String sessionId;
    /**
     * 组
     */
    private String group;
    /**
     * 消息发送者
     */
    private String fromUser;
    /**
     * 发送内容
     */
    private String content;
}
MessageClient
@Component
@Slf4j
public class MessageClient {

    private String routeKey = "bws.key";
    private String exchange = "bws.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(MessageDto messageDto) {
        try {
            rabbitTemplate.convertAndSend(exchange, routeKey, JSON.toJSONString(messageDto));
        } catch (AmqpException e) {
            log.error("MessageClient.sendMessage: {}", JSON.toJSONString(messageDto), e);
        }
    }
}
MessageListener
@Slf4j
@Component
public class MessageListener {

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "bws.exchange", type = "topic"), value =
    @Queue(value = "bws.queue", durable = "true"), key = "bws.key"))
    public void onMessage(Message message) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("<<<<<<<<< MessageListener.onMessage:{}", messageStr);
            MessageDto messageDto = JSON.parseObject(messageStr, MessageDto.class);
            if (!Objects.isNull(messageDto)) {
                SessionRegistry.getInstance().sendGroupTextMessage(messageDto);
            } else {
                log.info("<<<<<<<<< MessageListener.onMessage is null:{}", messageStr);
            }
        } catch (Exception e) {
            log.error("######### MessageListener.onMessage: {}-{}", messageStr, e);
        }
    }

}

application.properties配置


spring.rabbitmq.host=192.168.x.x
spring.rabbitmq.password=guest
spring.rabbitmq.port=27067
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=my-cluster

测试

websoket链接: ws://127.0.0.1:8080/group/2?username=xxx, websocket客户端测试地址

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1315025.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《人工智能导论》知识思维导图梳理【第6章节】

文章目录 第六章 知识图谱1 知识图谱概述2 知识图谱相关概念3 知识图谱的逻辑结构4 知识图谱的数据存储5 知识图谱的构建过程6 例题 markdown内容的分享 第六章 知识图谱 1 知识图谱概述 2 知识图谱相关概念 3 知识图谱的逻辑结构 4 知识图谱的数据存储 5 知识图谱的构建过程 6…

fl studio20中文内测版下载2024最新完美实现汉化

fl studio20是一款众所周知的水果编曲软件&#xff0c;能够剪辑、混音、录音&#xff0c;它的矢量界面能更好用在4K、5K甚至8K显示器上&#xff0c;还可以可以编曲、剪辑、录音、混音&#xff0c;让你的计算机成为全功能录音室&#xff0c;不论是在功能上面还是用户界面上都是数…

小程序使用Nodejs作为服务端,Nodejs与与MYSQL数据库相连

小程序使用Nodejs作为服务端,Nodejs与MYSQL数据库相连 一、搭建环境二、配置Nodejs三、与小程序交互四、跨域处理/报错处理五、nodejs连接mysql数据库六、微信小程序连接nodejs报错七、小程序成功与服务端相连,且能操作数据库一、搭建环境 新建空文件夹:Win + R进入cmd命令界…

Composer 安装与使用

Composer 是 PHP 的一个依赖管理工具。我们可以在项目中声明所依赖的外部工具库&#xff0c;Composer 会帮你安装这些依赖的库文件&#xff0c;有了它&#xff0c;我们就可以很轻松的使用一个命令将其他人的优秀代码引用到我们的项目中来。 Composer 默认情况下不是全局安装&a…

为uniDBGrid设置文字操作栏

为uniDBGrid设置文字操作栏&#xff0c;如下图的效果&#xff0c;用户点击审核&#xff0c;执行审核代码&#xff0c;点退回&#xff0c;执行退回代码&#xff1a; 对于Web应用界面&#xff0c;这是最常见的方式&#xff0c;那对于我等Delphi开发者来说&#xff0c;基于uniGUI该…

RT-DETR优化:轻量化卷积设计 | DualConv双卷积魔改RT-DETR结构

🚀🚀🚀本文改进: DualConv双卷积魔改v8结构,达到轻量化的同时并能够实现小幅涨点 🚀🚀🚀RT-DETR改进创新专栏:http://t.csdnimg.cn/vuQTz 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; RT-DETR模型创新优化,涨点技巧分享,科研小助手; 1.DualC…

SpringBoot 源码解析1:环境搭建

SpringBoot 源码解析1&#xff1a;环境搭建 1.项目结构2.pom.xml3.MyApplication 1.项目结构 最简单的spring-web项目 2.pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns…

3. 内容模块管理 - 异常处理与校验

文章目录 内容模块管理一、自定义异常1.1 全局异常处理器1.2 自定义异常1.3 异常统一响应类1.4 封装通用异常信息 二、JSR303校验2.1 Maven坐标2.2 校验规则2.3 代码示例2.4 捕捉校验异常2.5 分组校验2.6 备注 三、全局异常处理23.1 全局异常处理器3.2 结果集3.3 常用注解3.3.1…

喝葡萄酒时观酒闻香尝味究竟有什么用?

对许多人来说&#xff0c;在品尝葡萄酒时能发现大多数人闻不到的香气和尝不到的味道似乎是一种神奇的能力。其他人则认为这是学究式葡萄酒爱好者过于活跃的想象&#xff0c;或者是保持葡萄酒鉴赏精英声誉的一种方式&#xff0c;但两者都不是。 部分是艺术&#xff0c;部分是科…

【异步绘制】UIView刷新原理 与 异步绘制

快捷目录 壹、 iOS界面刷新机制贰、浅谈UIView的刷新与绘制概述一.UIView 与 CALayer1. UIView 与 CALayer的关系2. CALayer的一些常用属性contents属性contentGravity属性contentsScale属性maskToBounds属性contentsRect属性 二.View的布局与显示1.图像显示原理2.布局layoutSu…

关于Ubuntu22.04恢复误删文件的记录

挂载在Ubuntu22.04下的固态盘有文件被误删了&#xff0c;该固态盘是ntfs格式的。 在网上找了很多教程&#xff0c;最后决定用TestDisk工具进行恢复。 现记录如下&#xff1a; Ubuntu安装testdisk sudo apt-get install testdisk运行testdisk sudo testdisk得到 我选择的是…

Vue3使用了Vite和UnoCSS导致前端项目启动报错:Error:EMFILE:too many open files

一个 Vue3 的项目&#xff0c;用的是 Vite 打包&#xff0c;通过 npm run dev 运行时&#xff0c;遇到了以下错误&#xff08;尤其是引入了 Element-Plus 后&#xff09;&#xff1a; Error: EMFILE: too many open files&#xff0c;后面是具体的文件路径。。甚至到了 node_mo…

面试官:这些大学生都会

大家好&#xff0c;我是 JavaPub。 最近有些同学在后台问我&#xff0c;面试总是会遇到被问 Linux 命令的问题&#xff0c;自己就面试个后端开发岗位&#xff0c;怎么这么难呢&#xff1f; 其实 Linux 命令&#xff0c;对于一个后端开发来说&#xff0c;并不是很难&#xff0c…

【DataSophon】大数据管理平台DataSophon-1.2.1基本使用

&#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&am…

git 常见错误总结(会不断更新中。。)

常见错误 1. 配置部署key后git clone还是拉不下代码 执行以下命令 先添加 SSH 密钥到 SSH 代理&#xff1a; 如果你使用 SSH 代理&#xff08;例如 ssh-agent&#xff09;&#xff0c;将生成的私钥添加到代理中。 ssh-add ~/.ssh/gstplatrontend/id_rsa如果报错以下错误信息…

wps左上角有绿色小三角的数字如何求和

1.这个状态是求和不了的&#xff0c;使用求和公式求出来的也是0 2.进行如下操作 3.转换好后 则可以求和成功了

Orange Comet利用Sui Kiosk进行游戏道具和知识产权保护

Orange Comet与AMC合作开发基于《行尸走肉》系列的NFT和游戏&#xff0c;首要关注的问题就是保护AMC的知识产权。利用Sui的Kiosk原语不仅让Orange Comet向AMC保证其资产安全&#xff0c;而且为即将推出的《行尸走肉大陆》游戏打开了无限的可能性。 Kiosk是Sui上的一个原语&…

文物数字化建模纹理贴图

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 1、文物3D数字化建模的特点 文物埋在地下历经千年&#xff0c;由于时…

拷贝的艺术:深拷贝与浅拷贝的区别与应用(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Apple Find My「查找」认证芯片找哪家,认准伦茨科技ST17H6x芯片

深圳市伦茨科技有限公司&#xff08;以下简称“伦茨科技”&#xff09;发布ST17H6x Soc平台。成为继Nordic之后全球第二家取得Apple Find My「查找」认证的芯片厂家&#xff0c;该平台提供可通过Apple Find My认证的Apple查找&#xff08;Find My&#xff09;功能集成解决方案。…