2.nio入门和netty

news2025/1/11 7:58:34

1.nio实现网络编程
//服务端

public class NIOServer {
    public static void main(String[] args) throws Exception{

        //创建ServerSocketChannel -> ServerSocket

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //绑定一个端口6666, 在服务器端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        //得到一个Selecor对象
        Selector selector = Selector.open();
        //把 所有 serverSocketChannel 注册到  selector 关心 事件为 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1




        //循环等待客户端连接
        while (true) {
            //这里我们等待1秒,如果没有事件发生, 返回
            if(selector.select(1000) == 0) { //没有事件发生
                System.out.println("服务器等待了1秒,无连接");
                continue;
            }

            //如果返回的>0, 就获取到相关的 selectionKey集合
            //1.如果返回的>0, 表示已经获取到关注的事件
            //2. selector.selectedKeys() 返回关注事件的集合
            //   通过 selectionKeys 反向获取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("selectionKeys 数量 = " + selectionKeys.size());

            //遍历 Set<SelectionKey>, 使用迭代器遍历
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();

            while (keyIterator.hasNext()) {
                //获取到SelectionKey
                SelectionKey key = keyIterator.next();

                //根据key 对应的通道发生的事件做相应处理
                if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
                    //该该客户端生成一个 SocketChannel
                    //因为提前判断是否有连接所以很快执行
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
                    //将  SocketChannel 设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
                    //关联一个Buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                    System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..


                }
                if(key.isReadable()) {  //发生 OP_READ

                    //通过key 反向获取到对应channel
                    SocketChannel channel = (SocketChannel)key.channel();

                    //获取到该channel关联的buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    channel.read(buffer);
                    System.out.println("form 客户端 " + new String(buffer.array()));

                }
                System.out.println("keyIterator"+keyIterator);

                //手动从集合中移动当前的selectionKey, 防止重复操作
                keyIterator.remove();

            }

        }

    }
}

//客户端

public class NIOClient {
    public static void main(String[] args) throws Exception{

        //得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端的ip 和 端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)) {
            //连接过程等待中
            while (!socketChannel.finishConnect()) {
                System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
            }
        }

        //...如果连接成功,就发送数据
        String str = "hello, 尚硅谷~";
        //Wraps a byte array into a buffer
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        //发送数据,将 buffer 数据写入 channel
        socketChannel.write(buffer);
        System.in.read();

    }
}

//interestOps(int opt)改变想要静态的事件

2.ServerSockerChannel负责连接,SocketChannel负责读写操作

3.NIO实现网络编程 上线下线通知其他客户端 ,发布订阅
//客户端怎么排除自己? 使用instanceof
//取消注册
key.cancel();
//服务器负责全部请求的连接和读写(可拿到所有连接数),

   public class GroupChatServer {
    //定义属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    //构造器
    //初始化工作
    public GroupChatServer() {

        try {

            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel =  ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            //将该listenChannel 注册到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);

        }catch (IOException e) {
            e.printStackTrace();
        }

    }

    //监听
    public void listen() {

        System.out.println("监听线程: " + Thread.currentThread().getName());
        try {

            //循环处理
            while (true) {

                int count = selector.select();
                if(count > 0) {//有事件处理
                    //遍历得到selectionKey 集合
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        //取出selectionkey
                        SelectionKey key = iterator.next();

                        //监听到accept
                        if(key.isAcceptable()) {
                            SocketChannel sc = listenChannel.accept();
                            sc.configureBlocking(false);
                            //将该 sc 注册到seletor
                            sc.register(selector, SelectionKey.OP_READ);

                            //提示
                            System.out.println(sc.getRemoteAddress() + " 上线 ");

                        }
                        if(key.isReadable()) { //通道发送read事件,即通道是可读的状态
                            //处理读 (专门写方法..)

                            readData(key);

                        }
                        //当前的key 删除,防止重复处理
                        iterator.remove();
                    }

                } else {
                    System.out.println("等待....");
                }
            }

        }catch (Exception e) {
            e.printStackTrace();

        }finally {
            //发生异常处理....

        }
    }

    //读取客户端消息
    private void readData(SelectionKey key) {

        //取到关联的channle
        SocketChannel channel = null;

        try {
           //得到channel
            channel = (SocketChannel) key.channel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            int count = channel.read(buffer);
            //根据count的值做处理
            if(count > 0) {
                //把缓存区的数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("form 客户端: " + msg);

                //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                sendInfoToOtherClients(msg, channel);
            }

        }catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + " 离线了..");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            }catch (IOException e2) {
                e2.printStackTrace();;
            }
        }
    }

    //转发消息给其它客户(通道) ,send to alive channels in selectors
    private void sendInfoToOtherClients(String msg, SocketChannel self) throws  IOException{
        System.out.println("服务器转发消息中...");
        System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());
        //遍历 所有注册到selector 上的 SocketChannel,并排除 self
        for(SelectionKey key: selector.keys()) {

            //通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();

            //排除自己
            if(targetChannel instanceof  SocketChannel && targetChannel != self) {

                //转型
                SocketChannel dest = (SocketChannel)targetChannel;
                //将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //将buffer 的数据写入 通道
                dest.write(buffer);
            }
        }

    }

    public static void main(String[] args) {

        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}

