Tomcat长轮询原理与源码解析

news2024/10/3 21:24:23

零丶长轮询的引入#

最近在看工作使用到的diamond配置中心原理,发现大多数配置中心在推和拉模型上做的选择出奇的一致选择了基于长轮询的拉模型

  • 基于拉模型的客户端轮询的方案
    客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性。

    • 优点:简单、可靠。
    • 缺点:应用增多时,较高的轮询频率给整个配置中心服务带来巨大的压力。

    另外,从配置中心的应用场景上来看,是一种写少读多的系统,客户端大多数轮询请求都是没有意义的,因此这种方案不够高效。

  • 基于推模型的客户端长轮询的方案

    基于Http长轮询模型,实现了让客户端在没有发生动态配置变更的时候减少轮询。这样减少了无意义的轮询请求量,提高了轮询的效率;也降低了系统负载,提升了整个系统的资源利用率。

一丶何为长轮询#

长轮询 本质上是原始轮询技术的一种更有效的形式。

它的出现是为了解决:向服务器发送重复请求会浪费资源,因为必须为每个新传入的请求建立连接,必须解析请求的 HTTP 头部,必须执行对新数据的查询,并且必须生成和交付响应(通常不提供新数据)然后必须关闭连接并清除所有资源。

  • 从tomcat服务器的角度就是客户端不停请求,每次都得解析报文封装成Request,Response对象,并且占用线程池中的一个线程。
  • 并且每次轮询都要进行tcp握手,挥手,网卡发起中断,操作系统处理中断从内核空间拷贝数据到用户空间,一通忙活服务端返回 配置未修改(配置中心没有修改配置,客户端缓存的配置和配置中心一致,所以是白忙活)

长轮询是一种服务器选择尽可能长的时间保持和客户端连接打开的技术仅在数据变得可用或达到超时阙值后才提供响应而不是在给到客户端的新数据可用之前,让每个客户端多次发起重复的请求

简而言之,就是服务端并不是立马写回响应,而是hold住一段时间,如果这段时间有数据需要写回(例如配置的修改,新配置需要写回)再写回,然后浏览器再发送一个新请求,从而实现及时性,节省网络开销的作用。

二丶使用等待唤醒机制写一个简单的“长轮询”(脱裤子放屁)#

package com.cuzzz.springbootlearn.longpull;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@RestController
@RequestMapping("long-pull")
public class MyController implements InitializingBean {

    /**
     * 处理任务的线程
     */
    private ThreadPoolExecutor processExecutor;
    /**
     * 等待唤醒的锁
     */
    private static final ReentrantLock lock = new ReentrantLock();
    /**
     * 当请求获取配置的时候,在此condition上等待一定时间
     * 当修改配置的时候通过这个condition 通知其他获取配置的线程
     */
    private static final Condition condition = lock.newCondition();

    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {
        //组转成任务
        Task<String> task = new Task<String>(request, response,
                () -> "拿配置" + System.currentTimeMillis());
        //提交到线程池
        Future<?> submit = processExecutor.submit(task);
        //tomcat线程阻塞于此
        submit.get();
    }

    /**
     * 模拟修改配置
     *
     * 唤醒其他获取配置的线程
     */
    @PostMapping
    public String post(HttpServletRequest request, HttpServletResponse response) {
        lock.lock();
        try {
            condition.signalAll();
        }finally {
            lock.unlock();
        }
        return "OK";
    }


    static class Task<T> implements Runnable {
        private HttpServletResponse response;
        /**
         * 等待时长
         */
        private final long timeout;
        private Callable<T> task;

        public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task) {
            this.response = response;

            String time = request.getHeader("time-out");
            if (time == null){
                //默认等待10秒
                this.timeout = 10;
            }else {
                this.timeout = Long.parseLong(time);
            }
            this.task = task;
        }


