本节我们看看Netty的传输(全是干货,自带水杯)
- 一、Java的NIO和OIO
- 1.1 OIO
- 1.2 NIO
- 二、Netty的NIO和OIO
- 2.1 OIO
- 2.2 NIO
- 三、传输API
- 四、内置的传输
- 4.1 NIO
- 4.2 Epoll—用于 Linux 的本地非阻塞传输
- 4.3 OIO
- 4.4 用于 JVM 内部通信的 Local 传输
- 4.5 Embedded 传输
- 五、传输的用例
一、Java的NIO和OIO
流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的网络传输。
1.1 OIO
我们先来看一段Java的阻塞应用程序程序:
package com.example.javademo;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
/**
* @author lhd
* @date 2023/05/17 13:19
* @notes java oio演示
*/
public class PlainOioServer {
public void serve(int port) throws IOException {
//绑定服务器到指定端口
final ServerSocket socket = new ServerSocket(port);
try {
for (;;) {
//接受连接
final Socket clientSocket = socket.accept();
System.out.println(
"Accepted connection from " + clientSocket);
//创建一个新的线程来处理该连接
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
//将消息写给新连接的客户端
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
out.flush();
//关闭连接
clientSocket.close();
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
clientSocket.close();
}
catch (IOException ex) {
// 关闭时忽略
}
}
}
}).start(); //启动线程
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
这段代码完全可以处理中等数量的并发客户端。但是随着应用程序变得流行起来,你会发现它并不能很好地伸缩到支撑成千上万的并发连入连接。
1.2 NIO
同样,来看一段Java的非阻塞应用程序代码:
package com.example.javademo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author lhd
* @date 2023/05/17 13:27
* @notes java NIO演示
*/
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
//将服务绑定到端口
InetSocketAddress address = new InetSocketAddress(port);
ssocket.bind(address);
//打开选择器来处理Channel
Selector selector = Selector.open();
//将serverChannel注册到选择器用来接收连接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
//等待需要处理的新事件;阻塞 将一直持续到下一个传入事件
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
//获取所有接收事件的SelectionKey 实例
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
//检查事件是否是一个新的已经就绪可以被接受的连接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
//接受客户端,并将它注册到选择器
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
//检查套接字是否已经准备好写数据
if (key.isWritable()) {
SocketChannel client = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
//将数据写到已连接的客户端
if (client.write(buffer) == 0) {
break;
}
}
//关闭连接
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
可以看出,两段Java的OIO和NIO都做了相同的事:连接客户端,并发送“Hi”。区别在于一个是阻塞的,另一个是非阻塞的,但两段代码却完全不同。如果为了用于非阻塞 I/O 而重新实现这个简单的应用程序,都需要一次完全的重写的话,那么不难想象,移植真正复杂的应用程序需要付出什么样的努力。
二、Netty的NIO和OIO
下来我们来看以下Netty 实现该应用程序将会是什么样子。
2.1 OIO
来看一段Netty的阻塞代码
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @author lhd
* @date 2023/05/17 13:47
* @notes Netty 阻塞程序演示
* ps:Netty已启用OIO传输,使用 NIO / EPOLL / KQUEUE 传输。
*/
public class NettyOioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", StandardCharsets.UTF_8));
EventLoopGroup group = new OioEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用 OioEventLoopGroup以允许阻塞模式
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){
//添加一个 ChannelInboundHandlerAdapter 以拦截和处理事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//将消息写到客户端,并添加 ChannelFutureListener,以便消息一被写完就关闭连接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
//释放所有的资源
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
ps:这段代码和我们上一篇文章中的有所区别,不要搞混淆了
2.2 NIO
我们再来看看Netty的非阻塞代码:
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @author lhd
* @date 2023/05/17 14:05
* @notes Netty 非阻塞代码演示
*/
public class NettyNioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", StandardCharsets.UTF_8));
EventLoopGroup group = new NioEventLoopGroup(); // 1
try {
ServerBootstrap b = new ServerBootstrap();
//为非阻塞模式使用NioEventLoopGroup
b.group(group).channel(NioServerSocketChannel.class) // 2
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//添加 ChannelInboundHandlerAdapter 以接收和处理事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
}
可以看到,Netty的阻塞代码和非阻塞代码只有两处不同(代码中1 2 处)。这使得Netty的阻塞和非阻塞切换非常容易,Netty 为每种传输的实现都暴露了相同的 API,所以无论选用哪一种传输的实现,你的代码都仍然几乎不受影响。在所有的情况下,传输的实现都依赖于 interface Channel、ChannelPipeline 和 ChannelHandler。
三、传输API
传输 API 的核心是 interface Channel,它被用于所有的 I/O 操作。下图是它的层次结构:
如图所示,每个 Channel 都将会被分配一个 ChannelPipeline和ChannelConfig。ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个 ChannelConfig 的子类型。
由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。
ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实例,这些 ChannelHandler 实现了应用程序用于处理状态变化以及数据处理的逻辑。
ChannelHandler 的典型用途包括:
- 将数据从一种格式转换为另一种格式;
- 提供异常的通知;
- 提供 Channel 变为活动的或者非活动的通知;
- 提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知;
- 提供有关用户自定义事件的通知。
ps:ChannelPipeline 实现了一种常见的设计模式—拦截过滤器(InterceptingFilter)。UNIX 管道是另外一个熟悉的例子:多个命令被链接在一起,其中一个命令的输出端将连接到命令行中下一个命令的输入端。
也可以根据需要通过添加或者移除ChannelHandler实例来修改ChannelPipeline。
通过利用Netty的这项能力可以构建出高度灵活的应用程序。例如,STARTTLS方法名协议被请求时,你可以简单地通过 向 ChannelPipeline 添 加 一个适当的 ChannelHandler(SslHandler)来按需地支持STARTTLS协议。除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel的其他方法。
比较重要的Channel方法如下:
方法名 | 描述 |
---|---|
eventLoop | 返回分配给 Channel 的 EventLoop |
pipeline | 返回分配给 Channel 的 ChannelPipeline |
isActive | 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的 |
localAddress | 返回本地的 SokcetAddress |
remoteAddress | 返回远程的 SocketAddress |
write | 将数据写到远程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷 |
flush | 将之前已写的数据冲刷到底层传输,如一个 SocketwriteAndFlush 一个简便的方法,等同于调用 write()并接着调用 flush() |
Netty 所提供的广泛功能只依赖于少量的接口,所以我们可以对我们的应用程序逻辑进行重大的修改,而又无需大规模地重构你的代码库。
下面是一个写数据并将其冲刷到远程节点的例子。
Channel channel = ...
//创建要发送的数据
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
//写数据并冲刷它
ChannelFuture cf = channel.writeAndFlush(buf);
//添加监听,以便完后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
//如果成功,在控制台打印成功
if (future.isSuccess()) {
System.out.println("Write successful");
} else {
//错误则打印错误和堆栈跟踪
System.err.println("Write error");
future.cause().printStackTrace();
}
}
});
ps:Netty 的 Channel 实现是线程安全的,因此你可以存储一个到 Channel 的引用,并且每当你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。
上一篇博文说过,一个Channel对应一个eventLoop,一个eventLoop持有一个线程却不一定只持有一个Channel。下面展示多个线程使用同一个Channel。
final Channel channel = ...
//创建数据
final ByteBuf buf = Unpooled.copiedBuffer("your data",
CharsetUtil.UTF_8).retain();
//创建写数据的Runable
Runnable writer = new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buf.duplicate());
}
};
//获取线程池的引用
Executor executor = Executors.newCachedThreadPool();
//递交任务给线程池,以便某个线程调用
executor.execute(writer);
//递交另一个写任务以便在另一个线程中执行
executor.execute(writer);
...
这些消息将会保证按照顺序发送~
四、内置的传输
Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以你必须选择一个和你的应用程序所使用的协议相容的传输。
下面是一些Netty提供的传输。
名 称 | 包 | 描 述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作为基础——基于选择器的方式 |
Epoll | io.netty.channel.epoll | 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持只有在Linux上可用的多种特性,如SO_REUSEPORT,比 NIO 传输更快,而且是完全非阻塞的 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作为基础使用阻塞流 |
Local | io.netty.channel.local | 可以在 VM 内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的ChannelHandler 实现时非常有用 |
之后我们回详细介绍这些传输。
ps:Epoll这个是 Netty 特有的实现,更加适配 Netty 现有的线程模型,具有更高的性能以及更低的垃圾回收压力
4.1 NIO
NIO 提供了一个所有 I/O 操作的全异步的实现。它利用了自 NIO 子系统被引入JDK 1.4 时便可用的基于选择器的 API。
选择器背后的基本概念是充当一个注册表,在那里你将可以请求在 Channel 的状态发生变化时得到通知。可能的状态变化有:
- 新的 Channel 已被接受并且就绪;
- Channel 连接已经完成;
- Channel 有已经就绪的可供读取的数据;
- Channel 可用于写数据。
选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的改变做出响应之后,选择器将会被重置,并将重复这个过程。
下表中的常量值代表了由class java.nio.channels.SelectionKey定义的位模式。
这些位模式可以组合起来定义一组应用程序正在请求通知的状态变化集。
名称 | 描述 |
---|---|
OP_ACCEPT | 请求在接受新连接并创建 Channel 时获得通知 |
OP_CONNECT | 请求在建立一个连接时获得通知 |
OP_READ | 请求当数据已经就绪,可以从 Channel 中读取时获得通知 |
OP_WRITE | 请求当可以向 Channel 中写更多的数据时获得通知。这处理了套接字缓冲区被完全填满时的情况,这种情况通常发生在数据的发送速度比远程节点可处理的速度更快的时候 |
内置传输的处理流程图:
ps:零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,其在像 FTP 或者HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。反过来说,传输已被加密的文件则不是问题
4.2 Epoll—用于 Linux 的本地非阻塞传输
Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。虽然这保证了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相应的限制,因为 JDK为了在所有系统上提供相同的功能,必须做出妥协。
Netty为Linux提供了一组NIO API,其以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。如果你的应用程序旨在运行于Linux系统,那么请考虑利用这个版本的传输;你将发现在高负载下它的性能要优于JDK的NIO实现。
这个传输的语义和上一节内置传输的处理流程图 所示的完全相同,而且它的用法也是简单直接的。相关示例参照上面的Netty NIO代码。如果要在那个代码中使用 epoll 替代 NIO,只需要将 NioEventLoopGroup替换为EpollEventLoopGroup ,并且将 NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可。
4.3 OIO
这个可以参照Netty的OIO代码即可,最新的Netty中抛弃了OIO的传输方式~
下图是OIO的传输处理逻辑:
4.4 用于 JVM 内部通信的 Local 传输
Netty 提供了一个 Local 传输,用于在同一个 JVM 中运行的客户端和服务器程序之间的异步通信。同样,这个传输也支持对于所有 Netty 传输实现都共同的 API。
在这个传输中,和服务器 Channel 相关联的 SocketAddress 并没有绑定物理网络地址;相反,只要服务器还在运行,它就会被存储在注册表里,并在 Channel 关闭时注销。因为这个传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,客户端希望连接到(在同一个 JVM 中)使用了这个传输的服务器端时也必须使用它。除了这个限制,它的使用方式和其他的传输一模一样。
4.5 Embedded 传输
Netty 提供了一种额外的传输,使得你可以将一组 ChannelHandler 作为帮助器类嵌入到其他的 ChannelHandler 内部。通过这种方式,你将可以扩展一个ChannelHandler 的功能,而又不需要修改其内部代码。
Embedded 传输的关键是一个被称为 EmbeddedChannel 的具体的 Channel
实现。我将在之后的文章中为大家演示这种方式。
五、传输的用例
Netty支持的传输和网络协议:
传 输 | TCP | UDP | SCTP | UDT |
---|---|---|---|---|
NIO | × | × | × | × |
Epoll(仅 Linux) | × | × | - | - |
OIO | × | × | × | × |
PS:UDT 协议实现了基于 UDP 协议的可靠传输;
在 Linux 上启用 SCTP
SCTP 需要内核的支持,并且需要安装用户库。
例如,对于 Ubuntu,可以使用下面的命令:# sudo apt-get install libsctp1
对于 Fedora,可以使用 yum:#sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64
虽然只有SCTP传输有这些特殊要求,但是其他传输可能也有它们自己的配置选项需要考虑。此外,如果只是为了支持更高的并发连接数,服务器平台可能需要配置得和客户端不一样。
这里是一些很可能会遇到的用例
- 非阻塞代码库——如果你的代码库中没有阻塞调用(或者你能够限制它们的范围),那么在 Linux 上使用 NIO 或者 epoll 始终是个好主意。虽然 NIO/epoll 旨在处理大量的并发连接,但是在处理较小数目的并发连接时,它也能很好地工作,尤其是考虑到它在连接之间共享线程的方式。
- 阻塞代码库——正如我们已经指出的,如果你的代码库严重地依赖于阻塞 I/O,而且你的应用程序也有一个相应的设计,那么在你尝试将其直接转换为 Netty 的 NIO 传输时,你将可能会遇到和阻塞操作相关的问题。不要为此而重写你的代码,可以考虑分阶段迁移:先从OIO 开始,等你的代码修改好之后,再迁移到 NIO(或者使用 epoll,如果你在使用 Linux)。
- 在同一个 JVM 内部的通信——在同一个 JVM 内部的通信,不需要通过网络暴露服务,是Local 传输的完美用例。这将消除所有真实网络操作的开销,同时仍然使用你的 Netty 代码库。如果随后需要通过网络暴露服务,那么你将只需要把传输改为 NIO 或者 OIO 即可。
- 测试你的 ChannelHandler 实现——如果你想要为自己的 ChannelHandler 实现编写单元测试,那么请考虑使用 Embedded 传输。这既便于测试你的代码,而又不需要创建大量的模拟(mock)对象。你的类将仍然符合常规的 API 事件流,保证该 ChannelHandler在和真实的传输一起使用时能够正确地工作。
应用程序的最佳传输建议:
应用程序的需求 | 推荐的传输 |
---|---|
非阻塞代码库或者一个常规的起点 | NIO(或者在 Linux 上使用 epoll) |
阻塞代码库 | OIO |
在同一个 JVM 内部的通信 | Local |
测试 ChannelHandler 的实现 | Embedded |