接口 V2 完善:分布式环境下的 WebSocket 实现与 Token 校验

news2025/1/30 10:47:38

🎯 本文档详细介绍了如何使用WebSocket协议优化客户端与服务端之间的通信,特别是在处理异步订单创建通知的场景中。通过引入WebSocket代替传统的HTTP请求-响应模式,实现了服务器主动向客户端推送数据的功能,极大地提高了实时性和效率。文中首先概述了WebSocket的优势,随后深入探讨了其在分布式系统中的具体实现,包括依赖管理、网关配置、WebSocket服务类的设计以及消息队列的使用等关键环节。特别地,针对分布式架构下WebSocket连接状态同步问题,提出了一种基于消息队列广播机制的解决方案,确保了系统的可扩展性和稳定性。同时,还强调了心跳检测机制的重要性,以维护连接的有效性。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)

文章目录

  • 前言
  • WebSocket 介绍
  • 流程图
  • 具体实现
    • 依赖
    • 网关配置
    • WebSocket配置类
    • WebSocket服务类
    • MQ消费者
    • 启动类
    • 配置文件
  • 注意事项
    • 登录验证
      • WebSocket 配置类
      • token校验
    • 分布式 WebSocket
    • 心跳检测

前言

在时间段预定接口 V2 中,用户预定之后,会发送一个消息,让消息队列异步创建订单。此时客户端是无法知道服务端什么时候完成订单创建的,因此需要服务端告知客户端。但是以往都是客户端给服务端发 http 请求,但是服务端如何主动告知客户端呢?

这个时候就需要请出我们今天的主角 WebSocket 了

WebSocket 介绍

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务器直接向客户端推送数据而不必由客户端发起请求。这种特性让实时性要求较高的应用,如即时通讯工具、在线游戏以及实时交易系统等,能够更加高效地进行数据交互。通过WebSocket,开发者可以构建响应更快、性能更高的网络应用,同时减少不必要的网络开销和延迟。相比传统的HTTP请求-响应模式,WebSocket提供了更低的延迟和更高的效率,特别是在需要频繁更新数据的应用场景中表现出色。

因此使用了 WebSocket ,一旦客户端和服务端建立了连接,当订单创建成功之后,服务端直接别订单数据推送给客户端即可。

流程图

user1、user2 和 user3 分别发起 WebSocket 连接,首先经过网关,连接请求被分发到不同的服务中。WebSocket 服务接收到连接请求之后,对其进行登录校验,如果校验成功,将其 Session 信息存储在服务器的内存中,如果校验失败,直接关闭 Session 。其中 user1、user2 的Session信息被存储在 WebSocket 服务1 中,user3 的Session信息被存储在 WebSocket 服务2 中。

当用户预定时间段,生成订单之后,场馆服务向消息队列中发生订单数据。接着消息队列将订单数据广播到 WebSocket 服务1 和 WebSocket 服务2中。WebSocket 服务2 发现自己的内存中存有 user3 的Session,因此将订单数据通过该 Session 发送给 user3 。

暂时无法在飞书文档外展示此内容

具体实现

为了解耦 WebSocket 和其他服务,单独创建一个 WebSocket 服务。

在这里插入图片描述

依赖

<dependencies>
    <dependency>
        <groupId>com.vrs</groupId>
        <artifactId>vrs-web</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.dam</groupId>
        <artifactId>vrs-rocketmq</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>com.vrs</groupId>
        <artifactId>vrs-common</artifactId>
    </dependency>
    <dependency>
        <groupId>com.vrs</groupId>
        <artifactId>vrs-idempotent</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <!-- websocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

网关配置

