基于WebSocket实现简易即时通讯功能

news2024/10/10 22:43:43

代码实现

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.15.0</version>
</dependency>

配置信息

部分内容非必须,按自身需求处理即可

  • WebSocketConfig
package com.example.im.config;

import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.annotation.Resource;

/**
 * @author PC
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Resource
    private WebSocketProperties webSocketProperties;

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

    /***
     * 配置线程池
     * @return 线程池
     */
    @Bean
    public TaskExecutor taskExecutor() {
        WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(executorProperties.getCorePoolSize());
        // 设置最大线程数
        executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
        // 设置队列容量
        executor.setQueueCapacity(executorProperties.getQueueCapacity());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
        // 设置默认线程名称
        executor.setThreadNamePrefix("im-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}
  • WebSocketProperties
package com.example.im.config;

import com.example.im.infra.constant.ImConstants;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author PC
 */
@Configuration
@ConfigurationProperties(prefix = "cus.ws")
public class WebSocketProperties {

    /**
     * 接收人是否排除自身,默认排除
     */
    private Boolean receiverExcludesHimselfFlag = true;

    /**
     * 消息是否排除接收人信息,默认不排除
     */
    private Boolean excludeReceiverInfoFlag = false;

    /**
     * 线程池信息
     */
    private ExecutorProperties executorProperties = new ExecutorProperties();

    /**
     * 发送消息给指定人的分隔符,默认为@
     */
    private String receiverSeparator = ImConstants.Symbol.AT;

    public Boolean getReceiverExcludesHimselfFlag() {
        return receiverExcludesHimselfFlag;
    }

    public void setReceiverExcludesHimselfFlag(Boolean receiverExcludesHimselfFlag) {
        this.receiverExcludesHimselfFlag = receiverExcludesHimselfFlag;
    }

    public Boolean getExcludeReceiverInfoFlag() {
        return excludeReceiverInfoFlag;
    }

    public void setExcludeReceiverInfoFlag(Boolean excludeReceiverInfoFlag) {
        this.excludeReceiverInfoFlag = excludeReceiverInfoFlag;
    }

    public String getReceiverSeparator() {
        return receiverSeparator;
    }

    public void setReceiverSeparator(String receiverSeparator) {
        this.receiverSeparator = receiverSeparator;
    }

    public ExecutorProperties getExecutorProperties() {
        return executorProperties;
    }

    public void setExecutorProperties(ExecutorProperties executorProperties) {
        this.executorProperties = executorProperties;
    }

    /**
     * 线程池信息
     */
    public static class ExecutorProperties {
        /**
         * 核心线程数
         */
        private int corePoolSize = 10;
        /**
         * 最大线程数
         */
        private int maxPoolSize = 20;
        /**
         * 队列容量
         */
        private int queueCapacity = 50;
        /**
         * 线程活跃时间(秒)
         */
        private int keepAliveSeconds = 60;

        public int getCorePoolSize() {
            return corePoolSize;
        }

        public void setCorePoolSize(int corePoolSize) {
            this.corePoolSize = corePoolSize;
        }

        public int getMaxPoolSize() {
            return maxPoolSize;
        }

        public void setMaxPoolSize(int maxPoolSize) {
            this.maxPoolSize = maxPoolSize;
        }

        public int getQueueCapacity() {
            return queueCapacity;
        }

        public void setQueueCapacity(int queueCapacity) {
            this.queueCapacity = queueCapacity;
        }

        public int getKeepAliveSeconds() {
            return keepAliveSeconds;
        }

        public void setKeepAliveSeconds(int keepAliveSeconds) {
            this.keepAliveSeconds = keepAliveSeconds;
        }
    }
}

application.yml

server:
  port: 18080
cus:
  ws:
    exclude-receiver-info-flag: true
    receiver-excludes-himself-flag: true

ws端口

  • WebSocketEndpoint

注意:若按常规注入方式(非static修饰),在项目启动时setWebSocketMessageService是有值的,但是发送消息时WebSocketMessageService会变为null,需要用static修饰。

其原因为Spring的bean管理是单例的,但是WebSocket是多对象的,当新用户进入系统时,会创建一个新的WebSocketEndpoint对象,但是不会再注入WebSocketMessageService,这样就会导致其为null。若想解决该问题,可以使用static修饰WebSocketMessageService,static修饰的对象属于类,而非实例,其在类加载时即可进行初始化。

package com.example.im.endpoint;

import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author PC
 */
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);

    public static final ConcurrentHashMap<String, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();

    private Session session;
    private static WebSocketMessageService webSocketMessageService;

    @Autowired
    public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
        WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
    }

    /**
     * 打开ws连接
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        //连接成功
        logger.info("The connection is successful:" + getUserName(session));
        this.session = session;
        WEB_SOCKET_ENDPOINT_MAP.put(getUserName(session), this);
    }

    /**
     * 断开ws连接
     *
     * @param session 会话
     */
    @OnClose
    public void onClose(Session session) {
        WEB_SOCKET_ENDPOINT_MAP.remove(getUserName(session));
        //断开连接
        logger.info("Disconnect:" + getUserName(session));
    }

    /**
     * 接收到的消息
     *
     * @param message 消息内容
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        //接收消息
        String sendUserName = getUserName(session);
        logger.info(sendUserName + " send message: " + message);
        webSocketMessageService.sendMessage(sendUserName, message);
    }

    private String getUserName(Session session) {
        return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
                .stream().findFirst().orElse("anonymous_users");
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

实现类

WebSocketMessageServiceImpl

package com.example.im.app.service.impl;

import com.example.im.app.service.WebSocketMessageService;
import com.example.im.config.WebSocketProperties;
import com.example.im.endpoint.WebSocketEndpoint;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author PC
 */
