StarRocks 中如何做到查询超时(QueryTimeout)

news2024/10/2 10:24:38

背景

本文基于 StarRocks 3.1.7
主要是分析以下两种超时设置的方式:

  • SESSION 级别
    SET query_timeout = 10;
    SELECT sleep(20);
  • SQL 级别
  select /*+ SET_VAR(query_timeout=10) */ sleep(20); 

通过本文的分析大致可以了解到在Starrocks的FE端是如何进行Command的交互以及数据流走向,其他的命令也是可以举一反三

分析

query_timeout 命令解析

和Spark以及hive等但是解析一样,StarRocks也是采用的Anltr4进行语法的解析,
对于StarRocks来说, 对应的语法解析文件为 StarRocks.g4文件,那么其set query_time在如下的位置

setStatement
    : SET setVar (',' setVar)*
    ;

setVar
    : (CHAR SET | CHARSET | CHARACTER SET) (identifierOrString | DEFAULT)                       #setNames
    | NAMES (charset = identifierOrString | DEFAULT)
        (COLLATE (collate = identifierOrString | DEFAULT))?                                     #setNames
    | PASSWORD '=' (string | PASSWORD '(' string ')')                                           #setPassword
    | PASSWORD FOR user '=' (string | PASSWORD '(' string ')')                                  #setPassword
    | userVariable '=' expression                                                               #setUserVar
    | varType? identifier '=' setExprOrDefault                                                  #setSystemVar
    | systemVariable '=' setExprOrDefault                                                       #setSystemVar
    | varType? TRANSACTION transaction_characteristics                                          #setTransaction
    ;

继而可以找到对应的语法解析部分为 AstBuilder.java 中

 @Override
    public ParseNode visitSetSystemVar(StarRocksParser.SetSystemVarContext context) {
        NodePosition pos = createPos(context);
        if (context.systemVariable() != null) {
            VariableExpr variableDesc = (VariableExpr) visit(context.systemVariable());
            Expr expr = (Expr) visit(context.setExprOrDefault());
            return new SystemVariable(variableDesc.getSetType(), variableDesc.getName(), expr, pos);
        } else {
            Expr expr = (Expr) visit(context.setExprOrDefault());
            String variable = ((Identifier) visit(context.identifier())).getValue();
            if (context.varType() != null) {
                return new SystemVariable(getVariableType(context.varType()), variable, expr, pos);
            } else {
                return new SystemVariable(SetType.SESSION, variable, expr, pos);
            }
        }
    }

从以上所示,SET query_timeout = 10; 就会在语法层面解析为 new SystemVariable(SetType.SESSION, variable, expr, pos)

数据流向

以上只是说到了 SET query_timeout = 10 只会被解析为SystemVariable对应的java数据结构,但是一条SQL从客户端发送过来,是怎么一个数据流呢?
我们大概的捋一下:

StarRocksFE中新建QeService对象
   ||
   \/
 new NMysqlServer(port, scheduler, sslContext)
   ||
   \/
 new AcceptListener(connectScheduler, sslContext)
   ||
   \/
 AcceptListener.handleEvent
   ||
   \/
 context.startAcceptQuery(processor)
   ||
   \/
 NMysqlChannel.startAcceptQuery
   ||
   \/
 conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor))
   ||
   \/
 ReadListener.handleEvent
   ||
   \/
 connectProcessor.processOnce()
   ||
   \/
 connectProcessor.dispatch
   ||
   \/
 connectProcessor.handleQuery
   ||
   \/
 stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
   ||
   \/
 StmtExecutor.execute()
   ||
   \/
 StatementPlanner.plan(parsedStmt, context)
   ||
   \/
 StmtExecutor.handleSetStmt()
   ||
   \/
 SetExecutor.execute // 会设置到变量的keyValue到`ConnectContext`的`SystemVariable`变量中,后续会或获取对应的值

query_timeout 怎么生效

还是定位到StarRocksFE.java中:

ExecuteEnv.setup();

该方法是整个执行环境的基础设置。其中会有ConnectScheduler的初始化:

