Tomcat NIO 实现

news2024/11/15 13:27:31

1. tomcat网络整体架构

在这里插入图片描述

来自 https://www.cnblogs.com/cuzzz/p/17499364.html

上图是tomcat整个网络请求模型

  1. Acceptor线程作为监听线程,会通过通过 accept 方法 获取连接,该线程没有使用selector进行多路复用,使用了阻塞式的accept
  2. 有请求连接后,就会把该连接设置为非阻塞的socket,并且加入到队列中交给poller线程进行处理
  3. poller线程有selector,可以进行多路复用,从队列中获取 需要注册事件的连接
  4. 对于读事件来说,会先取消关心的读事件(因为JDK 的NIO是水平触发模,如果不进行取消,只要该socket缓冲区的数据没有读取完成,会一直触发读事件发生),然后交给业务线程池进行http协议解析,以及servlet业务执行

2. Tomcat读Socket数据原理

tomcat 连接建立时,监听read事件,然后使用fill(boolean block) 方法进行socket数据读取。参数block可以是阻塞模式,也可以是非阻塞模式

  • 阻塞模式:如果socket.read()返回0,再次注册读事件,第二次读,如果有数据返回,没有数据,object.wait,如果超时会抛出超时异常
  • 非阻塞模式:不管是否读取到数据,直接返回

http1.1协议数据解析

  • 解析请求行 :非阻塞模式读取

  • 解析请求头 :非阻塞模式读取

  • 请求行和请求头使用非阻塞模式读,会带来的问题,比如请求头只发送了一半,请求行只有一半,然后非阻塞读的时候没有读取到新数据,tomcat的处理是直接让该连接变成长连接,并重新注册读事件监听,这样就可以不占用线程资源

  • 请求体(HttpServletRequest中请求体Stream流方式读取):阻塞,没有读完就注册读事件,直到读到数据,会占用线程资源(因为会使得该线程阻塞)

怎么判断请求体是否读取完整?
Http1.1协议中使用content-length 或者 transfer-encoding字段进行判断

  • content-length:整个请求内容字节数
  • transfer-encoding:比如:transfer-encoding:chunk,格式是chunk长度\r\n ,结束标志是0\r\n\r\n