//可以写一个Handler

class MyHandler {
    public void readData() {

    }
    public void sendInfoToOtherClients(){

    }
}

//客户端负责发送消息和接收消息

public class GroupChatClient {

    //定义相关的属性
    private final String HOST = "127.0.0.1"; // 服务器的ip
    private final int PORT = 6667; //服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    //构造器, 完成初始化工作
    public GroupChatClient() throws IOException {

        selector = Selector.open();
        //没有初始化也可以调用静态方法
        //连接服务器
        socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将channel 注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到username
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username + " is ok...");

    }

    //向服务器发送消息
    public void sendInfo(String info) {

        info = username + " 说:" + info;

        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    //读取从服务器端回复的消息
    public void readInfo() {

        try {

            int readChannels = selector.select();
            if(readChannels > 0) {//有可以用的通道

                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {

                    SelectionKey key = iterator.next();
                    if(key.isReadable()) {
                        //得到相关的通道
                       SocketChannel sc = (SocketChannel) key.channel();
                       //得到一个Buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        sc.read(buffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                }
                iterator.remove(); //删除当前的selectionKey, 防止重复操作
            } else {
                //System.out.println("没有可以用的通道...");

            }

        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {

        //启动我们客户端
        GroupChatClient chatClient = new GroupChatClient();

        //启动一个线程, 每个3秒,读取从服务器发送数据
        new Thread() {
            public void run() {

                while (true) {
                    chatClient.readInfo();
                    try {
                        Thread.currentThread().sleep(3000);
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        //发送数据给服务器端
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }


}

3.NIO零拷贝(面试)(直接在内存拷贝,没有cpu拷贝) DMA直接内存拷贝access 优化网络传输和文件传输(optimize the time between memory and socket I/O)
图传统方式实现0拷贝,需要4次拷贝3次切换(在cpu和内存切换和拷贝)
在这里插入图片描述

1.mmap(小文件读写)不是真正的0拷贝,需要3次拷贝和3次切换(共享内核buffer和user buffer)
请添加图片描述

2.linux2.1的sendFile(大文件读写)函数优化成3次拷贝2次切换也不是真正的0拷贝
在这里插入图片描述

3.linux2.4 近乎0拷贝 kernel buffer到socketbuffer可能有拷贝但是数据量很少(偏移量…位置…)
2次拷贝2次切换
请添加图片描述

byteBuffer.rewind();//倒带 position-0和mark=-1作废
//NIO 的transferTo底层使用了0拷贝,windows只能传输8m要分段改第一个传输.linux可以一个函数传输
transferCount+= fileChannel.transferTo(i*8000000, 8000000, socketChannel);
//比传统的io传输速度大幅提升Socket底层使用了BIO
//服务器

public class NewIOServer {
    public static void main(String[] args) throws Exception {

        InetSocketAddress address = new InetSocketAddress(7001);

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        ServerSocket serverSocket = serverSocketChannel.socket();

        serverSocket.bind(address);

        //创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();

            int readcount = 0;
            while (-1 != readcount) {
                try {

                    readcount = socketChannel.read(byteBuffer);

                }catch (Exception ex) {
                   // ex.printStackTrace();
                    break;
                }
                //
                byteBuffer.rewind(); //倒带 position = 0 mark 作废
            }
        }
    }
}
public class NewIOClient {
    public static void main(String[] args) throws Exception {

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 7001));
        String filename = "deepspeech-0.9.3-models.scorer";

        //得到一个文件channel
        FileChannel fileChannel = new FileInputStream(filename).getChannel();

        //准备发送
        long startTime = System.currentTimeMillis();

        //在linux下一个transferTo 方法就可以完成传输
        //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
        //传输时的位置 =》 课后思考...
        //transferTo 底层使用到零拷贝
        double ceil = Math.ceil(fileChannel.size() / 8000000f);
        int len = (int)ceil;
        System.out.println(len);
        long transferCount =0;
        for (int i=0;i<len;i++){
             transferCount+= fileChannel.transferTo(i*8000000, 8000000, socketChannel);
        } //the second param is size

            System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));





        //关闭
        fileChannel.close();

    }
}

4.AIO(NIO2.0异步不阻塞)NIO使用Proactor AIO使用Reactor

同步(队列)阻塞(不可以做其他事情)
同步非阻塞(可以同时做其他事情)
异步非阻塞(可以先做某一步然后后来再做接下来的步骤)

5.NIO的问题

1.需要掌握selector SocketChannel ServerSocketChannel和ByteBuffer等
2.需要熟悉多线程和网络编程涉及到Reactor模式
3.bug多 客户端短连重连 网络闪断 半包读写,失败缓存 网络阻塞 异常流的处理
4.jdk1.7空轮询导致cpu占用100% epoll bug

6.netty提供了多种协议和api jboss开发的
netty支持的协议和api图
在这里插入图片描述

1.应用 互联网和大数据分布式计算领域
游戏通信行业,es dubbo框架使用netty
2.依赖少 jdk5(用3.x)jdk6(4.x 课程使用的 4.1.20 ) 5有大bug被官方废弃 吞吐量高,延迟低
3.安全 使用ssl/tls/start 支持,社区活跃

7.线程模式

  1. 传统的阻塞io模型 阻塞在read 一连接一个线程 图6.传统io模型
    在这里插入图片描述
  1. reactor模型,多个连接共用一个阻塞对象,无需等待,结合线程池,分配连接的线程,IO复用监听 图7
    在这里插入图片描述

8.reactor模型(反应器,分发,通知者) scalable io in java

1.reactor是 serviceHandler ,handlers是处理事件的实际处理者
2.netty是主从reactor(主线程处理连接,子线程进行读写) 图9 reactor主线程处理连接请求,分发给多个reactor子线程(连接对象加入队列)处理read还是send,再分发给worker进行业务处理
主线程和子线程后期没有交互,但编程难度大,memecached,nginx使用它,避免多线程和同步问题,避免多线程/进程切换开销

3.单reactor单线程(适合业务处理快的), 之前写的nio服务器客户端,不适合高并发,会阻塞,线程意外终止和死循环导致节点故障
4.单reactor多线程(线程池处理业务结果返回给handler) 图8 充分发挥多核性能,
但多线程数据更新访问复杂,reactor单线程要处理所有事件的监听和响应,只是拆分了处理数据,高并发会出现性能瓶颈
在这里插入图片描述
5.多主从reactor,多个子线程和多个worker处理业务在这里插入图片描述

9.netty原理 图10
在这里插入图片描述

  1. NioEventLoo都有一个selector监听socket通信
  2. runAlltasks是处理任务队列的任务

//
10.netty服务器客户端实战

//project structure—>moudules —>depend—>±->new lib—>from
maven—>
输入 io.netty:netty-all搜索点击docs和source–>4.1.20—>生成lib文件夹 //pipeline(是双向链表,放队列)放处理的事件handler服务器和客户端都有
//bossgroup和workergroup自己可以设置,默认cpu线程数*2个
//pipeline相互包含channel,可以相互得到 ,ctx包含全部信息包括pipeline和channel

//客户端

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group).channel(NioSocketChannel.class) //可以自定义各种类型的channel,http,tcp,udp....支持丰富的协议
                    .handler(new MyClientInitializer()); //自定义一个初始化类

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        //加密数据
        pipeline.addLast(new MyLongToByteEncoder());

        //这时一个入站的解码器(入站handler )
        //解密数据,可以根据自己的规则
        pipeline.addLast(new MyByteToLongDecoder2());
        //加入一个自定义的handler , 处理业务
        pipeline.addLast(new MyClientHandler());


    }
}
public class MyClientHandler  extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
        System.out.println("收到服务器消息=" + msg);

    }

    //重写channelActive 发送数据
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler 发送数据");
        //ctx.writeAndFlush(Unpooled.copiedBuffer(""))
        ctx.writeAndFlush(123456L); //发送的是一个long

        //分析
        //1. "abcdabcdabcdabcd" 是 16个字节
        //2. 该处理器的前一个handler 是  MyLongToByteEncoder
        //3. MyLongToByteEncoder 父类  MessageToByteEncoder
        //4. 父类  MessageToByteEncoder
        /*

         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) { //判断当前msg 是不是应该处理的类型,如果是就处理,不是就跳过encode
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        }
        4. 因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
        */
       // ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));

    }
}