        @Override
        public void run() {
            lock.lock();
            try {

                //超市等待
                boolean await = condition.await(timeout, TimeUnit.SECONDS);
                //超时
                if (!await) {
                    throw new TimeoutException();
                }
                //获取配置
                T call = task.call();
                //写回
                ServletOutputStream outputStream = response.getOutputStream();
                outputStream.write(("没超时拿当前配置:" + call).getBytes(StandardCharsets.UTF_8));
            } catch (TimeoutException | InterruptedException exception) {
                //超时或者线程被中断
                try {
                    ServletOutputStream outputStream = response.getOutputStream();
                    T call = task.call();
                    outputStream.write(("超时or中断拿配置:" + call).getBytes(StandardCharsets.UTF_8));
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }


    @Override
    public void afterPropertiesSet() {

        int cpuNums = Runtime.getRuntime().availableProcessors();

        processExecutor
                = new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

使用get方法反问的请求回被提交到线程池进行await等待,使用post方法的请求回唤醒这些线程。

但是这个写法有点脱裤子放屁

为什么会出现这种情况,直接提交到线程池异步执行不可以么,加入我们删除上面submit.get方法会发现其实什么结果都不会,这是因为异步提交到线程池后,tomcat已经结束了这次请求,并没有维护这个连接,所以没有办法写回结果。

如果不删除这一行,tomcat线程阻塞住我们可以写回结果,但是其实没有达到配置使用长轮询的初衷——"解放tomcat线程,让配置中心服务端可以处理更多请求"。

所以我们现在陷入一个尴尬的境地,怎么解决昵?看下去

三丶Tomcat Servlet 3.0长轮询原理#

1.AsyncContext实现长轮询#

package com.cuzzz.springbootlearn.longpull;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@RestController
@RequestMapping("long-pull3")
public class MyController2 {

    private static final ScheduledExecutorService procesExecutor
            = Executors.newSingleThreadScheduledExecutor();
    /**
     * 记录配置改变的map
     */
    private static final ConcurrentHashMap<String, String> configCache
            = new ConcurrentHashMap<>();
    /**
     * 记录长轮询的任务
     */
    private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
            = new ConcurrentLinkedDeque<>();

    static {
        //每2秒看一下释放配置变更,或者任务超时
        procesExecutor.scheduleWithFixedDelay(() -> {
            List<AsyncTask>needRemove  = new ArrayList<>();
            for (AsyncTask asyncTask : interestQueue) {
                if (asyncTask.timeout()) {
                    asyncTask.run();
                    needRemove.add(asyncTask);
                    continue;
                }
                if (configCache.containsKey(asyncTask.configId)) {
                    needRemove.add(asyncTask);
                    asyncTask.run();
                }
            }
            interestQueue.removeAll(needRemove);
        }, 1, 2, TimeUnit.SECONDS);
    }


    static class AsyncTask implements Runnable {
        private final AsyncContext asyncContext;
        private final long timeout;
        private static long startTime;
        private String configId;

        AsyncTask(AsyncContext asyncContext) {
            this.asyncContext = asyncContext;
            HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
            String timeStr = request.getHeader("time-out");
            if (timeStr == null) {
                timeout = 10;
            } else {
                timeout = Long.parseLong(timeStr);
            }
        	//关注的配置key,应该getParameter的,无所谓
            this.configId = request.getHeader("config-id");
            if (this.configId == null) {
                this.configId = "default";
            }
            
            //开始时间
            startTime = System.currentTimeMillis();
        }
		
        //是否超时
        public boolean timeout() {
            return (System.currentTimeMillis() - startTime) / 1000 > timeout;
        }

        @Override
        public void run() {
		
            String result = "开始于" + System.currentTimeMillis() + "--";
            try {
                if (timeout()) {
                    result = "超时: " + result;
                } else {
                    result += configCache.get(this.configId);
                }

                result += "--结束于:" + System.currentTimeMillis();
                ServletResponse response = asyncContext.getResponse();
                response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
                
                //后续将交给tomcat线程池处理,将给客户端响应
                asyncContext.complete();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        }

    }


    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) {
        //打印处理的tomcate线程id
        System.out.println("线程id" + Thread.currentThread().getId());
        //添加一个获取配置的异步任务
        interestQueue.add(new AsyncTask(asyncContext));
        //开启异步
        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0);
        //监听器打印最后回调的tomcat线程id
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("线程id" + Thread.currentThread().getId());
            }
            //...剩余其他方法
        });
        
        //立马就会释放tomcat线程池资源
        System.out.println("tomcat主线程释放");
    }

    @PostMapping
    public void post(HttpServletRequest request) {
        String c = String.valueOf(request.getParameter("config-id"));
        if (c.equals("null")){
            c = "default";
        }
        String v = String.valueOf(request.getParameter("value"));
        configCache.put(c, v);
    }
}