3. 实现简易的Tomcat NIO网络模型

 public static void main(String[] args) throws IOException {
        MyTomcat tomcat= new MyTomcat ();
        tomcat.start();
    }


    void start() {
        Poller poller = new Poller();
        Acceptor acceptor = new Acceptor();
        acceptor.poller = poller;
        acceptor.start();
        poller.start();

        Thread thread = new Thread(acceptor);
        thread.setName("acceptor");
        thread.start();

        Thread pollerThread = new Thread(poller);
        pollerThread.setName("poller");
        pollerThread.start();

    }

    class Acceptor implements Runnable {
        ServerSocketChannel serverSocketChannel = null;

        Poller poller = null;

        public void start() {
            try {
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(true);
                serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 9898));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            while (true) {
                try {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);

                    SocketWrapper socketWrapper = new SocketWrapper();
                    socketWrapper.socketChannel = socketChannel;
                    socketWrapper.poller = poller;
                    poller.register(socketWrapper);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Poller implements Runnable {
        Selector selector = null;

        Executor executor = Executors.newFixedThreadPool(50);

        LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>();

        public void register(SocketWrapper socketWrapper) {
            Event event = new Event();
            event.socketWrapper = socketWrapper;
            addEvent(event);
        }

        public void addEvent(Event event) {
            queue.add(event);
            selector.wakeup();
        }

        public void start() {
            try {
                selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            int keyCount = 0;
            while (true) {
                try {
                    keyCount = selector.select();
                    Event event = null;
                    while ((event = queue.poll()) != null) {
                        SocketWrapper socketWrapper = event.socketWrapper;
                        SocketChannel socketChannel = socketWrapper.socketChannel;
                        if (event.ops == event.REGISTER) {
                            socketChannel.register(selector, SelectionKey.OP_READ, socketWrapper);
                        } else if (socketWrapper.readBlocking) {
                            synchronized (socketWrapper.object) {
                                socketWrapper.readBlocking = false;
                                socketWrapper.object.notify();
                            }
                        } else {
                            SelectionKey selectionKey = socketChannel.keyFor(selector);
                            if (selectionKey == null) {
                                socketChannel.close();
                            } else {
                                SocketWrapper attachment = ((SocketWrapper) selectionKey.attachment());
                                if (attachment != null) {
                                    try {
                                        int ops = event.ops | selectionKey.interestOps();
                                        selectionKey.interestOps(ops);
                                    } catch (CancelledKeyException ckx) {
                                        selectionKey.cancel();
                                    }
                                } else {
                                    selectionKey.cancel();
                                }
                            }
                        }
                    }
                    if (keyCount > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        System.out.println(keyCount);
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            if (selectionKey.isReadable()) {
                                final SocketWrapper socketWrapper = (SocketWrapper) selectionKey.attachment();

                                selectionKey.interestOps(selectionKey.interestOps() & (~selectionKey.readyOps()));
                                executor.execute(() -> {
                                    Http11Processor http11Processor = new Http11Processor();
                                    http11Processor.process(socketWrapper);
                                });
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    class SocketWrapper {
        private SocketChannel socketChannel;
        private Poller poller;
        private final Object object = new Object();
        private volatile boolean readBlocking = false;
        private long timeout = TimeUnit.SECONDS.toMillis(20);


        private int read(boolean isBlock, ByteBuffer byteBuffer) {
            int len = 0;
            try {
                if (isBlock) {
                    long startNanos = 0;
                    do {
                        if (startNanos > 0) {
                            long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                            if (elapsedMillis == 0) {
                                elapsedMillis = 1;
                            }
                            timeout -= elapsedMillis;
                            if (timeout <= 0) {
                                throw new RuntimeException("读取超时");
                            }
                        }

                        len = socketChannel.read(byteBuffer);
                        if (len == -1) {
                            throw new RuntimeException("客户端断开连接");
                        } else if (len == 0) {
                            if (!readBlocking) {
                                readBlocking = true;
                                poller.register(this);
                            }
                            synchronized (object) {
                                if (readBlocking) {
                                    if (timeout > 0) {
                                        startNanos = System.nanoTime();
                                        object.wait(timeout);
                                    } else {
                                        object.wait();
                                    }
                                }
                            }
                        }
                    } while (len == 0);
                } else {
                    len = socketChannel.read(byteBuffer);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            return len;
        }

    }

    class Event {
        private final int REGISTER = 0x100;
        private int ops = REGISTER;
        private SocketWrapper socketWrapper;
    }

    class Http11Processor {
        void process(SocketWrapper socketWrapper) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);

            Request request = new Request();

            //解析请求行
            parseRequestLine(request, socketWrapper, byteBuffer);

            //解析请求头
            parseRequestHeaders(request, socketWrapper, byteBuffer);
            System.out.println(request.method);
            System.out.println(request.uri);
            System.out.println(request.protocol);
            System.out.println(request.headers);

            //写
            Response response = new Response();
            response.socketWrapper = socketWrapper;

            response.write(("<h1>hello world</h1>").getBytes(StandardCharsets.UTF_8));
        }

        private void parseRequestHeaders(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
            boolean isParseHeaders = false;
            StringBuilder stringBuilder = new StringBuilder();
            String headerName = null;
            while (!isParseHeaders) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                byte b = byteBuffer.get();
                if (headerName == null && b == ':') {
                    headerName = stringBuilder.toString();
                    stringBuilder = new StringBuilder();
                } else if (b == '\r') {
                } else if (b == '\n' && headerName == null) {
                    isParseHeaders = true;
                } else if (b == '\n') {
                    request.headers.put(headerName, stringBuilder.toString());
                    headerName = null;
                    stringBuilder = new StringBuilder();
                } else {
                    stringBuilder.append((char) b);
                }
            }
        }

        private void parseRequestLine(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
            //读 解析
            socketWrapper.read(true, byteBuffer);
            byteBuffer.flip();
            StringBuilder stringBuilder = new StringBuilder();
            boolean space = false;
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == ' ') {
                    request.method = stringBuilder.toString();
                    space = true;
                } else if (c == '\r') {
                } else if (c == '\n') {
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }
            space = false;
            stringBuilder = new StringBuilder();
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == ' ') {
                    request.uri = stringBuilder.toString();
                    space = true;
                } else if (c == '\r') {
                } else if (c == '\n') {
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }

            space = false;
            stringBuilder = new StringBuilder();
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == '\r') {
                } else if (c == '\n') {
                    request.protocol = stringBuilder.toString();
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }
        }
    }

    class Request {
        private String method;
        private String uri;
        private String protocol;
        private int contentLength;
        private final Map<String, Object> headers = new HashMap<>();

    }

    class Response {
        SocketWrapper socketWrapper = null;
        private final int maxHeaderSize = 1024 * 2;
        private ByteBuffer byteBuffer = null;

        public void write(byte[] bytes) {
            System.out.println("write enter");
            byteBuffer = ByteBuffer.allocate(maxHeaderSize);
            //写入响应行
            byteBuffer.put("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8));
            //写入响应头
            byteBuffer.put("Content-Type: text/html;charset=utf-8\r\n".getBytes(StandardCharsets.UTF_8));
            //content-length
            byteBuffer.put(("Content-Length: " + bytes.length + "\r\n").getBytes(StandardCharsets.UTF_8));

            //写入响应空行
            byteBuffer.put("\r\n".getBytes(StandardCharsets.UTF_8));
            //写入响应体
            try {
                byteBuffer.flip();
                SocketChannel socketChannel = socketWrapper.socketChannel;

                int write = socketChannel.write(byteBuffer);
                System.out.println("http header write:" + write);

                ByteBuffer picByteBuffer = ByteBuffer.wrap(bytes);
                while (picByteBuffer.hasRemaining()) {
                    int write1 = socketChannel.write(picByteBuffer);
                    System.out.println("picByteBuffer position:" + picByteBuffer.position() + " limit:" + picByteBuffer.limit() + " leave:" + (picByteBuffer.limit() - picByteBuffer.position()) + " http body write:" + write1 + " content-length:" + bytes.length);
                }


                Event event = new Event();
                event.ops = SelectionKey.OP_READ;
                event.socketWrapper = socketWrapper;
                socketWrapper.poller.addEvent(event);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/738741.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UE5《Electric Dreams》项目PCG技术解析 之 PCGCustomNodes详解(二)Look At

继续解析《Electric Dreams》项目中的自定义节点和子图 文章目录 前导文章Look AtExecute with ContextPoint Loop Body使用范例Get Actor Data节点的设置LookAt节点的设置Add节点的设置 小结 前导文章 《虚幻引擎程序化资源生成框架PCG 之 UPCGBlueprintElement源码笔记&…

阿里云轻量应用服务器使用教程(从0到1网站上线)

阿里云轻量应用服务器怎么使用&#xff1f;阿里云百科分享轻量应用服务器从选配、配置建站环境、轻量服务器应用服务器远程连接、开端口到网站上线全流程&#xff1a; 阿里云轻量应用服务器使用教程 轻量应用服务器很火爆因为成本足够低&#xff0c;阿里云2核2G3M带宽轻量服务…

第一批用ChatGPT坐牢的人,都玩的是哪些套路?

通过GPT在短时间内生成完整诈骗话术&#xff0c;套路啊套路 “虚拟角色”可以虚拟客服&#xff0c;还可以虚拟恋人玩杀猪盘 让受害人以为自己“坠入爱河” 套路还是从前的套路 但骗子用上了新的工具 又换上了很多马甲 防不胜防 你以为OpenAI不知道骗子会用这个工具来做坏…

数据库之MySQL数据操作练习

目录 练习内容 worker表要求 创建的表的表结构 表中的数据内容 对数据的操作 1.显示所有职工的基本信息 2.查询所有职工所属部门的部门号&#xff0c;不显示重复的部门号 3.求出所有职工的人数 4.列出最高工和最低工资 5.列出职工的平均工资和总工资 6.创建一个只有职…

深度学习之权重初始化

在深度学习中&#xff0c;神经网络的权重初始化方法( w e i g h t weight weight i n i t i a l i z a t i o n initialization initialization)对模型的收敛速度和性能有着至关重要的影响。说白了&#xff0c;神经网络其实就是对权重参数 w w w的不停迭代更新&#xff0c;以达…

基于Javaweb实现ATM机系统开发实战(五)新增用户功能实现

新增用户非常简单&#xff0c;前端拿到数据传递给后端然后往数据库里一存就完事了~ 首先我们看一下新增用户的页面&#xff1a;add.jsp&#xff0c; 可以看到提交的页面&#xff1a;insert 和方式post&#xff0c;但是少了密码的添加&#xff0c;所以我们手动给他加上&#xf…

阿里云——网站建设:部署与发布(知识点)

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前言 学习目标 1.建站&#xff1a; &#xff08;1&#xff09;建站基本步骤 2.域…

导入Excel数据【EasyPoi实战系列】- 第480篇

历史文章&#xff08;文章累计480&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 【…

Docker操作镜像相关命令

[rootdocker02 ~]# docker image ##查看docker命令 docker image search 搜索镜像, 优先选官方,stars数量多&#xff08;尽量不选择第三方镜像&#xff09; 第三方镜像不支持搜索&#xff0c;特点是特别长。 官方仓库地址&#xff1a;https://hub.docker.com/ 官方仓库 官方镜像…

v-for遍历数组或者对象,渲染结构、v-bind动态设置class和style

&#xff08;一&#xff09;v-for数组变化&#xff08;增加一项&#xff0c;删除一项&#xff09;, 会更新页面吗?那么数组改变后&#xff0c;是如何更新的呢&#xff1f; 1. v-for 作用&#xff1a;可以遍历数组或者对象&#xff0c;用于渲染结构。遍历数组语法&#xff1a…

robotframework之python扩展库编写

通过对robot自动化框架的使用&#xff0c;大大方便了我们做自动化测试。但很多时候官网提供的扩展库并不能满足我们的测试要求&#xff0c;此时就需要我们编写自己的扩展库。这里以ubuntu下使用python编写自定义扩展库举例说明。 最近笔者在做物联网方面的自动化测试&#xff…

FASTADMIN联合查询 搜索

控制器中添加relationSearch开关 控制器里面添加联合查询 MODEL里面添加 js里面添加即可 可以查看数据json 搜索开启searchFileds就可以了

❤️创意网页:生日快乐!穿越奇妙时光的温暖庆祝(HTML+CSS+JS)简单好用

✨博主&#xff1a;命运之光 &#x1f338;专栏&#xff1a;Python星辰秘典 &#x1f433;专栏&#xff1a;web开发&#xff08;简单好用又好看&#xff09; ❤️专栏&#xff1a;Java经典程序设计 ☀️博主的其他文章&#xff1a;点击进入博主的主页 前言&#xff1a;欢迎踏入…

【Linux】使用git 连接到远程gitee仓库

一.创建gitee仓库 登陆到自己的gitee&#xff0c;点击新建仓库 输入仓库名称&#xff0c;注意gitee初次创建仓库时只能设置成私有的 创建后的页面是这样的&#xff0c;复制HTTPS 二.git三板斧 1.安装git 打开xshell&#xff0c;输入命令&#xff1a; yum install git 可以输入…

SpringBoot中集成Flyway实现数据库sql版本管理入门以及遇到的那些坑

场景 Git/ SVN 是代码界的版本控制工具&#xff0c;那么&#xff0c;Flyway 就是一款数据库界的版本控制工具&#xff0c; 它可以记录数据库的变化记录。可能很多公司都是通过人工去维护、同步数据库脚本&#xff0c; 但经常会遇到疏忽而遗漏的情况&#xff0c;举个简单的例…

glob找不到文件而os.listdir可以

前几天遇到了一个问题&#xff0c;就是glob找不到文件而os.listdir可以 /mnt/data/datasets/abc[123]&#xff1a; ├── img_0001.jpg ├──img_0002.jpg代码 import os import glob path /mnt/data/datasets/abc[123] os.listdir(path) glob.glob(os.path.join(path, *.…

ES6: Proxy概念及用法

Proxy如其名&#xff0c; 它的作用是在对象和和对象的属性值之间设置一个代理&#xff0c;获取该对象的值或者设置该对象的值&#xff0c; 以及实例化等等多种操作&#xff0c; 都会被拦截住&#xff0c; 经过这一层我们可以统一处理&#xff0c;我们可以认为它就是“代理器” …

使用 @Autowired 为什么会被 IDEA 警告,应该怎么修改最佳?

# 问题原因 关于这个问题&#xff0c;其实答案相对统一&#xff0c;实际上用大白话说起来也容易理解。 1.初始化问题 先看一下Java初始化类的顺序&#xff1a;父类的静态字段 > 父类静态代码块 > 子类静态字段 > 子类静态代码块 > 父类成员变量 > 父类构造代码块…

前端学习——Web API (Day3)

事件流 事件流和两个阶段说明 事件捕获 事件冒泡 阻止冒泡 解绑事件 事件委托 案例 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" …

day1-搜索插入位置

搜索插入位置 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5 输出: …