SpringBoot+Netty+Websocket实现消息推送

news2024/9/17 7:12:13

这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。

添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

定义netty端口号

websocket:
  netty:
    port: 8888
  path: /websocket

netty服务器

@Slf4j
@Component
public class NettyServer {
    /**
     * netty服务端口号
     */
    @Value("${websocket.netty.port}")
    private int port;
    /**
     * netty事件辅助组
     */
    private EventLoopGroup bossGroup;
    /**
     * netty事件工作组
     */
    private EventLoopGroup workGroup;

    /**
     * 管道配置
     */
    private final CustomChannelInitializer channelInitializer;

    public NettyServer(CustomChannelInitializer channelInitializer) {
        this.channelInitializer = channelInitializer;
    }


    /**
     * netty服务初始化
     */
    @PostConstruct
    public void start() {
        new Thread(() -> {

            bossGroup = new NioEventLoopGroup();

            workGroup = new NioEventLoopGroup();

            ServerBootstrap bootstrap = new ServerBootstrap();
            //bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            //设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            //设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            //设置管道
            bootstrap.childHandler(channelInitializer);
            try {
                ChannelFuture channelFuture = bootstrap.bind().sync();
                log.info("Netty服务启动成功,开启监听:{}", channelFuture.channel().localAddress());
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("Netty服务启动失败!", e);
                throw new RuntimeException(e);
            }

        }).start();
    }

}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @version 1.0.0
 * @description 业务类
 */
public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;

    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();


    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

管道配置

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * @version 1.0.0
 * @description Netty管道配置类
 */
@Component
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {
    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
    /**
     * websocket服务地址
     */
    @Value("${websocket.path:/websocket}")
    private String websocketPath;
    private final CustomChannelHandler channelHandler;

    public CustomChannelInitializer(CustomChannelHandler channelHandler) {
        this.channelHandler = channelHandler;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(channelHandler);
    }
}

自定义CustomChannelHandler

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.utils.StringUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @version 1.0.0
 * @description Netty管道handler类
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class CustomChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        if(StringUtils.isNotEmpty(uid)){
            NettyConfig.getChannelMap().put(uid, ctx.channel());
            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        super.exceptionCaught(ctx,cause);
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        if(StringUtils.isNotEmpty(userId)){
            NettyConfig.getChannelMap().remove(userId);
        }
    }
}

推送消息接口及实现类

public interface PushMsgService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String group, String msg);

    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

实现接口

@Service
public class PushMsgServiceImpl implements PushMsgService {

    @Override
    public void pushMsgToOne(String group, String msg) {
        Channel channel = NettyConfig.getChannel(group);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }
    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

具体的controller层接口

