Netty深入浅出Java网络编程学习笔记(三) 优化篇

news2025/1/15 20:07:22

目录

五、优化

1、拓展序列化算法

序列化接口

枚举实现类

修改原编解码器

2、参数调优

CONNECT_TIMEOUT_MILLIS

使用

源码分析

SO_BACKLOG

三次握手与连接队列

作用

默认值

TCP_NODELAY

SO_SNDBUF & SO_RCVBUF

ALLOCATOR

使用

ByteBufAllocator类型

RCVBUF_ALLOCATOR

3、RPC框架

准备工作

RpcRequestMessageHandler

RpcResponseMessageHandler

客户端发送消息

改进客户端

改进RpcResponseMessageHandler


五、优化

1、拓展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])

  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

序列化接口

public interface Serializer {
    /**
     * 序列化
     * @param object 被序列化的对象
     * @param <T> 被序列化对象类型
     * @return 序列化后的字节数组
     */
    <T> byte[] serialize(T object);

    /**
     * 反序列化
     * @param clazz 反序列化的目标类的Class对象
     * @param bytes 被反序列化的字节数组
     * @param <T> 反序列化目标类
     * @return 反序列化后的对象
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

枚举实现类

积累一下这种枚举的运用方式

public enum SerializerAlgorithm implements Serializer {
    // Java的序列化和反序列化
    Java {
        @Override
        public <T> byte[] serialize(T object) {
            // 序列化后的字节数组
            byte[] bytes = null;
            try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(bos)) {
                oos.writeObject(object);
                bytes = bos.toByteArray();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return bytes;
        }

        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            T target = null;
            System.out.println(Arrays.toString(bytes));
            try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
                 ObjectInputStream ois = new ObjectInputStream(bis)) {
                target = (T) ois.readObject();
            } catch (IOException | ClassNotFoundException e) {
                e.printStackTrace();
            }
            // 返回反序列化后的对象
            return target;
        }
    }
    
     // Json的序列化和反序列化
    Json {
        @Override
        public <T> byte[] serialize(T object) {
            String s = new Gson().toJson(object);
            System.out.println(s);
            // 指定字符集,获得字节数组
            return s.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            String s = new String(bytes, StandardCharsets.UTF_8);
            System.out.println(s);
            // 此处的clazz为具体类型的Class对象,而不是父类Message的
            return new Gson().fromJson(s, clazz);
        }
    }
}

修改原编解码器

编码

// 获得序列化后的msg
/* 使用指定的序列化方式 SerializerAlgorithm.values();是获取一个枚举类中
的对象下标,如上的枚举中,Java对象的获取可以为SerializerAlgorithm.values()[0];
*/
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 获得序列化后的对象
byte[] bytes = values[out.getByte(5)-1].serialize(msg);

解码

// 获得反序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 通过指定方式进行反序列化
// 需要通过Message的方法获得具体的消息类型
Message message = values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);

2、参数调优

CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 的参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • 注意:Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
使用
public class TestParam {
    public static void main(String[] args) {
        // SocketChannel 5s内未建立连接就抛出异常
        new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        
        // ServerSocketChannel 5s内未建立连接就抛出异常
        new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000);
        // SocketChannel 5s内未建立连接就抛出异常
        new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    }
}

注意SocketChannel  和 ServerSocketChannel 的区别

  • 客户端通过 Bootstrap.option 函数来配置参数,配置参数作用于 SocketChannel
  • 服务器通过 ServerBootstrap来配置参数,但是对于不同的 Channel 需要选择不同的方法
    • 通过 option 来配置 ServerSocketChannel 上的参数
    • 通过 childOption 来配置 SocketChannel 上的参数

源码分析

客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢

AbstractNioChannel.AbstractNioUnsafe.connect方法中

public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
    ...
        
    // Schedule connect timeout.
    // 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    // 如果超时时间大于0
    if (connectTimeoutMillis > 0) {
        // 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
        // schedule(Runnable command, long delay, TimeUnit unit)
        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                // 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
                // 如果超时,则通过tryFailure方法将异常放入Promise中
                // 在主线程中抛出
                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                    close(voidPromise());
                }
            }
        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    
   	...
        
}

