Netty系列-6 Netty消息处理流程

news2024/12/28 2:28:21

背景

前文介绍了Netty服务端的启动流程,服务端启动后可以处理客户端发送的请求,包括连接请求和普通消息。

1.处理连接

当客户端有连接请求到达时,服务器会创建通道并将通道注册到选择器上,处理逻辑与NIO中实现完全一致。
详细流程如下所示:
在这里插入图片描述
本章节将分小节对上图进行详细介绍。

1.1 Server阻塞监听

在Netty系列-5 Netty启动流程中介绍过,当Netty服务端启动时,将NioServerSocketChannel作为attachment:

public abstract class AbstractNioChannel extends AbstractChannel {
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
				//... 异常处理逻辑
            }
        }
    }
}

将this(NioServerSocketChannel对象)作为attachment传递给register方法,当选择器selector被事件唤醒时,可以通过selectionKey.attachment获取NioServerSocketChannel对象。
注意:1.2 将用到这部分内容。
将ServerSocketChannel注册到选择器后,关联的NioEventLoop将陷入阻塞等待状态(调用选择器的选择方法阻塞监听连接请求)。

1.2 server接受连接

当连接请求到达Netty服务器时,NioEventLoop线程从select阻塞中唤醒, 并执行processSelectedKeys方法处理已就绪的事件:

private void processSelectedKeys() {
	// ...
   processSelectedKeysOptimized();
}
	
private void processSelectedKeysOptimized() {
	for (int i = 0; i < selectedKeys.size; ++i) {
		final SelectionKey k = selectedKeys.keys[i];
		selectedKeys.keys[i] = null;

		final Object a = k.attachment();
		processSelectedKey(k, (AbstractNioChannel) a);
		//...
	}
}

依次遍历就绪的SelectionKey,从SelectionKey中取出attachment,即取出ServerSocketChannel对象(1.1中介绍过),然后调用processSelectedKey方法实际处理每个SelectedKey:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
	try {
		int readyOps = k.readyOps();
		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
			int ops = k.interestOps();
			ops &= ~SelectionKey.OP_CONNECT;
			k.interestOps(ops);
			unsafe.finishConnect();
		}
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			ch.unsafe().forceFlush();
		}
		// 处理可读、连接事件
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
	} catch (CancelledKeyException ignored) {
		unsafe.close(unsafe.voidPromise());
	}
}

此时readyOps是Accept事件,因此表达式
(readyOps & (SelectionKey.OP_READ |SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0为true
进入unsafe.read()方法:

private final List<Object> readBuf = new ArrayList<Object>();
 
public void read() {
	
	do {
		// ... 
		
		int localRead = doReadMessages(readBuf);
		if (localRead == 0) {
			break;
		}
		if (localRead < 0) {
			closed = true;
			break;
		}
		allocHandle.incMessagesRead(localRead);
	} while (allocHandle.continueReading());


	int size = readBuf.size();
	for (int i = 0; i < size; i ++) {
		readPending = false;
		pipeline.fireChannelRead(readBuf.get(i));
	}
	readBuf.clear();
	allocHandle.readComplete();
	pipeline.fireChannelReadComplete();
	//...省略try-catch异常处理分支
}

逻辑较为清晰:持续调用doReadMessages读取数据并添加到readBuf列表中,直到数据完全读完;然后遍历readBuf列表,将每个消息(元素)以channelRead事件触发到pipeline,消息全部处理完成后,向pipeline提交ChannelReadComplete事件。
这里有个细节需要关注一下,doReadMessages内部将NIO的通道封装为netty的通道:

protected int doReadMessages(List<Object> buf) throws Exception {
    // 接收客户端连接,得到SocketChannel对象:serverSocketChannel.accept()得到SocketChannel
	SocketChannel ch = SocketUtils.accept(javaChannel());

	try {
		if (ch != null) {
            // 封装SocketChannel对象为NioSocketChannel类型
			buf.add(new NioSocketChannel(this, ch));
			return 1;
		}
	} catch (Throwable t) {
		logger.warn("Failed to create a new channel from an accepted socket.", t);

		try {
			ch.close();
		} catch (Throwable t2) {
			logger.warn("Failed to close a socket.", t2);
		}
	}
	return 0;
}

这里的buf集合存储的是NioSocketChannel类型的元素(消息)。
因此,进入Pipeline的channelRead事件的对象是NioSocketChannel对象。消息在pipeline中传递的顺序由左往右,如下所示:
在这里插入图片描述
当消息经过ServerBootstrapAcceptor时被处理:取出消息对象NioSocketChannel,配置NioSocketChannel对象,将NioSocketChannel对象注册到选择器上。

1.3 ServerBootstrapAcceptor作用

ServerBootstrapAcceptor是ChannelInboundHandlerAdapter的子类,属于入站事件处理器。ServerBootstrapAcceptor重写了channelRead和exceptionCaught方法,核心逻辑在channelRead中:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
	// ...
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 步骤1:从消息中提取通道对象,为NioSocketChannel类型
		final Channel child = (Channel) msg;
        // 步骤2:向NioSocketChannel通道设置handler、配置options和attrs
		child.pipeline().addLast(childHandler);
		setChannelOptions(child, childOptions, logger);
		setAttributes(child, childAttrs);
		try {
            // 步骤3: 注册到childGroup(NioEventLoopGroup)上
			childGroup.register(child).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					if (!future.isSuccess()) {
						forceClose(child, future.cause());
					}
				}
			});
		} catch (Throwable t) {
			forceClose(child, t);
		}
	}
}

