Doris:读取Doris数据的N种方法

news2025/1/15 17:12:47

目录

1.MySQL Client

2.JDBC

3. 查询计划

4.Spark Doris Connector

5.Flink Doris Connector


1.MySQL Client

        Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后,可使用 select语句查询数据。

mysql -uroot -P9030 -h127.0.0.1

        为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。用户在使用时,如果发现报 Memory limit exceeded 错误,一般是超过内存限制了。遇到内存超限时,用户应该尽量通过优化自己的 sql 语句来解决。如果确切发现2GB内存不能满足,可以手动设置内存参数。

    select 查询如果使用limit分页查询,则需要指定order by 字段,否则同一个sql返回的数据可能不一样。

2.JDBC

        由于Doris 采用 MySQL 协议,同样也支持通过JDBC方式读取数据。

package com.yichenkeji.demo.test;


import lombok.extern.slf4j.Slf4j;

import java.sql.*;
import java.util.Properties;

@Slf4j
public class DorisJDBCDemo {

    public static void main(String[] args) throws SQLException {
        String jdbc_driver = "com.mysql.cj.jdbc.Driver";
        String jdbc_url = "jdbc:mysql://192.168.179.131:9030/demo?rewriteBatchedStatements=true";
        String username = "root";
        String password = "";

        Connection conn = getConnection(username,password,jdbc_url,jdbc_driver);
        log.info("{}",conn);
        String sql = "select * from dim_area limit 10";
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()){
            log.info("id={},name={}",rs.getFloat("id"),rs.getString("name"));
        }

        closeConnection(conn);
    }

    /**
     * 获取连接
     * @param username
     * @param password
     * @param jdbcUrl
     * @param driver
     * @return
     */

    public static Connection getConnection(String username,String password,String jdbcUrl,String driver) {

        Properties prop = new Properties();
        prop.put("user", username);
        prop.put("password", password);
        try {
            Class.forName(driver);

            log.info("jdbcUrl:{}",jdbcUrl);
            return DriverManager.getConnection(jdbcUrl, prop);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭连接
     * @param conn
     */
    public static void closeConnection(Connection conn) {
        if(conn != null){
            try {
                if(!conn.isClosed()){
                    conn.close();
                }
            } catch (SQLException e) {
                log.error("SQLException:{}", e.getMessage());
            }
        }
    }

}

3. 查询计划

        由于jdbc查询暂时不支持流式读取,如果读取的数据量过大,一次性读取全部数据需要很大的资源,所有可以使用查询计划API接口,给定一个 SQL,获取该 SQL 对应的查询计划。通过返回的数据分区信息,分批读取数据。

package com.yichenkeji.demo.test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class DorisReaderDemo{
    static String dorisUrl = "192.168.179.131:8030";
    static String username = "root";
    static String password = "";
    static String database = "demo";
    static String table = "dim_area";

    static String querySql = String.format("SELECT id, name from demo.dim_area");
    static int readRowCount = 0;
    static int readTotal = 0;


    public static void main(String[] args) throws Exception {
        String queryPlanUrl = String.format("http://%s/api/%s/%s/_query_plan",dorisUrl,database,table);
        QueryPlanResult queryPlanResult = DorisUtil.getQueryPlan(username,password,queryPlanUrl,querySql);

        if (queryPlanResult != null && queryPlanResult.getOpaquedQueryPlan() != null){

            JSONObject partitions = queryPlanResult.getPartitions();
            log.info("partitions:{}",partitions);
            for(Map.Entry<String, Object> tablet : partitions.entrySet()){
                Long tabletId = Long.parseLong(tablet.getKey());
                JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));
                //get first backend
                String routingsBackend = value.getJSONArray("routings").getString(0);
                String backendHost = routingsBackend.split(":")[0];
                String backendPort = routingsBackend.split(":")[1];

                //connect backend
                TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
                TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));
                TProtocol protocol = factory.getProtocol(transport);
                TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);
                if (!transport.isOpen()) {
                    transport.open();
                }
                //build params
                TScanOpenParams params = new TScanOpenParams();
                params.cluster = "default_cluster";
                params.database = database;
                params.table = table;
                params.tablet_ids = Arrays.asList(tabletId);
                params.opaqued_query_plan = queryPlanResult.getOpaquedQueryPlan();
                // max row number of one read batch
                params.setBatchSize(50000);
                params.setQueryTimeout(3600);
                params.setMemLimit(2147483648L);
                params.setUser(username);
                params.setPasswd(password);

                //open scanner
                TScanOpenResult tScanOpenResult = client.openScanner(params);
                if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {
                    throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",
                            routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));
                }
                List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();

                TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
                nextBatchParams.setContextId(tScanOpenResult.getContextId());
                boolean eos = false;
                //read data
                int offset = 0;
                while(!eos){

                    nextBatchParams.setOffset(offset);
                    TScanBatchResult nextResult = client.getNext(nextBatchParams);
                    if (!TStatusCode.OK.equals(nextResult.getStatus().getStatusCode())) {
                        throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",
                                routingsBackend, nextResult.getStatus().getStatusCode(), nextResult.getStatus().getErrorMsgs()));
                    }
                    eos = nextResult.isEos();
                    if(!eos){
                        RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
                        ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
                        VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
                        List<List<Object>> results = new ArrayList<>();
                        while (arrowStreamReader.loadNextBatch()) {
                            List<FieldVector>  fieldVectors = root.getFieldVectors();

                            //total data rows
                            int rowCountInOneBatch = root.getRowCount();
                            for(int row = 0 ; row < rowCountInOneBatch ;row++){
                                List<Object> record = new ArrayList<>();

                                for (int col = 0; col < fieldVectors.size(); col++) {
                                    FieldVector fieldVector = fieldVectors.get(col);
                                    Types.MinorType minorType = fieldVector.getMinorType();
                                    Object v = DorisUtil.convertValue(row , minorType, fieldVector);
                                    record.add(v);
                                }
                                results.add(record);
                            }
                            offset += root.getRowCount();


                        }
                        log.info("total data rows:{}",results.size());
                        //处理完之后要关闭,否则容易内存溢出
                        arrowStreamReader.close();
                    }
                }

                //close
                TScanCloseParams closeParams = new TScanCloseParams();
                closeParams.setContextId(tScanOpenResult.getContextId());
                client.closeScanner(closeParams);
                if ((transport != null) && transport.isOpen()) {
                    transport.close();
                }
            }
        }
    }


    public static String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    /**
     * 获取查询计划
     * @param username
     * @param password
     * @param queryPlanUrl
     * @param sql
     * @return
     * @throws Exception
     */

    public static QueryPlanResult getQueryPlan(String username, String password, String queryPlanUrl, String sql){
        try (CloseableHttpClient client = HttpClients.custom().build()) {
            HttpPost post = new HttpPost(queryPlanUrl);
            post.setHeader(HttpHeaders.EXPECT, "100-continue");
            post.setHeader(HttpHeaders.AUTHORIZATION,  basicAuthHeader(username,password));
            log.info("queryPlanUrl:{}",queryPlanUrl);
            log.info("sql:{}",sql);
            //The param is specific SQL, and the query plan is returned
            Map<String,String> params = new HashMap<>();
            params.put("sql",sql);
            StringEntity entity = new StringEntity(JSON.toJSONString(params));
            post.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(post)) {
                if (response.getEntity() != null ) {
                    JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
                    JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");
                    if (dataJSONObject.containsKey("exception")){
                        throw new RuntimeException(dataJSONObject.getString("exception"));
                    }
                    String queryPlan = dataJSONObject.getString("opaqued_query_plan");
                    JSONObject partitions = dataJSONObject.getJSONObject("partitions");
                    return new QueryPlanResult(queryPlan,partitions);
                }
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
        return null;
    }


}

