55、基于 WebFlux 开发 WebSocKet

news2025/1/15 16:37:03

★ 基于Web Flux开发WebSocket

两步:
(1)实现WebSocketHandler开发WebSocket处理类。

实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。

(2)使用HandlerMapping和WebSocketHandlerAdapter注册WebSocket处理类。

★ 反应式的WebSocket处理类

反应式API模型下,WebSocketSession的receive()方法返回的只是Flux(消息发布者),
它并不会同步获取消息,也不会阻塞。

类似的,WebSocketSession的send()方法发送的也只是Flux(消息发布者)

因此WebSocket处理类receive()消息之后,程序依然使用map()等方法对Flux中的数据项进行处理。

★ 配置基于WebFlux的WebSocket

要配置两个Bean:

  1. HandlerMapping(通常使用SimpleUrlHandlerMapping实现类即可)Bean,它定义URL与WebSocketHandler Bean的映射关系。

  2. WebSocketHandlerAdapter:它负责管理对WebSocketHandler Bean进行适配。
    它会自动对容器中所有的WebSocketHandler Bean进行适配,
    因此,意味着无论容器中有多少个WebSocketHandler ,该WebSocketHandlerAdapter只要配置一个即可。

可直接使用www.websocket.org/echo.html页面来测试WebSocket

代码演示

1、创建项目
在这里插入图片描述

MyWebSocketHandler

实现 WebSocKet 处理类

package cn.ljh.webflux_websocket.websockethandler;


import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

//实现 WebSocKet 处理类

@Component
public class MyWebSocketHandler implements WebSocketHandler
{
    //实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux
    //这个方法并不需要处理具体的数据,它面向的是Flux编程
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession)
    {
        //接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道
        Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();
        //map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式
        Flux<WebSocketMessage> resultFlux = sourceFlux.map(message ->
        {
            //textMessage() 方法负责将 String 转换成 WebSocketMessage
            //这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成String
            WebSocketMessage webSocketMessage = webSocketSession.textMessage("回复:" + message.getPayloadAsText());
            return webSocketMessage;
        });

        //发送消息
        Mono<Void> sendMessage = webSocketSession.send(resultFlux);
        return sendMessage;
    }
}

WebSocketConfig

配置基于WebFlux的WebSocket

package cn.ljh.webflux_websocket.config;


import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

//配置基于WebFlux的WebSocket

@Configuration
public class WebSocketConfig
{
    //这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter()
    {
        return new WebSocketHandlerAdapter();
    }

    //MyWebSocketHandler 会接受容器中的依赖注入
    @Bean
    public HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler)
    {
        //定义 URL 与 WebSocketHandler Bean 之间的映射关系
        //就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理
        Map map = Map.of("/myWebSocket",myWebSocketHandler);
        //参数1:指定 URL 和 Handler 之间的映射关系  ,  参数2:就是优先级
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);
        return simpleUrlHandlerMapping;
    }

}

前端没有写,www.websocket.org/echo.html页面已经没法测试WebSocket了。
现在简单的websocket 就完成了

在这里插入图片描述

通过 webFlux 弄一个webSocket 的聊天室。

完整代码:

client.html

这个客户端页面,先放在static 静态路径下面,就可以直接访问

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title> 基于WebSocket的多人聊天 </title>
	<script type="text/javascript">
		// 定义Web Socket对象
		var webSocket = null;
		let sendMsg = function()
		{
			if (webSocket == null || webSocket.readyState != 1)
			{
				document.getElementById('show').innerHTML
					+= "还未连接服务器,请先连接WebSocket服务器<br>";
				return;
			}
			let inputElement = document.getElementById('msg');
			// 发送消息
			webSocket.send(inputElement.value);
			// 清空单行文本框
			inputElement.value = "";
		}
		let connect = function()
		{
			let name = document.getElementById('name').value.trim();
			if (name == null || name == "")
			{
				document.getElementById('show').innerHTML
					+= "用户名不能为空<br>";
				return;
			}
			if (webSocket && webSocket.readyState == 1)
			{
				webSocket.close();
			}
			webSocket = new WebSocket("ws://127.0.0.1:8080/myWebSocket/" + name);
			webSocket.onopen = function()
			{
				document.getElementById('show').innerHTML
					+= "恭喜您,连接服务器成功!<br>";
				document.getElementById('name').value = "";
				// 为onmessage事件绑定监听器,接收消息
				webSocket.onmessage= function(event)
				{
					// 接收、并显示消息
					document.getElementById('show').innerHTML
						+= event.data + "<br>";
				}
			};
		}
	</script>
