Netty实战(十三)

news2025/2/13 12:46:10

WebSocket协议(一)

  • 一、什么是WebSocket 协议
  • 二、简单的 WebSocket 程序示例
    • 2.1 程序逻辑
    • 2.2 添加 WebSocket 支持
    • 2.3 处理 HTTP 请求
    • 2.4 处理 WebSocket 帧

一、什么是WebSocket 协议

WebSocket 协议是完全重新设计的协议,旨在为 Web 上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执。

二、简单的 WebSocket 程序示例

2.1 程序逻辑

我们先设计一个基于浏览器的聊天程序来更好的理解WebSocket ,它的逻辑如下:

(1)客户端发送一个消息;
(2)该消息将被广播到所有其他连接的客户端。

像这样:
在这里插入图片描述

2.2 添加 WebSocket 支持

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;它可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们先这样设定:如果被请求的 URL 以/ws 结尾,那么我们把该协议升级为 WebSocket;否则,服务器将使用基本的 HTTP/S。在连接已经升级完成之后,所有数据都将会使用 WebSocket 进行传输。

在这里插入图片描述

2.3 处理 HTTP 请求

上一节我们说使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。所以我们应该先实现处理 HTTP 请求的组件。让它提供用于访问聊天室并显示由连接的客户端发送的消息的网页。

import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;

/**
 * Author: lhd
 * Data: 2023/6/12
 * Annotate: http请求处理
 */
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //扩展 SimpleChannelInboundHandler 以处理 FullHttpRequest 消息

    private final String wsUri;
    private static final File INDEX;
    static {
        URL location = HttpRequestHandler.class
                .getProtectionDomain()
                .getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }
    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        //如果请求了 WebSocket协议升级,则增加引用计数(调用 retain()方法),并将它传递给下一个ChannelInboundHandler
        if (wsUri.equalsIgnoreCase(request.uri())) {
            ctx.fireChannelRead(request.retain());
        } else {
            //处理 100 Continue   请求以符合 HTTP 1.1 规范
            if (HttpUtil.is100ContinueExpected(request)) {
                send100Continue(ctx);
            }
            //读取 index.html
            RandomAccessFile file = new RandomAccessFile(INDEX, "r");
            HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaderNames .CONTENT_TYPE, "text/plain; charset=UTF-8");
            boolean keepAlive = HttpUtil.isKeepAlive(request);
            //如果请求了keep-alive,则添加所需要的 HTTP头信息
            if (keepAlive) {
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaderNames .CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            //将 HttpResponse写到客户端
            ctx.write(response);
            if (ctx.pipeline().get(SslHandler.class) == null) {
                //将 index.html 写到客户端
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            //写 LastHttpContent 并冲刷至客户端
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAlive) {
                //如果没有请求 keep-alive,则在写操作完成后关闭 Channel
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
            ctx.writeAndFlush(response);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

我们简单的说一下上面代码的流程:

(1) 如果该 HTTP 请求指向了地址为/ws 的 URI,那么 HttpRequestHandler 将调用 FullHttpRequest 对象上的 retain()方法,并通过调用 fireChannelRead(msg)方法将它转发给下一个 ChannelInboundHandler 。之所以需要调用 retain()方法,是因为调用 channelRead()方法完成之后,它将调用 FullHttpRequest 对象上的 release()方法以释放它的资源。

(2)如果客户端发送了 HTTP 1.1 的 HTTP 头信息 Expect: 100-continue,那么 HttpRequestHandler 将会发送一个 100 Continue 响应。在该 HTTP 头信息被设置之后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端。这不是一个 FullHttpResponse,因为它只是响应的第一个部分。此外,这里也不会调writeAndFlush()方法,在结束的时候才会调用。

(3)如果不需要加密和压缩,那么可以通过将 index.html 的内容存储到 DefaultFileRegion 中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,可以检查一下,是否有 SslHandler 存在于在ChannelPipeline 中。否则,可以使用 ChunkedNioFile。

(4)HttpRequestHandler 将写一个 LastHttpContent 来标记响应的结束。如果没有请求 keep-alive ,那么HttpRequestHandler 将会添加一个 ChannelFutureListener到最后一次写出动作的 ChannelFuture,并关闭该连接。在这里,将调用 writeAndFlush()方法以冲刷所有之前写入的消息。

这部分代码纯粹的是 HTTP 请求和响应,下面我们来处理 实际传输聊天内容的 WebSocket 帧。

WEBSOCKET 帧: WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

2.4 处理 WebSocket 帧

IETF 发布的 WebSocket RFC,定义了 6 种帧,Netty 为它们每种都提供了一个 POJO 实现。

类 型描 述
BinaryWebSocketFrame包含了二进制数据
TextWebSocketFrame包含了文本数据
ContinuationWebSocketFrame包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame 的文本数据或者二进制数据
CloseWebSocketFrame表示一个 CLOSE 请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame请求传输一个 PongWebSocketFrame
PongWebSocketFrame作为一个对于 PingWebSocketFrame 的响应被发送

我们的聊天应用程序将使用下面几种帧类型:

  • CloseWebSocketFrame;
  • PingWebSocketFrame;
  • PongWebSocketFrame;
  • TextWebSocketFrame。

TextWebSocketFrame 是我们唯一真正需要处理的帧类型。为了符合 WebSocket RFC,Netty 提供了 WebSocketServerProtocolHandler 来处理其他类型的帧。

下面我们来处理 TextWebSocketFrame 的 ChannelInboundHandler,再将它的 ChannelGroup 中跟踪所有活动的 WebSocket 连接:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

/**
 * Author: lhd
 * Data: 2023/6/12
 * Annotate: 处理文本帧
 */
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //扩展 SimpleChannelInboundHandler, 并处理 TextWebSocketFrame 消息
    private final ChannelGroup group;
    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    //重写 userEventTriggered() 方法以处理自定义事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            //如果该事件表示握手成功,则从该Channelipeline中移除 HttpRequestHandler,因为将不会接收到任何 HTTP 消息了
            ctx.pipeline().remove(HttpRequestHandler.class);
            //通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            //将新的 WebSocket Channel添加到 ChannelGroup 中,以便它可以接收到所有的消息
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //增加消息的引用计数,并将它写到 ChannelGroup 中所有已经连接的客户端
        group.writeAndFlush(msg.retain());
    }
}