   /**
     * 获取弹框网关状态
     */
    @GetMapping("/upKnxNetworkLink/{uid}")
    public void upKnxNetworkLink(@PathVariable String uid){
        KnxNetworkLinkInfo knxNetworkLinkInfo =new KnxNetworkLinkInfo();
        knxNetworkLinkInfo.setStatus("0");
       List<KnxNetworkLinkInfo>knxNetworkLinkInfoList=knxNetworkLinkInfoService.queryList(knxNetworkLinkInfo);
        JSONArray array= JSONArray.parseArray(JSON.toJSONString(knxNetworkLinkInfoList));
        pushMsgService.pushMsgToOne(uid,array.toJSONString());
    }

使用postman测试Websocket推送
在这里插入图片描述
连接Websocket
在这里插入图片描述
在开一个窗口测试发送消息的接口
在这里插入图片描述
发送过后在回到连接Websocket窗口
在这里插入图片描述
前端需要做一个定时访问发送消息的接口,每发一次就会往前端推送一次数据。
参考:Springboot + netty +websocket 实现推送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1305072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot3.2.x支持虚拟线程

背景&#xff1a; 大家都知道jdk21已经发布一段时间了&#xff0c;springboot3.2开始正式支持虚拟线程了&#xff1b; 支持虚拟线程&#xff1a; 1、spring.threads.virtual.enabledtrue 开启虚拟线程 2、Servlet Web 服务器 当启用虚拟线程时&#xff0c;Tomcat和Jetty将使…

KubeSphere应用【二】Docker安装

一、Docker安装 1.下载Docker安装包 【地址】Index of linux/static/stable/x86_64/ 2.上传至服务器 # 解压文件 tar -xvf docker-20.10.10.tgz# 将docker 目录中的所有文件复制至/usr/bin/目录下 cp docker/* /usr/bin 3.配置docker.service文件 vim /usr/lib/systemd/sy…

Make pixels dance:high-dynamic video generation

1.Introduction 大多数视频生成主要关注文本到视频的生成&#xff0c;PixelDance在文本指令的基础上&#xff0c;将图像指令分别用于视频剪辑的第一帧和最后一帧&#xff0c;第一帧图像指令描绘了视频剪辑的主要场景&#xff0c;最后一帧图像是可选的&#xff0c;描述了剪辑的…

2024年AI云计算专题研究报告:智算带来的变化

今天分享的人工智能系列深度研究报告&#xff1a;《2024年AI云计算专题研究报告&#xff1a;智算带来的变化》。 &#xff08;报告出品方&#xff1a;华泰证券&#xff09; 报告共计&#xff1a;32页 Al 云计算 2024:关注智算带来的新变化 通过对海内外主要云厂商及其产业链…

HarmonyOS 的应用开发语言:ArkTS

本心、输入输出、结果 文章目录 HarmonyOS 的应用开发语言&#xff1a;ArkTS前言ArkTS 产生背景ArkTS 语言特点ArkTS 基本语法ArkTS 声明式 UIArkTS 状态管理ArkTS 渲染控制 ArkTS 轻量化并发机制ArkTS 相关文档花有重开日&#xff0c;人无再少年实践是检验真理的唯一标准 Harm…

mysql语句大全及用法

常用的MySQL语句和简要用法&#xff0c;以帮助你开始学习和使用MySQL。 连接数据库 mysql -u username -p在命令行中使用以上命令来连接到MySQL数据库服务器。username 是你的MySQL用户名&#xff0c;执行后会提示输入密码。 显示数据库 SHOW DATABASES;列出数据库服务器上…

Nginx首页修改及使用Nginx实现端口转发

按照我之前博客给的方法搭建好这样一个CTF靶场 但是呢它默认是在8000端口 如何直接访问IP地址或者域名就可以实现直接访问到靶场呢 我们需要将80端口的内容转发到8000&#xff0c;使用nginx实现端口转发功能 首先我们安装nginx&#xff1a; 安装工具和库 yum -y install gc…

数据结构二维数组计算题,以行为主?以列为主?

1.假设以行序为主序存储二维数组Aarray[1..100,1..100]&#xff0c;设每个数据元素占2个存储单元&#xff0c;基地址为10&#xff0c;则LOC[5,5]&#xff08; &#xff09;。 A&#xff0e;808 B&#xff0e;818 C&#xff0e;1010 D&…

【数字信号处理】DFT

DFT 2023年11月18日 #elecEngeneer 文章目录 DFT1. 离散傅里叶变换-DFT2. 离散傅里叶反变换-IDFT3. DFT的误差下链 1. 离散傅里叶变换-DFT 离散傅里叶变换&#xff08;Discrete Fourier Transform&#xff0c;DFT&#xff09;&#xff0c;是当有 N {N} N 个信号采样点&#…

1,使用IDLE开启我们第一个Python程序

前面我们已经安装好了Python&#xff0c;安装了Python后&#xff0c;他会自动帮我们安装一个IDLE。IDLE是一个Python自带的非常简洁的集成开发环境&#xff08;IDE&#xff09;。他是一个Python Shell&#xff0c;我们可以利用Python Shell与Python交互。下面我们就利用IDLE开发…

10基于matlab的悬臂梁四节点/八节点四边形单元有限元编程(平面单元)

悬臂梁&#xff0c;有限元编程。基于matlab的悬臂梁四节点/八节点四边形单元有限元编程&#xff08;平面单元&#xff09;&#xff0c;程序有详细注解&#xff0c;可根据需要更改参数&#xff0c;包括长度、截面宽度和高度、密度、泊松比、均布力、集中力、单元数量等。需要就拍…

水の数列

这题目没有修改&#xff0c;所以可以考虑预处理 显然\(x\)从大到小或者从小到大&#xff0c;被选中的数字是单调的(尽管区间变化个数没有单调性) 所以我们可以考虑枚举\(x\) 我最开始想的是从大到小枚举\(x\)&#xff0c;但是维护有一点复杂&#xff0c;因为是删除 这个时候就要…

12.12 作业

1&#xff0c; 源代码&#xff1a; #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);speerornew QTextToSpeech(this);idstartTimer(1000);//每隔一秒&#xf…

Linux Ubuntu 手动搭建webDav

1、安装 因为需要跟 zotero 进行交互&#xff0c;因此需要在服务器搭建一个webDav 以下是搭建步骤&#xff1a; sudo apt-get update sudo apt-get install apache2 Ubuntu 安装apache2来实现 不同于Centos 安装好了之后&#xff0c;运行 a2enmod dav_fs a2enmod dav 激…

040.Python面向对象_设计原则

我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448; 入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448; 虚 拟 环 境 搭 建 &#xff1a;&#x1f449;&…

Mysql的所有数据类型和它们的区别

一、数值类型 1. 普通整数数值类型 以下数据类型只能用以保存整数 整数数值类型类型存储大小&#xff08;字节&#xff09;有符号的取值范围&#xff08;允许存在负数&#xff09;无符号的取值范围TINYINT1-128 ~ 1270 ~ 255SMALLINT2- 327678 ~ 327670 ~ 65535MEDIUMINT3- 8…

性能优化 vue2/vue3 通过CDN 减少项目启动时间

其实更多可以通过压缩图片等文件大小 也会让项目运行快一些 以及尽量使用异步或者懒加载 使用CDN可以避免在项目中使用npm导入Vue的依赖项&#xff0c;从而减少项目启动时的加载时间 使用方法如下 <!-- Vue 2 --> <script src"https://cdn.jsdelivr.net/npm/vue…

Linux上使用一分钟搞定Kafka的安装

文章目录 一、前言二、安装三、验证是否安装成功 一、前言 一般我们要安装Kafka&#xff0c;还需要先安装JDK和Zookeeper&#xff0c;并进行相关配置。因为Kafka和Zookeeper都是运行在JVM之上的服务&#xff0c;所以需要先安装JDK。另外&#xff0c;Kafka依赖Zookeeper管理集群…

Python常用文件操作库详解与示例

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 文件操作是编程中常见的任务之一&#xff0c;而Python提供了丰富的文件操作库&#xff0c;使得文件的读取、写入、复制、移动等操作变得非常便捷。本文将深入介绍一些Python中常用的文件操作库&#xff0c;以及它…

CSS import 规则

导入 “navigation.css” 样式到当前的样式表&#xff1a; import “navigation.css”; /* 使用字符串 / 或者 import url(“navigation.css”); / 使用 url 地址 */ 属性定义及使用说明 CSS import 用于从其他样式表导入样式规则。 import 规则必须在 CSS 文档的头部&#xff…