public ConnectScheduler(int maxConnections) {
        this.maxConnections = new AtomicInteger(maxConnections);
        numberConnection = new AtomicInteger(0);
        nextConnectionId = new AtomicInteger(0);
        // Use a thread to check whether connection is timeout. Because
        // 1. If use a scheduler, the task maybe a huge number when query is messy.
        //    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
        // 2. Use a thread to poll maybe lose some accurate, but is enough to us.
        ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,
                "Connect-Scheduler-Check-Timer", true);
        checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);
    }

这里有个定时线程池去进行timeout的检查,间隔是一秒。具体的检查机制在TimeoutChecker类中:

private class TimeoutChecker extends TimerTask {
        @Override
        public void run() {
            try {
                long now = System.currentTimeMillis();
                synchronized (ConnectScheduler.this) {
                    //Because unregisterConnection will be callback in NMysqlChannel's close,
                    //unregisterConnection will remove connectionMap (in the same thread)
                    //This will result in a concurrentModifyException.
                    //So here we copied the connectionIds to avoid removing iterator during operate iterator
                    ArrayList<Long> connectionIds = new ArrayList<>(connectionMap.keySet());
                    for (Long connectId : connectionIds) {
                        ConnectContext connectContext = connectionMap.get(connectId);
                        connectContext.checkTimeout(now);
                    }
                }
            } catch (Throwable e) {
                //Catch Exception to avoid thread exit
                LOG.warn("Timeout checker exception, Internal error : " + e.getMessage());
            }
        }
    }

主要逻辑就是从connectionMap中获取对应的ConnectContext,从而调用ConnectContext.checkTimeout方法检查是否超时。
checkTimeout方法主要是通过sessionVariable.getQueryTimeoutS()获取设置的超时时间,如果超时,则调用StmtExecutor.cancel,继而调用Coordinator.cancel
所以现在就存在一个问题: 当前连接的ConnectContext什么时候被集成到 connectionMap中去的?
还是回到流程 AcceptListener.handleEvent 中去:

    connectScheduler.submit(context);
    ...
    if (connectScheduler.registerConnection(context)) {
            MysqlProto.sendResponsePacket(context);
            connection.setCloseListener(
                    streamConnection -> connectScheduler.unregisterConnection(context));
    } else {
    ...

这里的submit 方法会生成context的conectionId.
registerConnection方法会把当前 ConnectionContext 的id和 ConnectionContext 组成KeyValue对并放置到connectionMap

至此 SET query_timeout = 10 整体的数据流就结束了,待在同一个连接中进行select 操作的时候,就会根据执行的长短进行超时处理了。

注意:
对于 select /*+ SET_VAR(query_timeout=10) */ sleep(20); 这种情况的解析,是通过 HintCollector来解析的。
词法解析是在StarRocksLex.g4 中,

OPTIMIZER_HINT
    : '/*+' .*? '*/' -> channel(2)
    ;

对于获取hint是通过HintCollectorextractHintToRight获取的:

 private void extractHintToRight(ParserRuleContext ctx) {
        Token semi = ctx.start;
        int i = semi.getTokenIndex();
        List<Token> hintTokens = tokenStream.getHiddenTokensToRight(i, HINT_CHANNEL);
        if (hintTokens != null) {
            contextWithTokenMap.computeIfAbsent(ctx, e -> new ArrayList<>()).addAll(hintTokens);
        }
    }

对应的解析在:SqlParser.parseWithStarRocksDialect 方法中:

  HintCollector collector = new HintCollector((CommonTokenStream) parser.getTokenStream());
            collector.collect(singleStatementContexts.get(idx));

  AstBuilder astBuilder = new AstBuilder(sessionVariable.getSqlMode(), collector.getContextWithHintMap());

AstBuilder 中会存储 hint到 hintMap 变量中,而在 visitQuerySpecification方法中

        selectList.setOptHints(extractVarHints(hintMap.get(context)));

从而在StmtExecutor.execute中会调用 optHints = selectRelation.getSelectList().getOptHints();获取对应的hint,

 if (isQuery &&
          ((QueryStatement) parsedStmt).getQueryRelation() instanceof SelectRelation) {
      SelectRelation selectRelation = (SelectRelation) ((QueryStatement) parsedStmt).getQueryRelation();
      optHints = selectRelation.getSelectList().getOptHints();
  }
  if (optHints != null) {
      LOG.error("optHints: parsedStmt:" + parsedStmt.getOrigStmt() +"  "+ optHints.size());
      });
      SessionVariable sessionVariable = (SessionVariable) sessionVariableBackup.clone();
      for (String key : optHints.keySet()) {
          VariableMgr.setSystemVariable(sessionVariable,
                  new SystemVariable(key, new StringLiteral(optHints.get(key))), true);
      }
      context.setSessionVariable(sessionVariable);

