SpringBoot集成WebSocket讲解

news2024/11/16 13:48:30

文章目录

  • 1 WebSocket
    • 1.1 简介
    • 1.2 WebSocket作用和调用
      • 1.2.1 作用
      • 1.2.2 js端调用
    • 1.3 Javax
      • 1.3.1 服务端
        • 1.3.1.1 服务端接收
        • 1.3.1.2 服务端集成
        • 1.3.1.3 ping和pong消息
      • 1.3.2 客户端
        • 1.3.2.1 客户端接收
        • 1.3.2.2 客户端发送
    • 1.4 WebMVC
      • 1.4.1 服务端
        • 1.1.4.1 服务端接收
        • 1.1.4.2 服务端集成
        • 1.1.4.3 服务器握手拦截
        • 1.1.4.4 服务器地址问题
      • 1.4.2 客户端
        • 1.4.2.1 客户端接收
        • 1.4.2.2 客服端发送
    • 1.5 WebFlux
      • 1.5.1 服务端
        • 1.5.1.1 服务端发送接收
        • 1.5.1.2 服务端集成
      • 1.5.2 客户端
        • 1.5.2.1 客户端发送接收
        • 1.5.2.2 客户端发送

1 WebSocket

1.1 简介

WebSocket 协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在这里插入图片描述

1.2 WebSocket作用和调用

1.2.1 作用

HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接:

  • 无状态:每次连接只处理一个请求,请求结束后断开连接。
  • 无连接:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。

通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。
WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。

WebSocket 特点:

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 数据格式比较轻量,性能开销小,通信高效。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是 ws(如果加密,则为wss),服务器网址就是 URL

1.2.2 js端调用

<script>
    var ws = new WebSocket('ws://localhost:8080/webSocket/10086');
    // 获取连接状态
    console.log('ws连接状态:' + ws.readyState);
    //监听是否连接成功
    ws.onopen = function () {
        console.log('ws连接状态:' + ws.readyState);
        //连接成功则发送一个数据
        ws.send('test1');
    }
    // 接听服务器发回的信息并处理展示
    ws.onmessage = function (data) {
        console.log('接收到来自服务器的消息:');
        console.log(data);
        //完成通信后关闭WebSocket连接
        ws.close();
    }
    // 监听连接关闭事件
    ws.onclose = function () {
        // 监听整个过程中websocket的状态
        console.log('ws连接状态:' + ws.readyState);
    }
    // 监听并处理error事件
    ws.onerror = function (error) {
        console.log(error);
    }
    function sendMessage() {
        var content = $("#message").val();
        $.ajax({
            url: '/socket/publish?userId=10086&message=' + content,
            type: 'GET',
            data: { "id": "7777", "content": content },
            success: function (data) {
                console.log(data)
            }
        })
    }
</script>

下面主要介绍三种方式:Javax,WebMVC,WebFlux,在Spring Boot中的服务端和客户端配置

1.3 Javax

java的扩展包javax.websocket中就定义了一套WebSocket的接口规范

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

1.3.1 服务端

1.3.1.1 服务端接收

一般使用注解的方式来进行配置

/**
 * html页面与之关联的接口
 * var reqUrl = "http://localhost:8081/websocket/" + cid;
 * socket = new WebSocket(reqUrl.replace("http", "ws"));
 */
@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {

    @OnOpen
    public void onOpen(Session session, EndpointConfig config,
                       @PathParam(value = "type") String type) {
        //连接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //连接关闭
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本信息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong信息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二进制信息,也可以用byte[]接收
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //异常处理
    }
}

我们在类上添加 @ServerEndpoint注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}包起来就可以了

  • @OnOpen:用来标记对应的方法作为客户端连接上来之后的回调,Session就相当于和客户端的连接了,我们可以把它缓存起来用于发送消息;通过@PathParam注解就可以获得动态路径中对应值了
  • @OnClose:用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session的缓存,同时可以接受一个CloseReason的参数用于获取关闭原因
  • @OnMessage:用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息
  • @OnError:用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session和异常对象