上面演示利用AsyncContext tomcat是如何实现长轮询

这种方式的优势在于:解放了tomcat线程,其实tomcat的线程只是运行了get方法中的代码,然后立马可以去其他请求,真正获取配置更改的是我们的单线程定时2秒去轮询。

2.实现原理#

2.1 tomcat处理一个请求的流程#

  • Connector是客户端连接到Tomcat容器的服务点,它提供协议服务来将引擎与客户端各种协议隔离开来

    在Connector组件中创建了Http11NioProtocol组件,Http11NioProtocol默认持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且启动的时候会启动一个线程运行Acceptor

  • Acceptor服务器端监听客户端的连接,会启动线程一直执行

    每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。,每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。

  • Poller组件持有多路复用器selector,poller组件不停从自身的事件队列中将事件取出注册到自身的多路复用器上,同时多路复用器会不停的轮询检查是否有通道准备就绪,准备就绪的通道就可以扔给tomcat线程池处理了。

  • tomcat线程池处理请求

    • 这里会根据协议创建不同的Processor处理,这里创建的是Http11Processor,Http11Processor会使用CoyoteAdapter去解析报文随后交给Container去处理请求

    • CoyoteAdapter解析报文随后交给Container去处理请求

    • Container会将Filter和Servlet组装成FilterChain依次调用

    • FilterChain会依次调用Filter#doFilter,然后调用Servlet#service方法

      至此会调用到Servlete#service方法,SpringMVC中的Dispatcher会反射调用我们controller的方法

2.2 AsyncContext 如何实现异步#

2.2.1 request.startAsync() 修改异步状态机状态为Starting#

AsycContext内部持有一个AsyncStateMachine来管理异步请求的状态(有点状态模式的意思)

状态机的初始状态是AsyncState.DISPATCHED,通过setStarted将状态机的状态更新成STARTING

2.2.2 AbstractProtocol启动定时任务处理超时异步请求#

Connector启动的时候触发ProtocolHandler的start方法,如下

其中startAsyncTimeout方法会遍历waitingProcessors中每一个Processor的timeoutAsync方法,这里的Processor就是Http11Processor

那么waitProcessors中的Http11Processor是谁塞进去的昵?

tomcat线程在执行完我们的Servlet代码后,Http11NioProtocol会判断请求状态,如果为Long那么会塞到waitProcessors集合中。

如果发现请求超时,那么会调用Http11Processor#doTimeoutAsycn然后由封装的socket通道socketWrapper以TIMEOUT的事件类型重新提交到tomcat线程池中。

2.2.3 AsyncContext#complete触发OPEN_READ事件#

可以看到其实和超时一样,只不过超时是由定时任务线程轮询来判断,而AsyncContext#complete则是我们业务线程触发processSocketEvent将后续处理提交到tomcat线程池中。

四丶长轮询的优点和缺点#

本文学习了长轮询和tomcat长轮询的原理,可以看到这种方式的优点

  • 浏览器长轮询的过程中,请求并没有理解响应,而是等到超时或者有需要返回的数据(比如配置中心在这个超时事件内发送配置的变更)才返回,解决了短轮询频繁进行请求网络开销的问题,减少了读多写少业务情景下无意义请求。
  • 真是通过这种方式,减少了无意义的请求,而且释放了tomcat线程池中的线程,使得我们服务端可以支持更多的客户端(因为业务逻辑是放在其他的线程池执行的,而且对于配置中心来说,可以让多个客户端的长轮询请求由一个线程去处理,原本是一个请求一个tomcat线程处理,从而可以支持更多的请求)

当然这种方式也是有缺点的

  • hold住请求也是会消耗资源的,如果1w个请求同时到来,我们都需要hold住(封装成任务塞到队列)这写任务也是会占用内存的,而短轮询则会立马返回,从而时间资源的释放

  • 请求先后顺序无法保证,比如轮询第五个客户端的请求的时候,出现了配置的变更,这时候第五个请求会被提交到tomcat线程池中,从而早于前面四个请求得到响应,这对于需要严格有序的业务场景是有影响的

  • 多台实例监听配置中心实例,出现不一致的情况

    比如配置中心四台实例监听配置变更,前三台可能响应了得到V1的配置,但是轮询到第四台实例的请求的时候又发生了变更可能就得到了v2的配置,这时候这四台配置不一致了。需要保证这种一致性需要我们采取其他的策略,比如配置中心服务端主动udp推,或者加上版本号保证这四台配置一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/422036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win10家庭版 WSL Centos7