服务器接收连接请求得到NIO的SocketChannel,netty框架封装为NioSocketChannel对象,传递给Pipeline. 当消息沿着Pipeline传递到ServerBootstrapAcceptor的channelRead方法时,将进行以下处理:
[1] 从消息中提取通道对象(即NioSocketChannel对象),消息本身就是通道对象;
[2] 向NioSocketChannel通道设置handler、配置options和attrs,这里的配置来源于ServerBootstrap启动时配置的childHandler, childOptions和childAttrs.
[3] 将NioSocketChannel通道注册到childGroup(NioEventLoopGroup)上,对应传递给ServerBootstrap的workerGroup线程池.
其中:步骤[3]中向NioEventLoopGroup注册NioSocketChannel,即从NioEventLoopGroup选择出一个NioEventLoop,使用NioEventLoop注册NioSocketChannel,这一部分已在Netty系列-2 NioServerSocketChannel和NioSocketChannel介绍中介绍过,不再赘述。有个细节需要注意:向选择器注册NioServerSocketChannel时,attachment是NioServerSocketChannel对象;而向选择器注册NioSocketChannel时,attachment是NioSocketChannel对象。

2.处理消息

当通道有消息可读时,NioEventLoop线程从阻塞中唤醒并处理SelectionKey, 流程与NIO相似。详细流程如下所示:
在这里插入图片描述

2.1 可读事件

有可读事件到达时,workerGroup中的某个NioEventLoop将(从select阻塞中)被唤醒,调用processSelectedKeys方法处理就绪的事件。

private void processSelectedKeys() {
	//...
	processSelectedKeysOptimized();
}

private void processSelectedKeysOptimized() {
	for (int i = 0; i < selectedKeys.size; ++i) {
		final SelectionKey k = selectedKeys.keys[i];
		selectedKeys.keys[i] = null;
		final Object a = k.attachment();
		//...
		processSelectedKey(k, (AbstractNioChannel) a);
		//...

	}
}

遍历已就绪的事件,调用processSelectedKey处理。
注意:这里从selectedKey取出的attachment对象是NioSocketChannel通道。
继续跟进processSelectedKey方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

	try {
		int readyOps = k.readyOps();
        //... 省略其他分支:SelectionKey.OP_CONNECT和readyOps & SelectionKey.OP_WRITE

		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
}

此时,readyOps为SelectionKey.OP_READ,调用unsafe.read()处理消息。

public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	//...
	do {
		byteBuf = allocHandle.allocate(allocator);
		// 读取数据,存入ByteBuf对象
		allocHandle.lastBytesRead(doReadBytes(byteBuf));
		if (allocHandle.lastBytesRead() <= 0) {
			byteBuf.release();
			byteBuf = null;
			close = allocHandle.lastBytesRead() < 0;
			if (close) {
				readPending = false;
			}
			break;
		}

		allocHandle.incMessagesRead(1);
		readPending = false;
		// 读取后,向Pipeline触发ChannelRead事件
		pipeline.fireChannelRead(byteBuf);
		byteBuf = null;
	} while (allocHandle.continueReading());
	//...
	// 消息处理完,向pipeline触发ChannelReadComplete事件
	pipeline.fireChannelReadComplete();
}