//服务端

    public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();//一会下断点

        //入站的handler进行解码 MyByteToLongDecoder
        //pipeline.addLast(new MyByteToLongDecoder());
        pipeline.addLast(new MyByteToLongDecoder2());
        //出站的handler进行编码
        pipeline.addLast(new MyLongToByteEncoder());
        //自定义的handler 处理业务逻辑
        pipeline.addLast(new MyServerHandler());
    }
}
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("从客户端" + ctx.channel().remoteAddress() + " 读取到long " + msg);

        //给客户端发送一个long
        ctx.writeAndFlush(98765L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
//对数据在客户端和服务端之间加密解密
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    //编码方法
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {

        System.out.println("encode加密"+msg);
        out.writeLong(msg+1);   //加密的数据

    }
}
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("decode解密");
        //在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
        out.add(in.readLong()-1);


    }
}

11.异步任务(耗时的时间)
taskQueue自定义任务 :在handler的ctx.channel.eventLoop().execute(new Runnable(){}) //执行异步任务,后面的代码可先执行,多个任务,需要队列,一个一个休眠( is a Queue)
用户自定义定时任务: scheduleTaskQueue ctx.channel.eventLoop().schedule(,5000); //区别在于,是同一时间休眠的会同一时间唤醒,指定延时时间 (no a Queue)
//只需要在处理器添加即可
//客户端