简单的说一下:

(1)TextWebSocketFrameHandler 只有少量的责任。当和新客户端的 WebSocket握手成功完成之后 ,它将通过把通知消息写到 ChannelGroup 中的所有 Channel 来通知所有已经连接的客户端,然后它将把这个新 Channel 加入到该 ChannelGroup 中 。

(2)如果接收到了 TextWebSocketFrame 消息 ,TextWebSocketFrameHandler 将调用TextWebSocketFrame 消息上的 retain()方法,并使用 writeAndFlush()方法来将它传输给 ChannelGroup,以便所有已经连接的 WebSocket Channel 都将接收到它。

(3)然后调用 retain()方法,因为当 channelRead0()方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在 channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

(4)因为 Netty 在内部处理了大部分剩下的功能,所以现在剩下唯一需要做的事情就是为每个新创建的 Channel 初始化其 ChannelPipeline。所以我们需要一个 ChannelInitializer。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/639470.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

读书笔记-《ON JAVA 中文版》-摘要16[第十六章 代码校验]

文章目录 第十六章 代码校验1. 测试1.1 单元测试1.2 JUnit1.3 测试覆盖率的幻觉 2. 前置条件2.1 断言&#xff08;Assertions&#xff09;2.2 Java 断言语法2.3 Guava 断言2.4 使用断言进行契约式设计2.4.1 检查指令2.4.2 前置条件2.4.3 后置条件2.4.4 不变性2.4.5 放松 DbC 检…

Frida技术—逆向开发的屠龙刀

简介 Frida是一种基于JavaScript的动态分析工具&#xff0c;可以用于逆向开发、应用程序的安全测试、反欺诈技术等领域。Frida主要用于在已安装的应用程序上运行自己的JavaScript代码&#xff0c;从而进行动态分析、调试、修改等操作&#xff0c;能够绕过应用程序的安全措施&a…

路径规划算法:基于人工电场优化的路径规划算法- 附代码

路径规划算法&#xff1a;基于人工电场优化的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于人工电场优化的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化…

【Leetcode60天带刷】day06哈希表——242.有效的字母异位词,349. 两个数组的交集,202题. 快乐数,1. 两数之和

题目&#xff1a;242.有效的字母异位词 Leetcode原题链接&#xff1a;242. 有效的字母异位词 思考历程与知识点&#xff1a; 如果一个字母一个字母的找&#xff0c;也就是暴力&#xff0c;用两个for的话时间复杂度是O(N^2)&#xff1b; 我们可以换个思路&#xff0c;a~z一共…

Telerik Report Server R2 2023

Telerik Report Server R2 2023 仪表报告项-使用仪表或类似表盘的显示提供数据的可视化表示。 报告项上的AccessibleRole属性-ARIA(可访问的富Internet应用程序)支持已显著改进。在Web上&#xff0c;当启用了辅助功能时&#xff0c;呈现的报表项包含预定义的辅助功能角色。这样…

(七)矢量数据的空间分析——叠置分析①

矢量数据的空间分析——叠置分析 叠置分析是将代表不同主题的各个数据层面进行叠置&#xff0c;产生一个新的数据层面&#xff0c;叠置结果综合了原来两个或多个层面要素所具有的属性。 叠置分析不仅生成了新的空间关系&#xff0c;而且还将输入的多个数据层的属性联系起来产…

随机的乐趣和游戏

1、猜数字游戏 #GuessingGame.py import random the_number random.randint(1, 10) print("计算机已经在1到10之间随机生成了一个数字&#xff0c;") guess int(input("请你猜猜是哪一个数字: ")) while guess ! the_number:if guess > the_number:p…

