文章目录
- 前言
- 一、解决方案:
- 1、传统实时处理方案:
- 2、HTML5 标准引入的实时处理方案:
- 3、第三方推送:
- 二、SSE:
- 1、客户端:
- 2、服务端:
- 三、业务实践:
- 总结:
前言
服务端推送,也称为消息推送或通知推送,是一种允许应用服务器主动将信息发送到客户端的能力,为客户端提供了实时的信息更新和通知,增强了用户体验。
服务端推送的背景与需求主要基于以下几个诉求:
-
实时通知:在很多情况下,用户期望实时接收到应用的通知,如新消息提醒、商品活动提醒等。
-
节省资源:如果没有服务端推送,客户端需要通过轮询的方式来获取新信息,会造成客户端、服务端的资源损耗。通过服务端推送,客户端只需要在收到通知时做出响应,大大减少了资源的消耗。
-
增强用户体验:通过服务端推送,应用可以针对特定用户或用户群发送有针对性的内容,如优惠活动、个性化推荐等。这有助于提高用户对应用的满意度和黏性。
常见推送场景有:微信消息通知栏、新闻推送、外卖状态 等等,我们自身的推送场景有:下载、连线请求、直播提醒 …
一、解决方案:
1、传统实时处理方案:
轮询:这是一种较为传统的方式,客户端会定时地向服务端发送请求,询问是否有新数据。服务端只需要检查数据状态,然后将结果返回给客户端。轮询的优点是实现简单,兼容性好;缺点是可能产生较大的延迟,且对服务端资源消耗较高。
长轮询(Long Polling):轮询的改进版。客户端向服务器发送请求,服务器收到请求后,如果有新的数据,立即返回给客户端;如果没有新数据,服务器会等待一定时间(比如30秒超时时间),在这段时间内,如果有新数据,就返回给客户端,否则返回空数据。客户端处理完服务器返回的响应后,再次发起新的请求,如此反复。长轮询相较于传统的轮询方式减少了请求次数,但仍然存在一定的延迟。
2、HTML5 标准引入的实时处理方案:
WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。
SSE:SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术。服务端可以使用 SSE 来向客户端推送数据,但客户端不能通过SSE向服务端发送数据。相较于 WebSocket,SSE 更简单、更轻量级,但只能实现单向通信。
两者的主要区别:
3、第三方推送:
常见的有操作系统提供相应的推送服务,如苹果的APNs(Apple Push Notification service)、谷歌的FCM(Firebase Cloud Messaging)等。同时,也有一些跨平台的推送服务,如个推、极光推送、友盟推送等,帮助开发者在不同平台上实现统一的推送功能。
这种推送方式在生活中十分常见,一般你打开手机就能看到各种信息推送,基本就是利用第三方推送来实现。
二、SSE:
接下来我们重点讲讲 SSE 服务端推送,它基于 HTTP 协议,易于实现和部署,特别适合那些需要服务器主动推送信息、客户端只需接收数据的场景:
1、客户端:
Server-Sent Events(SSE)是 HTML5 的一部分,用于从服务器实时接收更新,目前大部分主流浏览器都提供了支持:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Client</title>
</head>
<body>
<h1>Receive: <span id="sse"></span></h1>
<script>
const numberElement = document.getElementById("sse");
const source = new EventSource('http://localhost:8080/sse');
source.onmessage = (event) => {
numberElement.innerText = event.data;
};
source.onerror = (error) => {
console.error("SSE error:", error);
};
</script>
</body>
</html>
自动重连:一旦连接断开,浏览器会自动尝试重新建立连接。当然,每个浏览器都有自己的重连策略和措施。因此,重连时间和尝试次数可能因浏览器而异。
也可以通过 CURL 调用:
➜ ~ curl -X GET -H "Accept:text/event-stream" -H "Content-Type:application/x-www-form-urlencoded" "http://localhost:8080/sse"
data:Data: 0
data:Data: 1
data:Data: 2
data:Data: 3
...
2、服务端:
我们目前服务端主要使用 Spring,其对 SSE 主要提供了两种支持:
- Spring WebMVC:传统的基于 Servlet 的同步阻塞编程模型,即 同步模型Web框架。
- Spring WebFlux:异步非阻塞的响应式编程模型,即 异步模型Web框架。
1)Spring WebFlux 中的 SSE 支持(支持版本Spring5.0):
Spring WebFlux 框架提供了一套基于响应式编程的非阻塞异步IO模型,能高效支持 SSE。在 Spring WebFlux 中,我们可以结合 Flux 和 MediaType.TEXT_EVENT_STREAM_VALUE 来实现 SSE。
以下示例展示如何在 Spring WebFlux 中创建 SSE 流:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSseStream() {
// 使用Flux生成每秒一个递增的数据流,用于模拟实时数据推送
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Data: " + sequence);
}
}
Spring WebFlux 底层依赖于两个非阻塞的异步框架: Reactor 和 Netty。其中,Reactor 库主要是提供响应式式编程的支持,Netty 是一个高性能的非阻塞网络框架,主要负责处理 HTTP 输入输出。
- Reactor:Reactor 是一个基于 Java 8 的响应式流库,它实现了 Reactive Streams 规范。Reactor 提供了两个核心的响应式类型 - Mono 和 Flux,相当于 findOne 和 findList 的区别。这两个类型提供了丰富的操作符,允许你以声明式和函数式的方式来处理你的业务逻辑。
- Netty:Netty 是一个高性能、异步的事件驱动的网络框架,虽然 Netty 重点在网络通信层,但仍然提供了 Web 服务器的能力。
Reactor 对 Netty 进行了集成,提供了子模块 Reactor Netty,在 Spring WebFlux 中,Reactor Netty 主要用作默认的服务器运行时环境,负责处理 HTTP 请求和响应。
这两个框架共同为 Spring WebFlux 提供了底层的支持,使得我们能够使用响应式编程编写高性能、可扩展的Web应用程序。另外,虽然 Spring WebFlux 在底层默认使用 Reactor 和 Netty,但它也有很好的灵活性和可替换性,我们可以根据需要更换其他非阻塞的异步框架。
2)Spring WebMVC 中的 SSE 支持(支持版本Spring4.2):
在 Spring WebMVC 中,可以通过SseEmitter对象来处理 SSE 请求,它允许将数据通过 SSE 连接发送给客户端。
下面是一个使用SseEmitter提供 SSE 的简单示例:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class SseController {
private ExecutorService nonBlockingService = Executors.newCachedThreadPool();
@GetMapping("/sse")
public SseEmitter getSseStream() {
SseEmitter emitter = new SseEmitter();
nonBlockingService.execute(() -> {
// 这里模拟数据发送给客户端的逻辑
try {
for (int i = 0; i < 10; i++) {
emitter.send("Data: " + i);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}
如果你的项目使用 SpringMVC 模型,不想再引入 Spring WebFlux,能否利用 Reactor 响应式库呢?
答案也是可以的,虽然 SpringMVC 主要提供的是同步阻塞能力,但也不妨碍它提供一定的异步支持,比如这里,我们可以直接引入 Reactor 库,也同样可以实现 SSE:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSseStream() {
// 使用Flux生成每秒一个递增的数据流,用于模拟实时数据推送
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Data: " + sequence);
}
}
基于 Spring WebMVC 或 Spring WebFlux,我们可以方便地在 Spring 框架中实现 SSE 的支持。这两种方法根据具体需求和场景,可以灵活选择。
三、业务实践:
我们以「文件下载」功能进行说明,一般情况下,大文件的下载,服务端压力比较大、处理时间也比较长,为了有更好的交互体验,我们可以使用异步处理,服务端处理完了之后主动通知 客户端,效果如下:
这个小弹窗就是服务端处理完了之后,通过 SSE连接主动推送到客户端。
我们来看看处理流程:
1)SSE 连接:
先建立 SSE 连接,确保服务端有主动推送消息的能力。
2)异步下载:
长耗时下载任务我们通过异步的方式处理,避免用户在下载页面长时间等待。
3)广播并推送:
下载完成后,我们需要将完成事件推送给客户端。需要注意的是,由于服务是集群部署、SSE 连接在节点本地 Map 维护,这就有可能导致当前客户端的 SSE连接所在节点 与 事件推送节点 是两个独立的节点。
因此,我们这里借助于 Redis 的发布/订阅能力,将消息广播出去,能匹配连接的节点负责将消息推送至客户端、其他节点直接丢弃即可。效果图如下:
能否做到精准投递?
答案也是可以的,我们可以这样来做:
- 借助 Redis 做中心存储,存储Map<用户,节点IP> 这样的映射关系。
- 在推送消息之前,先通过映射关系找到该用户的SSE连接所在节点
- 然后在通过 RPC调用 直接将消息投递到对应的服务节点,最后由该节点进行事件推送。
一般情况下,我们可以用「广播」这种简单粗暴的方式应对大部分场景,毕竟「精准投递」需要中心化的维护节点关系、应对节点变更等,处理起来稍显麻烦。具体视业务场景来做选择即可。
总结:
SSE 技术是一种轻量级的实时推送技术,具有支持跨域、使用简单、支持自动重连等特点,使得其在实时消息推送、股票交易等场景下广泛使用。
另外,SSE 相对于 WebSocket 更加轻量级,如果需求场景不需要交互式动作,那么 SSE 是一个不错的选择。