SpringCloud源码探析(十)-Web消息推送

news2024/11/16 15:36:58

1.概述

消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息,发送至用户当前访问的网页或者移动设备。本文主要分析在web端进行消息推送的几种方式,实现用户在web端接收推送消息。

2.消息推送几种方式

web消息推送的方式主要分为两种:一种是主动向服务端请求(简单理解为客户端pull消息)、一种是服务端推送(简单理解为服务端push消息)。两种方式各有利弊:主动向服务端请求会按照一定周期不断去请求服务器,如果客户端数量庞大会对服务端造成极大压力,并且数据具有一定延时性;服务端推送实时性较好,但服务端需要存储客户端会话信息,如果客户端数量较多,服务端查询对应会话压力较大。

2.1 Pull消息

Pull消息主要是客户端发起的操作,定时向服务端进行轮询获取消息,轮询可分为短轮询和长轮询。

短轮询:指定时间间隔,由应用浏览器发送http请求,服务器实时返回消息至客户端,浏览器进行展示;短轮询在前端一般通过JS定时器定时发送请求来实现;
长轮询:是对短轮询的一种优化,客户端发起请求,服务器不会立即返回请求结果,而是将请求挂起等待一段时间,如果此时间段内数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间响应请求,客户端重新发起长连接;长轮询在nacos、Kafka、RocketMQ队列中使用较多。

2.2 Push消息

服务端向客户端推送,在一定程度上能节约一部分资源,常用的方式有WebSocket、SSE等,还有一些通过中间件RabbitMQ来实现等。本文主要介绍利用SSE方式和WebSocket方式来推送消息,具体如下:

2.2.1 SSE

SSE(Server-sent events)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的短轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。SSE的主要特点包括:

简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单;
单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据;
实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

SSE的整体实现思路如下,它的原理其实类似于在线视频播放,视频流会连续不断的推送到浏览器,图如下:
在这里插入图片描述
其实可以简单地理解为它是一种单向实时通信技术,一旦客户端与服务端建立连接,只能接收服务端信息,不能向服务端发送信息,且拥有自动重连机制,客户端与服务端断开会进行自动重连,websocket断开不能自动重连,这是SSE优于websocket的地方。
Springboot使用SSE功能发送信息代码如下,由于springboot内嵌sse模块,因此不需要引入额外包:

package com.eckey.lab.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: Marin
 * @CreateTime: 2023-10-08  14:29
 * @Description: TODO
 * @Version: 1.0
 */
@Slf4j
@RestController
@CrossOrigin //此注解是为了解决测试过程中的跨域问题
@RequestMapping("/sse")
public class SSEController {

    /**
     * 使用Map对象来存放userId和对应的会话
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * @description: 浏览器端注册,将会话信息存入Map,这种方式会导致一个userId只能与服务器建立一个会话,生产环境慎用这种方式
     * @author: Marin
     * @date: 2023/10/9 16:51
     * @param: [userId]
     * @return: org.springframework.web.servlet.mvc.method.annotation.SseEmitter
     **/
    @GetMapping(path = "/subscribe/{userId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable("userId") String userId) throws IOException {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(30_000L);
        // 设置前端的重试时间为15s,如果不加这个发送一下,前端就不会显示连接成功
        sseEmitter.send("连接成功");
        // 注册回调
        sseEmitter.onCompletion(() -> {
            log.info("连接结束:{}", userId);
        });

        sseEmitter.onError((Throwable throwable) -> {
            log.error("连接异常:{}", throwable.getMessage());
        });

        sseEmitter.onTimeout(() -> {
            log.warn("连接超时:{}", userId);
        });
        //以userId为key,如果一个用户多个设备连接,会不准确
        sseEmitterMap.put(userId, sseEmitter);
        log.info("创建新的sse连接,当前用户:{}", userId);
        return sseEmitter;
    }

    /**
     * @description: 向指定用户发送指定信息
     * @author: Marin
     * @date: 2023/10/9 16:53
     * @param: [userId, message]
     * @return: void
     **/
    @GetMapping(path = "/sendMessage")
    public void sendMessage(String userId, String message) {
        if (sseEmitterMap.containsKey(userId)) {
            try {
                log.info("向用户:{},发送消息:{}", userId, message);
                sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", userId, e.getMessage());
                removeUser(userId);
            }
        }
    }