这样 hint相关的变量就设置到ConnectContextSessionVariable中了,后续的流程和之前的一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2184210.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu22.04之mpv播放器高频快捷键(二百七十)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

两个向量所在平面的法线,外积,叉积,行列式

偶尔在一个数学题里面看到求两向量所在平面的法线&#xff0c;常规方法可以通过法线与两向量垂直这一特点&#xff0c;列两个方程求解&#xff1b;另外一种方法可以通过求解两个向量的叉积&#xff0c;用矩阵行列式 (determinant) 的方式&#xff0c;之前还没见过&#xff0c;在…

正则表达式调试工具实战

正则表达式调试工具实战 1、新建工程QWidget工程工程名RegexTool 如果QT不会配置,请参考我的博客,QT配置 Widget.cpp 默认内容如下 2、主界面设计 三行两列,每行采用HBoxLayout作为行布局控件,内部一个Lable控件和一个TextEdit控件,采用VBoxLayout 控件包裹三个HBoxLa…

JWT | JWT 漏洞介绍

关注这个漏洞的其他相关笔记&#xff1a;JWT 漏洞 - 学习手册-CSDN博客 0x01&#xff1a;JWT 漏洞介绍 JWT&#xff08;Json Web Token&#xff09;是用于在网络应用环境间传递声明的一种基于 JSON 的开放标准。它通过使用 JSON 对象进行安全的信息传输&#xff0c;可以用于身…

ChatGPT实时语音将于本周向免费用户推出:OpenAI DevDay 2024详细解读

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…

激活函数ReLU,Sigmoid,tanh,softmax性质讲解及使用matplotlib绘制

Sigmoid及tanh sigmoid及tanh的函数图像较为相似,它们的公式如下 Sigmoid公式 tanh公式 Sigmoid及tanh性质解析 Sigmoid和tanh作为激活函数都可以有效完成非线性映射的功效,其中Sigmoid经常作为2分类的神经网络的输出层的激活函数,由于其非线性映射会将输出值转换到0-1的区间…

心觉:潜意识开发为何失败?99%的人卡在理性与感性学习的误区

Hi&#xff0c;我是心觉&#xff0c;与你一起玩转潜意识、脑波音乐和吸引力法则&#xff0c;轻松掌控自己的人生&#xff01; 挑战每日一省写作187/1000天 小时候看武侠剧的时候&#xff0c;经常看到剧中有武林高手把绝世武功传给某个涉世未深的铁憨憨主角&#xff0c;比如《天…

四、Drf认证组件

四、Drf认证组件 4.1 快速使用 from django.shortcuts import render,HttpResponse from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.authentication import BaseAuthentication from rest_framework.exception…

win11 升级报 0x80073713 错误

安装错误 - 0x80073713 通常是由于系统文件损坏或 Windows Update 组件异常引起的。‌ 这个问题可能阻止您的系统正常接收和安装更新&#xff0c;影响系统的稳定性和安全性。 可以尝试如下如下方法&#xff1a; 首先&#xff0c;您可以尝试使用命令提示符运行系统文件检查器…

资源《Arduino 扩展板2-矩阵按钮 》说明。