win10家庭版 WSL Centos7虚拟机WSLCentos7虚拟机 以前在win系统上跑liunx系统一般是先安装个虚拟机软件&#xff0c;再在虚拟机上运行对于linux的镜像。 直到后来win推出了WSL。 WSL Windows Subsystem for Linux&#xff08;简称WSL&#xff09;是一个在Windows 10\11上能够…

力扣sql中等篇练习(二)

力扣sql中等篇练习(二) 1 第N高的薪水 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 1.2 示例sql语句 -- 注意:limit函数是不能使用变量-1这种作为条件的,如N-1是不被允许的 CREATE FUNCTION getNthHighestSalary(N INT) RETURNS INT BEGINdeclare m INT;SET mN-1;…

大厂敲门砖!!!让你轻松扫除通往大厂的障碍,一步到岸!!2023年究极最全(Java面试手册)!!!

又到了一年一度的大厂招牌环节&#xff0c;多少程序员在为了金三银四在前几个月疯狂卷面试题&#xff0c;却还是惨遭淘汰&#xff0c;今天给大家整理了一份最最最核心的java面试题&#xff0c;希望能对大家在进击大厂的道路上添一份力&#xff01; 总纲&#xff1a; 由于文章篇…

Canal(2):Canal 部署与配置

1 mysql数据库配置 1.1新建数据库 1.2 新建测试表 CREATE TABLE user_info(id VARCHAR(255),name VARCHAR(255),sex VARCHAR(255) ); 1.3 修改配置文件开启 Binlog vim /etc/my.cnf server-id4 log-binmysql-bin binlog_formatrow binlog-do-dbcaneltestdb 1.4 重启MySQL使配…

Vue3 + TypeScript 实现自动打字、打字机动画、动画打字工具(TypeIt)

一、介绍 &#x1f475; &#x1f475; TypeIt是一个JavaScript库&#xff0c;用于创建简单而流畅的打字效果和动画效果。它可以用于网页开发中的很多场景&#xff0c;例如创建动态文字效果、制作页面过渡动画、增强用户体验等。 我们还可以利用它进行一些后端日志的回显&…

想让 ChatGPT 帮忙进行数据分析?你还需要做......

近年&#xff0c;火出圈的 ChatGPT 掀起了久违的人工智能的热潮&#xff0c;如何更好地让人工智能真正为企业所用&#xff0c;也成了近期的热门话题。大数据和人工智能两者相辅相成&#xff0c;人工智能的训练以大量数据作为基础&#xff0c;而数据的价值则需要人工智能的充分挖…

nacos和eureka的区别

nacos和eureka的区别 Eureka是什么 Eureka详解Nacos是什么 Nacos详解Nacos和Eureka的区别 CAP理论连接方式服务异常剔除操作实例方式自我保护机制 Eureka是什么 Eureka 是Spring Cloud 微服务框架默认的也是推荐的服务注册中心,由Netflix公司与2012将其开源出来,Eureka基于RE…

《剪花布条》:从花布条中尽可能剪出几块小饰条

目录 一、题目 二、思路 1、代码中要使用的String类中的方法 &#xff08;1&#xff09;判断 s 中是否有 t &#xff08;2&#xff09;将 s 分割 2、递归判断 三、代码 详细注释版本 简化注释版本 一、题目 题目&#xff1a;剪花布条 题目链接&#xf…

8、DRF实战总结:分页(Pagination)及DRF提供的分页类详解(附源码)

在前面的DRF系列教程中&#xff0c;以博客为例介绍了序列化器, 使用基于类的视图APIView和ModelViewSet开发了针对文章资源进行增删查改的完整API接口&#xff0c;并详细对权限和认证(含jwt认证)进行了总结与演示。在本篇文章中将向演示如何在Django REST Framework中使用分页。…

【MySQL】进阶查询-聚合查询和联合查询

文章目录1. 前言2. 表的设计2.1 一对一2.2 一对多2.3 多对多3.将查询结果放到另一个表中4. 聚合查询4.1 聚合函数4.2 GROUP BY4.3 HAVING5. 联合查询(多表查询)5.1 内连接5.2 外连接5.3 自连接5.4 子查询5.5 合并查询6. 总结1. 前言 文章主要围绕着以下三个问题: group by的作…