@Service
public class WebSocketMessageServiceImpl implements WebSocketMessageService {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketMessageServiceImpl.class);

    private WebSocketProperties webSocketProperties;

    @Autowired
    public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
        this.webSocketProperties = webSocketProperties;
    }

    private TaskExecutor taskExecutor;

    @Autowired
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void sendMessage(String sendUserName, String message) {
        //包含@发给指定人,否则发给全部人
        if (StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
            this.sendToUser(sendUserName, message);
        } else {
            this.sendToAll(sendUserName, message);
        }
    }

    private void sendToUser(String sendUserName, String message) {
        getReceiverName(sendUserName, message).forEach(receiverName -> taskExecutor.execute(() -> {
                            try {
                                if (WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.containsKey(receiverName)) {
                                    WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverName).getSession().getBasicRemote()
                                            .sendText(generatorMessage(message));
                                }
                            } catch (IOException ioException) {
                                logger.error("send error:" + ioException);
                            }
                        }
                )
        );
    }

    private void sendToAll(String sendUserName, String message) {
        for (Map.Entry<String, WebSocketEndpoint> webSocketEndpointEntry : WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.entrySet()) {
            taskExecutor.execute(() -> {
                        if (webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, webSocketEndpointEntry.getKey())) {
                            return;
                        }
                        try {
                            webSocketEndpointEntry.getValue().getSession().getBasicRemote()
                                    .sendText(generatorMessage(message));
                        } catch (IOException ioException) {
                            logger.error("send error:" + ioException);
                        }
                    }
            );
        }
    }

    private List<String> getReceiverName(String sendUserName, String message) {
        if (!StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
            return new ArrayList<>();
        }
        String[] names = StringUtils.split(message, webSocketProperties.getReceiverSeparator());
        return Stream.of(names).skip(1).filter(receiver ->
                        !(webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, receiver)))
                .collect(Collectors.toList());
    }

    /**
     * 根据配置处理发送的信息
     *
     * @param message 原消息
     * @return 被处理后的消息
     */
    private String generatorMessage(String message) {
        return BooleanUtils.isTrue(webSocketProperties.getExcludeReceiverInfoFlag()) ?
                StringUtils.substringBefore(message, webSocketProperties.getReceiverSeparator()) : message;
    }
}

