使用FlinkCDC从mysql同步数据到ES,并实现数据检索

news2024/9/20 5:00:27

文章目录

  • 一、背景
  • 一、环境准备
    • 1、创建ES索引
    • 2、创建mysql数据表
  • 二、使用FlinkCDC同步数据
    • 1、导包
    • 2、demo
    • 3、es工具类
  • 三、测试
    • 1、先创建几条数据
    • 2、启动cdc
    • 3、查询es
    • 4、增删改几条数据进行测验

一、背景

随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。

于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。

一、环境准备

1、创建ES索引

// 创建索引并指定映射
PUT /course
{
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text"
			},
			"label": {
				"type": "text"
			},
			"content": {
			  "type": "text"
			}
		}
	}
}

// 查询course下所有数据(备用)
GET /course/_search
// 删除索引及数据(备用)
DELETE /course

2、创建mysql数据表

CREATE TABLE `course` (
  `id` varchar(32) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `label` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

二、使用FlinkCDC同步数据

1、导包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.18.0</version>
</dependency>



2、demo

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


/**
 * cdc
 */
public class CDCTest {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.10")
                .port(3306)
                .databaseList("mytest")
                .tableList("mytest.course")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启检查点
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // 1个并行任务
            .setParallelism(1)
            .addSink(new RichSinkFunction<String>() {
                private final static ElasticSearchUtil es = new ElasticSearchUtil("192.168.56.10");
                @Override
                public void invoke(String value, Context context) throws Exception {
                    super.invoke(value, context);
                    JSONObject jsonObject = JSON.parseObject(value);
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.setOp(jsonObject.getString("op"));
                    dataInfo.setBefore(jsonObject.getJSONObject("before"));
                    dataInfo.setAfter(jsonObject.getJSONObject("after"));
                    dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
                    dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));

                    if (dataInfo.getDb().equals("mytest") && dataInfo.getTable().equals("course")) {

                        String id = dataInfo.getAfter().get("id").toString();
                        if(dataInfo.getOp().equals("d")) {
                            es.deleteById("course", id);
                        } else {
                            es.put(dataInfo.getAfter(), "course", id);
                        }
                    }
                }
            })

            .setParallelism(1); // 对接收器使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.util.Map;

/**
 * 收集的数据类型
 * @author cuixiangfei
 * @since 20234-03-20
 */
public class DataInfo {

    // 操作 c是create;u是update;d是delete;r是read
    private String op;

    private String db;

    private String table;

    private Map<String, Object> before;

    private Map<String, Object> after;


    public String getOp() {
        return op;
    }

    public void setOp(String op) {
        this.op = op;
    }

    public String getDb() {
        return db;
    }