public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();


        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {

            group.shutdownGracefully();

        }
    }
}
//客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

//服务端

public class NettyServer {
    public static void main(String[] args) throws Exception {


        //创建BossGroup 和 WorkerGroup
        //说明
        //1. 创建两个线程组 bossGroup 和 workerGroup
        //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
        //3. 两个都是无限循环
        //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
        //   默认实际 cpu核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8



        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
//                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

            System.out.println(".....服务器 is ready...");

            //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //给cf 注册监听器,监控我们关心的事件
            //这个是监听服务器发生的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功");
                    } else {
                        System.out.println("监听端口 6668 失败");
                    }
                }
            });


            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}
//核心代码,服务器处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //读取数据实际(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {



        //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
        //NIOEventLoop 的 taskQueue中,

        //解决方案1 用户程序自定义的普通任务

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

//        ctx.channel().eventLoop().execute(new Runnable() {
//            @Override
//            public void run() {
//
//                try {
//                    Thread.sleep(5 * 1000);
//                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
//                    System.out.println("channel code=" + ctx.channel().hashCode());
//                } catch (Exception ex) {
//                    System.out.println("发生异常" + ex.getMessage());
//                }
//            }
//        });

        //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中


        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        }, 5, TimeUnit.SECONDS);



        System.out.println("go on ...");


        System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的关系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站


        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //处理异常, 一般是需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