测试

Postman访问WebSocket

点击new,新建WebSocket连接

创建ws连接

连接格式:ws://ip:port/endpoint

例如,本次实例demo的ws连接如下,userName为自定义参数,测试使用,非必须,根据自身需求调整即可

ws://127.0.0.1:18080/ws?userName=test1

点击Connect进行连接

为了方便测试,再创建三个ws连接,也进行Connect

ws://127.0.0.1:18080/ws?userName=test2

ws://127.0.0.1:18080/ws?userName=test3

ws://127.0.0.1:18080/ws?userName=test4

测试

连接后,在test1所在页面发送消息

  • 首先测试@用户的情况

test2、test3可接收消息,test4无消息

  • 而后测试发送给所有人的情况

test2、test3、test4均接收到消息

参考资料

[1].即时通讯demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2203378.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++(vector的实现)

1. vector介绍 vector本质其实就是和我们数据结构中的顺序表类似&#xff0c;都是对一个数组进行增删查改等操作&#xff0c;但是这里vector的实现和顺序表的实现有所不同&#xff0c;vector的底层源码的实现是通过三个迭代器实现的&#xff0c;一个是指向开始的位置_start&…

优化小企业财务,使用记账软件的好处解析

财务记账软件优化企业财务管理&#xff0c;支持开票、在线支付、费用分类、银行对账、工时项目管理、库存管理及税务合规&#xff0c;自动生成报表助企业决策&#xff0c;克服传统电子表格局限&#xff0c;支持企业持续健康发展。 使用财务记账软件的好处和优势 1、开票和计费…

10.pwn 中级ROP

ret2csu 什么是ret2csu&#xff1f; 这个其实就是在程序中一般都会有一段万能的控制参数的gadgets&#xff0c;里面可以控制rbx,rbp,r12,r13,r14,r15以及rdx,rsi,edi的值&#xff0c;并且还可以call我们指定的地址。然后劫持程序执行流的时候&#xff0c;劫持到这个__libc_cs…

seL4 Faults(八)

Faults 学习什么是线程错误理解线程错误和处理器硬件错误是不同的理解什么是错误处理器理解内核对于一个有错误的线程做了什么了解如何设置内核将在其上传递故障消息的端点&#xff08;master与 MCS&#xff09;。在错误故障后学习如何恢复线程。 Background: What is a faul…

(21)Nakagami-m分布及其参数的意义

文章目录 前言一、Nakagami衰落的定义二、Nakagami衰落的形状参数m三、Nakagami衰落的尺度参数ω四、Nakagami随机变量的生成 前言 在无线信道中&#xff0c;由于电波的多径传播效应&#xff0c;接收到的信号强度会因为多条传播路径的相长或相消而发生起伏变化。这种现象被称为…

mysql迁移到达梦数据库报错:参数不兼容

1: 这个错误可能是某个字段‘定义超长’&#xff0c;尝试&#xff1a; 2: 如果还报错&#xff0c;指定和mysql同版本驱动

树状数组——学习心得

可以解决大部分区间上面的修改以及查询的问题&#xff0c;例如1.单点修改&#xff0c;单点查询&#xff0c;2.区间修改&#xff0c;单点查询&#xff0c;3.区间查询&#xff0c;区间修改&#xff0c;换言之&#xff0c;线段树能解决的问题&#xff0c;树状数组大部分也可以&…

全栈开发笔记

1.后端没问题 前端不显示返回数据&#xff1f; 返回数据被&#xff0c;axios拦截器拦截了 2.路径跳转显示空白&#xff1f; 没有配置router 3.后端部署到服务器上 无法通过外网访问接口&#xff1f; 检查服务器防火墙设置 即使服务监听正确&#xff0c;服务器本身的防火墙也可能…

【工作流引擎集成】springboot+Vue+activiti+mysql带工作流集成系统,直接用于业务开发,流程设计,工作流审批,会签