zookeeper介绍和搭建

zookeeper介绍和搭建一、zookeeper简介二、zookeeper工作机制三、zookeeper集群部署3.1 实验准备3.2 修改配置文件3.3 创建目录以及拷贝文件3.4 配置Zookeeper启动脚本3.5 查看启动状态一、zookeeper简介 1、zookeeper概念 ZooKeeper是一种为分布式应用所设计的高可用、高性能…

尚融宝20-实现用户注册和用户认证

目录 一、需求 二、前端整合发送验证码 三、实现用户注册 1、创建VO对象 2、定义常量 3、引入MD5工具类 4、Controller 5、Service 6、前端整合 四、实现用户登录 1、后端整合JWT 2、前端整合 五、校验用户登录 1、后端 2、前端 一、需求 二、前端整合发送验证码…

数据结构———一万字手撕八大排序算法

常见的排序算法1.排序算法的作用1.1列如我们在购物时1.2玩游戏时英雄战力的排行&#xff0c;都得用到排序算法2.常见排序算法的实现2.1冒泡排序时间复杂度计算&#xff1a;2.2直接插入排序时间复杂度计算&#xff1a;2.3选择排序时间复杂度计算&#xff1a;2.4希尔排序⭐时间复…

Qt音视频开发28-ffmpeg解码本地摄像头(yuv422转yuv420)

一、前言 一开始用ffmpeg做的是视频流的解析,后面增加了本地视频文件的支持,到后面发现ffmpeg也是支持本地摄像头设备的,只要是原则上打通的比如win系统上相机程序、linux上茄子程序可以正常打开就表示打通,整个解码显示过程完全一样,就是打开的时候要传入设备信息,而且…

Prophet学习(五)季节性、假日效应和回归因子

​ 编辑目录 假期和特殊事件建模&#xff08;Modeling Holidays and Special Events&#xff09; 内置国家假日&#xff08;Built-in Country Holidays&#xff09; 季节性的傅里叶级数&#xff08;Fourier Order for Seasonalities&#xff09; 指定自定义季节&#xff08…

启动neo4j备忘录

做个备忘录 Neo4j下载、安装、环境配置 优秀教程&#xff1a;https://blog.csdn.net/zeroheitao/article/details/122925845 Neo4j环境变量配置 1、安装JDK 由于Neo4j是基于Java的图形数据库&#xff0c;运行Neo4j需要启动JVM进程&#xff0c;因此必须安装JAVA SE的JDK。配置…

【数据结构】反射

文章目录&#x1f337; 1 定义&#x1f337; 2 用途(了解)&#x1f337; 3 反射基本信息&#x1f337; 4 反射相关的类&#x1f333; 4.1 Class类(反射机制的起源)Class类中的相关方法&#x1f333; 4.2 反射示例4.2.1 获得Class对象的三种方式4.2.2 反射的使用⭐️反射使用1&a…

3个宝藏级软件,每一个都超级好用,少装一个跟你急

分区助手 下载地址&#xff1a;https://www.disktool.cn/ 很多新手小白使用电脑都不懂&#xff0c;把所有软件都安装到了C盘&#xff0c;时间久了储存的东西变多&#xff0c;C盘空间着实不够用&#xff0c;这个免费的工具可以帮你重新分区&#xff0c;无损数据地执行。除了无损…

腾讯云8核16G18M轻量服务器CPU带宽流量性能测评

腾讯云轻量应用服务器8核16G18M带宽&#xff0c;18M公网带宽下载速度峰值可达2304KB/秒&#xff0c;相当于2.25M/s&#xff0c;系统盘为270GB SSD盘&#xff0c;3500GB月流量&#xff0c;折合每天116GB流量。腾讯云百科分享腾讯云轻量服务器8核16G18M配置、CPU型号、公网带宽月…

ChatGPT资讯—2023.4.3

一、 最新资讯 1. UC伯克利开源大语言模型Vicuna又来了 Vicuna-13b只需要花费300美刀&#xff08;比Alpaca的600美元便宜一半&#xff09;就能搞出来接近ChatGPT的水平。如何用小资源大模型让个人普通者与中小微企业也能用上高科技一直是开源社区孜孜追求的目标 Vicuna开源代…