资源链接&#xff1a; Arduino 扩展板2-矩阵按钮 1.文件明细 2.文件内容说明 包含&#xff1a;AD工程、原理图、PCB。 3.内容展示 4.简述 该文件为PCB工程&#xff0c;采用AD做的。 该文件打板后配合Arduino使用&#xff0c;属于Arduino的扩展板。 该文件主要有16个按钮…

wgan的实现的伪代码,和原理,dcgan,模型坍塌 em距离 js kl散

原文地址 https://blog.csdn.net/Keep_Trying_Go/article/details/130471766 伪代码详细看这个&#xff0c; 特别注意点&#xff1a;gan(xnosize) 生成器 是 输入噪声 &#xff0c;而不是全部噪声生成的 特别注意点&#xff1a;gan(xnosize) 生成器 是 输入噪声 &#…

vue3使用Teleport 控制台报警告:Invalid Teleport target on mount: null (object)

Failed to locate Teleport target with selector “.demon”. Note the target element must exist before the component is mounted - i.e. the target cannot be rendered by the component itself, and ideally should be outside of the entire Vue component tree main.…

基于SSM+小程序的选课管理系统2(源码+sql脚本+视频导入教程+文档)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 ​ 教师的账号和密码可以注册&#xff0c;管理员的账号和密码可由系统开发者在数据库中设置&#xff0c;学生的账号和密码可以由注册获得。 1、管理员的主要功能为学生资料、教师资料的添…

如何升级OCAT

如何升级OCAT 一. 从官网下载OCAT 官网下载地址链接&#xff1a;https://github.com/ic005k/OCAuxiliaryTools/releases 目前下载下来版本为0.8.8&#xff0c;但RapidEFI显示最新版本已经为1.0.1。 为了防止OC版本过低导致进不了系统&#xff0c;需要对OCAT进行升级。![在这里…

.NET 一款支持冰蝎的免杀WebShell

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

探索 PixiJS:强大的 2D 图形渲染库

探索 PixiJS&#xff1a;强大的 2D 图形渲染库 演示地址 演示地址 源码地址 源码地址 获取更多 获取更多 随着 Web 技术的发展&#xff0c;越来越多的开发者希望在网页中实现丰富的视觉效果和动画。PixiJS 作为一个高性能的 2D 渲染库&#xff0c;凭借其强大的功能和易用性…

《NoSQL》非关系型数据库MongoDB 学习笔记!

Mongo基础&#xff1a; 使用数据库&#xff1a; 使用use 命令 后面跟着要使用的数据库名字即可&#xff0c; 例如&#xff1a;use cities, 值得注意的是&#xff0c; mongo中不像mysql&#xff0c; 还需要先创建数据库&#xff0c;后访问&#xff0c; mongo中&#xff0c;你无…

媒介坊:在数字化时代,企业如何在竞争激烈的市场中脱颖而出

在当今的数字化时代&#xff0c;企业如何在竞争激烈的市场中脱颖而出&#xff0c;成为消费者关注的焦点&#xff1f;软文投放作为一种高效的营销手段&#xff0c;正受到越来越多企业的青睐。而媒介坊&#xff0c;作为一站式软文投放平台&#xff0c;正是帮助企业实现这一目标的…

Android Camera2 与 Camera API技术探究和RAW数据采集

Android Camera2 Android Camera2 是 Android 系统中用于相机操作的一套高级应用程序接口&#xff08;API&#xff09;&#xff0c;它取代了之前的 Camera API。以下是关于 Android Camera2 的一些主要信息&#xff1a; 主要特点&#xff1a; 强大的控制能力&#xff1a;提供…

JavaWeb——Vue组件库Element(4/6):案例:基本页面布局(基本框架、页面布局、CSS样式、完善布局、效果展示,含完整代码)

目录 步骤 基本页面布局 基本框架 页面布局 CSS样式 完善布局 效果展示 完整代码 Element 的基本使用方式以及常见的组件已经了解完了&#xff0c;接下来要完成一个案例&#xff0c;通过这个案例让大家知道如何基于 Element 中的各个组件制作一个完整的页面。 案例&am…