1.3.1.2 服务端集成
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

依赖SpringWebSocket模块,手动注入ServerEndpointExporter就可以了
需要注意ServerEndpointExporterSpring中的类,算是Spring为了支持javax.websocket的原生用法所提供的支持类

javax.websocket 库中定义了PongMessage而没有PingMessage

通过测试发现基本上所有的WebSocket包括前端js自带的,都实现了自动回复;也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息;
当然我上面讲的ping和pong都是需要使用框架提供的api,如果是我们自己通过Message来自定义心跳数据的话是没有任何的处理的,下面是对应的api

//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.3.1.3 ping和pong消息

ping 消息pong 消息都是 WebSocket 协议中的特殊消息类型,用于进行心跳保活和检测 WebSocket 连接的健康状态。

  • ping 消息:由服务器端(或客户端)发送给对端的消息。它用于发起一个心跳检测请求,要求对端回复一个 pong 消息作为响应。ping 消息通常用于检测对端的连接是否仍然处于活动状态,以及测量网络延迟。
  • pong 消息:由对端(即客户端或服务器端)作为对 ping 消息的响应发送回来。它用于确认接收到 ping 消息,并表明连接仍然活跃。

当一方发送一个 ping 消息时,对端应该立即发送一个 pong 消息作为响应。通过交换 ping 和 pong 消息,可以检测连接是否仍然有效,以及测量网络的延迟时间。

ping 和 pong 消息通常由 WebSocket 底层协议处理,开发人员可以通过设置相应的参数来启用或禁用这些消息的交换。一般情况下,WebSocket 客户端和服务器都会自动处理 ping 和 pong 消息,无需开发人员显式地处理。ping 和 pong 消息是属于底层协议层

1.3.2 客户端

1.3.2.1 客户端接收

客户端也是使用注解配置

@ClientEndpoint
public class JavaxWebSocketClientEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        //连接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //连接关闭
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本消息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong消息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二进制消息
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //异常处理
    }
}

客户端使用@ClientEndpoint来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服务端一模一样

1.3.2.2 客户端发送
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);

我们可以通过ContainerProvider来获得一个WebSocketContainer,然后调用connectToServer方法将我们的客户端类和连接的uri传入就行了

通过ContainerProvider#getWebSocketContainer获得WebSocketContainer其实是基于SPI实现的
Spring的环境中更推荐大家使用ServletContextAware来获得,代码如下

@Component
public class JavaxWebSocketContainer implements ServletContextAware {

    private volatile WebSocketContainer container;

    public WebSocketContainer getContainer() {
        if (container == null) {
            synchronized (this) {
                if (container == null) {
                    container = ContainerProvider.getWebSocketContainer();
                }
            }
        }
        return container;
    }

    @Override
    public void setServletContext(@NonNull ServletContext servletContext) {
        if (container == null) {
            container = (WebSocketContainer) servletContext
                .getAttribute("javax.websocket.server.ServerContainer");
        }
    }
}

发消息

Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);

//发送文本消息
session.getAsyncRemote().sendText(String message);

//发送二进制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);

//发送对象消息,会尝试使用Encoder编码
session.getAsyncRemote().sendObject(Object message);

//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);

1.4 WebMVC

pom依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

1.4.1 服务端

1.1.4.1 服务端接收

我们实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

public class ServletWebSocketServerHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //连接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //异常处理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //连接关闭
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}
1.1.4.2 服务端集成

首先需要添加@EnableWebSocket来启用WebSocket
然后实现WebSocketConfigurer来注册WebSocket路径以及对应的WebSocketHandler

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket")//注册Handler
            .setAllowedOrigins("*");
    }
}
1.1.4.3 服务器握手拦截

