Netty基础

news2025/4/12 1:20:08

2.1Netty是什么

是一个基于异步的(多线程处理结果和接收)、事件驱动的网络应用框架,用于基于快速开发可维护、高性能的网络服务器和客户端

异步是指调用时的异步,他的IO还是多路复用的IO

许多中间件都依赖与Netty

  • zookper
  • hadoop
  • Spring 5.x - flux 抛弃了Tomact 使用netty作为服务器端

Netty优势

  • 解决很多网络传输中的问题,比如黏包和半包
  • 对API进行增强
  • 防止epoll空轮询导致CPU100%

2.2Hello World

目标:

开发一个简单的服务器端和客户端

  • 客户端向服务器发送hello world
  • 服务器仅接收,不返回
  • HelloServer
public class HelloServer {
    public static void main(String[] args) {
        //1. 启动器 负责组装netty组件 启动服务器
        new ServerBootstrap()
                //2.BossEventLoop WorkerEventLoop(selector,thread) 加入一个eventGroup组
            	//EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3. 选择服务器的实现
                .channel(NioServerSocketChannel.class)
                //4. boss 负责处理连接 worker(child)负责处理读写 决定了worker(child) 能执行哪些操作(handler)
                .childHandler(
                        //5. channel代表和客户端进行数据读写的通道 Initializer初始化器  负责添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //6.添加具体的handler
                        //用于解码 将传输过来的数据类型(byteBuf) 转换成字符串
                        //服务器收到接收的结果后,调用handler的初始化方法  然后一步一步执行自己写的handler即可
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        //自定义的handler
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            //处理读事件
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                    //7. 绑定 监听端口
                }).bind(8080);
    }
}
  • HelloClient
public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //1.创建启动器类 启动客户端
        new Bootstrap()
                //2.添加EventLoop  //EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3.选择客户端的channel实现
                .channel(NioSocketChannel.class)
                //4.添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //编码器 将string -> byteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //5.连接到服务器
                .connect(new InetSocketAddress("localhost",8080))
                .sync()     //阻塞方法 直到连接建立
                .channel()
                //  向客户端发送数据
                .writeAndFlush("hello,world");  //收发数据都要走handler  这里将str -> ByteBuf 然后发给服务器
    }
}

2.3组件

2.3.1EventLoop

本质是一个单线程执行器(同时维护了一个selectror),里面有run方法处理Channel上源源不断的io事件。

继承关系:

  • juc.ScheduledExecutorService,所以包含了线程池中的所有方法
  • 继承netty自己的OrderEventExecutor
    • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看自己属于哪个EventLoop

事件循环组:

EventLoopGroup是一组EventGroup,Channel一般会调用EventLoopGroup

示例:

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        //1.NioEventLoopGroup功能比较全面:定时任务、Io、普通任务
        //不指定参数,默认线程数为cpu核心数*2
        EventLoopGroup group = new NioEventLoopGroup(2);
//        System.out.println(NettyRuntime.availableProcessors());

        //普通任务、定时任务
//        EventLoopGroup group2 = new DefaultEventLoopGroup();

        //2.获取下一个事件循环对象  每次调用得到下一个 然后回过头从第一个开始
        System.out.println(group.next()); //NioEventLoop@d70c109
        System.out.println(group.next()); //NioEventLoop@17ed40e0
        System.out.println(group.next()); //NioEventLoop@d70c109

        //3.执行普通任务  异步处理
//        group.next().submit(()->{
//            log.debug("new Runnable OK"); //[nioEventLoopGroup-2-1]
//        });

        //4.执行定时任务  初始延迟1s后 2s打印一次
        group.next().scheduleAtFixedRate(()->{
            log.debug("data ok");
        },1,2, TimeUnit.SECONDS);

        log.debug("Main OK");

    }
}

接收和发送案例

  • client
@Sl4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //1.创建启动器类 启动客户端
        Channel channel = new Bootstrap()
                //2.添加EventLoop  EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3.选择客户端的channel实现
                .channel(NioSocketChannel.class)
                //4.添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //编码器 将string -> byteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //5.连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()     //阻塞方法 直到连接建立
                .channel();
        //  向客户端发送数据
        System.out.println(channel);
        System.out.println("");  //此处断点  右键断点选择Thread模式
    }
}
  • server
@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

断点处右键 发送不同的案例 跑多个client

