Doris数据库FE——SQL 接收

news2024/12/23 19:08:45

在这里插入图片描述

SQL 接收

首先看定义在fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java文件中的public class QeService类,该类is the encapsulation of the entire front-end service, including the creation of services that support the MySQL protocol是整个前端服务的封装。其中包含两个最重要的类MysqlServer和ConnectScheduler,构造MysqlServer实例需要port端口和ConnectScheduler连接调度器。然后提供start成员函数,其主要是调用mysqlServer.start()函数。

public class QeService {
    private static final Logger LOG = LogManager.getLogger(QeService.class);

    private int port; private MysqlServer mysqlServer; // MySQL protocol service
    public QeService(int port, ConnectScheduler scheduler) {
        this.port = port; this.mysqlServer = new MysqlServer(port, scheduler);
    }

    public void start() throws Exception {  // Set up help module
        try { HelpModule.getInstance().setUpModule(HelpModule.HELP_ZIP_FILE_NAME);
        } catch (Exception e) { LOG.warn("Help module failed, because:", e); throw e; }

        if (!mysqlServer.start()) { LOG.error("mysql server start failed"); System.exit(-1);  }
        LOG.info("QE service start.");
    }
}

Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。其中最核心的类就是定义在fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java文件中的public class MysqlServer(mysql protocol implementation based on nio)。由于其是基于nio的,所以可以有private XnioWorker xnioWorkerprivate ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool( Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true)线程池成员。而acceptListener即是当新连接来时提供调用的handler回调函数。AcceptListener类(public class AcceptListener implements ChannelListener<AcceptingChannel<StreamConnection>>)为listener for accept mysql connections,其最重要的成员函数就是public void handleEvent(AcceptingChannel<StreamConnection> channel)

    public MysqlServer(int port, ConnectScheduler connectScheduler) {
        this.port = port;
        this.xnioWorker = Xnio.getInstance().createWorkerBuilder().setWorkerName("doris-mysql-nio").setWorkerIoThreads(Config.mysql_service_io_threads_num).setExternalExecutorService(taskService).build();        
        this.acceptListener = new AcceptListener(connectScheduler); // connectScheduler only used for idle check.
    }

handleEvent函数首先accept流连接,然后创建ConnectContext对象,通过调用connectScheduler.submit(context)提交到连接调度器中,启动worker执行connectScheduler.registerConnection(context)注册ConnectContext对象,然后创建ConnectProcessor类ConnectProcessor processor = new ConnectProcessor(context),最后执行context.startAcceptQuery(processor),流程交到ConnectProcessor线程中。
在这里插入图片描述
MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。
MysqlServer start函数,调用createStreamConnectionServer创建服务端,调用server.resumeAccepts进入服务状态。

    // start MySQL protocol service. return true if success, otherwise false
    public boolean start() {
        try {
            if (FrontendOptions.isBindIPV6()) {
                server = xnioWorker.createStreamConnectionServer(new InetSocketAddress("::0", port), acceptListener, OptionMap.create(Options.TCP_NODELAY, true, Options.BACKLOG, Config.mysql_nio_backlog_num));
            } else {
                server = xnioWorker.createStreamConnectionServer(new InetSocketAddress(port), acceptListener, OptionMap.create(Options.TCP_NODELAY, true, Options.BACKLOG, Config.mysql_nio_backlog_num));
            }
            server.resumeAccepts();
            running = true;
            LOG.info("Open mysql server success on {}", port);
            return true;
        } catch (IOException e) {
            LOG.warn("Open MySQL network service failed.", e);
            return false;
        }
    }