超时的判断主要是通过 Eventloop 的 schedule 方法 + Promise 共同实现的

  • schedule 设置了一个定时任务,延迟connectTimeoutMillis秒后执行该方法
  • 如果指定时间内没有建立连接,则会执行其中的任务
    • 任务负责创建 ConnectTimeoutException 异常,并将异常通过 Pormise 传给主线程并抛

SO_BACKLOG

该参数是 ServerSocketChannel 的参数

三次握手与连接队列

第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列

当完成三次握手以后,连接会被放入全连接队列中

服务器处理Accept事件是在TCP三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理

在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 linux 2.2 之后,分别用下面两个参数来控制

  • 半连接队列 - sync queue
    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • 全连接队列 - accept queue
    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
作用

在Netty中,SO_BACKLOG主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG设置的值时,便会抛出异常

设置方式如下

// 设置全连接队列,大小为2
new ServerBootstrap().option(ChannelOption.SO_BACKLOG, 2);
默认值

backlog参数在NioSocketChannel.doBind方法被使用

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

其中backlog被保存在了DefaultServerSocketChannelConfig配置类中

private volatile int backlog = NetUtil.SOMAXCONN;

具体的赋值操作如下

SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
    @Override
    public Integer run() {
        // Determine the default somaxconn (server socket backlog) value of the platform.
        // The known defaults:
        // - Windows NT Server 4.0+: 200
        // - Linux and Mac OS X: 128
        int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
        File file = new File("/proc/sys/net/core/somaxconn");
        BufferedReader in = null;
        try {
            // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
            // try / catch block.
            // See https://github.com/netty/netty/issues/4936
            if (file.exists()) {
                in = new BufferedReader(new FileReader(file));
                // 将somaxconn设置为Linux配置文件中设置的值
                somaxconn = Integer.parseInt(in.readLine());
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: {}", file, somaxconn);
                }
            } else {
                ...
            }
            ...
        }  
        // 返回backlog的值
        return somaxconn;
    }
}
  • backlog的值会根据操作系统的不同,来选择不同的默认值
    • Windows 200
    • Linux/Mac OS 128
  • 如果配置文件/proc/sys/net/core/somaxconn存在,会读取配置文件中的值,并将backlog的值设置为配置文件中指定的

TCP_NODELAY

  • 属于 SocketChannal 参数
  • 因为 Nagle 算法,数据包会堆积到一定的数量后一起发送,这就可能导致数据的发送存在一定的延时
  • 该参数默认为false,如果不希望的发送被延时,则需要将该值设置为true

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
  • 该参数用于指定接收方与发送方的滑动窗口大小

ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存
使用
// 选择ALLOCATOR参数,设置SocketChannel中分配的ByteBuf类型
// 第二个参数需要传入一个ByteBufAllocator,用于指定生成的 ByteBuf 的类型
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator());
ByteBufAllocator类型
  • 池化并使用直接内存

    // true表示使用直接内存
    new PooledByteBufAllocator(true);
  • 池化并使用堆内存

    // false表示使用堆内存
    new PooledByteBufAllocator(false);
  • 非池化并使用直接内存

    // ture表示使用直接内存
    new UnpooledByteBufAllocator(true);
  • 非池化并使用堆内存

    // false表示使用堆内存
    new UnpooledByteBufAllocator(false);

RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 Netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
在 Netty 中,RCVBUF_ALLOCATOR 参数的作用是设置接收缓冲区分配
器。该参数决定了网络通道接收数据时的缓冲区分配策略。

Netty 为了优化网络通信性能,使用了可扩展的缓冲区分配策略来处理
接收到的数据。而 RCVBUF_ALLOCATOR 参数就是用于配置这种缓冲区分配策略的。

3、RPC框架

准备工作

在聊天室代码的基础上进行一定的改进

Message中添加如下代码

public abstract class Message implements Serializable {

    ...

    // 添加RPC消息类型
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {
        // 将消息类型放入消息类对象Map中
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

}

RPC请求消息

@Data
@AllControductor
public class RpcRequestMessage extends Message {
    /**
     * 调用的接口全限定名,服务端根据它找到实现
     */
    private String interfaceName;
    