</head>
<body>
<input type="text" size="20" id="name" name="name"/>
<input type="button" value="连接" onclick="connect();"/>
<div style="width:600px;height:240px;
	overflow-y:auto;border:1px solid #333;" id="show"></div>
<input type="text" size="80" id="msg" name="msg"/>
<input type="button" value="发送" onclick="sendMsg();"/>
</body>
</html>

MyWebSocketHandler

实现 WebSocKet 处理类

package cn.ljh.webflux_websocket.websockethandler;


import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

//实现 WebSocKet 处理类

@Component
public class MyWebSocketHandler implements WebSocketHandler
{
    //创建一个线程安全的map来存聊天信息,  FluxSink 代表了发送消息的通道
    public static final Map<WebSocketSession, FluxSink<WebSocketMessage>> myClients = new ConcurrentHashMap<>();


    //实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux
    //这个方法并不需要处理具体的数据,它面向的是Flux编程
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession)
    {

        //1、获取连接路径:   得到 WenSocket 的连接路径
        String path = webSocketSession.getHandshakeInfo().getUri().getPath();
        //2、获取用户名:     获取path路径字符串最后一个斜杠之后的内容,也就是获取聊天的用户名
        String name = path.substring(path.lastIndexOf("/") + 1);


        //3、接收消息:     接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道
        Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();
        //map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式
        Mono<Void> mono1 = sourceFlux.map(message ->
        {
            //这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成String
            //获取用户发送的消息
            String payloadAsText = message.getPayloadAsText();
            //返回 用户名+消息
            String nameAndMessage = name + " : " + payloadAsText;
            return nameAndMessage;
        })
                //4、实现消息广播:   把消息发给每一个用户
                //此时的 message 已经是转换之后的 message 了,这时候是 String 类型
                .doOnNext(message ->
                {
                    //此处做消息广播,      keySet()用于遍历map中的所有key,存在一个set集合中
                    for (WebSocketSession session : myClients.keySet())
                    {
                        //通过session这个key , 获取消息通道FluxSink
                        FluxSink<WebSocketMessage> fluxSink = myClients.get(session);
                        //调用 fluxSink 的 next() 方法向 Flux 发送消息
                        //textMessage() 方法负责将 String 转换成 WebSocketMessage,把string类型的消息转回 WebSocketMessage
                        fluxSink.next(session.textMessage(message));
                    }
                    //.then() 方法 讲解的时候说是合并上面的消息操作,百度说是异步执行
                }).then();


        //创建要发送消息的 outFlux
        Flux<WebSocketMessage> outFlux = Flux.create(fluxSink ->
        {
            //Flux 真正发布消息用的是Flux 底层的 fluxSink
            myClients.put(webSocketSession, fluxSink);
        });

        //发送消息
        Mono<Void> mono2 = webSocketSession.send(outFlux);
        //把两个mono 的消息汇总起来 再返回
        Mono<Void> allMono = Mono.zip(mono1, mono2).then();
        return allMono;
    }
}

WebSocketConfig

配置基于WebFlux的WebSocket

package cn.ljh.webflux_websocket.config;


import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

//配置基于WebFlux的WebSocket

@Configuration
public class WebSocketConfig
{
    //这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter()
    {
        return new WebSocketHandlerAdapter();
    }

    //MyWebSocketHandler 会接受容器中的依赖注入
    @Bean
    public HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler)
    {
        //定义 URL 与 WebSocketHandler Bean 之间的映射关系
        //就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理
        Map map = Map.of("/myWebSocket/{name}",myWebSocketHandler);
        //参数1:指定 URL 和 Handler 之间的映射关系  ,  参数2:就是优先级
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);
        return simpleUrlHandlerMapping;
    }

}