12.异步模型ChannelFuture(不能立即得到结果,一直监听对象得到结果)

11netty链式操作, 可以添加监听器对象的方法
请添加图片描述

13.netty做http开发(服务器浏览器)

  1. coderc (coder + decoder)
  2. 做到图标不能请求服务器,访问接口可以访问(过滤) 自定义处理类
    HttpRequest req=(HttpRequest) msg;
    URI uri=new URI(req.uri);
    if(“/aa.ico”.equals(uri.getPath)){return;}
    //pipline和handler的hashcode不一样
    //http不是长连接的,断开产生新的pipline和handler
    //handler()使用在bossgroup的处理器,而childOption使用在连接的channel(客户端)使用
    sync()得到异步完成
    //不同的协议可能对应不同的channel对象

14.!!!Pipeline包含ChannelPipeline,server<—>入站事件(head->tail传递)和出站事件(tail->head传递) <–>client
图12channel包含pineline是一个双向链表相互包含
在这里插入图片描述

15.ChannelHandler处理数据或者拦截IO操作,转发到ChannelPipeline

public class TestServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        //向管道加入处理器

        //得到管道
        ChannelPipeline pipeline = ch.pipeline();

        //加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
        //HttpServerCodec 说明
        //1. HttpServerCodec 是netty 提供的处理http的 编-解码器
        pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
        //2. 增加一个自定义的handler
        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());

        System.out.println("ok~~~~");

    }
}

//处理类设置头
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {


    //channelRead0 读取客户端数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

        //回复信息给浏览器 [http协议]

        System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx
        .pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());
        System.out.println("当前ctx的handler=" + ctx.handler());
        //判断 msg 是不是 httprequest请求
        if(msg instanceof HttpRequest) {
            //获取到
            HttpRequest httpRequest = (HttpRequest) msg;
            //获取uri, 过滤指定的资源,uri(uniform resource identifier,you can define anything),but url is only one type :   protocol://localtion,such as http://baidu.com/aa
            URI uri = new URI(httpRequest.uri());
            if("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求了 favicon.ico, 不做响应");
                return;
            }


            System.out.println("ctx 类型="+ctx.getClass());

            System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());

            System.out.println("msg 类型=" + msg.getClass());
            System.out.println("客户端地址" + ctx.channel().remoteAddress());



            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);

            //构造一个http的相应,即 httpresponse
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            //将构建好 response返回
            ctx.writeAndFlush(response);

        }
    }



}

16.Uppooled类
//不用flip进行反转,ByteBuf封装了数组和写的方法,因为分为3个去 0—read 已经读取的区域
readindex----writeindex 可读区 writeindex----capacity可写区
//可以api得到,通过getByte不改变可读长度