ConnectScheduler为The scheduler of query requests(Now the strategy is simple, we allocate a thread for it when a request comes)。作为连接调度器,必然会维护连接数,这里的private final int maxConnections; private final AtomicInteger numberConnection; private final AtomicInteger nextConnectionId;就是该用途。由于每个连接请求都被封装成一个 ConnectContext 对象,因此在连接调度器维护了map private final Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap()用于保存ConnectionId和ConnectContext 对象的映射关系,private final Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap()则是用于维护用户与连接数量的映射关系。

    // submit one MysqlContext to this scheduler. return true, if this connection has been successfully submitted, otherwise return false. Caller should close ConnectContext if return false.
    public boolean submit(ConnectContext context) {
        if (context == null) { return false; }
        context.setConnectionId(nextConnectionId.getAndAdd(1));
        return true;
    }
    // Register one connection with its connection id.
    public boolean registerConnection(ConnectContext ctx) {
        if (numberConnection.incrementAndGet() > maxConnections) {  numberConnection.decrementAndGet(); return false;  }
        // Check user
        connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
        AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
        if (conns.incrementAndGet() > ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
            conns.decrementAndGet(); numberConnection.decrementAndGet(); return false;
        }
        connectionMap.put(ctx.getConnectionId(), ctx);
        return true;
    }

ConnectContext When one client connect in, we create a connect context for it. We store session information here. Meanwhile ConnectScheduler all connect with its connection id. 上述流程中调用了startAcceptQuery函数,该函数只有一行代码,即执行mysqlChannel.startAcceptQuery(this, connectProcessor)。如下所示,即注册ReadListener。而ReadListener类最重要的函数就是handleEvent,其中最重要的就是获取worker执行函数体 connectProcessor.processOnce()

    public void startAcceptQuery(ConnectContext connectContext, ConnectProcessor connectProcessor) {
        conn.getSourceChannel().setReadListener(new ReadListener(connectContext, connectProcessor));
        conn.getSourceChannel().resumeReads();
    }
    
public class ReadListener implements ChannelListener<ConduitStreamSourceChannel> {
    @Override
    public void handleEvent(ConduitStreamSourceChannel channel) {
        // suspend must be call sync in current thread (the IO-Thread notify the read event), otherwise multi handler(task thread) would be waked up by once query.
        XnioIoThread.requireCurrentThread();  ctx.suspendAcceptQuery();
        // start async query handle in task thread.
        channel.getWorker().execute(() -> {
            ctx.setThreadLocalInfo();
            try {
                connectProcessor.processOnce();
                if (!ctx.isKilled()) { ctx.resumeAcceptQuery();
                } else { ctx.stopAcceptQuery(); ctx.cleanup();
                }
            } catch (Exception e) {
                LOG.warn("Exception happened in one session(" + ctx + ").", e); ctx.setKilled(); ctx.cleanup();
            } finally {
                ConnectContext.remove();
            }
        });
    }

ConnectProcessor类用于Process one mysql connection, receive one packet, process, send one packet。其最重要的人口函数即是processOnce()。从channel中读取数据包,调用dispatch命令分发该数据包到不同的命令执行函数。

    // Process a MySQL request
    public void processOnce() throws IOException {      
        ctx.getState().reset();  executor = null; // set status of query to OK.
        // reset sequence id of MySQL protocol
        final MysqlChannel channel = ctx.getMysqlChannel(); channel.setSequenceId(0);        
        try {
            packetBuf = channel.fetchOnePacket(); // read packet from channel
            if (packetBuf == null) {
                LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
                throw new IOException("Error happened when receiving packet.");
            }
        } catch (AsynchronousCloseException e) {
            // when this happened, timeout checker close this channel killed flag in ctx has been already set, just return
            return;
        }     
        dispatch(); // dispatch        
        finalizeCommand(); // finalize

        ctx.setCommand(MysqlCommand.COM_SLEEP);
    }

在这里插入图片描述
finalizeCommand函数在请求执行结束后,用于向客户端反馈应答数据包。

    // When any request is completed, it will generally need to send a response packet to the client
    // This method is used to send a response packet to the client
    private void finalizeCommand() throws IOException {
        ByteBuffer packet;
        if (executor != null && executor.isForwardToMaster() && ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
            ShowResultSet resultSet = executor.getShowResultSet();
            if (resultSet == null) { packet = executor.getOutputPacket();
            } else {
                executor.sendResultSet(resultSet);  packet = getResultPacket();
                if (packet == null) { LOG.debug("packet == null"); return; }
            }
        } else {
            packet = getResultPacket();
            if (packet == null) { LOG.debug("packet == null"); return;
            }
        }

        MysqlChannel channel = ctx.getMysqlChannel();
        channel.sendAndFlush(packet);
        // note(wb) we should write profile after return result to mysql client
        // because write profile maybe take too much time
        // explain query stmt do not have profile
        if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain()
                && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
                || executor.getParsedStmt() instanceof LogicalPlanAdapter
                || executor.getParsedStmt() instanceof InsertStmt)) {
            executor.updateProfile(true);
            StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator();
            if (statsErrorEstimator != null) {
                statsErrorEstimator.updateProfile(ConnectContext.get().queryId());
            }
        }
    }

