五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步

news2024/9/28 16:08:48

前言

前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。

由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector

一.整合Mybatis操作Doris

1.准备数据库

CREATE TABLE `doris_test` (
   `id` int NULL COMMENT "id",
   `price` decimal(10,2) NULL COMMENT "价格",
   `title` varchar(20)NULL COMMENT "标题"
 ) ENGINE=OLAP
 DUPLICATE KEY(`id`)
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "1"
 );
Query OK, 0 rows affected (0.06 sec)

2.搭建SpringBoot项目

第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--如果要用传统的xml或properties配置,则需要添加此依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>

        <!--mybatisplus持久层依赖-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.23</version>
        </dependency>
    </dependencies>

3.整合Mybatis

创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.

启动类如下,通过 @MapperScan 来扫描Mapper接口

@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {

    public static void main(String[] args) {
        SpringApplication.run(DorisApplication.class,args);
    }
}

服务层和持久层代码如下 , 省略了部分代码

@Service
public class DorisServiceImpl implements DorisService {

    @Autowired
    DorisMapper dorisMapper;

    @Override
    public List<Order> listDoris() {
        return dorisMapper.listDoris();
    }

    @Override
    public int add(Order order) {
        return dorisMapper.add(order);
    }
}


public interface DorisMapper {

   List<Order> listDoris();

   int add(Order order);

}

下面是sql映射文件,和操作数据库没有任何区别

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper">

    <select id="listDoris" resultType="cn.whale.domain.Order">
        select id,price,title from orders
    </select>

    <insert id="add" parameterType="cn.whale.domain.Order">
        INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})
    </insert>
</mapper>

4.编写配置文件

server:
  port: 8080
spring:
  #Doris数据库连接配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=false
    username: root
    password: 123456
    type: com.alibaba.druid.pool.DruidDataSource
    initial-size: 500
    min-idle: 500
    max-active: 500

#mybatis的相关配置
mybatis:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.wudl.doris.domain.DorisTest

5.编写测试类

注入Service,完成查询和添加的测试

package cn.whale.service;

import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.util.List;


@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {

    @Autowired
    private DorisService dorisService;

    @Test
    public void listDoris() {
        List<Order> orders = dorisService.listDoris();
        orders.stream().forEach(System.out::println);
    }


    @Test
    public void addDoris() throws InterruptedException {
        for (int i = 100 ; i>0 ; i--){
            Order order = new Order();
            order.setId(Long.valueOf(i));
            order.setPrice(new BigDecimal(i));
            order.setTitle(i+"元");
            dorisService.add(order);
        }
        Thread.sleep(100000);
    }
}

二.SpringBoot整合FlinkCDC+doris-connector实时同步

上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。

1.搭建项目导入依赖

版本兼容
在这里插入图片描述

  • flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
  • flink-doris-connector :用来读写Doris的驱动
<properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.13.6</flink.version>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.13</version>
    </parent>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--mysql -cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.18</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>


        <!-- flink doris connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

2.定义消息序列化器

定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化


/**
 * @desc mysql消息读取自定义序列化
 **/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {

    public static final String TS_MS = "ts_ms";
    public static final String BIN_FILE = "file";
    public static final String POS = "pos";
    public static final String CREATE = "CREATE";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String UPDATE = "UPDATE";

    /**
     *
     * 反序列化数据,转为变更JSON对象
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) {
    	//拿到数据
        Struct struct = (Struct) sourceRecord.value();
        //输出数据
        collector.collect(getJsonObject(struct, AFTER).toJSONString());
    }
    
    /**
     *
     * 从原数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

3.Mysql监听器

监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中


/**
 * @desc mysql变更监听
 **/
@Component
public class MysqlEventListener implements ApplicationRunner {


    @Override
    public void run(ApplicationArguments args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置Mysql数据源
        DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<String> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "mysql-source")
                .setParallelism(1);
        //streamSource.addSink(dataChangeSink);


        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
		//配置Doris信息
        streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder()
                                .setBatchSize(3)
                                .setBatchIntervalMs(10L)
                                .setMaxRetries(3)
                                .setStreamLoadProp(pro)
                                .setEnableDelete(true)
                                .build(),
                        DorisOptions.builder()
                                .setFenodes("192.168.220.253:8030")
                                .setTableIdentifier("app_db.orders")
                                .setUsername("root")
                                .setPassword("123456").build()));
        env.execute("mysql-stream-cdc");

    }

    /**
     * 构造变更数据源
     */
    private DebeziumSourceFunction<String> buildDataChangeSource() {
        return MySqlSource.<String>builder()
                .hostname("192.168.220.253")
                .port(3307)
                .databaseList("app_db")
                .tableList("app_db.orders")
                .username("root")
                .password("123456")

                /**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)
                 */
                .startupOptions(StartupOptions.latest())
                //序列化
                .deserializer(new MysqlDeserialization())
                .serverTimeZone("GMT+8")
                .build();
    }
}