    public void setDb(String db) {
        this.db = db;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    public boolean checkOpt() {
        if (this.op.equals("r")) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "DataInfo{" +
                "op='" + op + '\'' +
                ", db='" + db + '\'' +
                ", table='" + table + '\'' +
                ", before=" + before +
                ", after=" + after +
                '}';
    }

    public static void main(String[] args) {
        String value = "{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";
        JSONObject jsonObject = JSON.parseObject(value);

        System.out.println(jsonObject.get("op"));
        System.out.println(jsonObject.get("before"));
        System.out.println(jsonObject.get("after"));
        System.out.println(jsonObject.getJSONObject("source").get("db"));
        System.out.println(jsonObject.getJSONObject("source").get("table"));
    }
}

3、es工具类

springboot集成elasticSearch(附带工具类)

三、测试

1、先创建几条数据

INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('1', '11', '111', '1111');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('2', '22 33', '222 333', '2222 3333');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('3', '33 44', '33 444', '3333 4444');

2、启动cdc

3、查询es

在这里插入图片描述

4、增删改几条数据进行测验

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1535717.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【异质集成】高k复杂氧化物栅介质在GaN HEMTs上的异质集成

COMMUNICATIONS ENGINEERING | (2024) 3:15论文阅读。 文章讨论了高k复杂氧化物栅介质在宽带隙高电子迁移率晶体管&#xff08;HEMTs&#xff09;上的异质集成。 其核心内容的总结如下&#xff1a; 研究背景&#xff1a; 异质集成不同晶体材料因其在高性能多功能电子和光子…

linux之shell脚本基础

1.构建基础脚本 1.1 创建shell脚本 1.1.1 第一行需要指定使用的shell # 用作注释行.shell并不会处理脚本中的注释行,但是第一行的注释,会告诉shell使用哪个shell来运行脚本. #!/bin/bash 1.1.2 让shell找到你的脚本 直接运行脚本会提示-bash: a.sh: command not found.因…

C#,图片分层(Layer Bitmap)绘制,反色、高斯模糊及凹凸贴图等处理的高速算法与源程序

1 图像反色Invert 对图像处理的过程中会遇到一些场景需要将图片反色,反色就是取像素的互补色,比如当前像素是0X00FFFF,对其取反色就是0XFFFFFF – 0X00FFFF = 0XFF0000,依次对图像中的每个像素这样做,最后得到的就是原始2 图像的反色。 2 高斯模糊(Gauss Blur)算法 …

opengl 学习(六)-----坐标系统与摄像机

坐标系统与摄像机 分类引言坐标系统摄像机教程在CMake中使用全局定义预编译宏,来控制是否开启错误检查补充 分类 opengl c 引言 OpenGL希望在每次顶点着色器运行后&#xff0c;我们可见的所有顶点都为标准化设备坐标(Normalized Device Coordinate, NDC)。也就是说&#xff…

电脑数字键盘充当上下左右键怎么解决

搜索鼠标键把下面这个按钮关掉就好了

垃圾回收-垃圾回收中的相关概念

目录 System.gc()的理解 内存泄漏&#xff08;Memory Leak&#xff09; 内存溢出&#xff08;OOM&#xff09; Stop The World 垃圾回收的串行、并行与并发 安全点与安全区域 强、软、弱、虚引用 强、软、弱、虚引用 终结器引用 System.gc()的理解 在默认情况下&#…

【项目】基于YOLOv8和RotNet实现圆形滑块验证码(拼图)自动识别(通过识别中间圆形的角度实现)

TOC 一、引言 1.1 实现目标 要达到的效果是使用算法预测中间圆形的角度&#xff0c;返回给服务器&#xff0c;实现自动完成验证码的问题。要实现的内容如下图所示。 1.2 实现思路 思路1&#xff08;效果较差&#xff09;&#xff1a;以RotNet要实现的验证码识别为灵感&…

【技术栈】Redis 的理解与数据存储格式

SueWakeup 个人主页&#xff1a;SueWakeup 系列专栏&#xff1a;学习技术栈 个性签名&#xff1a;保留赤子之心也许是种幸运吧 本文封面由 凯楠&#x1f4f8; 友情提供 目录 相关传送门 1. NOSQL和关系型数据库比较 2. 主流的NOSQL产品 3. Redis的理解 4. redis数据存储格式…

Golang标准库fmt深入解析与应用技巧

Golang标准库fmt深入解析与应用技巧 前言fmt包的基本使用打印与格式化输出函数Print系列函数格式化字符串 格式化输入函数小结 字符串格式化基本类型的格式化输出自定义类型的格式化输出控制格式化输出的宽度和精度小结 错误处理与fmt使用fmt.Errorf生成错误信息fmt包与错误处理…

vue/vite添加地图

最简单的方式&#xff0c;不论vue2、vue3、vite均适用&#xff0c;例如以高德为例&#xff1a; index.html 引入 <scriptsrc"https://webapi.amap.com/maps?v1.4.15&key您的key&pluginAMap.ToolBar,AMap.MouseTool,AMap.DistrictSearch,AMap.ControlBar&quo…

filezilla客户端的应用以及ftplftpwget的用法

filezilla的应用 用户的配置查看上一篇文章FTP3种用户的配置 进入filezilla软件测试 用yy用户登录发现可以上传下载创建删除 再用cc用户登录发现不能上传不能删除不能创建只能下载 ftp&lftp&wget客户端的应用 以命令行的方式连接ftp&#xff0c;一般只会用到上…

MAC废纸篓删掉还能复原吗 MAC废纸篓倾倒掉的文件怎么恢复 删除的东西在哪里可以找回来 怎么找回已删除的文件

MAC系统中的废纸篓&#xff08;Trash&#xff09;通常指用来临时存放用户即将丢弃的文件的地方。MAC系统的废纸篓功能相当于Windows系统的垃圾回收站&#xff0c;通过废纸篓删除的文件&#xff0c;一般是无法从系统中操作还原。那么&#xff0c;MAC废纸篓删掉还能复原吗&#x…

Linux系统(四)- 进程初识 | 环境变量 | 进程地址空间

~~~~ 前言冯诺依曼体系结构&#xff08;重要&#xff09;总览CPU工作方式什么是指令集&#xff1f;CPU为什么只和内存打交道&#xff08;数据交换&#xff09;&#xff1f;木桶效应&#xff1a;在数据层面的结论程序运行为什么要加载到内存&#xff1f; 进一步理解计算机体系结…

应用APM-如何配置Prometheus + Grafana监控springboot应用

文章目录 概述在Spring Boot应用中集成Micrometerspringboot配置修改 Docker安装Prometheus和Grafanaprometheus配置grafana配置启动Prometheus和Grafana在Grafana中配置数据源创建Grafana仪表盘配置Grafana告警&#xff08;可选&#xff09;监控和分析 概述 配置Prometheus和…

NASA数据集——2017 年来自 Arctic-CAP 的大气中 CO、CO2 和 CH4 浓度剖面图

简介 ABoVE: Atmospheric Profiles of CO, CO2 and CH4 Concentrations from Arctic-CAP, 2017 文件修订日期&#xff1a;2019-05-01 数据集版本: 1 数据集摘要 本数据集提供了 2017 年 4 月至 11 月北极碳飞机剖面&#xff08;Arctic-CAP&#xff09;月度采样活动期间阿拉…

MIT的研究人员最近开发了一种名为“FeatUp”的新算法,这一突破性技术为计算机视觉领域带来了高分辨率的洞察力

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

【冥想X理工科思维】场景10:长期项目的焦虑和压力

冥想音频合集&#xff1a;职场解压冥想音频 压力场景&#xff1a; 在长期项目中&#xff0c;如何定期冥想帮我保持耐心和持久性&#xff1f; 点击看大图&#xff1a; 详细说明&#xff1a;通过这个冥想流程&#xff0c;你可以帮助自己在长期项目中保持耐心、坚持和放松的状态。…

SAP前台处理:物料主数据创建<MM01>之会计视图

一、背景&#xff1a; 终于来到了物料主数据&#xff0c;我觉得物料账是SAP最重要的一项发明&#xff0c;也一直是SAP的一项重要优势&#xff0c;物料账记录了一个个物料的生生不息&#xff1b; 本章主要讲解物料主数据和财务相关的主要内容&#xff1a;这里特别提示由于作者…

类和对象-4

文章目录 前言const成员函数取地址及const取地址操作符重载构造函数续explicit static成员友元内部类匿名对象 前言 在前面的文章中&#xff0c;我们了解到了类的四个默认成员函数&#xff1a;构造、析构、拷贝构造和赋值重载。接下来我们会继续学习剩下的两个默认成员函数以及…

CAD建筑版2024 安装教程

CAD建筑版是一种专门用于建筑设计和绘图的CAD软件版本。它提供了专业的建筑设计工具和功能&#xff0c;帮助建筑师、设计师和工程师在建筑领域进行快速、准确和高效的设计工作。 CAD建筑版具备建筑相关的库和元素&#xff0c;用户可以方便地使用预定义的建筑符号和元素进行建筑…