提供了HandshakeInterceptor来拦截握手

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket")
            //添加握手拦截器
            .addInterceptors(new ServletWebSocketHandshakeInterceptor())
            .setAllowedOrigins("*");
    }
    
    public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {

        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            //握手之前
           if (request instanceof ServletServerHttpRequest) {
        	String path = request.getURI().getPath();
        	if(requestIsValid(path)){
        		String[] params = getParams(path);
        		attributes.put("WEBSOCKET_AUTH", params[0]);
        		attributes.put("WEBSOCKET_PID", params[1]);
        		attributes.put("WEBSOCKET_SN", params[2]);
        		attributes.put("WEBSOCKET_OPENID", params[3]);
        		attributes.put("WEBSOCKET_FIRSTONE","yes");
        	}
        }
        System.out.println("================Before Handshake================");
        return true;
        }

        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
            //握手之后
            System.out.println("================After Handshake================");
	    	if(e!=null) e.printStackTrace();
	    	System.out.println("================After Handshake================");
        }
     
     	private boolean requestIsValid(String url){
	        //在这里可以写上具体的鉴权逻辑
	    	boolean isvalid = false;
	    	if(StringUtils.isNotEmpty(url)
	    			&& url.startsWith("/netgate/")
	    			&& url.split("/").length==6){
	    		isvalid = true;
	    	}
    		return isvalid;
    	}
    
	    private String[] getParams(String url){
	    	url = url.replace("/netgate/","");
	    	return url.split("/");
	    }

    }
}
1.1.4.4 服务器地址问题

当在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**这样的通配符