前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;一套完整并且实际运用在多套项目中的案例&#xff0c;满足日常业务流程审批需求。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器&#xff0c;流行的前后端…

前端_002_CSS扫盲

文章目录 概念选择器常用属性背景边框高度和宽度颜色文本字体链接表格里对齐显示相关溢出&#xff0c;滚动条属性 伪类和伪元素 概念 1.书写格式&#xff1a; 选择器{ 属性名:属性值 ; 属性名:属性值 ; } 2.文件后缀.css 选择器 元素选择器 [tag] id选择器 #[id_name] c…

西门子S7-SMART运动控制向导

打开“运动控制”向导&#xff0c;“工具”->“向导”->“运动控制” 图 1.打开“运动控制”向导 选择需要配置的轴 图 2.选择需要配置的轴 为所选择的轴命名 图 3.为所选择的轴命名 输入系统的测量系统&#xff08;“工程量”或者“脉冲数/转”&#xff…

开机启动项在哪里关闭?五个全面指南,教你关闭开机启动项!(新)

您是否发现您的电脑运行性能正在受一些无关紧要的应用程序所影响呢&#xff1f;也许您没有意识到&#xff0c;每当您登录电脑时&#xff0c;许多程序会在不知情的情况下自动启动。这些自动启动的程序不仅会拖慢系统的运行速度&#xff0c;还会占用大量的内存和cpu资源。为了改善…

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…

YOLOv11进行图像与视频的目标检测

一、AI应用系统实战项目 项目名称项目名称1.人脸识别与管理系统2.车牌识别与管理系统3.手势识别系统4.人脸面部活体检测系统5.YOLOv8自动标注6.人脸表情识别系统7.行人跌倒检测系统8.PCB板缺陷检测系统9.安全帽检测系统10.生活垃圾分类检测11.火焰烟雾检测系统12.路面坑洞检测…

使用Qt Creator创建项目

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 使用Qt Creator创建项目 收录于专栏【Qt开发】 本专栏旨在分享学习Qt的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 温馨提示: 1. 新…

Fastjson反序列化

Fastjson反序列化一共有三条利用链 TempLatesImpl&#xff1a;实战中不适用JdbcRowSetImpl&#xff1a;实际运用中较为广泛BasicDataSource&#xff08;BCEL&#xff09; 反序列化核心 反序列化是通过字符串或字节流&#xff0c;利用Java的反射机制重构一个对象。主要有两种…

Spring Boot 进阶-详解Spring Boot与其他框架整合

通过前面的文章,我们了解到了Spring、Spring Boot框架都是为Java企业级开发提供了一个基础框架,我们可以通过这个基础框架去整合其他的框架来实现我们具体的业务功能。 在网站上搜索一下,Spring Boot整合某某框架就会出现大量的教程,但是总会有一天你会遇到一个你没有教程的…

jenkins中的allure和email问题梳理

一、allure相关 1、我安装了jenkins之后需要再安装allure吗&#xff1f;在jenkins插件中心直接安装allure 1.Allure Jenkins Plugin 只是一个集成插件&#xff0c;它要求你在 Jenkins 服务器上安装 Allure 命令行工具&#xff08;Allure Commandline&#xff09;来实际生成报…

真的有被Transformer多头注意力惊艳到…

在这篇文章中&#xff0c;我们将深入探讨 Transformer 的核心部分-多头注意力&#xff08;Multi-head Attention&#xff09;。 这个机制能让 Transformer 同时从多个角度理解数据&#xff0c;提高处理信息的能力和效率。 01、Transformer 中如何使用注意力机制 Transformer…

[数据结构]带头双向循环链表的实现与应用

文章目录 一、引言二、链表的基本概念1、链表是什么2、链表与顺序表的区别3、带头双向循环链表 三、带头双向循环链表的实现1、结构体定义2、初始化3、销毁4、显示5、增删查改 四、分析带头双向循环链表1、存储方式2、优点3、缺点 五、总结1、练习题2、源代码 一、引言 链表作…