4.Spark Doris Connector

        Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

val dorisSparkDF = spark.read.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .load()

dorisSparkDF.show(5)

5.Flink Doris Connector

        Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。

DorisOptions.Builder builder = DorisOptions.builder()
        .setFenodes("FE_IP:HTTP_PORT")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");

DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
        .setDorisOptions(builder.build())
        .setDorisReadOptions(DorisReadOptions.builder().build())
        .setDeserializer(new SimpleListDeserializationSchema())
        .build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1200375.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM框架的高校试题管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

交换机工作原理

交换机工作原理 交换机功能&#xff1a;端口扩展&#xff08;默认同一网络&#xff09;&#xff0c;如果只是两台设备进行通信&#xff0c;可以直接连接这两台设备而不用交换机&#xff0c;但如果设备较多&#xff0c;设备没有那么多接口&#xff0c;那么这个时候就需要交换机…

智慧水利整体解决方案:PPT全文43页,附下载

关键词&#xff1a;智慧水利发展前景&#xff0c;智慧水利解决方案&#xff0c;智慧水利建设方案&#xff0c;智慧水利平台系统 一、智慧水利建设背景 传统水利系统存在一些问题&#xff1a; 现有基础感知不能满足更高标准的水利管理需求&#xff1b;决策调度支撑能力亟需加强…

ztree结合hmap使用经验分享

项目背景 在建德封控拦截系统&#xff08;Vue3antd2.x&#xff09;为追求更快的地图初始化体验&#xff0c;在尝试了hmap2.5.0版本以及2.6.3版本后&#xff0c;由于这两个版本在现场电脑的初始化速度不够流畅&#xff0c;最终使用的是hmap2.1.3版本。同时由于布控选设备&#…

c语言练习第11周(1~5)

数列 1 1 2 3 5 8 13 21 ... 被称为斐波纳数列。 输入若干个正整数N&#xff0c;输出这个序列的前 N 项的和。 题干数列 1 1 2 3 5 8 13 21 ... 被称为斐波纳数列。 输入若干个正整数N&#xff0c;输出这个序列的前 N 项的和。输入样例3 5 4 1输出样例…

