Spring boot 集成netty实现websocket通信

news2024/11/20 10:30:45

一、netty介绍

Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

二、工程搭建

实验目标:实现推送消息给指定的用户

pom.xml

 
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>netty</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.87.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
    </dependencies>
</project>

属性文件

 
 
server:
  port: 8088

netty server

 
 
package com.et.netty.server;


import com.et.netty.config.ProjectInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;


import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName NettyServer.java
 * @description: TODO
 * @createTime 2023年02月06日 16:41:00
 */
@Component
public class NettyServer {
    static final Logger log = LoggerFactory.getLogger(NettyServer.class);


    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:8889}")
    int port;


    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;


    @Autowired
    ProjectInitializer nettyInitializer;


    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            // 设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 设置管道
            bootstrap.childHandler(nettyInitializer);


            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }


    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

ProjectInitializer

初始化,设置websocket handler

 
 
package com.et.netty.config;


import com.et.netty.handler.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName ProjectInitializer.java
 * @description: 管道配置
 * @createTime 2023年02月06日 16:43:00
 */
@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {


    /**
     * webSocket协议名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";


    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;


    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

WebSocketHandler

 
 
package com.et.netty.handler;


import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.et.netty.config.NettyConfig;
import com.et.netty.server.NettyServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName WebSocketHandler.java
 * @description: TODO
 * @createTime 2023年02月06日 16:44:00
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);


    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }


    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());


        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());


        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);


        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }


    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }


    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

NettyConfig

 
 
package com.et.netty.config;


import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;


import java.util.concurrent.ConcurrentHashMap;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName NettyConfig.java
 * @description: 管理全局Channel以及用户对应的channel(推送消息)
 * @createTime 2023年02月06日 16:43:00
 */
public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;


    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;


    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();




    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }


    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }


    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

controller

 
 
package com.et.netty.controller;


import com.et.netty.service.PushMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;




/**
 * @author dongliang7
 * @projectName
 * @ClassName TestController.java
 * @description: TODO
 * @createTime 2023年02月06日 17:48:00
 */
@RestController
@RequestMapping("/push")
public class TestController {
    @Autowired
    PushMsgService pushMsgService;


    /**
     * 推送消息到具体客户端
     * @param uid
     */
    @GetMapping("/{uid}")
    public void pushOne(@PathVariable String uid) {
        pushMsgService.pushMsgToOne(uid, "hello-------------------------");
    }


    /**
     * 推送消息到所有客户端
     */
    @GetMapping("/pushAll")
    public void pushAll() {
        pushMsgService.pushMsgToAll("hello all-------------------------");
    }
}

PushMsgService

package com.et.netty.service;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName PushMsgService.java
 * @description: 推送消息接口
 * @createTime 2023年02月06日 16:44:00
 */
public interface PushMsgService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String userId, String msg);


    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

PushMsgServiceImpl

package com.et.netty.service;


import com.et.netty.config.NettyConfig;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;




import java.util.Objects;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName PushMsgServiceImpl.java
 * @description: 推送消息实现类
 * @createTime 2023年02月06日 16:45:00
 */
@Service
public class PushMsgServiceImpl implements PushMsgService {


    @Override
    public void pushMsgToOne(String userId, String msg) {
        Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }


        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }


    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

文章值贴出部分关键代码,具体的详情代码参加代码仓库的netty模块

代码仓库

  • https://github.com/Harries/springboot-demo

三、测试

启动springboot工程

2024-03-08 11:21:32.975  INFO 10348 --- [       Thread-2] com.et.netty.server.NettyServer          : Server started and listen on:/0:0:0:0:0:0:0:0:8889

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{'uid':'sss'}给服务端95ed8191ebb1c8c9f021df580ed604bf.png打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sssbb8db40adbcff28d239bc1495dff7c36.png 

四、引用

  • https://www.cnblogs.com/dongl961230/p/17099057.html

  • http://www.liuhaihua.cn/archives/710299.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1514210.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【研发日记】,Matlab/Simulink开箱报告(十)——Requirements Toolbox

前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;五&#xff09;——S-Fuction模块(C MEX S-Function)》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;六&#xff09;——S-Fuction模块&#xff08;TLC&#xff09;》 见《开…

狂揽Github—start19.7k☆开源OCR—Umi-OCR

文章目录 背景Umi-OCR—源码下载Umi-OCR—可执行程序下载页面介绍截图OCR识别批量OCR识别批量文档二维码全局设置 总结&#xff1a; 背景 大家都知道我是一个Python办公自动化的小小程序员&#xff0c;经常收集一些免费开源的OCR供大家使用&#xff0c;目前我已经写出来多家OCR…

集智书童 | 炸裂 !轻量化YOLO | ShuffleNetv2与Transformer结合,重塑YOLOv7成就超轻超快YOLO

本文来源公众号“集智书童”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;炸裂 &#xff01;轻量化YOLO | ShuffleNetv2与Transformer结合&#xff0c;重塑YOLOv7成就超轻超快YOLO 随着移动计算技术的迅速发展&#xff0c;在移动…

flutter入门

本文真对 Flutter 的技术特性&#xff0c;做了一些略全面的入门级的介绍&#xff0c;如果你听说过Flutter&#xff0c;想去了解它&#xff0c;但是又不想去翻厚厚的API&#xff0c;那么本文就是为你准备的。 随着纯客户端到Hybrid技术&#xff0c;到RN&Weex&#xff0c;再…

【OpenGL手册13】 光照贴图

目录 一、说明二、漫反射贴图三、镜面光贴图四、采样镜面光贴图练习 一、说明 在上一节中&#xff0c;我们讨论了让每个物体都拥有自己独特的材质从而对光照做出不同的反应的方法。这样子能够很容易在一个光照的场景中给每个物体一个独特的外观&#xff0c;但是这仍不能对一个…