4.最后编写一个启动类

@SpringBootApplication
public class FlinkCdcApplication {

    public static void main(String[] args) {
        SpringApplication.run(FlinkCdcApplication.class, args);
    }
 
}

5.准备好数据库

我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去

三.其他同步方式

Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2174027.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【滑动窗口算法】——定长滑动窗口——Python(附题)

一.定长滑动窗口是什么及其使用场景 定长滑动窗口算法的核心思想是使用两个指针&#xff0c;通常称为“左指针”和“右指针”。窗口的大小是固定的&#xff0c;右指针用于扩展窗口&#xff0c;直到达到指定大小&#xff0c;而左指针则在窗口移动时逐步向右滑动。这样可以高效地…

3款照片人物开口说话AI工具,跟真人说话一样~免费!短视频带货必备!(附教程)

大家好&#xff0c;我是画画的小强 今天给大家分享一个AI图片口播数字人讲认知思维&#xff0c;单号佣金赚5W的AI带货信息差玩法&#xff0c;许多小伙伴表示对这类AI带货玩法感兴趣。 说实话&#xff0c;现在AI照片人物对口型工具&#xff0c;越来越逼真&#xff0c;很难辨识出…

Codesys trace工具右键菜单框呼出异常的问题解决

这个问题困扰了好久&#xff0c;添加trace工具之后&#xff0c;一但在模型图位置右键后&#xff0c;整个codesys界面都无法呼出右键菜单&#xff0c;甚至会出现键盘输入失效的问题。 解决办法&#xff1a;更新trace工具 1、工具 —> CODESYS Installer 或者搜索CODESYS In…

AI新时代序幕!大模型研究报告(附AI名词详解)

AI新时代序幕&#xff01;大模型研究报告&#xff08;附AI名词详解&#xff09; 前言AI 大模型研究报告 前言 AI通过大规模的数据训练和先进的算法构建而成的&#xff0c;能够模拟人类的智能&#xff0c;处理各种复杂的任务。它可以理解我们的语言&#xff0c;回答我们的问题&…

PyQt5 statusbar 放图片并设置图片大小和左右间距

在 PyQt5 中&#xff0c;状态栏&#xff08;QStatusBar&#xff09;通常用于显示窗口的状态信息或提示。虽然 PyQt5 的 QStatusBar 没有直接提供设置图片作为状态栏项&#xff08;QStatusBarItem&#xff09;的 API&#xff0c;但你可以通过添加一个 QWidget&#xff08;如 QLa…

Java实现找色和找图功能

某天&#xff0c;张三接到一个任务需求&#xff0c;将一个Excel表格里面的员工信息&#xff0c;录入到员工系统里面&#xff0c;由于数据量非常大&#xff0c;操作起来巨慢。经过一段时间的操作和观察&#xff0c;他发现这种操作&#xff0c;非常有规律&#xff0c;基本就是一些…

html+css+js实现Pagination 分页

效果图 HTML部分 <body><div class"pagination"><button class"prev"><</button><ul><li class"active">1</li><li>2</li><li>3</li><li>4</li><li>5…

基于springboot+vue+mysql公益旧物捐赠系统(源码+参考文档+定制)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

3d可视化图片:通过原图和深度图实现

1、depthy 在线体验demo: https://depthy.stamina.pl/#/ 也可以docker安装上面服务: docker run --rm -t -i -p 9000:9000 ndahlquist/depthy http://localhost:90001)首先传原图 2)再传对应深度图 3)效果

云栖大会观察:云计算第三次浪潮下的暗流涌动