ftp服务器(filezilla服务端软件)下载、安装、使用

下载 通过360软件管家下载 输入filezilla&#xff0c;点击搜索&#xff0c;点击安装 修改安装路径 等待安装完成 配置服务端 启动配置 双击打开&#xff0c;点击软件中间按钮 不用输入密码&#xff0c;因为安装的时候没有设置密码 如果在安装的时候设置了密码&#xff0c;…

可以为一个servlet定义多个servlet-mapping、或url-pattern

在web描述符文件web.xml文件中&#xff0c;可以为同一个servlet定义多个servlet-mapping&#xff1b;也可以在同一个servlet-mapping中&#xff0c;定义多个url-pattern。也就是说&#xff0c;可以把多个地址&#xff08;相对于上下文路径&#xff09;映射到同一个servlet处理。…

基于MATLAB的关节型六轴机械臂轨迹规划仿真

笛卡尔空间下的轨迹规划&#xff0c;分为直线轨迹规划和圆弧轨迹规划&#xff0c;本文为笛卡尔空间下圆弧插值法的matlab仿真分析 目录 1 实验目的 2 实验内容 2.1标准D-H参数法 2.2实验中使用的Matlab函数 3 全部代码 4 仿真结果 1 实验目的 基于机器人学理论知识&…

最全面的软考架构师复习资料(历时2年整理)

一、面向服务的架构 1.请分别用200字以内文字说明什么是面向服务架构&#xff08;SOA&#xff09;以及ESB在SOA的作用与特点 面向服务的体系架构&#xff08;SOA&#xff09;是一种粗粒度、松耦合的服务架构&#xff0c;服务之间通过简单、精确定义接口进行通信。他可以根据需求…

【操作系统】4.2 文件系统

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

[CISCN 2023 西南]do_you_like_read

打开题目&#xff0c;大概是一个购买书籍的网站&#xff0c;有登陆的功能 我们可以先分析下给的源码 在admin.php中会验证是否为admin用户 我们尝试爆破下密码&#xff0c;爆出来为admin123 登陆后发现存在文件上传漏洞 我们分析下源码 存在文件后缀检测&#xff0c;如果为p…

【第四章】软件设计师 之 计算机网络

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 1、七层模型 2、网络技术标准与协议 中介 别…

信捷 XDH 输出点流水灯

本文以XDH 为例&#xff0c;实现输出点流水灯&#xff0c;测试输出点是否正常。 用到了FOR NEXT循环和偏移量实现。 程序下载链接如下&#xff1a; https://download.csdn.net/download/weixin_39926429/88527971

Python的基础语句大全

以下是Python的基础语句大全&#xff1a; 变量定义语句&#xff1a; var_name var_value输出语句&#xff1a; print(var_name)输入语句&#xff1a; var_name input()条件语句&#xff1a; if condition:// do something if condition is True elif condition:// do somethi…

手机地磁传感器与常见问题

在手机中&#xff0c;存在不少传感器&#xff0c;例如光距感&#xff0c;陀螺仪&#xff0c;重力加速度&#xff0c;地磁等。关于各传感器&#xff0c;虽功能作用大家都有所了解&#xff0c;但是在研发设计debug过程中&#xff0c;却总是会遇到很多头疼的问题。关于传感器&…

链表的实现(文末附完整代码)

链表的概念及结构 链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的 我们在上一篇文章所学习的顺序表是连续存储的 例如&#xff1a; 顺序表就好比火车上的一排座位&#xff0c;是连续的 而链表就好比是火车…

字节面试:请说一下DDD的流程,用电商系统为场景

说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业字节、如阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格&#xff0c;遇到很多很重要的面试题&#xff1a; 谈谈你的DDD落地经验&#xff1f; 谈谈你对DDD的理解&…

matlab GUI界面实现ZieglerNicholas调节PID参数

1、内容简介 略 11-可以交流、咨询、答疑 ZieglerNicholas、PID、GUI 2、内容说明 GUI界面实现ZieglerNicholas调节PID参数 通过ZieglerNicholas调节PID参数&#xff0c;设计了GUI 3、仿真分析 略 4、参考论文 略 链接&#xff1a;https://pan.baidu.com/s/1yQ1yDfk-_…

vue+mongodb+nodejs实现表单增删改查

ExpressMongodbVue实现增删改查 效果图 前言 最近一直想学下node,毕竟会node的前端更有市场。但是光看不练&#xff0c;感觉还是少了点什么&#xff0c;就去github上看别人写的项目&#xff0c;收获颇丰&#xff0c;于是准备自己照葫芦画瓢写一个。 作为程序员&#xff0c;一…

电容的作用

文章目录 总结1.降压2.滤波3.延时4.耦合5.旁路电容 总结 1.降压 问题&#xff1a; 直接连接灯泡会烧掉 解决方案 进一步为了防止电容放电&#xff0c;伤人&#xff0c;加入一个大电阻 2.滤波 直流的情况 交流的情况 频率与容抗的关系 3.延时 4.耦合 滤除直流成分&#xf…