ByteBuf buf=  Uppooled.copiedBuffer("xxx",Charset.forName("utf-8"));
   if(buf.hasArray()){
             byte [] content=bufBuf.array();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/815886.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue 使用vue-json-viewer 展示 JSON 格式的数据

npm install vue-json-viewer --save<el-button type"primary" click"previewClick">预览</el-button><el-dialog title"预览" :visible.sync"previewVisible" width"70%"><viewer ref"viewer&qu…

回顾.NET系列:Framework、Net Core、Net 过往

一、个人最近工作变化 我经历了可能很多技术人都会经历的过程&#xff0c;从一名纯粹的技术人员转型成为了一名技术管理我已经不再单纯了&#xff0c;经历了从做好自己的事&#xff0c;搞定一个复杂模块和系统&#xff0c;到带领一个小团队&#xff0c;攻克一个个复杂项目&…

迷你系统天花板 英特尔蝰蛇峡谷NUC12 评测

1.全新设计的NUC12 蝰蛇峡谷&#xff08;Serpent Canyon&#xff09; i7 12700HArc A770M 16GB版本开箱 近年来&#xff0c;ITX主机和小型化系统变得越来越受欢迎。英特尔的NUC受到许多玩家们的关注。作为mini主机的代表NUC小巧设计和灵活性使它成为很多玩家和科技爱好者的选择…

Openlayers实战:绘制图形,导出KML文件

KML 文件使用 XML 通过存储位置、图像叠加、视频链接以及线条、形状、3D 图像和点等建模信息来表达地理注释和可视化。在OPenlayers的交互中,经常性的我们要导出一些数据,在这个实战中,演示的是导出KML文件。 安装依赖 npm install file-saver --save 效果图 导出的文件 &l…

Python数据可视化工具——Seaborn

1 简介 Seaborn基于matplotlib&#xff0c;它在matplotlib的基础上进行了更高级的API封装&#xff0c;便于用户可以更加简便地做出各种有吸引力的统计图表。它还能够高度兼容numpy与pandas数据结构以及scipy与statsmodel等统计模式。用更简单的调用呈现更多图表 seaborn官网&a…

【二开】JeecgBoot-vue3二次开发 前端 扩展online表单js增强等-初始化列表之后执行

【二开】JeecgBoot-vue3二次开发 前端 扩展online表单js增强等-初始化列表之后执行 二开位置 OnlineAutoList.js.initAutoList 定义方法 /*** 初始化列表之后执行* js增强* param tableColumns* returns {Promise<void>|*}*/onlineTableContext["afterInitAutoList…

2023 ISSE观察:智能遮阳窗帘行业蓬勃发展,AI设计引热议

7月31日&#xff0c;上海国际智能遮阳与建筑节能展览会落下帷幕。作为智能遮阳的行业展会&#xff0c;展会三天&#xff0c;现场热闹非凡&#xff0c;参展商和观展者络绎不绝。 作为一大行业盛事&#xff0c;2023 ISSE展会方打造了五大展区&#xff0c;除了提供系统门窗装修方案…

微信如何提高回复信息速度?

规范流程话术有什么用&#xff1f;为了提高回复客户的效率和质量&#xff0c;可以事先设计好的一套标准化的对话模板。它通常包括多个环节和问题&#xff0c;帮助客服人员或销售人员在与客户沟通时&#xff0c;按照标准化的流程进行&#xff0c;以提高工作效率和客户满意度。 如…

vue echart3个饼图

概览&#xff1a;根据UI设计需要做3个饼图且之间有关联&#xff0c;并且处理后端返回的数据。 参考链接&#xff1a; echart 官网的一个案例&#xff0c;3个饼图 实现思路&#xff1a; 根据案例&#xff0c;把数据处理成对应的。 参考代码&#xff1a; 1.处理后端数据&am…

LangChain+ChatGLM大模型应用落地实践(一)

LLMs的落地框架&#xff08;LangChain&#xff09;&#xff0c;给LLMs套上一层盔甲&#xff0c;快速构建自己的新一代人工智能产品。 一、简介二、Lanchain源码三、租用云服务器实例四、部署实例 一、简介 LangChain是一个近期非常活跃的开源代码库&#xff0c;目前也还在快速…

SOLIDWORKS Utilities应用

在实际的生产设计制造中&#xff0c;经常会遇到同一个零件多个版本&#xff0c;有可能再次调用零件的时间已经是很长时间之后&#xff0c;对于版本之间的区别就不会那么清楚&#xff0c;碰到简单明显的零件还可以轻松的找到区别&#xff0c;但是复杂的零件区别的查找可能会造成…

Jenkins 节点该如何管理?

Jenkins 拥有分布式构建(在 Jenkins 的配置中叫做节点)&#xff0c;分布式构建能够让同一套代码在不同的环境(如&#xff1a;Windows 和 Linux 系统)中编译、测试等 Jenkins 的任务可以分布在不同的节点上运行 节点上需要配置 Java 运行时环境&#xff0c;JDK 版本大于 1.5 节…

Echarts常见图表展示

一、折线图 1.1 堆叠折线图 const option {title: {text: 折线图,},tooltip: {trigger: axis},legend: {data: [张三, 李四, 王五],bottom: 10,},grid: {left: 3%,right: 4%,bottom: 10%,containLabel: true},xAxis: {type: category,boundaryGap: false,data: [Mon, Tue, We…

layui框架学习(35:数据表格_列参数设置)

Layui中的table数据表格模块支持对表格及列进行基础参数设置以提高数据的可视化及可操作性&#xff0c;本文学习并记录与列相关的主要基础参数的用法及效果。   基础参数field设置待显示到列中的数据的字段名&#xff0c;主要针对数据表格url属性中返回的数据集合或data属性设…

如何使用fiddler进行抓包

首先需要下载fiddler&#xff0c;推荐使用bing搜索引擎搜索&#xff08;百度搜狗一般搜这种工具展示的前几个全都是广告&#xff09;&#xff0c;直接搜索fiddler&#xff0c;搜出来第一个fiddler官网 然后直接点击download下载 进入下载页面后&#xff0c;正确填写一个邮箱&a…

【LeetCode|编译原理】剑指 Offer 20. 表示数值的字符串

文章目录 题目链接标签步骤实现代码&#xff08;C&#xff09; 题目链接 剑指 Offer 20. 表示数值的字符串 标签 有限状态自动机(FA) 步骤 Step1. 去除字符串左、右空格&#xff1b; string strip(string str) {int start -1;for (int i 0; i < str.length(); i) {i…

一个CSS渐变下划线效果的实用技巧

下划线它只用到了CSS的渐变背景、背景大小调整、位置设置、鼠标hover 过渡等基本属性和技巧。 分析与实现 我们先看一下基本的结构。 <h2 class"title"><span>你好啊&#xff0c;嘴巴嘟嘟</span></h2>然后我们给span 元素添加一个线性渐变…

如何在Windows11中备份指定文件夹?

在现代的信息化时代&#xff0c;无论工作场所还是日常生活&#xff0c;文件的使用频率已经越来越高。对于那些经常操作特定文件夹的用户&#xff0c;他们可能会经常进行文件的修改、新增以及删除操作。为了防止文件丢失导致的困扰&#xff0c;定期备份指定文件夹显得尤为关键。…

java医院电子病历系统源码:云端SaaS服务 前后端分离模式开发和部署

电子病历系统是什么&#xff1f; 电子病历是指医务人员在医疗活动过程中,使用医疗机构信息系统生成的文字、符号、图表、图形、数据、影像等数字化信息,并能实现存储、管理、传输和重现的医疗记录,是病历的一种记录形式。 医院通过电子病历以电子化方式记录患者就诊的信息&…

晋级榜单揭晓!华秋第九届硬创大赛-华南分赛区路演成功举办

7月21日&#xff0c;第十五届深创赛福田预选赛区暨华秋第九届硬创大赛华南分赛区决赛路演活动在深圳华强科创广场成功举办。活动由深圳华秋电子有限公司&#xff08;以下简称 华秋 &#xff09;、深圳市福田区新一代信息技术产业链党委、深圳新一代产业园、微纳研究院、华强科创…