10:29:30.078 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 1
10:30:00.447 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2
10:30:37.059 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2cccc
10:33:34.727 [nioEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - 122222

不同的client绑定的nioEventLoopGroup不同,一个client只绑定一个,固定的绑定一个,所以从这可以看到,nioEventLoopGroup其实就像一个大的线程池,里面包含了不同的和client连接的线程,用来处理任务

流程图:

image-20230502103946050

对server的改进:

当多个线程来的时候,有的是accept,有的是读写操作,我们现在想将他们区分开。那么区分开后会不会有什么问题呢?因为我们使用的是NIO模型,所以当一个线程耗时比较长的时候,可能就会造成其他线程无法处理问题,这个时候怎么办呢?

我们的解决办法都是用不同的EventLoop去处理不同的问题:

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //改进2: 防止一个线程等待时间过长影响其他线程 创一个独立的EventLoopGroup去处理
        DefaultEventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                //改进1,负责accept和read
                //boos 和 worker boos只负责处理accept事件 worker只负责读写
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))//worker 只有两个 可以一个线程连接多个client
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //todo 改进2:命名为handler1 和handler2进行对比
                        nioSocketChannel.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                                log.debug("handler1被{}线程处理",Thread.currentThread().getName());
                                //todo 将消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                          //todo 改进2:使用group里的线程进行处理,命名为handler2
                        }).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                                log.debug("handler2被{}线程处理",Thread.currentThread().getName());
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

用三个不同的客户端发消息,结果:

14:35:15.206 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - 1
14:35:15.206 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-1线程处理
14:35:15.207 [defaultEventLoopGroup-2-1] DEBUG com.rpc.netty.test2.EventLoopServer - 1
14:35:15.207 [defaultEventLoopGroup-2-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-1线程处理

14:37:33.377 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - 2
14:37:33.377 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-1线程处理
14:37:33.377 [defaultEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2
14:37:33.377 [defaultEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-3线程处理

14:37:42.990 [nioEventLoopGroup-4-2] DEBUG com.rpc.netty.test2.EventLoopServer - 3
14:37:42.990 [nioEventLoopGroup-4-2] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-2线程处理
14:37:42.990 [defaultEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - 3
14:37:42.990 [defaultEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-4线程处理

其实可以看到,读写操作给一个EventLoop去执行了,hander2用的也是另外一个eventLoop去执行的

流程图:

image-20230502144538276

handler执行过程中如何切换给不同EventLoop处理handler的呢?

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    	//下一个handler的时间循环是否与当前的事件循环是一个线程
        EventExecutor executor = next.executor();
    	//是 直接调用
        if (executor.inEventLoop()) { //是否是同一个线程
            next.invokeChannelRead(m);
        } else {
            //不是 将要执行的diamante作为任务提交给下一个事件循环处理
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
  • 如果两个handler绑定的是同一个线程,那么直接调用
  • 如果不是,那么将会作为一个runnable传递给下一个handler的线程来调用

2.3.2Channel

channle主要作用:

  • close可以用来关闭channel
  • closeFuture用来处理channel的关闭
    • sync方法作用是同步等待channel的关闭
    • 而addList方法是异步等待channel关闭
  • plpeline方法添加处理器
  • write方法将数据写入
  • writeAndFlush方法将数据写入

Channel常用方法

1.处理异步连接

我们思考一个问题,为什么要用sync方法?

我们先看这样一个初始化客户端的写法:

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //xxxFuture、Promise基本都是和异步方法配合使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //1.连接到服务器
                //异步非阻塞   main 调用该方法  但是真正连接服务器的 是另一个线程 Nio线程
                .connect(new InetSocketAddress("localhost", 8080)); //可能1s后执行
}

返回了一个ChannelFuture对象,那么他有什么用呢?

他会异步非阻塞的去连接服务器,如果我们不写channel.sync方法,我们尝试去连接服务器发数据,我们的数据是没办法发送成功的

Channel channel = channelFuture.channel();
log.debug("{}",channel);
channel.writeAndFlush("hello world");

原因也很简单,因为他是异步的,所以当channelFuture.channel();先被执行时,因为没连接到,所以此时的channel是个空的,自然无法连接到服务器。

针对这个原因,我们提供了如下两个方法:

  //todo 1.使用sync同步处理结果,sync的作用就是等待connect连接成功,再继续执行下面的方法
//        channelFuture.sync();  //阻塞住 等待nio连接建立完成

//        Channel channel = channelFuture.channel();
//        log.debug("{}",channel);
//        channel.writeAndFlush("hello world");

        //todo 2: 使用addListener(回调对象) 方法异步处理结果
        channelFuture.addListener(new ChannelFutureListener(){
            @Override  //在nio线程连接建立完之后,会调用该方法
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                //调用下面的 发送的不是主线程 是nio线程
                Channel channel = channelFuture.channel();
                log.debug("{}",channel);
                channel.writeAndFlush("222");
            }
        });

方式2日志结果:

21:45:24.571 [nioEventLoopGroup-2-1] DEBUG com.rpc.netty.test3.EventLoopClient - [id: 0xc70019e8, L:/127.0.0.1:1752 - R:localhost/127.0.0.1:8080]

可见,是个nio的线程进行处理的

⎛⎝≥⏝⏝≤⎛⎝

2.处理异步关闭后的操作
我们现在有另一个需求,我们需要让客户端断开连接之后,继续执行一些操作,我们现在有这样的想法:

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); //日志级别 debug
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}",channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String next = scanner.next();
                if("q".equals(next)){
                    channel.close();
                    break;
                }
                channel.writeAndFlush(next);

            }
        },"input").start();

        log.debug("处理关闭之后的操作");
    }
}