测试结果

成功

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/998544.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TypeScript:赋予JavaScript数据类型新的力量,提升编程效率!

&#x1f3ac; 岸边的风&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ &#x1f4da; 前言 TypeScript&#xff1a;扩展JavaScript数据类型&#xff0c;赋予编程更强大的表达能力&#xff01…

bs4库爬取小说工具

学习了爬取天气预报&#xff0c;今天尝试做个爬取小说工具&#xff0c;有时候网上看看小说休闲下&#xff0c;打算保存txt文本文件&#xff0c;方便离线阅读。 第一步&#xff1a;先确定目标网址 网上随便找了本小说&#xff0c;先找到小说目录页面。 网址首页&#xff1a;h…

c语言练习48:总结字符函数和字符串函数

总结字符函数和字符串函数 字符函数 1. 字符分类函数 C语⾔中有⼀系列的函数是专⻔做字符分类的&#xff0c;也就是⼀个字符是属于什么类型的字符的。 这些函数的使⽤都需要包含⼀个头⽂件是 ctype.h 2. 字符转换函数 字符串函数 . strlen的使⽤ •字符串以 \0 作为结束标…

Linux操作系统基础知识

目录 一、什么是Linux 二、如何有一个Linux环境 三、基本的 Linux 命令 1. pwd - 显示当前工作目录 2. ls - 列出文件和目录 3. cd - 切换目录 4. mkdir - 创建目录 5. rm - 删除文件或目录 6. cp - 复制文件或目录 7. mv - 移动文件或目录 8. touch - 创建空文件 9…

智能座舱概述

文章目录 智能座舱智能驾驶一、汽车座舱历经机械化、电子化&#xff0c;向智能化不断演进二、智能座舱的定义&#xff1a;车内升级车外互联1.从车内看2.从车外看 三、电子座舱、智能助理、人机共驾、第三生活空间 智能座舱智能驾驶 智能汽车以“座舱”“底盘”上下两大智能化系…

短视频去水印

一、使用方法 打开短视频APP&#xff0c; 选择要下载的视频&#xff0c;点击右下角分享按钮&#xff0c;在分享弹框中点击“复制链接” 将刚才复制的链接粘贴到下面的输入框&#xff08;中文可以不用去掉&#xff09; 二、短视频解析王源码 public function analysis($video…

音视频技术开发周刊 | 310

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 学术头条 | 基于网络科学的人工智能揭示基因信息如何利用单细胞塑造形体 近日&#xff0c;由清华大学脑与智能实验室复杂网络智能中心&#xff08;CCNI&#xff09;主任Ca…

监听对象中属性变化(一个或多个属性、全部属性)

一、数据监听器 什么是数据监听器 数据监听器用于监听和响应任何属性和数据自动的变化&#xff0c;从而执行特定的操作。它的作用类似于vue中的watch侦听器。在小程序中&#xff0c;基本语法格式如下&#xff1a; Component({observers: {字段A&#xff0c;字段B: function(字…

计算机丢失msvcp140.dll是什么意思?msvcp140.dll丢失的解决方法

在使用计算机的过程中&#xff0c;我们可能会遇到各种奇葩的问题。其中&#xff0c;一个常见的问题是计算机提示丢失msvcp140.dll。这个文件是Microsoft Visual C 2015 Redistributable的一部分&#xff0c;通常用于支持一些软件&#xff08;如游戏、办公软件等&#xff09;的运…

遗忘因子递推最小二乘参数估计(FFRLS)

基于遗忘因子的最小二乘法电池参数辨识 最小二乘法是系统辨识中最常用的一种估算方法。为了克服最小二乘法存在”数据饱和”的问题&#xff0c;我们通常采用含有遗忘因子的递推最小二乘法(Forgetting Factor Recursive Least Square,FFRLS)算法进行电池模型的参数辨识。 1、二…

图片码二次渲染绕过