    /**
     * @description: 移除用户
     * @author: Marin
     * @date: 2023/10/9 16:53
     * @param: [userId]
     * @return: void
     **/
    private void removeUser(String userId) {
        sseEmitterMap.remove(userId);
        log.info("移除用户成功:{}", userId);
    }

    /**
     * @description: 删除与指定用户会话
     * @author: Marin
     * @date: 2023/10/9 16:54
     * @param: [userId]
     * @return: void
     **/
    @GetMapping("/close/{userId}")
    public void close(@PathVariable("userId") String userId) {
        removeUser(userId);
        log.info("关闭连接成功:{}", userId);
    }
}

前端测试代码如下:

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <title>SseEmitter</title>
</head>

<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (window.EventSource) {

        // 建立连接
        source = new EventSource('http://127.0.0.1:9090/sse/subscribe/' + userId);
        setMessageInnerHTML("连接用户=" + userId);
      
        source.addEventListener('open', function(e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        source.addEventListener('message', function(e) {
            setMessageInnerHTML(e.data);
        });

        source.addEventListener('error', function(e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else if (e.target.readyState === EventSource.CONNECTING) { 
                console.log('Connecting...');
            }else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function() {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://127.0.0.1:9090/sse/close/'+ userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>

</html>

打开前端页面,会出现连接信息,如下所示:
在这里插入图片描述
调用信息发送接口,跟据用户id发送指定消息,如下:
在这里插入图片描述
发送成功后前端接收并显示在页面上,如下:
在这里插入图片描述

2.2.2 WebSocket

WebSocket是一种用于实现实时双向通信的Web技术,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:

数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换;
连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信;
兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题;
适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。

WebSocket原理图如下所示:
在这里插入图片描述
Springboot整合websocket如下:
1.引入pom

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

2.编写socket配置

package com.eckey.lab.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;

/**
 * @Author: Marin
 * @CreateTime: 2023-10-08  16:26
 * @Description: TODO
 * @Version: 1.0
 */
@Slf4j
@Configuration
public class WebSocketConfig implements ServletContextInitializer {

    /**
     * 这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket,如果你使用外置的tomcat就不需要该配置文件
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        String serverInfo = servletContext.getServerInfo();
        log.info("serverInfo:{}", serverInfo);
    }

}

3.编写SocketServer代码

package com.eckey.lab.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Author: Marin
 * @CreateTime: 2023-10-08  15:49
 * @Description: TODO
 * @Version: 1.0
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();

    // 用来存在线连接数
    private static final Map<String, Session> sessionPool = new HashMap<String, Session>();

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            this.session = session;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
            log.error("");
        }
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("websocket消息: 收到客户端消息:" + message);
    }

    /**
     * 此为单点消息
     */
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("websocket发送单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

4.编写controller代码

package com.eckey.lab.controller;

import com.alibaba.fastjson.JSON;
import com.eckey.lab.config.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.HashMap;

/**
 * @Author: Marin
 * @CreateTime: 2023-10-08  15:24
 * @Description: TODO
 * @Version: 1.0
 */
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/socket")
public class WebSocketController {

    @Autowired
    private WebSocketServer webSocketServer;

    @GetMapping(path = "/publish/{userId}")
    public String publish(@PathVariable("userId") String userId, String message) throws IOException {
        webSocketServer.sendOneMessage(userId, message);
        log.info("信息发送成功!userId:{},message:{}", userId, message);
        HashMap maps = new HashMap();
        maps.put("code", "0");
        maps.put("msg", "success");
        return JSON.toJSONString(maps);
    }

}

5.测试
客户端发送消息,服务接收到消息,具体如下:
在这里插入图片描述
在这里插入图片描述
调用服务端接口发送消息,客户端接收到消息,具体如下:
在这里插入图片描述
在这里插入图片描述

3.小结

1.SSE方式是一种基于TCP协议的单向数据传输方式,当连接建立完成,只能由服务端向客户端发送信息;
2.WebSocket是一种双向通信技术,能够实现客户端和服务端的全双工通信,它在建立连接时使用HTTP协议,其它时候都是直接基于TCP协议进行通信;
3.在选择SSE或者WebSocket时,需要跟据场景、性能损耗进行综合考虑,合理的技术选型能够有效增强服务的健壮性。

4.参考文献

1.https://zhuanlan.zhihu.com/p/634581294
2.https://juejin.cn/post/7122014462181113887
3.https://javaguide.cn/system-design/web-real-time-message-push.html

5.附录

https://gitee.com/Marinc/nacos.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1073685.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【物联网】Arduino+ESP8266物联网开发(一):开发环境搭建 安装Arduino和驱动

ESP8266物联网开发 1.开发环境安装 开发软件下载地址&#xff1a; 链接: https://pan.baidu.com/s/1BaOY7kWTvh4Obobj64OHyA?pwd3qv8 提取码: 3qv8 1.1 安装驱动 将ESP8266连接到电脑上&#xff0c;安装ESP8266驱动CP210x 安装成功后&#xff0c;打开设备管理器&#xff0c…

抖音seo源代码开源部署----基于开放平台SaaS服务

抖音SEO搜索是什么&#xff1f; 抖音SEO搜索是指在抖音平台上进行搜索引擎优化&#xff08;Search Engine Optimization&#xff09;的一种技术手段。 通过优化抖音账号、发布内容和关键词等&#xff0c;提高抖音视频在搜索结果中的排名&#xff0c;从而增加视频曝光量和用户点…

什么是全流程的UI设计?它与单页面的视觉设计有什么区别?

在软件产品研发流程中&#xff0c;产品交互设计一般是根据项目需求&#xff0c;做出设计方案&#xff0c;以求解决某个问题。而全流程的设计不再局限于短暂或者单个页面的视觉优化&#xff0c;而是追求持续性地参与&#xff0c;以全局性整体性地提升产品体验。 在软件内的信息传…

mybatis配置entity下不同文件夹同类型名称的多个类型时启动springboot项目出现TypeException源码分析

记录问题&#xff1a;当配置了 mybatis.type-aliases-packagecom.runjing.erp.entity 配置项时&#xff0c;如果entity文件夹下存在不同子文件夹下的同名类型时&#xff0c;mybatis初始化加载映射时会爆出org.apache.ibatis.type.TypeException&#xff1a; The alias TestDemo…

怎么将自己拍摄的视频静音?详细步骤教会你~

大部分人都会遇到的一个问题&#xff0c;我们在拍摄视频时容易将嘈杂的背景音或环境音录进去&#xff0c;怎样解决这个问题呢&#xff1f;今天就来教大家具体操作步骤&#xff0c;只需用到这个软件即可&#xff01; 第一步&#xff1a;打开我们的【音分轨】APP&#xff0c;进入…

山西电力市场日前价格预测【2023-10-10】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-10-10&#xff09;山西电力市场全天平均日前电价为555.24元/MWh。其中&#xff0c;最高日前电价为1130.92元/MWh&#xff0c;预计出现在18: 30。最低日前电价为293.94元/MWh&#xff0c;预…

网络流量安全分析-工作组异常

在网络中&#xff0c;工作组异常分析具有重要意义。以下是网络中工作组异常分析的几个关键点&#xff1a; 检测网络攻击&#xff1a;网络中的工作组异常可能是由恶意活动引起的&#xff0c;如网络攻击、病毒感染、黑客入侵等。通过对工作组异常的监控和分析&#xff0c;可以快…

【SHUD】SHUD模型Windows下的编译过程

目录 〇、绪论一、SHUD水文模型说明二、SHUD编译1、SUNIALS库的编译1.1. 使用MSVC1.1.1 CMAKE 1.2. 使用MINGW1.2.1 CMAKE1.2.2 生成库文件 2、SHUD的编译过程2.1. LINUX和MAC环境2.2. WINDOWS环境2.2.1 shud2.2.2 shud_omp 〇、绪论 科学模型依据有限时空内的观测&#xff0c…

华为OD机试 - 数字反转打印(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

3D全景虚拟样板间展销系统扩展用户市场范围

VR样板间&#xff0c;能够真实还原现场&#xff0c;定制需要的场景。让一切比真实更真实。用户可以720度看房&#xff0c;自由行走在空间里&#xff0c;直观感受各空间的大小&#xff0c;看到自己家中的“未来样子”&#xff0c;同时通过操控手柄&#xff0c;控制整个智能家居系…

Redis-04独立功能的实现

1、发布与订阅 介绍&#xff1a; Redis的发布与订阅功能由PUBLISH、SUBSCRIBE、PSUBSCRIBE等命令组成。通过SUBSCRIBE命令&#xff0c;客户端可以订阅一个或多个频道&#xff0c;成为这些频道的订阅者&#xff08;subscriber&#xff09;每当有其他客户端向被订阅的频道发送消…

API网关是什么?

API网关是什么&#xff1f; API网关很多人都知道它的实现原理&#xff0c;但是并不清楚它存在的意义和背景是什么&#xff0c;这里我给大家通俗易懂地讲解下&#xff01;举个例子&#xff0c;假设你正在开发一个电商网站&#xff0c;那么这里会涉及到很多后端的微服务&#xf…

多个方法多个出路!Microsoft Excel中合并单元格的8种方法

合并单元格是初学者电子表格用户使用的最流行的选项之一。但它们有许多缺点,使它们不是那么好的选择。 在这篇文章中,我将向你展示有关合并单元格的所有信息,包括8种合并单元格的方法。我还将告诉你为什么不应该使用它们,以及一个更好的替代方案,它将产生相同的视觉效果。…

前端预览、下载二进制文件流(png、pdf)

前端请求设置 responseType: “blob” 后台接口返回的文件流如下&#xff1a; 拿到后端返回的文件流后&#xff1a; 预览 <iframe :src"previewUrl" frameborder"0" style"width: 500px; height: 500px;"></iframe>1、预览 v…

【APP】上架指南:iOS App Store 首次上架被拒原因分析与解决方案

目录 一、前言 二、APP 审核备案新规 &#xff08;1&#xff09;iOS 上架审核申请被拒 &#xff08;2&#xff09;苹果应用商店重大调整 &#xff08;3&#xff09;首次备案流程 ① 阿里云备案 ② 华为云备案 ③ 腾讯云备案 三、iOS 首次上架拒审原因分析 &#…

Redis-03持久化

1、RDB持久化 Redis是一个键值对数据库服务器&#xff0c;服务器中通常包含着任意个非空数据库&#xff0c;而每个非空数据库中又可以包含任意个键值对&#xff0c;通常情况下将服务器中的非空数据库以及它们的键值对统称为数据库状态 Redis是内存数据库&#xff0c;它将自己…

【Ingress】

Ingress 一、作用二、使外部应用能够访问集群内服务方案1.NodePort2.LoadBalancer3.externalIPs4.Ingress 三、Ingress的组成1.ingress:nginx配置文件2.ingress-controller: 当作反向代理或者说是转发器 四、Ingress工作原理五、ingress暴露服务的方式方式一&#xff1a;Deploy…

WPF向Avalonia迁移(四、其他事项)

开发必备 1. Avalonia项目源代码&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;没有源代码&#xff0c;你连控件的背景色怎么改都找不着&#xff01;&#xff01; 2.下载你所使用的版本&#x…

Tomcat 线程模型性能调优

Linux I/O模型详解 I/O要解决什么问题 I/O&#xff1a;在计算机内存与外部设备之间拷贝数据的过程。 程序通过CPU向外部设备发出读指令&#xff0c;数据从外部设备拷贝至内存需要一段时间&#xff0c;这段时间CPU就没事情做了&#xff0c;程序就会两种选择&#xff1a; 让出…

投资 3DEXPERIENCE® WORKS 的 10 大理由

3DEXPERIENCE Works 通过利用基于云的 3DEXPERIENCE 平台提供一个统一的协作环境&#xff0c;扩展 SOLIDWORKS 的价值&#xff0c;使参与 产品开发的每个人都能为创新流程做出贡献。简而言之&#xff0c;如果您喜欢使用 SOLIDWORKS&#xff0c;那么您可以在继续使用的同时&…