如果跳脱出今年云栖大会的“云启智跃产业蝶变”、“云计算第三次浪潮”这些设定好的视角&#xff0c;站在“AI有着太多的未知性”这个角度观察这次大会&#xff0c;会看到什么&#xff1f; 我们会看到&#xff0c;对于现在的阿里云而言&#xff0c;AI带来的并非都是机遇&#…

【Gitee自动化测试4】本地Git分支的增删查,本地Git分支中文件的增删查,本地文件的暂存/提交,本地分支的推送

一、流程 本地创建分支&#xff0c;设定连接什么云分支本地创建文件&#xff0c;暂存、提交–>本地分支本地分支推送所有修改–>云仓库 二、分支概念 在版本回退里&#xff0c;每次提交&#xff0c;git都把它们串成一条时间线&#xff0c;这条时间线可以理解为是一个分…

Meta Sapiens 人体AI模型

Meta 一直是开发图像和视频模型的领导者&#xff0c;现在他们又增加了一个新东西&#xff1a;Meta Sapiens。和Homo sapiens一样&#xff0c;这个模型也是关于人类的。它旨在执行与人类相关的任务&#xff0c;例如理解身体姿势、识别身体部位、预测深度&#xff0c;甚至确定皮肤…

小学三年级数学拓展填空题

用传统思维来看小学的学习是错误的。 学校考核老师主要看学生成绩&#xff0c;导致学生作业很多。 而且&#xff0c;现在的中小学生是不是太卷了&#xff1f;都开始卷远超过自己年龄阶段应该掌握的内容了&#xff1f;——这才是很不正常的现象。 如果大家都这么卷&#xff0c…

自然资源部最新Nature正刊!!!

2024年8月21日&#xff0c;国际顶级期刊《Nature》发表了自然资源部第二海洋研究所李家彪院士为通讯作者&#xff0c;张涛为第一作者的论文“超慢速扩张加克洋中脊的高变化岩浆增生”。这一成果颠覆了国际海洋学术界半个多世纪以来一直认为的超慢速扩张洋中脊岩浆供给极度贫瘠的…

9--苍穹外卖-SpringBoot项目中Redis的介绍及其使用实例 详解

目录 Redis入门 Redis简介 Redis服务启动与停止 服务启动命令 Redis数据类型 5种常用数据类型介绍 各种数据类型的特点 Redis常用命令 字符串操作命令 哈希操作命令 列表操作命令 集合操作命令 有序集合操作命令 通用命令 在java中操作Redis Redis的Java客户端 …

excel统计分析(4): 多元线性回归分析

用途&#xff1a;研究多个自变量&#xff08;也称为预测变量或解释变量&#xff09;与一个因变量&#xff08;也称为响应变量&#xff09;之间的线性关系。 多元线性回归分析模型&#xff1a;Yβ0β1X1β2X2…βkXkϵ Y 是因变量。1,X2,…,Xk 是自变量。β0 是截距项。β1,β2,…

ROSTCM6+Gephi的网络语义分析详细教程(附案例实战)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

经济不好,但是遍地都是赚钱的机会

现在职场越来越内卷&#xff0c;裁员风波也是不断&#xff0c;前些天看到一个帖子&#xff0c;裁员都裁到应届生头上了。 都说00后整治职场&#xff0c;在如今环境下也要掂量一下了。 大家都在抱怨环境&#xff0c;可是你有没有想过&#xff0c;有些人在闷声发着小财。 下面…

vue2 vconsole有助于移动端开发页面调试

项目场景&#xff1a; pc项目开发中&#xff0c;有浏览器自带的调试工具。但在移动端&#xff0c;就需要自己搭建调试工具了。vconsole一种非常方便的前端调试依赖库&#xff0c;有助于我们在移动端开发式进行调试&#xff0c;快速排查移动端问题。 搭建步骤 1、安装依赖库。…

让具身智能更快更强!华东师大上大提出TinyVLA:高效视觉-语言-动作模型,遥遥领先

论文链接&#xff1a;https://arxiv.org/pdf/2409.12514 项目链接&#xff1a;https://tiny-vla.github.io/ 具身智能近期发展迅速&#xff0c;拥有了大模型"大脑"的机械臂在动作上更加高效和精确&#xff0c;但现有的一个难点是&#xff1a;模型受到算力和数据的制…