【MySQL】数据库基本知识小结

数据库的基本概念 数据库&#xff1a;DataBase 简称 DB&#xff0c;就是信息的集合或者说数据库是由数据库管理系统管理的数据的集合。数据库管理系统&#xff1a;DataBase Management System 简称 DBMS&#xff0c;是一种操纵和管理数据库的大型软件&#xff0c;通常用于建立…

数据结构 一绪论

第一章&#xff1a;绪论 1.1数据结构的基本概念 1.数据&#xff1a;数据是信息的载体&#xff0c;是描述客观事物属性的数、字符以及所有能输入到计算机中并被程序识别 和处理的符号的集合。 2.数据元素&#xff1a;数据元素是数据的基本单位&#xff0c;通常作为一个整体进行…

软件项目质量跟踪控制的3大方法

1、质量度量法 质量度量法包括尺度度量和二元度量两种方法&#xff0c;而尺度度量是定量度量&#xff0c;适用可直接度量的特性。如缺陷率 而二元度量是定性度量&#xff0c;适用间接度量的质量特性。如使用性&#xff0c;灵活性。 软件项目质量跟踪控制的3大方法&#xff1a;质…

ReentrantLock实现原理-公平锁

在ReentrantLock实现原理(1)一节中&#xff0c;我们了解了ReentrantLock非公平锁的获取流程&#xff0c;在本节中我们来看下ReentrantLock公平锁的创建以及锁管理流程 创建ReentrantLock公平锁 创建公平锁代码如下&#xff1a; ReentrantLock reentrantLock new ReentrantL…

elementui 自定义loading动画加载层

elementui 自定义loading动画加载层。main.js中添加 import { Loading } from element-ui /* 自定义加载层 */ Vue.prototype.openLoading function(wer) {const loading Loading.service({lock: true, // 是否锁屏text: , // 加载动画的文字// spinner: inner-circles-loade…

数据库期末复习(8)并发控制

笔记 数据库DBMS并发控制(1)_旅僧的博客-CSDN博客 数据库 并发控制(2)死锁和意向锁_旅僧的博客-CSDN博客 冲突可串行化和锁 怎么判断是否可以进行冲突可串行化:简便的方法是优先图 只有不同对象和同一对象都是读才不能发生非串行化调度。我真傻 两个节点其实也可以算是一…

粤海街道后海村城市更新单元旧改项目,规划建面约42.6万平

项目名称&#xff1a;粤海街道后海村城市更新单元 项目地址&#xff1a;南山区粤海街道&#xff0c;西临天后西路&#xff0c;东至后海大道&#xff0c;南近东滨路&#xff0c;北近创业路。 开发商&#xff1a;远洋集团 申报主体&#xff1a;深圳市蛇口后海实业股份公司 拆…

SEMICON China 2023| 加速科技将携全系新品重磅亮相,欢迎打卡加速科技展台

2023年6月29日-7月1日&#xff0c;全球规模最大、规格最高的半导体行业盛会—SEMICON China 2023将在上海新国际博览中心盛大举行。作为业内领先的半导体测试设备供应商&#xff0c;杭州加速科技将携全系重磅新品及全系列测试解决方案受邀参展。 展位信息&#xff1a;E5馆 5643…

pytest+allure+jenkins持续集成及生成测试报告

目录 前言 一、jenkins安装 二、插件安装 三、构建项目 四、查看运行结果 总结&#xff1a; 前言 前面&#xff0c;讲了“Pycharmpytestallure打造高逼格的测试报告”&#xff0c;但实际工作中&#xff0c;往往需要通过jenkins进行自动化测试用例的持续集成并自动生成测试…

KW 喜报 | KaiwuDB 斩获 2023 数博会“优秀科技成果”奖

5月26日&#xff0c;大数据领域的国家级盛会——2023 中国国际大数据产业博览会&#xff08;以下简称“2023 数博会”&#xff09;在贵阳盛大开幕。作为大会最重磅的环节之一&#xff0c;“2023 领先科技成果发布会”于数博发布中心场地举办&#xff0c;向全行业发布 70 余项兼…

数据脱敏/换行/枚举的处理【EasyPoi实战系列】- 第474篇

历史文章&#xff08;文章累计460&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 【…

DDP分布式训练中遇到的一些问题

1&#xff1a;所有forward的输出必须参与到loss计算并回传 2&#xff1a;类似于layer_norm这样的操作是无需进行分布式通信的&#xff0c;也无法进行分布式通信&#xff0c;所以在DDP的时候必须把find_unused_parameters设置为True 3&#xff1a;当报错形式为如下时&#xff…

C语言之指针初阶(2)

目录 1. 指针是什么 2. 指针和指针类型 3. 野指针 4. 指针运算 5. 指针和数组 6. 二级指针 7. 指针数组 上次我们已经把前4个部分给大家讲完了&#xff0c;现在我们来讲一下后面三个部分 首先看数组和指针 指针和数组之间是什么关系呢&#xff1f; 指针变量就是指针变量&…