    /**
     * 调用接口中的方法名
     */
    private String methodName;
    
    /**
     * 方法返回类型
     */
    private Class<?> returnType;
    
    /**
     * 方法参数类型数组
     */
    private Class[] parameterTypes;
    
    /**
     * 方法参数值数组
     */
    private Object[] parameterValue;
}

想要远程调用一个方法,必须知道以下五个信息

  • 方法所在的全限定类名
  • 方法名
  • 方法返回值类型
  • 方法参数类型
  • 方法参数值

RPC响应消息

@Data
public class RpcResponseMessage extends Message {
    /**
     * 返回值
     */
    private Object returnValue;
    /**
     * 异常值
     */
    private Exception exceptionValue;
}

响应消息中只需要获取返回结果和异常值

服务器

public class RPCServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageSharableCodec messageSharableCodec = new MessageSharableCodec();

        // PRC 请求消息处理器
        RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 自定义协议粘半包处理器
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    // 日志
                    ch.pipeline().addLast(loggingHandler);
                    // 协议编解码器
                    ch.pipeline().addLast(messageSharableCodec);
                    // rpc处理工人
                    ch.pipeline().addLast(rpcRequestMessageHandler);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

服务器中添加了处理RPCRequest消息的handler

客户端

public class RPCClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageSharableCodec messageSharableCodec = new MessageSharableCodec();

        // PRC 请求消息处理器
        RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(loggingHandler);
                    ch.pipeline().addLast(messageSharableCodec);
                    // rpc处理工人
                    ch.pipeline().addLast(rpcResponseMessageHandler);
                }
            });
            Channel channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

通过接口Class获取实例对象的Factory

public class ServicesFactory {
    static HashMap<Class<?>, Object> map = new HashMap<>(16);

    public static Object getInstance(Class<?> interfaceClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        // 根据Class创建实例
        try {
            Class<?> clazz = Class.forName("cn.nyimac.study.day8.server.service.HelloService");
            Object instance = Class.forName("cn.nyimac.study.day8.server.service.HelloServiceImpl").newInstance();
           
            // 放入 InterfaceClass -> InstanceObject 的映射
            map.put(clazz, instance);
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            e.printStackTrace();
        }  
        return map.get(interfaceClass);
    }
}

RpcRequestMessageHandler

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage rpcMessage) {
        RpcResponseMessage rpcResponseMessage = new RpcResponseMessage();
        try {
            // 设置返回值的属性
            rpcResponseMessage.setSequenceId(rpcMessage.getSequenceId());
            // 返回一个实例
            HelloService service = (HelloService) ServicesFactory.getInstance(Class.forName(rpcMessage.getInterfaceName()));
            
            // 通过反射调用方法,并获取返回值
            Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParameterTypes());
            // 获得返回值
            Object invoke = method.invoke(service, rpcMessage.getParameterValue());
            // 设置返回值
            rpcResponseMessage.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            // 设置异常
            rpcResponseMessage.setExceptionValue(e);
        }
    }
    // 向channel中写入Message
    ctx.writeAndFlush(rpcResponseMessage);
}

远程调用方法主要是通过反射实现的,大致步骤如下

  • 通过请求消息传入被调入方法的各个参数
  • 通过全限定接口名,在map中查询到对应的类并实例化对象
  • 通过反射获取Method,将请求消息的参数传入并调用其invoke方法,返回值放入响应消息中
  • 若有异常需要捕获,并放入响应消息中

RpcResponseMessageHandler

@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    static final Logger log = LoggerFactory.getLogger(ChatServer.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
        System.out.println((String)msg.getReturnValue());
    }
}

客户端发送消息

public class RPCClient {
    public static void main(String[] args) {
		...
           
        // 创建请求并发送
		RpcRequestMessage message = new RpcRequestMessage(1,
               "cn.nyimac.study.day8.server.service.HelloService",
               "sayHello",
               String.class,
               new Class[]{String.class},
               new Object[]{"Nyima"});
		
        channel.writeAndFlush(message);   
            
        ...    
    }
}

运行结果

客户端

1606 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.server.ChatServer  - RpcResponseMessage{returnValue=你好,Nyima, exceptionValue=null}

改进客户端