当访问 /websocket/** 路径时,将请求转化到 WebSocket 服务,注意,转发的时候添加了前缀ws:

- id: vrs-websocket
  uri: lb:ws://vrs-websocket
  predicates:
    - Path=/websocket/**
  filters:
    - name: TokenValidate
      args:
        whitePathList:
          - /websocket/**

【去除默认过滤器】

如果像这样全局配置了默认过滤器,DedupeResponseHeader过滤器的作用是对指定的响应头(在这个例子中为VaryAccess-Control-Allow-OriginAccess-Control-Allow-Credentials)进行去重。当有多个相同名称的响应头时,它会按照给定的策略保留其中的一个。这里的策略是RETAIN_FIRST,意味着它将保留这些头部中第一次出现的那个,而删除后续出现的重复头部。

spring:
  cloud:
    gateway:
      default-filters:
        - DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST

发起 WebSocket 连接的时候,会报如下错误,这是因为修改了只读的请求头

java.lang.UnsupportedOperationException: null
        at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        *__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
        *__checkpoint ⇢ HTTP GET "/websocket/admin?token=eyJhbGciOiJIUzUxMiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAA_6tWKi5NUrJScgwN8dANDXYNUtJRSq0oULIyNDe2NDMyNrYw0lEqLU4t8kwBilmYmZgZm5sbG5mbGViYGpgYQyX9EnNTgYYkpuRm5ilBhEIqC4BCRrUAvgeVqmEAAAA.e7wanr0gKu4FD-Y_afO2MEIECxZ6oMKGlf8zarZp-GOmzqL5n354gasKr7GKKs4H3Pq0CYJQECO_Rv9ixGsvZQ" [ExceptionHandlingWebHandler]
Original Stack Trace:
                at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]

因此需要将上述配置删除,如果还需要这些默认配置,可以到具体的路由下面设置,就像下面一样

spring:
  cloud:
    gateway:
      routes:
        - id: vrs-admin
          uri: lb://vrs-admin
          predicates:
            - Path=/admin/**
          filters:
            - DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST
            - name: TokenValidate
              args:
                whitePathList:
                  - /admin/user/v1/login
                  - /admin/user/v1/wechatLogin
                  - ...

WebSocket配置类

配置类 WebSocketConfig 主要用于配置和初始化 WebSocket 服务器端点,并处理与 WebSocket 连接相关的操作,具体功能如下:

  1. Spring Bean 注册:通过 @Configuration 注解标明这是一个 Spring 配置类。在该类中定义了一个 @Bean 方法 serverEndpointExporter(),它返回一个 ServerEndpointExporter 实例。这个实例的作用是自动注册使用了 @ServerEndpoint 注解声明的 WebSocket 端点对象到 Spring 容器中。
  2. 握手请求修改:modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) 方法重写了父类中的同名方法,用于在建立 WebSocket 连接前对握手请求进行自定义修改。在这个例子中,方法尝试从握手请求参数中获取名为 “token” 的参数,并将其存储在 ServerEndpointConfig 对象的用户属性中(即 sec.getUserProperties().put("token", token);)。这使得后续逻辑可以通过访问端点配置对象来获取令牌信息。
  3. 端点实例化:getEndpointInstance(Class<T> clazz) 方法重写了父类的方法,用于提供自定义逻辑来实例化被 @ServerEndpoint 标注的 WebSocket 端点类。在这个实现中,它直接调用了父类的实现 super.getEndpointInstance(clazz) 来创建端点实例。通常情况下,除非需要特别的实例化逻辑,否则可以直接使用父类的默认实现。
package com.vrs.config;

import jakarta.websocket.HandshakeResponse;
import jakarta.websocket.server.HandshakeRequest;
import jakarta.websocket.server.ServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import java.util.List;
import java.util.Map;

/**
 * @Author dam
 * @create 2025/1/24 15:25
 */
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {

    /**
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * 建立握手时,连接前的操作
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        // 获取请求参数
        Map<String, List<String>> parameterMap = request.getParameterMap();
        List<String> tokenList = parameterMap.get("token");
        if (tokenList != null && !tokenList.isEmpty()) {
            String token = tokenList.get(0);
            sec.getUserProperties().put("token", token);
        }
    }

    /**
     * 初始化端点对象,也就是被@ServerEndpoint所标注的对象
     */
    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
        return super.getEndpointInstance(clazz);
    }
}