本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。
https://www.modb.pro/db/415009

Doris SQL解析具体包括了五个步骤:词法分析、语法分析、生成单机逻辑计划,生成分布式逻辑计划、生成物理计划。具体代码实现上包含以下五个步骤:Parse、Analyze、SinglePlan、DistributedPlan、Schedule。
在这里插入图片描述

Parse阶段

词法分析采用jflex技术,语法分析采用java cup parser技术,最后生成抽象语法树(Abstract Syntax Tree)AST,这些都是现有的、成熟的技术,在这里不进行详细介绍。

AST是一种树状结构,代表着一条SQL。不同类型的查询select, insert, show, set, alter table, create table等经过Parse阶段后生成不同的数据结构(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),但他们都继承自Statement,并根据自己的语法规则进行一些特定的处理。例如:对于select类型的sql, Parse之后生成了SelectStmt结构。

SelectStmt结构包含了SelectList,FromClause,WhereClause,GroupByClause,SortInfo等结构。这些结构又包含了更基础的一些数据结构,如WhereClause包含了BetweenPredicate(between表达式), BinaryPredicate(二元表达式), CompoundPredicate(and or组合表达式), InPredicate(in表达式)等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1018588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

89 # express 构建 layer 和 route 的关系

上一节实现了实现应用和路由的分离&#xff0c;这一节来构建 layer 和 route 的关系 先看个例子如下&#xff1a;路由中间件&#xff0c;将处理的逻辑拆分成一个个的模块 const express require("express"); const app express();app.get("/",(req, re…

大语言模型之十-Byte Pair Encoding

Tokenizer 诸如GPT-3/4以及LlaMA/LlaMA2大语言模型都采用了token的作为模型的输入输出&#xff0c;其输入是文本&#xff0c;然后将文本转为token&#xff08;正整数&#xff09;&#xff0c;然后从一串token&#xff08;对应于文本&#xff09;预测下一个token。 进入OpenAI官…

六、数学建模之插值与拟合

1.概念 2.例题和matlab代码求解 一、概念 1.插值 &#xff08;1&#xff09;定义&#xff1a;插值是数学和统计学中的一种技术&#xff0c;用于估算在已知数据点之间的未知数据点的值。插值的目标是通过已知数据点之间的某种函数或方法来估计中间位置的数值。插值通常用于数…

服务器管理

腾讯云服务器相关管理 linux下安装python3 linux自带2.x&#xff0c;有时候需要2.x执行一些工具&#xff0c;开发的时候又想用p3&#xff0c;就需要同时装python2和python3 依次执行以下命令 ssh xxxxx.xx.xx.xx #进入linux服务器 su #输入密码&#xff0c;如果不知道管理员…

基于讯飞人脸算法(调用API进行人脸比对)

先看结果 必须遥遥领先 所需准备 这里我调用了&#xff1a; 人脸比对 API 文档 | 讯飞开放平台文档中心https://www.xfyun.cn/doc/face/xffaceComparisonRecg/API.html#%E6%8E%A5%E5%8F%A3%E8%AF%B4%E6%98%8E 代码里所涉及的APPID、APISecret、APIKey 皆从讯飞的控制台获取&…

ARM Linux DIY(十三)Qt5 移植

前言 板子带有屏幕&#xff0c;那当然要设计一下 GUI&#xff0c;对 Qt5 比较熟悉&#xff0c;那就移植它吧。 移植 Qt5 buildroot 使能 Qt5&#xff0c;这里我们只开启核心功能 gui module --> widgets module 编译 $ make ODIY_V3S/ qt5base编译报错&#xff1a;找不…

旅游门户/旅行社网站-pc+移动端+可小程序+app强大功能-适合运营周边游/国内游/出境游