在研究了一下之后发现可以在UrlPathHelper上解决

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        if (registry instanceof ServletWebSocketHandlerRegistry) {
            //替换UrlPathHelper
            ((ServletWebSocketHandlerRegistry) registry)
                .setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
        }

        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket/**")
            .setAllowedOrigins("*");
    }
    
    public class PrefixUrlPathHelper extends UrlPathHelper {

        private String prefix;
		public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}
        @Override
        public @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
            //获得原本的Path
            String path = super.resolveAndCacheLookupPath(request);
            //如果是指定前缀就返回对应的通配路径
            if (path.startsWith(prefix)) {
                return prefix + "/**";
            }
            return path;
        }
    }
}

因为它内部实际上就是用一个Map<String, WebSocketHandler>来存的,所以没有办法用通配符

1.4.2 客户端

1.4.2.1 客户端接收

和服务端一样我们需要先实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

public class ServletWebSocketClientHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //连接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //异常处理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //连接关闭
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}
1.4.2.2 客服端发送
WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();

首先我们需要先new一个StandardWebSocketClient,可以传入一个WebSocketContainer参数,获得该对象的方式上面已经介绍过了,这边就先略过

然后new一个WebSocketConnectionManager传入WebSocketClientWebSocketHandler还有路径uri
最后调用一下WebSocketConnectionManagerstart方法就可以了

这里如果大家去看WebSocketClient的实现类就会发现有StandardWebSocketClient还有JettyWebSocketClient等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient实现类

这里给大家贴一小段Spring适配不同容器WebSocket的代码

public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {

    private static final boolean tomcatWsPresent;

    private static final boolean jettyWsPresent;

    private static final boolean jetty10WsPresent;

    private static final boolean undertowWsPresent;

    private static final boolean glassfishWsPresent;

    private static final boolean weblogicWsPresent;

    private static final boolean websphereWsPresent;

    static {
        ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
        tomcatWsPresent = ClassUtils.isPresent(
            "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
        jetty10WsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
        jettyWsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
        undertowWsPresent = ClassUtils.isPresent(
            "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
        glassfishWsPresent = ClassUtils.isPresent(
            "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
        weblogicWsPresent = ClassUtils.isPresent(
            "weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
        websphereWsPresent = ClassUtils.isPresent(
            "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
    }
}

发消息

import org.springframework.web.socket.*;

WebSocketSession session = ...

//发送文本消息
session.sendMessage(new TextMessage(CharSequence message);

//发送二进制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));

//发送ping
session.sendMessage(new PingMessage(ByteBuffer message));

//发送pong
session.sendMessage(new PongMessage(ByteBuffer message));

1.5 WebFlux

WebFluxWebSocket不需要额外的依赖包

1.5.1 服务端

1.5.1.1 服务端发送接收
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocketServerHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink对象在任意时候调用next发送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //异常处理
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //异常处理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //异常处理
                })
                .subscribe(it -> {
                    //连接关闭
                });

        return Mono.zip(send, receive).then();
    }
}

首先需要注意这里的WebSocketHandlerWebSocketSessionreactive包下的:

  • 通过WebSocketSession#send方法来持有一个FluxSink<WebSocketMessage>来用于发送消息
  • 通过WebSocketSession#receive来订阅消息
  • 通过WebSocketSession#closeStatus来订阅连接关闭事件
1.5.1.2 服务端集成

注入WebSocketHandlerAdapter

@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

注册一个HandlerMapping同时配置路径和对应的WebSocketHandler

@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
    public ReactiveWebSocketServerHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/**", new ReactiveWebSocketServerHandler());
        setUrlMap(map);
        setOrder(100);
    }
}

注意:我们自定义的HandlerMapping需要设置order,如果不设置,默认为Ordered.LOWEST_PRECEDENCE,会导致这个HandlerMapping被放在最后,当有客户端连接上来时会被其他的HandlerMapping优先匹配上而连接失败

1.5.2 客户端

1.5.2.1 客户端发送接收

客户端WebSocketHandler的写法和服务端的一样

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocketClientHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink对象在任意时候调用next发送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //处理异常
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //异常处理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //异常处理
                })
                .subscribe(it -> {
                    //连接关闭
                });

        return Mono.zip(send, receive).then();
    }
}
1.5.2.2 客户端发送
import org.springframework.web.reactive.socket.client.WebSocketClient;

WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();

首先我们需要先new一个ReactorNettyWebSocketClient
然后调用一下WebSocketClientexecute方法传入路径uriWebSocketHandler并继续调用subscribe方法就可以了

注意WebFluxWebMVC 中的 WebSocketClient一样,Reactive包中的WebSocketClient也有很多实现类,比如ReactorNettyWebSocketClientJettyWebSocketClientUndertowWebSocketClientTomcatWebSocketClient 等等,也是需要大家基于自身项目的容器使用不同的实现类

这里也给大家贴一小段Reactive适配不同容器WebSocket的代码

public class HandshakeWebSocketService implements WebSocketService, Lifecycle {

    private static final boolean tomcatPresent;

    private static final boolean jettyPresent;

    private static final boolean jetty10Present;

    private static final boolean undertowPresent;

    private static final boolean reactorNettyPresent;

    static {
        ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
        tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
        jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
        jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
        undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
        reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
    }
}

发消息
我们需要使用在WebSocketHandler中获得的FluxSink<WebSocketMessage>来发送消息

import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocket {

    private final WebSocketSession session;

    private final FluxSink<WebSocketMessage> sender;

    public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
        this.session = session;
        this.sender = sender;
    }

    public String getId() {
        return session.getId();
    }

    public URI getUri() {
        return session.getHandshakeInfo().getUri();
    }

    public void send(Object message) {
        if (message instanceof WebSocketMessage) {
            sender.next((WebSocketMessage) message);
        } else if (message instanceof String) {
            //发送文本消息
            sender.next(session.textMessage((String) message));
        } else if (message instanceof DataBuffer) {
            //发送二进制消息
            sender.next(session.binaryMessage(factory -> (DataBuffer) message));
        } else if (message instanceof ByteBuffer) {
            //发送二进制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
        } else if (message instanceof byte[]) {
             //发送二进制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
        } else {
            throw new IllegalArgumentException("Message type not match");
        }
    }

    public void ping() {
        //发送ping
        sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void pong() {
        //发送pong
        sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void close(CloseStatus reason) {
        sender.complete();
        session.close(reason).subscribe();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1073724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Amber中的信息传递——章节1.1-第一部分

了解 AmberTools 从何处开始&#xff0c;这主要是管理软件包中信息传递的问题&#xff0c;请参见图 1.1。首先需要了解模拟程序&#xff08;sander、pmemd、mdgx 或 nab&#xff09;需要哪些信息。 您需要知道这些信息从何而来&#xff0c;又是如何以这些程序所需的形式出现的。…

好看的水滴登录页面

css 如何绘制水滴 可以通过box-shadow 来显示阴影可以通过border-radius 改变水滴的形状当然如果像要使其更加灵活&#xff0c;可以使用animationkeyframes关键帧border-radius&#xff0c;让水滴动起来 是不是很简单 来吧展示效果 html代码&#xff0c;就只有一个div,然后使…

在宝塔面板环境下安装nps服务端

在宝塔面板环境下安装nps服务端 一、所需环境二、开始安装三、打开nps控制台四、更改默认账号密码和连接秘钥五、反向代理挂载SSL证书 一、所需环境 阿里云轻应用服务器&#xff08;选择宝塔应用镜像&#xff09;域名&#xff08;最好也是阿里注册的域名&#xff09;对应的ssl…

RT-Thread 中断管理(学习三)

中断与轮询 当驱动外设工作时&#xff0c;其编程模式到底采用中断模式触发还是轮训模式触发往往是驱动开发人员首先需要考虑的问题&#xff0c;并且这个问题在实时操作系统与分时操作系统中差异非常大。 轮询模式本身采用顺序执行的方式&#xff1a;查询到相应的事件然后进行…

探索跑腿配送App的未来:技术和创新的前沿

跑腿配送App正经历着快速的技术演进&#xff0c;为提供更智能、高效和个性化的服务而不断创新。本文将探讨其中一个可能的创新方向&#xff1a;使用机器学习和实时数据分析来改进配送路线&#xff0c;提高效率&#xff0c;并为用户提供更好的体验。 技术背景 要实现这个创新…

Flink之Watermark源码解析

1. WaterMark源码分析 在Flink官网中介绍watermark和数据是异步处理的,通过分析源码得知这个说法不够准确或者说不够详细,这个异步处理要分为两种情况: watermark源头watermark下游 这两种情况的处理方式并不相同,在watermark的源头确实是异步处理的,但是在下游只是做的判断,这…

【Monorepo实战】pnpm+turbo+vitepress构建公共组件库文档系统

Monorepo架构可以把多个独立的系统放到一起联调&#xff0c;本文记录基于pnpm > workspace功能&#xff0c;如何构建将vitepress和组件库进行联调&#xff0c;并且使用turbo进行任务顺序编排。 技术栈清单&#xff1a; pnpm 、vitepress 、turbo 一、需求分析 1、最终目标…

Maven 自动化构建

自动化构建定义了这样一种场景: 在一个项目成功构建完成后&#xff0c;其相关的依赖工程即开始构建&#xff0c;这样可以保证其依赖项目的稳定。 比如一个团队正在开发一个项目 bus-core-api&#xff0c; 并且有其他两个项目 app-web-ui 和 app-desktop-ui 依赖于这个项目。 …

面试算法22:链表中环的入口节点(1)

题目 如果一个链表中包含环&#xff0c;那么应该如何找出环的入口节点&#xff1f;从链表的头节点开始顺着next指针方向进入环的第1个节点为环的入口节点。 例如&#xff0c;在如图4.3所示的链表中&#xff0c;环的入口节点是节点3。 分析 第1步&#xff1a;确认是否包含环…

产线运作中如何实现sop无纸化的作业?

在车间产线的运作中&#xff0c;及时准确地获取作业指导书是至关重要的。然而&#xff0c;传统的纸质作业指导书往往需要花费大量时间和精力来查找和更新。而有了SOP电子作业指导书系统&#xff0c;车间工人们只需要通过电子设备登录系统&#xff0c;就可以轻松地找到所需的作业…

导引服务机器人 通用技术条件

声明 本文是学习GB-T 42831-2023 导引服务机器人 通用技术条件. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 6 检验规则 6.1 检验项目 检验分为型式检验和出厂检验。检验项目见表2。 表 2 检验项目 序号 检验项目 技术要求 检验方法 出厂检验 型…

【C++】哈希与布隆过滤器

&#x1f307;个人主页&#xff1a;平凡的小苏 &#x1f4da;学习格言&#xff1a;命运给你一个低的起点&#xff0c;是想看你精彩的翻盘&#xff0c;而不是让你自甘堕落&#xff0c;脚下的路虽然难走&#xff0c;但我还能走&#xff0c;比起向阳而生&#xff0c;我更想尝试逆风…

二叉树题目:二叉树的所有路径

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;二叉树的所有路径 出处&#xff1a;257. 二叉树的所有路径 难度 4 级 题目描述 要求 给你一个二叉树的根结点 root \texttt{root} root&#xff…

Go 并发编程

并发编程 1.1 并发与并⾏ 并⾏与并发是两个不同的概念&#xff0c;普通解释&#xff1a; 并发&#xff1a;交替做不同事情的能⼒并⾏&#xff1a;同时做不同事情的能⼒ 如果站在程序员的⻆度去解释是这样的&#xff1a; 并发&#xff1a;不同的代码块交替执⾏并⾏&#xf…

慢 SQL 的致胜法宝

大促备战&#xff0c;最大的隐患项之一就是慢SQL&#xff0c;对于服务平稳运行带来的破坏性最大&#xff0c;也是日常工作中经常带来整个应用抖动的最大隐患&#xff0c;在日常开发中如何避免出现慢SQL&#xff0c;出现了慢SQL应该按照什么思路去解决是我们必须要知道的。本文主…

C++对象模型(5)-- 数据语义学:继承的对象布局(不含虚函数)

1、单继承的对象布局 (1) 在普通继承&#xff08;没有虚函数、没有继承虚基类&#xff09;的情况下&#xff0c;按父对象、子对象的顺序布局 我们来看下面的例子&#xff1a; class Base { protected:int x;int y; };class Derive : public Base { private:int z; };int mai…

vue2项目中使用element ui组件库的table,制作表格,改表格的背景颜色为透明的

el-table背景颜色变成透明_el-table背景透明_讲礼貌的博客-CSDN博客 之前是白色的&#xff0c;现在变透明了&#xff0c;背景颜色是蓝色

短视频矩阵系统源码--源头技术独立自研框架开发

目录 一、批量剪辑&#xff08;采用php语言&#xff0c;数学建模&#xff09; 短视频合成批量剪辑的算法主要有以下几种&#xff1a; 1. 帧间插值算法&#xff1a;通过对多个视频的帧进行插帧处理&#xff0c;从而合成一段平滑的短视频。 2. 特征提取算法&#xff1a;提取多…

RedissonClient中Stream流的简单使用

1、pub端 //获取一个流 RStream rStream redissonClient.getStream("testStream"); //创建一个map&#xff0c;添加数据 Map<String, Object> rr new HashMap<>(); rr.put("xx", RandomUtil.randomString(5)); //添加到流 rStream.addAll(r…

TypeScript 笔记:String 字符串

1 对象属性 length 返回字符串的长度 2 对象方法 charAt() 返回在指定位置的字符 charCodeAt() 返回在指定的位置的字符的 Unicode 编码 concat 连接两个或更多的字符串 indexOf 返回某个指定的字符串值在字符串中首次出现的位置 lastIndexOf 从后向前搜索字符串&…