当我们将这个客户端启动之后,我们就可以看见已经开始处理关闭在之后的操作了,显然,这不是我们需要的,原因就是因为客户端的连接以及在线程里进行收取信息,这些其实都是异步的,而主线程在启动之后就会打印日志内容了。

那么我们在channel.close();之后处理可以吗?

这也是不行的,因为close也是异步方法,所有后面的也不一定会被处理掉。

那该怎么办?

其实和1内连接类似,提供下面两种方法:

     	//todo 3.正确写法:
        //获取CloseFuture对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close....");
        //3.1同步模式处理关闭
//        closeFuture.sync();
//        log.debug("处理关闭之后的操作"); //main线程打印

        //3.2 异步关闭 类似channel的
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("处理关闭之后的操作");  //[nioEventLoopGroup-2-1]  线程执行
            }
        });

这些都可以实现

但是新的问题又来了,我们close之后,客户端并不会自动的停止运行,这是怎么回事?

因为NioEventLoopGroupe内还有些线程在运行,所以我们需要在处理关闭之后的操作后单独的调用一下他的关闭方法

//优雅关闭Nio线程,拒绝接收新的任务
group.shutdownGracefully();

他将会拒绝接收新的任务,然后优美的结束

为什么netty需要异步?

异步效率高?

相比于只使用一个一个的线程去处理相同的事情,不如用相同的线程,每个线程只处理一个事件,就像 医院的挂号、看病、结账、拿药一样,分派给不同的人去做。

  • 单线程没有办法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  • 异步没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

2.3.3Future & Promise

异步处理常用的两个接口

netty的Futrue继承自jdk的Futrue,而Promise继承netty的Future

  • jdk:Future只能同步等待任务结束(成功或失败),才能得到结果
  • netty:
    • Future可以同步等待任务结束结果,也可以异步获取结果,但是都要等待任务结束
    • Promise:不仅有netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

jdk Future 、Netty Future & Promise

具体例子:

  • jdk Future
@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.jdk的future主要关联线程池使用
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Integer> future = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });

        //3.main线程通过future获取结果
        log.debug("等待结果");
        Integer res = future.get();
        log.debug("结果是:{}",res);
    }
}
  • netty Future
@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup eventLoop = new NioEventLoopGroup();

        EventLoop next = eventLoop.next();
        Future<Integer> future = next.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("处理结果");
                Thread.sleep(1000);
                return 100;
            }
        });

        log.debug("等待结果");
//        log.debug("结果{}",future.get());  //同步方式获取结果
        //异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("接收结果:{}",future.getNow()); //立刻获取结果 非阻塞
            }
        });
    }
}
  • Netty Promise
@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.准备eventLoop对象
        EventLoop eventLoop = new NioEventLoopGroup().next();

        //2.主动拿到promise对象  结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        //3.线程执行计算,像promise对象填充结果
        new Thread(()->{
            log.debug("开始计算1");
            try {
                int i = 1/0;
                Thread.sleep(1000);
                //计算成功结果放入容器
                promise.setSuccess(80);
            } catch (InterruptedException e) {
                e.printStackTrace();
                //异常 将异常也放进容器
                promise.setFailure(e);
            }
            

        }).start();

        //4.接收结果的线程
        log.debug("等待结果");
        log.debug("结果:{}",promise.get());
    }
}

2.3.4Handler & Pipeline

业务处理主要在Handler工作