逻辑较为清晰:持续调用doReadBytes读取数据至ByteBuf对象中,并将每个读取的ByteBuf以channelRead事件提交到pipeline,全部消息处理完成后,向pipeline提交ChannelReadComplete事件。
这里有个细节需要关注一下,doReadBytes内部使用NIO中的通道将数据流写入到ByteBuf中:

public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    try {
        return in.read(internalNioBuffer(index, length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

2.2 pipeline处理数据

当数据沿着NioSocketChannel通道的Pipeline传输时,从左到右顺序如下:
在这里插入图片描述
Bytebuf类型的消息将沿着解码器Handler->业务Handler->编码器Handler->…->TailContext的顺序处理。
其中HeadContext和TailContext由框架携带,其他Handler由用户根据业务需要开发和引入。
因此,业务Handler中未定义加码器时,第一个处理可读消息的Handler的消息为Bytebuf类型。另外,消息外发时,也需要将业务对象转为Bytebuf类型,才可以正常发出(编码器)。

2.4 扩展

编解码器内容将在Netty系列-7 Netty编解码器中详细介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2183131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟机、ubantu不能连接网络,解决办法

虚拟机、ubantu不能连接网络&#xff0c;解决办法 物理机OS&#xff1a; [Windows10 专业版](https://so.csdn.net/so/search?qWindows10 专业版&spm1001.2101.3001.7020) 虚拟机平台&#xff1a; VMware Workstation 16 Pro 虚拟机OS&#xff1a; Ubuntu 18.04 自动配…

英语音标与重弱读

英语中&#xff0c;比较重要的是音标。但事实上&#xff0c;我们对音标的学习还是比较少的&#xff0c;对它的理解也是比较少的。 一、音标 2个半元音 [w][j] 5个长元音&#xff1a;[i:] [ə:] [ɔ:] [u:] [ɑ:] 7个短元音&#xff1a;[i] [ə] [ɔ] [u] [] [e] [ʌ] 8个双元音…

车辆重识别(2020NIPS去噪扩散概率模型)论文阅读2024/9/27

[2] Denoising Diffusion Probabilistic Models 作者&#xff1a;Jonathan Ho Ajay Jain Pieter Abbeel 单位&#xff1a;加州大学伯克利分校 摘要&#xff1a; 我们提出了高质量的图像合成结果使用扩散概率模型&#xff0c;一类潜变量模型从非平衡热力学的考虑启发。我们的最…

【mmengine】配置器(config)(入门)读取与使用

一、 介绍 MMEngine 实现了抽象的配置类&#xff08;Config&#xff09;&#xff0c;为用户提供统一的配置访问接口。 配置类能够支持不同格式的配置文件&#xff0c;包括 python&#xff0c;json&#xff0c;yaml&#xff0c;用户可以根据需求选择自己偏好的格式。 配置类提供…

leetcode力扣刷题系列——【座位预约管理系统】

题目 请你设计一个管理 n 个座位预约的系统&#xff0c;座位编号从 1 到 n 。 请你实现 SeatManager 类&#xff1a; SeatManager(int n) 初始化一个 SeatManager 对象&#xff0c;它管理从 1 到 n 编号的 n 个座位。所有座位初始都是可预约的。 int reserve() 返回可以预约座…

单调队列应用介绍

单调队列应用介绍 定义应用场景实现模板具体示例滑动窗口最大值问题描述问题分析代码实现带限制的子序列和问题描述问题分析代码实现跳跃游戏问题描述问题分析代码实现定义 队列(Queue)是另一种操作受限的线性表,只允许元素从队列的一端进,另一端出,具有先进先出(FIFO)的特…

系统信息规划-系统架构师(七十四)

1前驱图 解析&#xff1a; 当S1执行完&#xff0c;C1S2并行执行&#xff0c;C1和S2执行完&#xff0c;P1,C2,S3并行执行&#xff0c;同理&#xff0c;P2C3并行执行。 直接制约则表示C1和P1受S1制约。 间接则代表S2和S3受S1制约。 2系统移植也是系统构建的一种实现方…

学习记录:js算法(五十一):统计二叉树中好节点的数目

文章目录 统计二叉树中好节点的数目网上思路 总结 统计二叉树中好节点的数目 给你一棵根为 root 的二叉树&#xff0c;请你返回二叉树中好节点的数目。 「好节点」X 定义为&#xff1a;从根到该节点 X 所经过的节点中&#xff0c;没有任何节点的值大于 X 的值。 图一&#xff1…

长江存储致态TiPlus7100 4TB满盘读写测试:性能几乎没有下降

一、前言&#xff1a;看看满盘状态下致态TiPlus7100 4TB性能会如何&#xff01; 现在还有很多同学对于长江存储品牌的存储产品不太信任&#xff0c;在选择SSD时会优先考虑三星、西数这样的品牌。 有鉴于此&#xff0c;我们此次会将手上的长江存储致态TiPlus7100 4TB SSD进行更严…

【STM32单片机_(HAL库)】4-2-1【定时器TIM】定时器输出PWM实现呼吸灯实验

1.硬件 STM32单片机最小系统LED灯模块 2.软件 pwm驱动文件添加定时器HAL驱动层文件添加GPIO常用函数定时器输出PWM配置步骤main.c程序 #include "sys.h" #include "delay.h" #include "led.h" #include "pwm.h"int main(void) {HA…

音视频入门基础:FLV专题(10)——Script Tag实例分析

一、引言 在《音视频入门基础&#xff1a;FLV专题&#xff08;9&#xff09;——Script Tag简介》中对FLV文件的Script Tag进行了简介。下面用一个具体的例子来对Script Tag进行分析。 二、Script Tag的Tag header实例分析 用notepad打开《音视频入门基础&#xff1a;FLV专题…

鸿蒙跨端实践-JS虚拟机架构实现

作者&#xff1a;京东科技 杜强强 前言 在Roma跨端方案中&#xff0c;JS虚拟机是框架的核心&#xff0c;负责执行动态化的JS代码。在Android平台采用了基于V8的J2V8&#xff0c;iOS平台则使用了系统自带的JSCore&#xff0c;而在HarmonyOS中&#xff0c;由于业界无类似的框架&a…

C++11_左值引用与右值引用

在C11之前&#xff0c;是没有右值引用的概念的&#xff0c;在C11之后才新增了右值引用。其实无论是左值引用还是右值引用都是给对象取别名。 认识左值和右值 什么是左值&#xff1f; 左值是一个表示数据的表达式(如变量名或解引用的指针)&#xff0c;我们可以获取它的地址可…

YOLOv11改进策略【损失函数篇】| Shape-IoU:考虑边界框形状和尺度的更精确度量

一、本文介绍 本文记录的是改进YOLOv11的损失函数&#xff0c;将其替换成Shape-IoU。现有边界框回归方法通常考虑真实GT&#xff08;Ground Truth&#xff09;框与预测框之间的几何关系&#xff0c;通过边界框的相对位置和形状计算损失&#xff0c;但忽略了边界框本身的形状和…

PV大题--专题突破

写在前面&#xff1a; PV大题考查使用伪代码控制进程之间的同步互斥关系&#xff0c;它需要我们一定的代码分析能力&#xff0c;算法设计能力&#xff0c;有时候会给你一段伪代码让你补全使用信号量控制的操作&#xff0c;请一定不要相信某些人告诉你只要背一个什么模板&#…

Java线程入门

目录 一.线程相关概念 1.程序&#xff08;program&#xff09; 2.进程 3.线程 4.其他相关概念 二.线程的创建 1.继承Thread 2.Runnable接口 3.多线程机制&#xff08;重要&#xff09; 4.start() 三.线程终止--通知 四.线程&#xff08;Thread&#xff09;方法 1.常…

fastAPI教程:数据库操作

FastAPI 六、数据库操作 FastAPI支持操作各种数据库&#xff0c;但本身并没有内置关于任何数据库相关的模块。因此我们可以根据需求使用任何数据库&#xff0c;包括关系型&#xff08;SQL&#xff09;数据库&#xff0c;例如&#xff1a;PostgreSQL、MySQL、SQLite、Oracle、…

【AGC005D】~K Perm Counting(计数抽象成图)

容斥原理。 求出f(m) &#xff0c;f(m)指代至少有m个位置不合法的方案数。 怎么求&#xff1f; 注意到位置为id&#xff0c;权值为v ,不合法的情况&#xff0c;当且仅当 v idk或 v id-k 因此&#xff0c;我们把每一个位置和权值抽象成点 &#xff0c;不合法的情况之间连一…

【JVM】基础篇

1 初识JVM 1.1 什么是JVM JVM 全称是 Java Virtual Machine&#xff0c;中文译名 Java虚拟机。JVM 本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件。 Java源代码执行流程如下&#xff1a; 分为三个步骤&#xff1a; 1、编写Java源代码文件。 …