目录
Netty开发【NIO核心组件】
1.NIO基础概念
2.NIO核心组件
2.1.Channel&&Buffer简介
2.2.Selector
服务器的多线程版本
服务器的线程池版本
服务器的selector版本
2.3.Buffer
0.ByteBuffer的正确使用流程
1.ByteBuffer类型简介
2.ByteBuffer核心属性说明
3.ByteBuffer核心方法
4.代码示例说明
0.maven配置
1.缓冲区初始化
2.position、limit、flip()、clear()、mark()和reset()、compact()、hasRemaining() 方法详解
5.数据的粘包与包
5.1.什么是粘包和半包?
5.2.粘包和半包如何解决?
小结
我是山茶君, 想了解更多内容,微信搜索【nlefer】,关注我,你懂得越多,就明白不懂的越多
1.NIO基础概念
NIO(Non-Blocking IO 也称为New IO),JDK提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,被统称为NIO(Non-Blocking IO 也称为New IO),是同步非阻塞的
2.NIO核心组件
2.1.Channel&&Buffer简介
Channel是一种管道,类似于stream,可以用来数据的传输。但Channel是基于Buffer缓冲区的异步的双向的数据读写通道,即可以从Buffer中读取数据,也可以向Buffer中写入数据。
常见的Channel有
● FileChannel
● DatagramChannel
● SocketChannel
● ServerSocketChannel
其中SocketChannel和ServerSocketChannel可以被使用在服务端与客户端的通信。
Buffer被用作为缓冲区存储数据,读写数据发生的位置,常见的有:
● ByteBuffer
○ MappedByteBuffer
○ DirectByteBuffer
○ HeapByteBuffer
● ShortBuffer
● IntBuffer
● LongBuffer
● FloatBuffer
● DoubleBuffer
● CharBuffer
2.2.Selector
- Selector 为IO多路复用选择器,轮询检测注册在selector上的服务端状态,如果有事件发生,则获取对应的事件,针对事件的不同进行分类别的处理。
- 也就是说可以使用一个线程管理多个Channel,也就是管理多个请求与连接。
- Selector 维护了三个set集合,主要展现为整体步骤中的连接存储、获取连接通道信息、断开连接响应(也就是断开监控)
- key set :通道注册时,使用register()方法将对应的cahnnel信息添加到SelectionKey中;
- selected key set :在做轮询检测时,通过遍历Set<SelectionKey> selectionKeys = selector.selectedKeys(); 获取相应的key也就是存储的channel信息
- canneled key set:客户端主动断开连接或以外断开连接时,该事件已经不存在了,但在服务端的key集合中依旧存在,下次检测的时候会因此而出现问题,所以,需要通过判断客户端发送事件的返回值来确定是否将对应的key的SelectionKey从key set中移除, 将关联channel丢弃掉,不再进行监听;
- 通过服务器的设计演变进化,可以进一步了解Selector的功能以及作用
服务器的多线程版本
多线程操作,既使用了一个连接,那就开一个线程去处理这个连接的所有事物,当连接没有事件发生的时候,对应的线程就等待着,一直等到这个连接有事情发生或者是连接断开。可以类比为餐厅服务员与顾客的关系,一个服务员专门服务于一个顾客。
缺点与不足:
- 当连接多了的时候,开的线程数量较多,会占用较大的内存
- 线程的上下文切换成本比较高(一个线程被暂停,另一个线程被操作系统选择选中开始执行的过程就叫做上下文切换)
- 场景使用仅适用于连接较少的情况
服务器的线程池版本
针对于上述多线程版本的缺点,做了进一步的改进。较多线程版本资源使用有所降低,但是随之而来的是性能的问题。(就好比一个餐厅服务员耳听八方,眼观四路一样,服务于多个顾客,但是当顾客同事发起请求的时候,服务生就只能处理一个请求事件,就会造成阻塞的情况发生。因此才会继续诞生selector模式,线程池中设定固定的线程数)。
缺点与不足:
- 阻塞模式下,线程仅能处理一个socket连接(处理scoket1的时候就不能处理scoket3,只有等代1处理完断开连接后才能处理3)
- 仅适合短连接(避免线程处理一个socket,造成阻塞,导致下一个socket无法被处理)
服务器的selector版本
如图,依旧以餐厅为例,露天餐厅经历了前两次的失败,进化了成了一个小馆子。channel就是我们的多个顾客,selecor就好比一个监控视频头,能够通知thread服务员。当有channel发生事件时,seletcor会将对应事件通知到服务员(thread)去进行处理。处理完该顾客后,继续等待事件的请求,不会阻塞。与线程池版不同的是,channel是工作在非阻塞环境下的,因此selector的效率有较大提高。
缺点与不足:
- 如果当前请求和事务的量级较大,线程就会一直处理该事务,其他的channel请求也会排队等待处理。
2.3.Buffer
ByteBuffer为NIO中的字节缓冲区,相对于BIO的Stream流只支持写入或者读取单向操作,ByteBuffer是双向的,支持读和写。
0.ByteBuffer的正确使用流程
- 向buffer中写入数据
-
- channel的read()方法
- buffer自身的put方法
- 调用filp()方法切换为读取数据模式
- 从buffer中读取数据
-
- channel的write()方法
- buffer的get()方法
- 调用clear()或compact()切换回写模式继续写入数据
- 重复以上步骤
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 写入数据到buffer;两种方式均可以
int readByte = channel.read(byteBuffer);
byteBuffer.put((byte) 0x61);
// 切换为读数据模式
byteBuffer.filp();
// 写入数据到buffer;两种方式均可以
int writeByte = channel.write(byteBuffer);
byte b = byteBuffer.get();
1.ByteBuffer类型简介
类型 | 简介 | 方法 |
DirectByteBuffer | 使用的是操作系统级别的内存,分配比较慢,但是数据的读写比较快,因为少了一次从系统内存到JVM内存的复制过程 | 初始化方法ByteBuffer.allocateDirect(1024 * 4); |
HeapByteBuffer | 使用的是JVM的堆内存,对于JVM来说,分配比较快,但是读写比较慢,因为需要将操作系统内存里的数据复制到JVM内存,且java的gc会对其有影响,因为gc会对内存模块进行整理 | 初始化方法 ByteBuffer.allocate(1024 * 4); |
2.ByteBuffer核心属性说明
属性名称 | 属性说明 |
capacity | ByteBuffer的容量,这个值在ByteBuffer初始化的时候就确定下来了。不论是在读还是在写模式下,这个值都不变 |
position | 写模式:该值表示当前写到了ByteBuffer的哪个位置,ByteBuffer初始化时,这个值为0。position的最大值为capacity-1 读模式:当从写模式切换到读模式,会将position重置为0,即从ByteBuffer的起始位置开始读取数据 |
limit | 写模式:limit为最大可写入的数据量,即ByteBuffer的最大容量,值为capacity 读模式:当从写模式切换从读模式,limit将会被设置为读模式下的position值,即可读取的最大数据量 |
3.ByteBuffer核心方法
方法 | 详情 |
flip() | 将写模式切换为读模式,会触发的对核心属性的操作:
|
clear() | 在逻辑上清空ByteBuffer里的数据,实际上不清空数据会触发的动作:
|
mark() | 标记当前position位置【结合reset()使用】 |
reset() | 将position指向上一次mark()所指向的位置,可以从这个位置重复向下读取数据 |
compact() | 如果并未读取完ByteBuffer中的数据,调用compact()会将position~limit之间的数据拷贝到ByteBuffer的起始处,并且position为剩余数据量的大小,下次再往ByteBuffer中写入数据时,将在position位置继续往下写,不会覆盖历史数据。 |
hasRemaining() | 判断缓冲区中是否还有未读数据 |
写入ByteBuffer | byteBuffer.put(x) channel.read(byteBuffer)
|
读取ByteBuffer | byteBuffer.get() channel.write(bytebuffer)
|
4.代码示例说明
0.maven配置
测试时需要创建一个maven项目,增加配置信息
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.3</version>
</dependency>
</dependencies>
1.缓冲区初始化
public static void main(String[] args) {
// 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 2.向缓冲区写入一个数字a,占一个字节
byteBuffer.put((byte) 0x61);
// 3.打印结果
debugAll(byteBuffer);
}
只进行了写入的操作,目前position的位置是处于写入数据的当前位置1,limit的位置是出于初始化容量大小的位置
2.position、limit、flip()、clear()、mark()和reset()、compact()、hasRemaining() 方法详解
在Bytebuffer流程中有一个读取数据,使用get()方法会使得position位置后移,limit位置不会发生变化
public static void main(String[] args) {
try {
// 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 2.向缓冲区写入一个数字a,占一个字节
byteBuffer.put((byte) 0x61);
byteBuffer.put((byte) 0x62);
byteBuffer.put((byte) 0x63);
byteBuffer.put((byte) 0x64);
// 3.打印结果
debugAll(byteBuffer);
// 4.切换为读取操作
byteBuffer.flip();
// 5.读取数据
while(byteBuffer.hasRemaining()){ // 6. 判断是否还有剩余未读取的数据
byte b = byteBuffer.get();
System.out.println("对应数据是:"+(char)b);
}
debugAll(byteBuffer);
// 7. 数据切换为写模式,clear()方式
byteBuffer.clear();
debugAll(byteBuffer);
// 8.写入数据到缓冲区,使用channel方法,然后重复切换操作
FileChannel channel = new FileInputStream("data.txt").getChannel();
channel.read(byteBuffer);
byteBuffer.flip();
while(byteBuffer.hasRemaining()){ // 9. 判断是否还有剩余未读取的数据
byte b1 = byteBuffer.get();
System.out.println("新的对应数据是:"+(char)b1);
debugAll(byteBuffer);
}
debugAll(byteBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
从图中可知,在向缓冲区塞入数据且未开始切换为读模式时,从第一步到第七步,position和limit的位置分别从开始未读取的4和10变化为读取后的4和4,再次切换为写入数据,直接将position和limit的位置切换为0和10,既覆盖模式,从零开始,将上一次的数据覆盖掉。
为了避免出现未读完数据被覆盖的情况,使用compact()方法可以直接继续上一次的数据后面继续写入,如图所示,未读完的数据bcd被移至前面,然后继续写入数据
public static void main(String[] args) {
try {
// 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 2.向缓冲区写入一个数字a,占一个字节
byteBuffer.put((byte) 0x61);
byteBuffer.put((byte) 0x62);
byteBuffer.put((byte) 0x63);
byteBuffer.put((byte) 0x64);
// 3.打印结果
debugAll(byteBuffer);
// 4.切换为读取操作
byteBuffer.flip();
// 5.0 单次读取数据
System.out.println("单次读取数据:"+byteBuffer.get());
debugAll(byteBuffer);
// 6.避免数据出现覆盖的情况,使用compact()方法操作
byteBuffer.compact();
byteBuffer.put((byte) 0x66);
byteBuffer.put((byte) 0x67);
byteBuffer.put((byte) 0x68);
debugAll(byteBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
mark和reset()方法联合使用的方法示例
public static void main(String[] args) {
try {
// 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 2.向缓冲区写入一个数字a,占一个字节
byteBuffer.put((byte) 0x61);
byteBuffer.put((byte) 0x62);
byteBuffer.put((byte) 0x63);
byteBuffer.put((byte) 0x64);
// 3.打印结果
debugAll(byteBuffer);
// 4.切换为读取操作
byteBuffer.flip();
// 5.0 单次读取数据
System.out.println("第一次读取数据:"+byteBuffer.get());
byteBuffer.mark();
debugAll(byteBuffer);
System.out.println("第二次读取数据:"+byteBuffer.get());
debugAll(byteBuffer);
byteBuffer.reset();
debugAll(byteBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
在字符a的做了标记,当读完b的时候,选择返回到a的位置,从新读取数据,实现了反复读取数据的需求与操作,具体看下图position和limit的位置
5.数据的粘包与包
5.1.什么是粘包和半包?
粘包:在缓冲区有足够大的空间时,且存在多条数据同时存储到缓冲区内,且无按照一定的分割符进行分割,那么在读取数据时,多条数据就会粘连到一起,这种现象就较多粘包。比如分别传入“9000000”、“世界你好”,可能读取的数据就会变为9000000世界你好,
半包:在缓冲区数据被读取时,无法解析成一句完整的数据会出现乱码等现象,这就是半包。例如,缓冲区是4个字节,我存储“中国”,在读取数据的时候读到的是“中*”,“中国”是6个字节,缓冲区较小存不下,国子无法被正常解析。
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I'm zhangsan\n
How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
Hello,world\nI'm zhangsan\nHo
w are you?\n
5.2.粘包和半包如何解决?
通过对"1.0粘包、半包示例"解决来学习如何解决对应的粘包以及半包,用以下代码来解析内容
public static void main(String[] args) {
// 1.创建对应的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
byteBuffer.put("Hello,world\nI'm zhangsan\nHo".getBytes());
splitbyteBuffer(byteBuffer);
byteBuffer.put("w are you?\n".getBytes());
splitbyteBuffer(byteBuffer);
}
public static void splitbyteBuffer(ByteBuffer source){
// 1.先写模式切换,既读模式和写模式
source.flip();
// 2.遍历数据
for (int i = 0;i < source.limit();i++){
// 3。找到完整的一条数据
if (source.get(i) == '\n') {
// 4. 根据数据长度开辟缓冲区空间大小
int length = i + 1 - source.position();
ByteBuffer byteBufferCatch = ByteBuffer.allocate(length);
// 5.遍历获取的数据,写入到新的缓冲区中
for (int j = 0;j < length;j++){
byteBufferCatch.put(source.get());
}
// 6.展示完整数据,既就是通过读物的数据
debugAll(byteBufferCatch);
}
}
source.compact();
}
分散集中写的方式就不继续介绍,可以在网络上 查询相应的一些资料。
小结
ByteBuffer在编写过程中不仅仅只有以上的单个方法的使用,可能会涉及到多个场景的应用,及多个数据场景结合使用,因此在使用的时候,需要根据具体业务具体操作。