SpringBoot集成netty实现websocket通信

news2024/11/6 3:17:04

实现推送消息给指定的用户

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>netty</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.87.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
    </dependencies>
</project>

二、属性文件和启动类

server:
  port: 8088

三、Controller接口

@RestController
@RequestMapping("/push")
public class TestController{
	
	@Autowired
	PushMsgService pushMsgService
}

四、PushMsgService

public interface PushMsgService{
	
	//推送给指定用户
	void pushMsgToOne(String userId,String msg);

	//推送给所有用户
	void pushMsgToAll(String msg);
}
@Service
public class PushMsgServiceImpl implements PushMsgService{
	
	@Override
	public void pushMsgToOne(String userId, String msg){
		Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
	}

	@Override
	public void pushMsgToAll(String msg){
		NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
	}
}

五、NettyConfig

public class NettyConfig{
	
	//定义全局channel,管理所有的channel
	private static volatile ChannelGroup channelGroup = null;

	//存放请求ID与channel的对应关系
	private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

	//定义两把锁
	private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

	public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

六、netty server

@Component
public class NettyServer {
    static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:8889}")
    int port;

    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;

    @Autowired
    ProjectInitializer nettyInitializer;

    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            // 设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 设置管道
            bootstrap.childHandler(nettyInitializer);

            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

七、ProjectInitializer初始化,设置websocket handler

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * webSocket协议名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

八、WebSocketHandler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

测试:

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{‘uid’:‘sss’}给服务端
在这里插入图片描述
打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sss
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1505658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis-集群 原生部署和工具自动部署

什么redis集群&#xff1f; redis集群是一个提供在多个redis节点之间共享数据的程序集。它并不像redis主从复制模式那样仅提供一个master节点来提供写服务&#xff0c;而是会提供多个master节点来提供写服务&#xff0c;每个master节点中存储的数据都不一样&#xff0c;这些数据…

HarmonyOS系统开发基础环境搭建

目录 一 鸿蒙介绍&#xff1a; 1.1 HarmonyOS系统 1.2 HarmonyOS软件编程语言 二 HarmonyOS编程环境搭建 1.1 官网下载地址 1.2搭建开发流程 1.3 创建安装目录 1.4 下载DevEco Studio​编辑 1.5 下载后点击安装 1.6 自动添加桌面快捷和bin路径 ​编辑1.7 安装好运行 …

[Angular 基础] - 表单:模板驱动表单

[Angular 基础] - 表单&#xff1a;模板驱动表单 之前的笔记&#xff1a; [Angular 基础] - routing 路由(上) [Angular 基础] - routing 路由(下) [Angular 基础] - Observable Angular 内置两种表单的支持&#xff0c;这篇写的就是第一种&#xff0c;即模板驱动表单 (Tem…

wps由于找不到krpt.dll,无法继续执行代码的解决方法

遇到由于找不到krpt.dll,无法继续执行代码的问题时&#xff0c;理解如何修复这个问题变得至关重要。本文会教大家krpt.dll的恢复流程&#xff0c;并介绍该DLL文件的相关属性。我们将一步步指导你如何处理缺失文件的情况&#xff0c;让你能够解决阻碍代码正常运行的障碍&#xf…

C语言初学10:typedef

一、作用 为用户定义的数据类型取一个新名字 二、对结构体使用typedef定义新的数据类型名字 #include <stdio.h> #include <string.h>typedef struct Books //使用 typedef 来定义一个新的数据类型名字 {char title[50];} book;int main( ) {//book是typedef定…

背包问题算法

背包问题算法 0-1背包问题二维数组一维数组 完全背包问题二维数组一维数组 多重背包问题一维数组 0-1背包问题 问题&#xff1a;背包的容量为9&#xff0c;有重量分别为[2, 4, 6, 9]的四个物品&#xff0c;价值分别为[3, 4, 5, 6]&#xff0c;求背包能装的物品的最大价值是多少…

LiveNVR监控流媒体Onvif/RTSP功能-支持云端录像监控视频集中存储录像回看录像计划配置NVR硬件设备录像回看

LiveNVR支持云端录像监控视频集中存储录像回看录像计划配置NVR硬件设备录像回看 1、流媒体服务软件2、录像回看3、查看录像3.1、时间轴视图3.2、列表视图 4、如何分享时间轴录像回看&#xff1f;5、iframe集成示例7、录像计划7、相关问题7.1、录像存储位置如何配置&#xff1f;…

【电路】工作于直流4.5V电压的声控小灯

这个声控小灯用于控制4.5V直流供电的小灯泡&#xff0c;可用作学生实验也可用作声控夜光小灯。电路主要由5G555时基集成电路和一些分立元件组成&#xff0c;如下图所示&#xff1a; 工作原理 压电陶瓷片B与晶体三极管VT1&#xff0c;电阻R1&#xff0c;和电阻R2等组成了声控脉…

数据结构之单链表及其实现!

目录 ​编辑 1. 顺序表的问题及思考 2.链表的概念结构和分类 2.1 概念及结构 2.2 分类 3. 单链表的实现 3.1 新节点的创建 3.2 打印单链表 3.3 头插 3.4 头删 3.5 尾插 3.6 尾删 3.7 查找元素X 3.8 在pos位置修改 3.9 在任意位置之前插入 3.10 在任意位置删除…

【Primsjs】vue+代码高亮

效果 括号变色括号鼠标移入高亮效果 代码效果 目录树 在这里插入图片描述 安装 cnpm i primsjs简介 文档&#xff08;点我进去&#xff09; 备用地址-https://prismjs.com/docs/index.html Primsjs使用 导入配置插件 注意&#xff1a;需要什么插件就导入什么插件 码 &l…

Linux centos6安装rz、sz命令

centos6传文件提示command not found # yum install lrzsz 提示错误 wget http://www.ohse.de/uwe/releases/lrzsz-0.12.20.tar.gz 下载离线包 https://www.ohse.de/uwe/software/lrzsz.html 下载最新版本 [rootnode1 ~]# tar -zxvf lrzsz-0.12.20.tar.gz …

PV与PVC知多少?解锁CKA认证考点攻略!

往期精彩文章 : 提升CKA考试胜算&#xff1a;一文带你全面了解RBAC权限控制&#xff01;揭秘高效运维&#xff1a;如何用kubectl top命令实时监控K8s资源使用情况&#xff1f;CKA认证必备&#xff1a;掌握k8s网络策略的关键要点提高CKA认证成功率&#xff0c;CKA真题中的节点维…

蓝桥省赛倒计时 35 天-双指针

双指针介绍 双指针算法是一种常用的算法技巧&#xff0c;它通常用于在数组或字符串中进行快速查找、匹配、排序或移动操作。 pointer 双指针并非真的用指针实现&#xff0c;一般用两个变量来表示下标&#xff08;在后面都用指针来表示&#xff09;。 双指针算法使用两个指针在数…

【Java JVM】Class 文件

Java 的口号 “一次编写, 到处运行 (Write Once, Run Anywhere)” 的基础: JVM 和 所有平台都统一支持的程序存储格式 – 字节码 (Byte Code)。 只要在对应的平台安装对应的 JVM, 将我们编写的源码编译为 Class 文件, 就能达到了一次编写, 导出运行的目标, 中间的所有细节由不同…

0103n阶行列式-行列式-线性代数

文章目录 一 n阶行列式二 三阶行列式三 特殊行列式结语 一 n阶行列式 ∣ a 11 a 12 ⋯ a 1 n a 21 a 22 ⋯ a 2 n ⋯ ⋯ ⋯ ⋯ a n 1 a n 2 ⋯ a n n ∣ \begin{vmatrix}a_{11}&a_{12}&\cdots&a_{1n}\\a_{21}&a_{22}&\cdots&a_{2n}\\\cdots&\cdots…

大型房企知识竞赛活动方案

&#xff08;一&#xff09; 线上挑战赛 1、加入团队 个人可以依据自身情况选择加入初始团队&#xff0c;也可创建团队。 2、题库来源 参考权威题库&#xff0c;适当加入公司帮扶贫困县的相关历史数据题目 3、小程序活动专题页 模块包括&#xff1a;党史知识线上挑战赛、活动宣…

Unity 采用自定义通道ShaderGraph实现FullScreen的窗户雨滴效果

效果如下 ShaderGraph实现 N21 随机化 DragLayer分层 将DragLayer分成四层&#xff0c;分别调整每层的缩放和大小 Shader实现的链接&#xff08;Unity 雨水滴到屏幕效果&#xff09; 我也是参考这个实现Shader Graph

Android7.1 ANR error 弹窗处理

Android7.1 ANR error 弹窗处理 问题描述解决方法 郑重声明:本人原创博文&#xff0c;都是实战&#xff0c;均经过实际项目验证出货的 转载请标明出处:攻城狮2015 Platform: Rockchip OS:Android 7.1.2 Kernel: 3.10 问题描述 有时会用到第三方apk&#xff0c;内置到系统中&…

通信-CAN-00 标准概述

总结了下CAN的基本知识&#xff0c;实际CAN的标准&#xff0c;内容&#xff0c;工具使用&#xff0c;上位机开发&#xff0c;下位机开发等&#xff0c;后续会找时间慢慢更新。本文主要介绍CAN标准&#xff0c;并对11898进行了进一步的介绍。 1 CAN概念 CAN-Controller Area N…

C++ 多状态dp

目录 按摩师 打家劫舍 打家劫舍2 删除并获得点数 粉刷房子 按摩师 面试题 17.16. 按摩师 最大值问题 f : 预约此次的最长时间 g &#xff1a;不预约此次的最长时间 出现的错误&#xff1a;return max(f[n - 1]), g[n - 1]); 注意&#xff1a;①题目没给nums的范围&…