public class RPCClientManager {
    /**
     * 产生SequenceId
     */
    private static AtomicInteger sequenceId = new AtomicInteger(0);
    private static volatile Channel channel = null;
    private static final Object lock = new Object();
    public static void main(String[] args) {
        // 创建代理对象
        HelloService service = (HelloService) getProxy(HelloService.class);
        // 通过代理对象执行方法
        System.out.println(service.sayHello("Nyima"));
        System.out.println(service.sayHello("Hulu"));
    }

    /**
     * 单例模式创建Channel
     */
    public static Channel getChannel() {
        if (channel == null) {
            synchronized (lock) {
                if (channel == null) {
                    init();
                }
            }
        }
        return channel;
    }

    /**
     * 使用代理模式,帮助我们创建请求消息并发送
     */
    public static Object getProxy(Class<?> serviceClass) {
        Class<?>[] classes = new Class<?>[]{serviceClass};
        // 使用JDK代理,创建代理对象
        Object o = Proxy.newProxyInstance(serviceClass.getClassLoader(), classes, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 创建请求消息
                int id = sequenceId.getAndIncrement();
                RpcRequestMessage message = new RpcRequestMessage(id, serviceClass.getName(),
                        method.getName(), method.getReturnType(),
                        method.getParameterTypes(),
                        args);
                // 发送消息
                getChannel().writeAndFlush(message);

                // 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
                DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
                // 将Promise放入Map中
                RpcResponseMessageHandler.promiseMap.put(id, promise);
                // 等待被放入Promise中结果
                promise.await();
                if (promise.isSuccess()) {
                    // 调用方法成功,返回方法执行结果
                    return promise.getNow();
                } else {
                    // 调用方法失败,抛出异常
                    throw new RuntimeException(promise.cause());
                }
            }
        });
        return o;
    }

    private static void init() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageSharableCodec messageSharableCodec = new MessageSharableCodec();

        // PRC 请求消息处理器
        RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProtocolFrameDecoder());
                ch.pipeline().addLast(loggingHandler);
                ch.pipeline().addLast(messageSharableCodec);
                ch.pipeline().addLast(rpcResponseMessageHandler);
            }
        });
        try {
            channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
            // 异步关闭 group,避免Channel被阻塞
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

获得Channel

  • 建立连接,获取Channel的操作被封装到了init方法中,当连接断开时,通过addListener法异步关闭group

  • 通过单例模式创建与获取Channel

远程调用方法

  • 为了让方法的调用变得简洁明了,将RpcRequestMessage创建与发送过程通过JDK的动态代理来完成
  • 通过返回的代理对象调用方法即可,方法参数为被调用方法接口的Class类

远程调用方法返回值获取

  • 调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。要在不同线程中进行返回值的传递,需要用到Promise

  • RpcResponseMessageHandler中创建一个Map

    • Key为SequenceId
    • Value为对应的Promise
  • 主线程的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。需要使用await等待结果被放入Promise中。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常

// 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 将Promise放入Map中
RpcResponseMessageHandler.promiseMap.put(id, promise);
// 等待被放入Promise中结果
promise.await();
if (promise.isSuccess()) {
    // 调用方法成功,返回方法执行结果
    return promise.getNow();
} else {
    // 调用方法失败,抛出异常
    throw new RuntimeException(promise.cause());
}

NIO线程负责通过SequenceId获取并移除(remove)对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值

  • 如果没有异常信息(ExceptionValue),就调用promise.setSuccess(returnValue)放入方法返回值
  • 如果有异常信息,就调用promise.setFailure(exception)放入异常信息
// 将返回结果放入对应的Promise中,并移除Map中的Promise
Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
Object returnValue = msg.getReturnValue();
Exception exception = msg.getExceptionValue();
if (promise != null) {
    if (exception != null) {
        // 返回结果中有异常信息
        promise.setFailure(exception);
    } else {
        // 方法正常执行,没有异常
        promise.setSuccess(returnValue);
    }
}

改进RpcResponseMessageHandler

@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    static final Logger log = LoggerFactory.getLogger(ChatServer.class);

    /**
     * 用于存放Promise的集合,Promise用于主线程与NIO线程之间传递返回值
     */
    public static Map<Integer, Promise<Object>> promiseMap = new ConcurrentHashMap<>(16);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        // 将返回结果放入对应的Promise中,并移除Map中的Promise
        Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
        Object returnValue = msg.getReturnValue();
        Exception exception = msg.getExceptionValue();
        if (promise != null) {
            if (exception != null) {
                // 返回结果中有异常信息
                promise.setFailure(exception);
            } else {
                // 方法正常执行,没有异常
                promise.setSuccess(returnValue);
            }
        }
        // 拿到返回结果并打印
        log.debug("{}", msg);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1081579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023.10.11

#include <iostream>using namespace std;class Sofa{ private:int price;int* size; public://无参构造Sofa(){}//有参构造Sofa(int p,int size):price(p),size(new int(size)){}//析构~Sofa(){delete size;}//拷贝构造Sofa(Sofa &other):price(other.price),size(n…

TensorFlow入门(二十、损失函数)

损失函数 损失函数用真实值与预测值的距离指导模型的收敛方向,是网络学习质量的关键。不管是什么样的网络结构,如果使用的损失函数不正确,最终训练出的模型一定是不正确的。常见的两类损失函数为:①均值平方差②交叉熵 均值平方差 均值平方差(Mean Squared Error,MSE),也称&qu…

[计算机网络基础]物理层详解

首先说明,基本的概述我还没写完,那部分虽然简单但是感觉要照顾到很多概念..... 以及本系列博客使用点模型并非iso模型,也并非tcp/IP模型,而是我们俗称的教学模型 也就是:物理层,数据链路层,网络层,传输层,应用层这五个,整个模型大多数是在教学中使用的,现实中基本不会这样子划…

spring容器ioc和di

spring ioc 容器的创建 BeanFactory 接口提供了一种高级配置机制&#xff0c;能够管理任何类型的对象&#xff0c;它是SpringIoC容器标准化超接口&#xff01; ApplicationContext 是 BeanFactory 的子接口。它扩展了以下功能&#xff1a; 更容易与 Spring 的 AOP 功能集成消…

K8S云计算系列-(3)

K8S Kubeadm案例实战 Kubeadm 是一个K8S部署工具&#xff0c;它提供了kubeadm init 以及 kubeadm join 这两个命令来快速创建kubernetes集群。 Kubeadm 通过执行必要的操作来启动和运行一个最小可用的集群。它故意被设计为只关心启动集群&#xff0c;而不是之前的节点准备工作…

echarts仪表盘vue

<div class"ybptx" ref"btryzb"></div>mounted() {this.getBtData();},getBtData() {var chart this.$echarts.init(this.$refs.btryzb);var data_czzf 88;var option {series: [{name: 内层数据刻度,type: gauge,radius: 80%,min: 0,max: 1…

Selenium+Pytest自动化测试框架

前言 selenium自动化 pytest测试框架 本章你需要 一定的python基础——至少明白类与对象&#xff0c;封装继承 一定的selenium基础——本篇不讲selenium&#xff0c;不会的可以自己去看selenium中文翻译网 测试框架简介 测试框架有什么优点呢&#xff1a; 代码复用率高&…

【安全】linux audit审计使用入门

文章目录 1 audit简介2 auditctl的使用2 audit配置和规则3 工作原理4 audit接口调用4.1 获取和修改配置4.2 获取和修改规则4.3 获取审计日志 5 audit存在的问题5.1 内核版本5.2 审计日志过多造成的缓存队列和磁盘问题5.2 容器环境下同一个命令的日志存在差异 6 参考文档 1 audi…

【gmail注册教程】手把手教你注册Google邮箱账号

手把手教你注册Google邮箱账号 写在前面&#xff1a; 要注意&#xff0c;注册Google邮箱必须要确保自己能够 科学上网&#xff0c;如果暂时做不到&#xff0c;请先进行相关学习。使用的手机号是大陆&#xff08;86&#xff09;的。 在保证自己能够科学上网后&#xff0c;在浏…

[硬件基础]-双稳态多谐振荡器配置

双稳态多谐振荡器配置 文章目录 双稳态多谐振荡器配置1、概述2、双稳态多谐振荡器的内部运行原理 在上一篇文章中&#xff0c;我们深入了解了555定时器在单稳态模式下的内部工作原理。 如果您已经理解了上一篇文章&#xff0c;那么本文对您来说将会非常简单。 我们将研究 555 定…

C++ - 智能指针 - auto_ptr - unique_ptr - std::shared_ptr - weak_ptr

前言 C当中的内存管理机制需要我们自己来进行控制&#xff0c;比如 在堆上 new 了一块空间&#xff0c;那么当这块空间不需要再使用的时候。我们需要手动 delete 掉这块空间&#xff0c;我们不可能每一次都会记得&#xff0c;而且在很大的项目程序当中&#xff0c;造成内存泄漏…

【合集】Java进阶——Java深入学习的笔记汇总 JVM底层、多线程、类加载 ...

前言 spring作为主流的 Java Web 开发的开源框架&#xff0c;是Java 世界最为成功的框架&#xff0c;持续不断深入认识spring框架是Java程序员不变的追求&#xff1b;而spring的底层其实就是Java&#xff0c;因此&#xff0c;深入学习Spring和深入学习Java是硬币的正反面&…

[代码随想录]二叉树篇

文章目录 1. 二叉树之层序遍历1.1 144-二叉树的前序遍历1.2 94-二叉树的中序遍历1.3 145-二叉树的后序遍历1.4 102-二叉树的层序遍历1.5 107-二叉树的层序遍历II1.6 199-二叉树的右视图1.7* 637-二叉树的层平均值1.8* 429-N叉树的层序遍历1.9 515-在每个树行中找最大值1.10* 11…

【算法挨揍日记】day14——724. 寻找数组的中心下标、238. 除自身以外数组的乘积

724. 寻找数组的中心下标 724. 寻找数组的中心下标 题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;请计算数组的 中心下标 。 数组 中心下标 是数组的一个下标&#xff0c;其左侧所有元素相加的和等于右侧所有元素相加的和。 如果中心下标位于数组最左端&#…

客户成功体系如何构建?请看这7步

⭐简单说两句⭐ 作者&#xff1a;后端小知识 CSDN个人主页&#xff1a;后端小知识 &#x1f50e;GZH&#xff1a;后端小知识 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; 客户成功体系如何构建&#xff1f;请看这7步 在中国企业服务领域的…

js获取当前月第一天最后一天

【版权所有&#xff0c;文章允许转载&#xff0c;但须以链接方式注明源地址&#xff0c;否则追究法律责任】【创作不易&#xff0c;点个赞就是对我最大的支持】 前言 仅作为学习笔记&#xff0c;供大家参考 总结的不错的话&#xff0c;记得点赞收藏关注哦&#xff01; 目录 …

C++ DAY 5

#include <iostream>using namespace std;class Sofa { private:string sit; public:Sofa(string s "-") :sit(s){cout << "sofa 构造函数" << endl;}void show (){cout << sit << endl;} }; class Bed { private:string sl…

Spring框架是什么Spring框架的体系结构

Spring框架是什么 Spring是为企业Java最流行的应用程序开发框架。数以百万计的世界各地的开发人员使用Spring框架来创建高性能&#xff0c;易于测试的&#xff0c;可重用的代码。 Spring框架是一个开源的Java平台&#xff0c;它最初是由Rod Johnson编写并在2003年6月在Apache2…

Logo设计教程:从入门到精通的全程指导

如果你想制作一个专业的Logo标识&#xff0c;但是又缺乏设计技能&#xff0c;那么乔拓云可以帮助你轻松完成这个任务。以下是通过乔拓云制作Logo标识的简单步骤&#xff1a; 1. 注册并登录乔拓云账号 访问乔拓云官网&#xff0c;注册并登录你的账号。登录后&#xff0c;你将进…

麒麟系统加密/麒麟系统防泄密

​深信达网络科技有限公司自主研发的深信达主机加固系统软件V2.0、深信达沙盒防泄密系统软件V5.0&#xff0c;与麒麟软件完成兼容认证&#xff0c;并被纳入麒麟软件安全生态联盟成员之一。 麒麟软件主要面向通用和专用领域打造安全创新操作系统产品和相应解决方案&#xff0c;以…