ChannelHandler用来处理Channel上的各种事件,分为入站和出站。所有ChannelHandler连成一起,就是Pipeline

看看他们之间的关系吧

@Slf4j
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.通过nio拿到pipeline(pipeline)
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();

                        //2.添加处理器 handler链(双向链表):head ->  h1 -> h2 -> h3 -> tail
                        pipeline.addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());

                                //传递给h2 用来往下走 此时用来传递处理的是name 也就是这里处理过的
                                super.channelRead(ctx, name);
                            }
                        });
                        pipeline.addLast("handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                //获取到name后 转换成student
                                Student student = new Student(msg.toString());
                                super.channelRead(ctx, student);
                                //作用和super.channelRead效果一样
//                                ctx.fireChannelRead(student);
                            }
                        });
                        pipeline.addLast("handler3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object student) throws Exception {
                                log.debug("3,结果:{},类型:{}",student,student.getClass());
                                //后面没有入站处理器,所以写出操作
                                //为了触发下面的出站处理器4 5 6 否则下面的只有写入操作才会触发
                                //ctx是从当前处理器,向前去找出站处理器,不是往后找,所以下面的出站处理器不会被执行
                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello".getBytes()));
                                //下面的这个可以往后调用整条
//                                nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                            }
                        });

                        //出站处理器 出站处理器会从后往前走 所以流程是6-> 5 -> 4
                        pipeline.addLast("handler4",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler5",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler6",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }

    @Data
    @AllArgsConstructor
    static class Student{
        private String name;
    }
}

客户端使用CloseFutureClient

整个调用结果:

10:06:11.458 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 1
10:06:11.460 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 2
10:06:11.461 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 3,结果:TestPipeline.Student(name=jj),类型:class com.rpc.netty.test5.TestPipeline$Student

因为里面ctx的操作问题,具体看里面的注释

下面是整个调用链的结构:

img

2.3.5ByteBuf

对字节数据的封装

创建

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

/**
* @author 我见青山多妩媚
* @date 2023/5/3 0003 11:03
* @Description TODO
*/
public class TestByteBuf {
   public static void main(String[] args) {
       //不知道容量为256
       ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
       log(buffer);         //read index:0 write index:0 capcacity:256
       StringBuilder builder = new StringBuilder();
       for (int i = 0; i < 32; i++) {
           builder.append("a");
       }

       buffer.writeBytes(builder.toString().getBytes());
       log(buffer);         //read index:0 write index:300 capcacity:512

   }

   private static void log(ByteBuf buffer){
       int len = buffer.readableBytes();
       int rows = len/16 + (len % 15 == 0 ? 0 : 1) + 4;
       StringBuilder buf = new StringBuilder(rows * 80 * 2)
               .append("read index:").append(buffer.readerIndex())
               .append(" write index:").append(buffer.writerIndex())
               .append(" capcacity:").append(buffer.capacity())
               .append(NEWLINE);
       appendPrettyHexDump(buf,buffer);
       System.out.println(buf.toString());
   }
}

直接内存 vs 堆内存

直接内存:

NIO的Buffer提供了一个可以不经过JVM内存直接访问系统物理内存的类——DirectBuffer。 DirectBuffer类继承自ByteBuffer,但和普通的ByteBuffer不同,普通的ByteBuffer仍在JVM堆上分配内存,其最大内存受到最大堆内存的限制;而DirectBuffer直接分配在物理内存中,并不占用堆空间,其可申请的最大内存受操作系统限制。

堆内存:

Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创建的。对象的堆内存由称为垃圾回收器的自动内存管理系统回收。

区别:

  • 直接内存的读写操作比普通Buffer快,但它的创建、销毁比普通Buffer慢。
  • 因此直接内存使用于需要大内存空间且频繁访问的场合,不适用于频繁申请释放内存的场合。

可以用下面的代码创建池化基于堆的ByteBuf

ByteBuf buffer  = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的ByteBuff

ByteBuf buffer = ByteBufAllocatoe.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁代价昂贵,但读写性能高(少一次内存复制,适合配合池化功能一起使用)
  • 直接内存对GC内存压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用ByteBuf,优点包括

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio,netty.allocator.type={unpooled|pooled}
  • 4.1以后,非安卓平台默认开启池化实现,安卓采用非池化
  • 4.1之前,池化不成熟,都是非池化ww

组成

ByteBuf由四部分组成:

img

最开始读写指针都在0位置

写入

常用方法如下

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

使用方法

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
} 

运行结果

