本篇介绍Netty调优,在上篇聊天室的案例中进行改造,手写一个简单的RPC实现。
1、超时时间参数
CONNECT_TIMEOUT_MILLIS 是Netty的超时时间参数,属于客户端SocketChannel的参数,客户端连接时如果一定时间没有连接上,就会抛出 timeout 异常
如何在代码中添加参数?在new Bootstrap()时使用
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
启动客户端,不启动服务器,发现连接超时
打上断点(选择多线程模式):
这一行获取到的值是创建BootStrap时添加CONNECT_TIMEOUT_MILLIS 的值(300)
int connectTimeoutMillis = config().getConnectTimeoutMillis();
满足条件,进入If块:
这是一个定时任务,延迟CONNECT_TIMEOUT_MILLIS 的值(300)后触发,执行Runnable中的逻辑,抛出超时异常。
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
主线程和NIO线程也是通过connectPromise进行异步通信的,两个线程持有的是同一个connectPromise对象。
2、SO_BACKLOG
SO_BACKLOG是一个与服务器套接字相关的参数,主要用于配置服务器套接字的接受队列大小,属于ServerSocketChannel的参数
什么是套接字?
套接字是计算机网络中的一种通信端点,用于在两个节点之间建立连接并进行数据传输,它包含了IP地址和端口号,通过这两个标识符,网络上的设备可以相互定位和通信。
套接字在客户端和服务器的工作顺序:
服务器端:
- 创建套接字。
- 绑定到指定的IP地址和端口号。
- 监听连接请求。
- 接受连接,创建一个新的套接字用于与客户端通信。
- 读取或写入数据。
客户端:
- 创建套接字。
- 连接到服务器的IP地址和端口号。
- 读取或写入数据。
而SO_BACKLOG参数决定了服务器套接字在操作系统内核中维护的一个挂起连接队列的最大长度。
当一个服务器应用程序启动并监听某个端口时,它会创建一个服务器套接字,用于等待客户端的连接请求。
当客户端尝试连接服务器时,连接请求会首先进入服务器端的一个等待队列,称为挂起连接队列。这个队列中的连接请求还没有被服务器应用程序正式接受处理。
在大多数操作系统中,套接字操作是由操作系统内核负责管理的。内核会为每个监听中的服务器套接字维护一个挂起连接队列。
如果参数设置的如果队列已满,新的连接请求将被拒绝或被操作系统忽略。
假设SO_BACKLOG设置为50,这意味着挂起连接队列的最大长度是50。当第51个连接请求到达时,如果前面的请求还没有被处理,新的请求将被拒绝。
3、ulimit -n
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
ulimit -n控制了操作系统中一个进程可以打开的最大文件描述符(file descriptor)数量。
什么是文件描述符?
文件描述符是操作系统内核用于管理打开文件的一个抽象概念,包括普通文件、套接字、管道等。每个打开的文件、网络连接都会占用一个文件描述符。
为什么要设置最大文件描述符
Netty 是一个高性能的网络框架,设计用于处理大量并发连接。如果文件描述符的限制太低,当连接数超过此限制时,服务器将无法接受新的连接,这将导致连接失败。
4、TCP_NODELAY
TCP_NODELAY 是 TCP 协议中的一个选项,用于控制 Nagle 算法的启用或禁用。
Nagle 算法 在前篇中有所提及,简单的说,当发送方有小数据包要发送时,如果前一个数据包的确认(ACK)尚未收到,Nagle 算法会将这些小数据包暂时存储起来,直到收到前一个数据包的确认或足够多的数据可以组成一个较大的数据包。
可以通过以下的代码设置是否开启Nagle 算法 ,同样地,这个参数属于ServerSocketChannel
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
什么场景下应该禁用Nagle 算法?
- 实时应用:在需要低延迟的实时应用中(例如在线游戏、实时通信应用)。
- 小数据包频繁发送:如果应用程序频繁发送小数据包,并且对每个数据包的发送延迟敏感。
5、SO_SNDBUF & SO_RCVBUF
SO_SNDBUF & SO_RCVBUF 和SO_BACKLOG类似,也是与网络套接字相关的两个重要参数,用于配置发送和接收缓冲区的大小。
发送缓冲区用于临时存储应用程序要发送到网络的数据。
接收缓冲区用于临时存储从网络接收到的数据,直到应用程序读取它们。
缓冲区过大可能增加延迟,因为数据在缓冲区中停留的时间更长;缓冲区过小可能导致频繁的缓冲区溢出和数据包丢失。
可以通过以下的代码进行设置:
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区大小32KB
bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 接收缓冲区大小32KB
如何选择合适的缓冲区大小
- 根据网络带宽和延迟:在高带宽和高延迟的网络环境中,需要更大的缓冲区来充分利用带宽。例如,宽带网络和跨国连接可能需要更大的缓冲区。
- 根据应用需求:不同的应用有不同的需求。实时应用(如视频流和在线游戏)通常需要较小的缓冲区以减少延迟,而大数据传输(如文件下载和大数据处理)可能需要较大的缓冲区以提高吞吐量。
- 测试和调优:最佳的缓冲区大小通常需要通过测试和调优来确定。可以通过逐步调整缓冲区大小并监测网络性能来找到最佳配置。
6、ALLOCATOR
ALLOCATOR 参数用于配置 ByteBuf 分配器,ByteBuf的相关概念在前篇中也提到过,大致可以分为池化和非池化:
- PooledByteBufAllocator:池化分配器,重复使用内存以减少分配和释放内存的开销,适用于高并发和性能敏感的应用。
- UnpooledByteBufAllocator:非池化分配器,每次都进行新的内存分配,适用于内存使用模式不可预测的应用。
可以通过以下代码进行设置:
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用池化分配器
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 使用非池化分配器
// bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
7、RCVBUF_ALLOCATOR
RCVBUF_ALLOCATOR 是一个用于管理接收缓冲区大小的机制,用于确定和管理网络连接上每次读取操作时分配的字节缓冲区的大小。
它是一个接口,常用的实现类有:
- FixedRecvByteBufAllocator:每次读操作分配固定大小的缓冲区。
- DefaultMaxBytesRecvByteBufAllocator:一个可以限制每次读取消息数量的实现。
- AdaptiveRecvByteBufAllocator:根据流量动态调整缓冲区大小,这是最常用的实现之一。
8、RPC简单实现
接下来会通过一个案例实现简单的RPC框架。
什么是RPC框架?
RPC(Remote Procedure Call,远程过程调用)框架是一种使程序能够通过网络调用远程服务器上的函数或方法的技术。
在表面上这种调用方式对用户是透明的,就像调用本地函数一样简单,但实际上底层会通过网络协议进行通信。
8.1、1.0版
首先需要新增RPC的请求和响应消息:
@Data
public abstract class Message implements Serializable {
// 省略旧的代码
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
然后定义一个RPC请求消息类,在请求消息类中,包括了调用接口及接口中方法的信息:
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
再定义一个响应消息类,包括正常返回的值以及发生异常时的返回值。
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
定义一个获取配置文件中接口实现类的工厂类:
public class ServicesFactory {
static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
static {
try (InputStream in = SerializedConfig.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
Class<?> interfaceClass = Class.forName(name);
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}
application.properties
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
HelloService接口及实现类:
public interface HelloService {
String sayHello(String name);
}
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String msg) {
// int i = 1 / 0;
return "你好, " + msg;
}
}
准备RPC服务器端和客户端的代码,和聊天室案例类似,但是加上了对应的RPC请求消息和响应消息的处理器:
服务器端:
/**
*
* RPC服务器端
**/
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端:
/**
*
*RPC 客户端
**/
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
先编写服务器端的自定义RPC消息处理器RpcRequestMessageHandler :
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
RpcResponseMessage responseMessage = new RpcResponseMessage();
int sequenceId = message.getSequenceId();
responseMessage.setSequenceId(sequenceId);
try {
//获取RPC消息对象中将要调用的接口的实现类 写在配置文件中
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
//获取实现类中的方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
//通过反射调用方法
Object result = method.invoke(service, message.getParameterValue());
responseMessage.setReturnValue(result);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
responseMessage.setExceptionValue(e);
}
//触发出站事件
ctx.writeAndFlush(responseMessage);
}
}
通过main方法测试一下:
/**
* 测试代码
* @param args
* @throws ClassNotFoundException
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//封装RPC消息对象
RpcRequestMessage message = new RpcRequestMessage(
1,
"cn.itcast.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"});
//获取RPC消息对象中将要调用的接口的实现类
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
//获取实现类中的方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
//调用方法
Object result = method.invoke(service, message.getParameterValue());
System.out.println(result);
}
编写客户端的代码以及自定义RPC消息返回处理器RpcResponseMessageHandler 暂时只将接收到的消息返回出去:
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
}
}
改造客户端的代码,发送调用方法请求:
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
1,
"cn.itcast.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
)).addListener(promise -> {
if (!promise.isSuccess()) {
Throwable cause = promise.cause();
log.error("error", cause);
}
});
它的执行顺序是:
客户端发送消息,触发所有出站处理器:
然后到服务器:
在RpcRequestMessageHandler 中无论消息处理是否报错,都会触发出站处理器将返回值传递给客户端:
最后再回到客户端:
注意:LOGGING_HANDLER和MESSAGE_CODEC是双向处理,既可以是入站,也可以是出站!
这样一个简单的RPC通信案例就已经实现了。
8.2、2.0版
但是在第一版中,用户在客户端发送调用请求时,需要自己封装RpcRequestMessage 请求对象,参数复杂,换做是我是绝对不愿意这样做的。那么我们对其进行优化。
改造客户端,首先定义一个成员变量channel:
private static volatile Channel channel = null;
然后将原有客户端的代码抽取成一个初始化channel的方法:
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
//双向事件
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
//入站事件
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
这个channel只应该存在一个实例,采用双检锁单例的方式获取:
/**
* 初始化单例channel
* @return
*/
private static Channel getChannel(){
if (channel != null){
return channel;
}
synchronized (LOCK){
if (channel!=null){
return channel;
}
initChannel();
return channel;
}
}
复习一下,为什么要使用双检锁模式?
(成员位置的channel可以不用volatile关键字?此时的channel对象不是走构造方法new出来的)
然后创建一个代理对象,代理对象负责将请求参数打包并发送给远程服务器:
public static <T> T getProxyService(Class<T> serviceClass){
ClassLoader classLoader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
//将方法调用转换成消息对象
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage message = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args);
//发送消息
Channel channel = getChannel();
channel.writeAndFlush(message);
//异步通信获取结果
DefaultPromise<Object> objectDefaultPromise = new DefaultPromise<>(channel.eventLoop());
//向PROMISE中注册ID和DefaultPromise
RpcResponseMessageHandler.PROMISES.put(sequenceId,objectDefaultPromise);
//等待结果
objectDefaultPromise.await();
if (objectDefaultPromise.isSuccess()){
return objectDefaultPromise.getNow();
}else {
throw new RuntimeException(objectDefaultPromise.cause());
}
});
return (T) o;
}
重点在于向客户端接收服务器响应的RpcResponseMessageHandler 中注册自己的消息ID和promise对象。
//向PROMISE中注册ID和DefaultPromise
RpcResponseMessageHandler.PROMISES.put(sequenceId,objectDefaultPromise);
这样用户只需要调用代理对象的方法就可以了:
public static void main(String[] args) {
HelloService helloService = getProxyService(HelloService.class);
System.out.println(helloService.sayHello("张三"));
}
同时需要修改客户端接受服务器响应的RpcResponseMessageHandler ,去找到对应消息ID的promise对象,并且移除,然后根据服务器返回的结果写入成功或异常情况,这时客户端的
//等待结果
objectDefaultPromise.await();
获取到了结果,进行最后的处理。
/**
* 接受服务器的响应
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
/**
* k:消息id
* v:消息ID对应的promise对象
*/
public static final ConcurrentHashMap<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
int sequenceId = msg.getSequenceId();
Promise<Object> promise = PROMISES.remove(sequenceId);
Exception exceptionValue = msg.getExceptionValue();
Object returnValue = msg.getReturnValue();
if (exceptionValue == null) {
promise.setSuccess(returnValue);
} else {
promise.setFailure(exceptionValue);
}
}
}