WebSocket服务类

WebSocketServer 类是为实现实时通信而设计的,能够有效地管理多个客户端之间的双向通信以及保持这些通信的稳定性和可靠性。它通过 Spring 的 @Component 和 Jakarta WebSocket 的 @ServerEndpoint 注解被注册为一个 Spring Bean,并监听路径为 /websocket/{username} 的 WebSocket 请求。该类利用一个静态的 ConcurrentHashMap 来存储每个用户的会话 (Session) 和最后一次活动时间,以跟踪在线用户和他们的活跃状态。它实现了以下关键功能:

  • 连接管理:处理用户的连接建立 (onOpen) 和关闭 (onClose) 事件,包括校验用户提供的 token 是否有效。
  • 消息处理:接收来自客户端的消息 (onMessage) 并据此更新用户的最后活动时间,支持发送 PING/PONG 心跳消息来维持连接。
  • 心跳检测:通过定时任务每30秒检查一次用户的心跳,若某用户超过60秒未活动,则自动断开其连接,确保资源的有效利用。
  • 消息发送:提供了一个方法用于向特定用户发送消息。
package com.vrs.controller;

import com.vrs.config.WebSocketConfig;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.utils.JwtUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Author dam
 * @create 2024/1/24 14:32
 */
// 将WebSocketServer注册为spring的一个bean
@ServerEndpoint(value = "/websocket/{username}", configurator = WebSocketConfig.class)
@Component
@Slf4j(topic = "WebSocketServer")
public class WebSocketServer {

    /**
     * 心跳检查间隔时间(单位:秒)
     */
    private static final int HEARTBEAT_INTERVAL = 30;

    /**
     * 心跳超时时间(单位:秒)
     */
    private static final int HEARTBEAT_TIMEOUT = 60;

    /**
     * 记录当前在线连接的客户端的session
     */
    private static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();

    /**
     * 记录用户最后一次活动时间
     */
    private static final Map<String, Long> lastActivityTimeMap = new ConcurrentHashMap<>();

    /**
     * 直接通过 Autowired 注入的话,redisTemplate为null,因此使用这种引入方式
     */
    private static StringRedisTemplate redisTemplate;
    @Autowired
    public void setRabbitTemplate(StringRedisTemplate redisTemplate) {
        WebSocketServer.redisTemplate = redisTemplate;
    }