read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+ 

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer); 
// 扩容前
read index:0 write index:12 capacity:16
...

// 扩容后
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+ 
扩容规则
  • 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容

    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则选择下一个 2

    n

    • 例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
... 

读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}

结果:

1
2
3
4
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+ 

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

      while (!buffer.release()) {} 
      

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

TailConext中释放ByteBuf的源码

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具体的释放方法
        ReferenceCountUtil.release(msg);
    }
} 

判断传过来的是否为ByteBuf,是的话才需要释放

public static boolean release(Object msg) {
	return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
} 

切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

img

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);
    }
} 

运行结果

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+ 

优势

  • 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如
    • slice、duplicate、CompositeByteBuf

关于零拷贝

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/497952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GUITAR PRO8吉他软件好不好用?值不值得下载

所谓“工欲善其事&#xff0c;必先利其器”&#xff0c;想成为一名专业甚至著名的音乐人&#xff0c;用到的工具软件非常多&#xff0c;在众多款软件工具中&#xff0c;Guitar Pro 8能满足乐谱创作者、学习者的所有需要。很多人在听到Guitar Pro这个名词时&#xff0c;本能反应…

spark的RDD算子计算

一、环境配置 import osfrom pyspark import SparkConf, SparkContextif __name__ __main__:os.environ[SPARK_HOME] /export/server/sparkos.environ["PYSPARK_PYTHON"] "/root/anaconda3/envs/pyspark_env/bin/python"os.environ["PYSPARK_DRIV…

JavaWeb07(MVC应用01[家居商城]连接数据库)

目录 一.什么是MVC设计模式&#xff1f; 1.2 MVC设计模式有什么优点&#xff1f; 二.MVC运用&#xff08;家居商城&#xff09; 2.1 实现登录 2.2 绑定轮播【随机三个商品】 2.2.1 效果预览 index.jsp 2.3 绑定最新上架&热门家居 2.3.1 效果预览 2.3.2 代码实现 数据…

linux进程基本知识

1.什么是程序&#xff0c;什么是进程&#xff1f; 程序是静态的概念&#xff0c;例如 gcc xx.c -o pro 磁盘中生成pro文件&#xff0c;叫做程序 进程是程序的一次运行活动&#xff0c;意思是程序跑起来了&#xff0c;系统中就多了一个进程 2.如何查看系统中有哪些进程&…

EMC VNX登录Unisphere错误 certificate has invalid date问题处理

经常有用户反应说&#xff0c;突然用浏览器登录EMC VNX或者Clarrion CX系统的时候出现“certificate has invalid date”的故障&#xff0c;然后无法正常登录图形界面。具体报错如下图所示&#xff1a; 导致这个问题的原因在于VNX系统中的certification认证过期&#xff0c;既然…

SpringBoot整合Echarts实现用户人数和性别展示

一、背景 在Web应用开发中&#xff0c;经常需要使用图表来展示数据&#xff0c;而Echarts是一个非常优秀的图表库。SpringBoot是一个非常流行的Java Web框架&#xff0c;它可以快速搭建Web应用。本文将介绍如何使用SpringBoot集成Echarts&#xff0c;实现展示用户人数和性别的…

百度百科如何创建?创建百度百科的秘诀你值得掌握(经验分享)

百度百科是中国最大的百科全书式的中文网站之一&#xff0c;是广大用户在互联网上获取知识的重要途径之一。任何人都可以在百度百科创建新的词条&#xff0c;为网站的发展作出贡献。 小媒同学将从如何创建百度百科词条和注意事项两个方面来详细介绍百度百科词条的创建流程和相关…

css3 flex弹性布局详解

css3 flex弹性布局详解 一、flexbox弹性盒子 2009年&#xff0c;W3C 提出了一种新的方案----Flex 布局&#xff0c;可以简便、完整、响应式地实现各种页面布局。目前&#xff0c;它已经得到了所有浏览器的支持&#xff0c;这意味着&#xff0c;现在就能很安全地使用这项功能。…

在选择数据库时需要考虑的因素

在文章的第一部分中&#xff0c;我们奠定了理解各种数据库类型及其用例的基础。随着我们继续探索数据库选择的艺术&#xff0c;我们现在将更深入地探讨影响这个决策过程的关键因素。通过更详细地检查每个因素&#xff0c;我们可以更好地装备自己做出符合项目要求的知情选择&…

ubuntu 卸载 软件包 libfdt-dev

编译环境 Win10 64位 ubuntu 20.04 虚拟机 VMware Workstation 16 Pro NUC980 bsp 自带的编译工具链&#xff1a; nuc980bsp.tar.gz&#xff0c;版本 gcc version 4.8.4 (GCC) NUC980 uboot : https://gitee.com/OpenNuvoton/NUC970_U-Boot_v2016.11 目标 手动编译 NUC9…

【Windows】Windows下载使用wget命令

文章目录 一、前言 & 介绍二、安装步骤2.1 下载 wget 压缩包2.2 解压到指定的位置2.3 检查是否安装成功2.4 是否可以正常下载 一、前言 & 介绍 wget 是 linux 一个下载文件的工具&#xff0c;可以下载一些软件或从远程服务器恢复备份到本地服务器。 wget 在 Linux 下…

PCA主成成分分析例题详解

主成分分析是一种降维算法&#xff0c;它能将多个指标转换为少数几个主成分&#xff0c;这些主成分是原始变量的线性组合&#xff0c;且彼此之间互不相关&#xff0c;其能反映出原始数据的大部分信息 需要了解具体细节可看此视频&#x1f449;&#xff1a;什么是主成成分分析PC…

webpack 5 实战(3)

四十一、代码拆分方式 通过Webpack实现前端项目整体模块化的优势很明显&#xff0c;但是它同样存在一些弊端&#xff0c;那就是项目当中所有的代码最终都会被打包到一起&#xff0c;试想一下&#xff0c;如果说应用非常复杂&#xff0c;模块非常多的话&#xff0c;那打包结果就…

C++ ---- 入门基础知识总结

思维导图 目录 命名空间 命名冲突 如何定义命名空间 命名空间定义语法 嵌套定义 同一工程下定义同名命名空间 命名空间的使用 命名空间名称和作用域限定符&#xff08;: :&#xff09; using将命名空间中某个成员“释放”出来 using namespace 命名空间名称 C标准库…

Go:值与指针

1. 计算机中的值 在百万年的演化历史中&#xff0c;人类对事物的属性进行了抽象&#xff0c;有了数量、精度、信息等概念的表示&#xff0c;对应的我们称之为整数、小数、文本文字等。计算机出现后&#xff0c;我们使用计算机对真实世界的问题进行建模&#xff0c;通过计算机的…

WIN10安装CUDA保姆级教程[2023.5.7更新]

本系列分步记录在win10上搭建CUDAcudnnpytorchYOLOv5tensorrt等深度学习架构部署及系统搭建&#xff0c;欢迎关注追更&#xff01; 目录 0.了解CUDA 1.注意事项 1.1显卡驱动 1.2确定关联性 1.2.1 显卡驱动与cuda的对应关系&#xff1a; 1.2.2 pytorch与cuda的对应关系 2.c…

NLP大模型微调原理

1. 背景 LLM (Large Language Model) 大型语言模型&#xff0c;旨在理解和生成人类语言&#xff0c;需要在大量的文本数据上进行训练。一般基于Transformer结构&#xff0c;拥有Billion以上级别的参数量。比如GPT-3(175B)&#xff0c;PaLM(560B)。 NLP界发生三件大事&#xff…

ChatGLM-6B模型微调实战(以 ADGEN (广告生成) 数据集为例,序列长度达 2048)

kingglory/ChatGLM-6B 项目地址 1 介绍 对于 ChatGLM-6B 模型基于 P-Tuning v2 的微调。P-Tuning v2 将需要微调的参数量减少到原来的 0.1%&#xff0c;再通过模型量化、Gradient Checkpoint 等方法&#xff0c;差不多需要 7GB或则8GB 显存即可运行。 2 环境 2.1 python …

【软考-中级】系统集成项目管理工程师-计算题

系统集成项目管理工程师 二、计算题题型一&#xff1a;EMV&#xff08;预期货币价值&#xff09;题型二&#xff1a;加权系统题型三&#xff1a;自制和外贸决策——采购管理题型四&#xff1a;沟通渠道——沟通管理题型五&#xff1a;投资回收期、回收率题型六&#xff1a;进度…

metinfo_6.0.0 任意文件读取漏洞复现

一.漏洞简介 MetInfo是一套使用PHP和Mysql开发的内容管理系统。 MetInfo 6.0.0~6.1.0版本中的 old_thumb.class.php文件存在任意文件读取漏洞。攻击者可利用漏洞读取网站上的敏感文件。 二.漏洞影响 MetInfo 6.0.0 MetInfo 6.1.0 三.漏洞分析 在\MetInfo6.0.0\app\system\i…