目录 一、环境 1、代码 2、文件处理方式 3、图片码的制作 二、绕过图片重构 1、可行性分析 2、数据比对 3、完成绕过 一、环境 以upload-labs靶场第十七关为例 1、代码 源码为&#xff1a; <?php include ../config.php; include ../head.php; include ../menu.…

管理类联考——数学——汇总篇——知识点突破——应用题——分段计费

👊 分段计费是指不同的范围对应着不同的计费方式,在实际中应用很广泛,比如电费,水费、邮费、个税、话费、出租车费、销售提成等等。解题思路的关键点有两个,一个是先计算每个分界点的值,确定所给的数值落入哪个范围;另外,对应选取正确的计费表达式,按照所给的标准进…

腾讯云CVM S5服务器性能测评和租用价格1年和五年

腾讯云服务器CVM五年时长&#xff0c;2核2G服务器5年1728元、2核4G1M带宽五年3550、4核8G服务器6437元五年&#xff0c;CVM标准型S5实例可选2核2G、2核4G和4核8M&#xff0c;公网带宽可1M、3M和5M&#xff0c;系统盘为50G高性能云硬盘&#xff0c;S5云服务器CPU采用Intel Xeon …

模电课设:用Multisim简单了解二极管

1 课设内容 1&#xff09;测试二极管伏安特性电路&#xff1b; 2&#xff09;二极管的整流电路及负载对输出电压和纹波的影响&#xff1b; 2 模型搭建 电路一&#xff1a;测试二极管伏安特性的电路如下图所示&#xff0c;结构十分简单&#xff0c;直流电源串联上二极管组成一…

windows10搭建llama大模型

背景 随着人工时代的到来及日渐成熟&#xff0c;大模型已慢慢普及&#xff0c;可以为开发与生活提供一定的帮助及提升工作及生产效率。所以在新的时代对于开发者来说需要主动拥抱变化&#xff0c;主动成长。 LLAMA介绍 llama全称&#xff1a;Large Language Model Meta…

c#中字段和属性的区别,委托和事件的区别

IDE眼里的字段和属性 class Test {public int age1 12;public int Age2 { get; set; } 18;public void Show(){Console.WriteLine(age1);Console.WriteLine(Age2);} }很多新人发现在类中定义变量时&#xff0c;有些人会在后面写上get,set。 这种写法定义出来的变量&#xf…

数据结构与算法-二叉搜索树红黑树

一&#xff1a;二叉搜索树 大家来看以下几个结构&#xff1a;下图中的 二叉搜索树又叫二叉查找树&#xff0c;二叉排序树&#xff1b; 它具有以下特点&#xff1a; 1.如果它的左子树不为空&#xff0c;则左子树上结点的值都小于根结点。 2.如果它的右子树不为空&#xff0c;则右…

动手学深度学习——Windows下的环境安装流程(一步一步安装,图文并配)

目录 环境安装官网步骤图文版安装Miniconda下载包含本书全部代码的压缩包使用conda创建虚拟&#xff08;运行&#xff09;环境使用conda创建虚拟环境并安装本书需要的软件激活之前创建的环境打开Jupyter记事本 环境安装 文章参考来源&#xff1a;http://t.csdn.cn/tu8V8 官网…

编程初学者指南(2023版):零基础小白如何学习编程-两万字详述

文章目录 1.写在前面1.1 为什么有这份指南1.2 指南里有什么1.3 关于软件协会1.4 面对人生&#x1f340; 对工作&#xff1a;越努力越幸运&#x1f340; 对感情&#xff1a;爱得厚重开阔&#x1f340; 对他人&#xff1a;保持尊重、友好、真诚和谦逊&#x1f340; 对生活&#x…

【论文解读】元学习:MAML

一、简介 元学习的目标是在各种学习任务上训练模型&#xff0c;这样它就可以只使用少量的训练样本来解决新任务。 论文所提出的算法训练获取较优模型的参数&#xff0c;使其易于微调&#xff0c;从而实现快速自适应。该算法与任何用梯度下降训练的模型兼容&#xff0c;适用于…