很美观的一款旅游门户/旅行社网站-pc+移动端+强大功能-适合运营周边游/国内游/出境游/酒店/门票/签证/租车/攻略都有,看演示地址 可以封装APP 套餐一:源码+包安装=400 套餐二:全包服务 包服务器+域名+APP+免费认证小程序+H5+PC=1000 可做小程序+app,请提前联系卖家 主…

【C#】【源码】直接可用的远程桌面应用

【背景】 封闭环境无法拷贝外来的远程桌面软件&#xff0c;所以就直接自己用C#写一个。 【效果】 【说明】 本篇会给出完整的编程步骤&#xff0c;照着写就能拥有你自己的远程桌面应用&#xff0c;直接可以运行在局域网。 如果不想自己敲代码&#xff0c;也可以选择直接下载…

LeetCode 周赛上分之旅 #45 精妙的 O(lgn) 扫描算法与树上 DP 问题

⭐️ 本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 和 BaguTree Pro 知识星球提问。 学习数据结构与算法的关键在于掌握问题背后的算法思维框架&#xff0c;你的思考越抽象&#xff0c;它能覆盖的问题域就越广&#xff0c;理解难度…

python 二手车数据分析以及价格预测

二手车交易信息爬取、数据分析以及交易价格预测 引言一、数据爬取1.1 解析数据1.2 编写代码爬1.2.1 获取详细信息1.2.2 数据处理 二、数据分析2.1 统计分析2.2 可视化分析 三、价格预测3.1 价格趋势分析(特征分析)3.2 价格预测 引言 本文着眼于车辆信息&#xff0c;结合当下较…

6. 装饰器

UML 聚合(Aggregation)关系&#xff1a;大雁和雁群&#xff0c;上图中空心菱形箭头表示聚合关系组合(Composition)关系&#xff1a;大雁和翅膀 &#xff0c;实心菱形箭头表示组合(Composition)关系 测试代码 #include <iostream> #include <stdio.h> #include &l…

IDEA2023.2.1中创建第一个Tomcat的web项目

首先&#xff0c;创建一个普通的java项目。点击【file】-【new】-【project】 创建一个TomcatDemo项目 创建如下图 添加web部门。点击【file】-【project structure】 选择【modules】-选中项目“TomcatDemo” 点击项目名上的加号【】&#xff0c;添加【web】模块 我们就会发现…

【微信小程序】文章设置

设置基本字体样式&#xff1a;行高、首行缩进 font-size: 32rpx;line-height: 1.6em;text-indent: 2em;padding: 20rpx 0;border-bottom: 1px dashed var(--themColor); 两端对齐 text-align: justify; css文字两行或者几行显示省略号 css文字两行或者几行显示省略号_css…

FPGA project : dht11 温湿度传感器

没有硬件&#xff0c;过几天上板测试。 module dht11(input wire sys_clk ,input wire sys_rst_n ,input wire key ,inout wire dht11 ,output wire ds ,output wire …

72、Spring Data JPA 的 Specification 动态查询

Specification&#xff1a;规范、规格 ★ Specification查询 它也是Spring Data提供的查询——是对JPA本身 Criteria 动态查询 的包装。▲ 为何要有动态查询 页面上常常会让用户添加不同的查询条件&#xff0c;程序就需要根据用户输入的条件&#xff0c;动态地组合不同的查询…

外星人入侵游戏-(创新版)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

不同类型程序的句柄研究

先做一个winform程序&#xff1b;随便放几个控件&#xff1b; 用窗口句柄查看工具看一下&#xff1b;form和上面的每个控件都有一个句柄&#xff1b; 然后看一下记事本&#xff1b;记事本一共包含三个控件&#xff0c;各自有句柄&#xff1b; 这工具的使用是把右下角图标拖到要…

服务器迁移:无缝过渡指南

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

基于SSM+Vue的高校实验室管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用Vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

线程的方法(未完成)

线程的方法 1、sleep(long millis) 线程休眠&#xff1a;让执行的线程暂停一段时间&#xff0c;进入计时等待状态。 static void sleep(long millis):调用此方法后&#xff0c;当前线程放弃 CPU 资源&#xff0c;在指定的时间内&#xff0c;sleep 所在的线程不会获得可运行的机…