C#重新认识笔记_ FixUpdate + Update

C#重新认识笔记_ FixUpdate Update Update: 刷新频率不一致,非物理对象的移动&#xff0c;简单的刷新可用&#xff0c; FixedUpdate: 刷新频率一致,按照固定频率刷新&#xff0c;一般调用FixedUpdate之后&#xff0c;会立即进入必要的物理计算中,因此&#xff0c;任何影响刚…

层次式架构设计

本博客地址&#xff1a;https://security.blog.csdn.net/article/details/136660435 一. 概述 1、软件体系结构为软件系统提供了结构、行为和属性的高级抽象&#xff0c;由构成系统的元素描述这些元素的相互作用、指导元素集成的模式以及这些模式的约束组成。层次式体系结构设…

王庆:当下股市过于悲观,A股、港股基本完成补跌和普跌过程,逆向布局时机已到

核心观点&#xff1a; 1、房地产对中国经济增长拖累最严重的时期正在过去...密切关注真正拐点的出现。 2、当前资本市场从价格表现上来讲&#xff0c;表现的远远超过了基本面所决定的悲观程度。 由于当前资本市场过于悲观&#xff0c;那么反过来就是孕育着机会。 3、我们判…

leetcode刷题(javaScript)——分治思想(二分查找、快速排序)相关场景题总结

分治思想是一种将问题分解成更小的子问题&#xff0c;然后解决子问题并将结果合并的算法设计策略。二分查找、快速排序和折半查找都属于分治思想的经典算法。在leetcode里&#xff0c;分治思想一般结合其他场景出现&#xff0c;构成复合型题目。但是在看题时一定要了解能否用分…

Docker入门二(应用部署、迁移与备份)

文章目录 一、应用部署1.MySQL部署2.Redis部署3.Nginx部署 二、迁移与备份1.容器做成镜像2.把镜像被分成压缩包 一、应用部署 1.MySQL部署 在dokcer中部署mysql&#xff0c;以后不需要在宿主机上装mysql1.做端口映射docker run -id --namemysql5.7 -p 3306:3306 -e MYSQL_ROOT…

九数分三组

枚举三位数时&#xff0c;不用写三个循环&#xff0c;写出最小和最大数循环就行。在这题里要求三个数中不能有重复的数字&#xff0c;先转换为字符串&#xff0c;再转换为字符数组进行排序&#xff0c;最后比较字符串就可以得出结果。这题把结果和原因调换了一下

每日OJ题_牛客HJ37 统计每个月兔子的总数(IO型OJ)

目录 牛客HJ37 统计每个月兔子的总数 解析代码 牛客HJ37 统计每个月兔子的总数 统计每个月兔子的总数_牛客题霸_牛客网 解析代码 #include <iostream> #include <vector>using namespace std; int main() {int n 0;cin >> n;vector<int> arr(n 1…

牛客Highway

题目大意 在ICPCCamp中&#xff0c;有n个方便编号的城镇&#xff0c;编号为1,2,...,n&#xff0c;它们之间通过&#xff08;n-1&#xff09;条道路相连。连接第i个城镇a_i和b_i的道路的长度为c_i。保证任意两座城市之间只能通过道路到达。 Bobo希望修建&#xff08;n-1&#…

PostGIS 中的 K-Means 聚类操作及应用

K-Means算法&#xff1a; K-means 是数据科学和商业的基本算法。让我们深入了解一下。 1. K-means是一种流行的用于聚类的无监督机器学习算法。它是用于客户细分、库存分类、市场细分甚至异常检测的核心算法。 2. 无监督&#xff1a;K-means 是一种无监督算法&#xff0c;用于…

记录一下C++的学习之旅吧--C++基础

文章目录 前言using namespace std; 使用标准命名空间一、helloworld-输出表示1.1代码1.2 运行结果 二、变量2.1.1 普通变量代码2.1.2 运行结果2.2.1 常量和变量代码2.2.2 运行结果 三、sizeof---统计数据类型所占的内存大小3.1 代码3.2 运行结果 四、小数表示4.2 运行结果 五、…

C语言分析基础排序算法——计数排序

目录 计数排序 计数排序基本思路 计数排序改进思路 计数排序 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用。具体思路为&#xff1a; 统计相同元素出现次数根据统计的结果将序列回收到原来的序列中 计数排序基本思路 基本思路分析&#xff1a; //以…

2024春秋蓝桥杯reverse——crackme01

尝试了下输入没有任何反应 查看——32位——IDA打开 我之前没怎么写过win32&#xff0c;所以我开始在string里面找flag,wrong,right什么的字符&#xff0c;都不行 然后我又在函数里面找main&#xff0c;也什么收获的没有,OK废话完了 在win32里面 关于弹窗的函数&#xff1a;…

EPSON X2A0003510033 XV7081BB介绍

X2A0003510033 XV7081BB是一款具有SPI接口&#xff0c;这款传感器主要用于商业类应用&#xff0c;具体可以应用于抗震和姿态控制领域&#xff0c;以及人机接口的运动检测中&#xff0c;这表明它在工业应用、机器人技术、自动驾驶车辆以及任何需要精确角度测量和姿态控制的场合都…

Codeforces Round 933 (Div. 3) A~D

比赛链接 : codeforces.com/contest/1941 A . Rudolf and the Ticket 直接暴力即可 ; #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \n #define lowbit(x) (x&(-x)) #define sz(a) (int)a.size() #define p…

手搭手RocketMQ发送消息

消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构) 消息中间件: acti…