    /**
     * 定时任务线程池,用于心跳检查
     */
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    // 初始化心跳检查任务
    static {
        scheduler.scheduleAtFixedRate(WebSocketServer::checkHeartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }

    /**
     * 浏览器和服务端连接建立成功之后会调用这个方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {
        // 校验 token 是否有效
        String token = (String) config.getUserProperties().get("token");
        boolean validToken = validToken(token);
        if (!validToken) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // 如果用户已存在,关闭旧连接
        if (usernameAndSessionMap.containsKey(username)) {
            Session oldSession = usernameAndSessionMap.get(username);
            if (oldSession != null && oldSession.isOpen()) {
                try {
                    oldSession.close();
                } catch (IOException e) {
                    log.error("关闭旧连接时发生错误", e);
                }
            }
        }

        // 记录新连接
        usernameAndSessionMap.put(username, session);
        // 记录用户活动时间
        lastActivityTimeMap.put(username, System.currentTimeMillis());
        log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("username") String username) throws IOException {
        try {
            if (session != null && session.isOpen()) {
                session.close();
            }
        } catch (IOException e) {
            log.error("关闭连接时发生错误", e);
        } finally {
            usernameAndSessionMap.remove(username);
            lastActivityTimeMap.remove(username);
            log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());
        }
    }

    /**
     * 发生错误的时候会调用这个方法
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误,原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 收到客户端消息时调用
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("username") String username) {
        // 更新用户最后一次活动时间
        lastActivityTimeMap.put(username, System.currentTimeMillis());

        if ("PING".equals(message)) {
            log.debug("收到来自 {} 的心跳检测请求", username);
        } else {
            log.info("收到来自 {} 的消息: {}", username, message);
        }
    }

    /**
     * 服务端发送消息给客户端
     */
    public void sendMessage(String toUsername, String message) {
        try {
            Session toSession = usernameAndSessionMap.get(toUsername);
            if (toSession != null && toSession.isOpen()) {
                toSession.getBasicRemote().sendText(message);
            } else {
                log.warn("用户 {} 的会话已关闭或不存在", toUsername);
            }
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败", e);
        }
    }


    /**
     * 关闭心跳检测超时的 session
     */
    private static void checkHeartbeat() {
        long currentTime = System.currentTimeMillis();
        for (Map.Entry<String, Long> entry : lastActivityTimeMap.entrySet()) {
            String username = entry.getKey();
            long lastActivityTime = entry.getValue();
            if (currentTime - lastActivityTime > HEARTBEAT_TIMEOUT * 1000) {
                log.info("用户 {} 心跳超时,关闭连接", username);
                Session session = usernameAndSessionMap.get(username);
                if (session != null) {
                    try {
                        session.close();
                    } catch (IOException e) {
                        log.error("关闭连接时发生错误", e);
                    }
                }
                usernameAndSessionMap.remove(username);
                lastActivityTimeMap.remove(username);
            }
        }
    }

    /**
     * 校验 token 有效
     *
     * @param token
     * @return
     */
    private boolean validToken(String token) {
        String userName = "";
        try {
            // 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效
            userName = JwtUtil.getUsername(token);
        } catch (Exception e) {
            return false;
        }
        if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&
                (redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {
            // --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验
            return true;
        }
        return false;
    }

}

MQ消费者

package com.vrs.rocketMq.listener;

import com.vrs.constant.RocketMqConstant;
import com.vrs.controller.WebSocketServer;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 执行预订流程 消费者
 *
 * @Author dam
 * @create 2024/9/20 21:30
 */
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,
        consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG,
        // 需要使用广播模式
        messageModel = MessageModel.BROADCASTING,
        // 监听tag
        selectorType = SelectorType.TAG,
        selectorExpression = RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG
)
@RequiredArgsConstructor
public class WebSocketSendMessageListener implements RocketMQListener<MessageWrapper<WebsocketMqDTO>> {

    private final WebSocketServer webSocketServer;

    /**
     * 消费消息的方法
     * 方法报错就会拒收消息
     *
     * @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
     */
    @SneakyThrows
    @Override
    public void onMessage(MessageWrapper<WebsocketMqDTO> messageWrapper) {
        // 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
        log.info("[消费者] websocket发生消息给{}", messageWrapper.getMessage().getToUsername());
        webSocketServer.sendMessage(messageWrapper.getMessage().getToUsername(), messageWrapper.getMessage().getMessage());
    }
}

启动类

package com.vrs;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

/**
 * @Author dam
 * @create 2025/01/24 16:34
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class VrsWebSocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(VrsWebSocketApplication.class, args);
    }
}

配置文件

server:
  port: 7054
spring:
  profiles:
    active: dam
  application:
    name: vrs-websocket
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
  data:
    redis:
      host: 127.0.0.1
      port: 6379
      password: 12345678
      database: 0
      timeout: 1800000
      jedis:
        pool:
          max-active: 20 #最大连接数
          max-wait: -1    #最大阻塞等待时间(负数表示没限制)
          max-idle: 5    #最大空闲
          min-idle: 0     #最小空闲
rocketmq:
  # rocketMq的nameServer地址
  name-server: 127.0.0.1:9876
  producer:
    # 生产者组别
    group: vrs-websocket-group
    # 消息发送的超时时间
    send-message-timeout: 10000
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 1
    # 发送消息的最大大小,单位字节,这里等于4M
    max-message-size: 999999999

注意事项

登录验证

为了防止被人恶意发生大量 WebSocket 连接,占用服务器资源,因此在建立连接的时候,需要进行登录验证,用户登录了才可以建立 WebSocket 连接。

由于建立 WebSocket 连接时,无法像之前的 http 请求一样在请求头携带 token 信息,因此之前网关实现的登录校验机制不生效,需要我们针对 WebSocket 连接额外实现一套登录验证方式。

假设前端发起 WebSocket 连接的代码如下:

new WebSocket("ws://localhost:7049/websocket/admin?token=dahidaho");

WebSocket 配置类

modifyHandshake中,将客户端发起连接请求时的 token 设置到属性中,这样后面就可以将 token 获取出来进行校验,如果说校验不通过,就关闭 WebSokcet 连接

token校验

代码位于WebSocketServer类中,当调用validToken校验失败之后,通过session.close来关闭连接

/**
 * 校验 token 有效
 *
 * @param token
 * @return
 */
private boolean validToken(String token) {
    String userName = "";
    try {
        // 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效
        userName = JwtUtil.getUsername(token);
    } catch (Exception e) {
        return false;
    }
    if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&
            (redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {
        // --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验
        return true;
    }
    return false;
}

/**
 * 浏览器和服务端连接建立成功之后会调用这个方法
 */
@OnOpen
public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {
    // 校验 token 是否有效
    String token = (String) config.getUserProperties().get("token");
    boolean validToken = validToken(token);
    if (!validToken) {
        try {
            session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 如果用户已存在,关闭旧连接
    if (usernameAndSessionMap.containsKey(username)) {
        Session oldSession = usernameAndSessionMap.get(username);
        if (oldSession != null && oldSession.isOpen()) {
            try {
                oldSession.close();
            } catch (IOException e) {
                log.error("关闭旧连接时发生错误", e);
            }
        }
    }

    // 记录新连接
    usernameAndSessionMap.put(username, session);
    // 记录用户活动时间
    lastActivityTimeMap.put(username, System.currentTimeMillis());
    log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
}

分布式 WebSocket

由于我们的项目是分布式架构的,如果vrs-websocket启动多个服务的话,需要处理如下问题:

WebSocketServer中的用户名及其对应的session信息usernameAndSessionMap是存储在本地的,假设发起连接的时候,session被存储在机器 1 上面。后续服务端要通知客户端时,怎么知道当前用户的信息是存储在机器1、机器 2 还是机器 3 呢?

由于 Session 无法直接序列化存储到 Redis 中,为了解决这个问题,本文通过借助消息队列来解决。

服务端要发送消息给客户端时,先将消息发送至消息队列中,消息设置为广播模式。后续多台部署了vrs-websocket的机器去消息队列中获取消息来消费,如果机器检查到了这条消息的接收者 session 就在机器上,则执行发送,否则直接 return 即可。

【消息生产者】

package com.vrs.rocketMq.producer;

import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * websocket发送消息 生产者
 *
 * @Author dam
 * @create 2024/9/20 16:00
 */
@Slf4j
@Component
public class WebsocketSendMessageProducer extends AbstractCommonSendProduceTemplate<WebsocketMqDTO> {

    @Override
    protected BaseSendExtendDTO buildBaseSendExtendParam(WebsocketMqDTO messageSendEvent) {
        return BaseSendExtendDTO.builder()
                .eventName("执行时间段预定")
                .topic(RocketMqConstant.VENUE_TOPIC)
                .tag(RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG)
                .sentTimeout(2000L)
                .build();
    }

    @Override
    protected Message<?> buildMessage(WebsocketMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {
        String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();
        return MessageBuilder
                .withPayload(new MessageWrapper(keys, messageSendEvent))
                .setHeader(MessageConst.PROPERTY_KEYS, keys)
                .setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag())
                .build();
    }
}

【消息消费者】

消费者的代码就在具体实现中,这里不重复放

【使用】

// 通过 websocket 发送消息,通知前端
websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder()
        .toUsername(orderDO.getUserName())
        .message(JSON.toJSONString(orderDO))
        .build());

心跳检测

用户建立 WebSocket 连接之后的 session 数据是存储在服务器本地的,随着连接数量的增加,session会占用大量的内存,心跳检测是为了定期清理那些无效的连接。

WebSocketServer中,通过定时任务每30秒检查一次客户端的心跳状态,记录每个用户的最后活动时间。如果当前时间与某用户最后活动时间之差超过60秒,则认为该用户心跳超时,服务端将关闭其WebSocket连接并清理相关记录。客户端需定期向服务端发送"PING"消息以维持连接活跃,确保不会因超时而被服务端断开。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2284641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025年数学建模美赛:A题分析(1)Testing Time: The Constant Wear On Stairs

2025年数学建模美赛 A题分析&#xff08;1&#xff09;Testing Time: The Constant Wear On Stairs 2025年数学建模美赛 A题分析&#xff08;2&#xff09;楼梯磨损分析模型 2025年数学建模美赛 A题分析&#xff08;3&#xff09;楼梯使用方向偏好模型 2025年数学建模美赛 A题分…

使用Vue3实现可拖拽的九点导航面板

开篇 本文使用Vue3实现了一个可拖拽的九宫导航面板。这个面板在我这里的应用场景是我个人网站的首页的位置&#xff0c;九宫导航对应的是用户最后使用或者最多使用的九个功能&#xff0c;正常应该是由后端接口返回的&#xff0c;不过这里为了简化&#xff0c;写的是固定的数组数…

68-《贝壳花》

贝壳花 贝壳花&#xff08;学名&#xff1a;Moluccella laevis Linn.&#xff09;是属于唇形科&#xff0c;贝壳花是一、二年的草本。植株高5至60cm&#xff0c;茎四棱&#xff0c;不分枝。叶对生&#xff0c;心脏状圆形&#xff0c;边缘疏生齿牙&#xff1b;叶柄和叶近等长。花…

【自然语言处理(NLP)】深度循环神经网络(Deep Recurrent Neural Network,DRNN)原理和实现

文章目录 介绍深度循环神经网络&#xff08;DRNN&#xff09;原理和实现结构特点工作原理符号含义公式含义 应用领域优势与挑战DRNN 代码实现 个人主页&#xff1a;道友老李 欢迎加入社区&#xff1a;道友老李的学习社区 介绍 **自然语言处理&#xff08;Natural Language Pr…

2025数学建模美赛|F题成品论文

国家安全政策与网络安全 摘要 随着互联网技术的迅猛发展&#xff0c;网络犯罪问题已成为全球网络安全中的重要研究课题&#xff0c;且网络犯罪的形式和影响日益复杂和严重。本文针对网络犯罪中的问题&#xff0c;基于多元回归分析和差异中的差异&#xff08;DiD&#xff09;思…

自定义数据集 使用pytorch框架实现逻辑回归并保存模型,然后保存模型后再加载模型进行预测

代码&#xff1a; import torch import numpy as np import torch.nn as nn# 定义数据&#xff1a;x_data 是特征&#xff0c;y_data 是标签&#xff08;目标值&#xff09; data [[-0.5, 7.7],[1.8, 98.5],[0.9, 57.8],[0.4, 39.2],[-1.4, -15.7],[-1.4, -37.3],[-1.8, -49.…

关于使用PHP时WordPress排错——“这意味着您在wp-config.php文件中指定的用户名和密码信息不正确”的解决办法

本来是看到一位好友的自己建站&#xff0c;所以突发奇想&#xff0c;在本地装个WordPress玩玩吧&#xff0c;就尝试着装了一下&#xff0c;因为之前电脑上就有MySQL&#xff0c;所以在自己使用PHP建立MySQL时报错了。 最开始是我的php启动mysql时有问题&#xff0c;也就是启动过…

【蓝桥杯】43694.正则问题

题目描述 考虑一种简单的正则表达式&#xff1a; 只由 x ( ) | 组成的正则表达式。 小明想求出这个正则表达式能接受的最长字符串的长度。 例如 ((xx|xxx)x|(x|xx))xx 能接受的最长字符串是&#xff1a; xxxxxx&#xff0c;长度是 6。 输入描述 一个由 x()| 组成的正则表达式。…

服务器虚拟化技术详解与实战:架构、部署与优化

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 引言 在现代 IT 基础架构中&#xff0c;服务器虚拟化已成为提高资源利用率、降低运维成本、提升系统灵活性的重要手段。通过服务…

jvm--类的生命周期

学习类的生命周期之前&#xff0c;需要了解一下jvm的几个重要的内存区域&#xff1a; &#xff08;1&#xff09;方法区&#xff1a;存放已经加载的类信息、常量、静态变量以及方法代码的内存区域 &#xff08;2&#xff09;常量池&#xff1a;常量池是方法区的一部分&#x…

TensorFlow实现逻辑回归模型

逻辑回归是一种经典的分类算法&#xff0c;广泛应用于二分类问题。本文将介绍如何使用TensorFlow框架实现逻辑回归模型&#xff0c;并通过动态绘制决策边界和损失曲线来直观地观察模型的训练过程。 数据准备 首先&#xff0c;我们准备两类数据点&#xff0c;分别表示两个不同…

《十七》浏览器基础

浏览器&#xff1a;是安装在电脑里面的一个软件&#xff0c;能够将页面内容渲染出来呈现给用户查看&#xff0c;并让用户与网页进行交互。 常见的主流浏览器&#xff1a; 常见的主流浏览器有&#xff1a;Chrome、Safari、Firefox、Opera、Edge 等。 输入 URL&#xff0c;浏览…

网络安全 | F5-Attack Signatures-Set详解

关注&#xff1a;CodingTechWork 创建和分配攻击签名集 可以通过两种方式创建攻击签名集&#xff1a;使用过滤器或手动选择要包含的签名。  基于过滤器的签名集仅基于在签名过滤器中定义的标准。基于过滤器的签名集的优点在于&#xff0c;可以专注于定义用户感兴趣的攻击签名…

STranslate 中文绿色版即时翻译/ OCR 工具 v1.3.1.120

STranslate 是一款功能强大且用户友好的翻译工具&#xff0c;它支持多种语言的即时翻译&#xff0c;提供丰富的翻译功能和便捷的使用体验。STranslate 特别适合需要频繁进行多语言交流的个人用户、商务人士和翻译工作者。 软件功能 1. 即时翻译&#xff1a; 文本翻译&#xff…

基于微信小程序的助农扶贫系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

我谈区域偏心率

偏心率的数学定义 禹晶、肖创柏、廖庆敏《数字图像处理&#xff08;面向新工科的电工电子信息基础课程系列教材&#xff09;》P312 区域的拟合椭圆看这里。 Rafael Gonzalez的二阶中心矩的表达不说人话。 我认为半长轴和半短轴不等于特征值&#xff0c;而是特征值的根号。…

关于低代码技术架构的思考

我们经常会看到很多低代码系统的技术架构图&#xff0c;而且经常看不懂。是因为技术架构图没有画好&#xff0c;还是因为技术不够先进&#xff0c;有时候往往都不是。 比如下图&#xff1a; 一个开发者&#xff0c;看到的视角往往都是技术层面&#xff0c;你给用户讲React18、M…

若依路由配置教程

1. 路由配置文件 2. 配置内容介绍 { path: "/tool/gen-edit", component: Layout, //在路由下&#xff0c;引用组件的名称&#xff0c;在页面中包括这个组件的内容(页面框架内容) hidden: true, //此页面的内容&#xff0c;在左边的菜单中不用显示。 …

基于ESP8266的多功能环境监测与反馈系统开发指南

项目概述 本系统集成了物联网开发板、高精度时钟模块、环境传感器和可视化显示模块&#xff0c;构建了一个智能环境监测与反馈装置。通过ESP8266 NodeMCU作为核心控制器&#xff0c;结合DS3231实时时钟、DHT11温湿度传感器、光敏电阻和OLED显示屏&#xff0c;实现了环境参数的…