FlinkCDC-MYSQL批量写入

news2024/12/23 6:09:19

一、运行环境

(1)Flink:1.17.2

(2)Scala:2.12.20

(3)Mysql:5.7.43     ##开启binlog

二、代码示例

         思路:通过滚动窗口收集一批数据推给sink消费。binlog日志对于dataStream是没有key的,那么需要给每条数据造一个key。两种方式:(1)通过UUID创建一个全局key。(2)解析数据中的时间字段作为key。

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.val;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.sink.MyJdbcSinkFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

public class FLCDCToMySql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("snapshot.locking.mode", "none");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("root321")
                .databaseList("test")
                .tableList("test.t_class")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.earliest())
                .build();

        DataStreamSource<String> dataStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql Source");
        // 10s一批数据推给sink
        SingleOutputStreamOperator<List<String>> outputStream = dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(UUID.randomUUID().toString(), s);
                    }
                }).keyBy(x -> x.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple2<String, String>, List<String>, String, TimeWindow>.Context context, Iterable<Tuple2<String, String>> iterable, Collector<List<String>> collector) throws Exception {
                        List datalist = new ArrayList();
                        for(Tuple2<String, String> element : iterable){
                            datalist.add(element.f1);
                        }
                        collector.collect(datalist);
                    }
                });
//        outputStream.print();
        outputStream.addSink(new MyJdbcSinkFunction());
        env.execute("flink cdc demo ");

    }
}

        批次写入思路:使用executeBatch方式提交,url中需添加rewriteBatchedStatements=true 。

package org.example.sink;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.RecordData;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

public class MyJdbcSinkFunction extends RichSinkFunction<List<String>> {

    private int batchSize = 10;
    private Connection connection = null;
    private PreparedStatement statement = null;
    private String jbcUrl = String.format("jdbc:mysql://localhost:3306/test?&useSSL=false&rewriteBatchedStatements=true");
    private String insertSql = String.format("insert into %s values (?,?,?,?,?,?)","t_demo");

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jbcUrl,"root","xxxxxx");
        connection.setAutoCommit(false);

        statement = connection.prepareStatement(insertSql);
    }

    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        super.invoke(value, context);
        System.out.println("dataInfo: "+ value);
        int flag = 0;          //处理数据条数的标记
        int batchSize = 1000;  //批次commit
        int dataSize = value.size();

        String before = "";
        String after = "";
        String op = "";
        String dbname = "";
        String tablename = "";
        String ts_ms = "";

        for (int i = 0; i < dataSize; i++){
            String data = value.get(i);
            JSONObject object = JSONObject.parseObject(data);

            op = String.valueOf(object.get("op"));
            if("c".equals(op)){
                after = String.valueOf(object.get("after"));
            }else if("d".equals(op)){
                before = String.valueOf(object.get("before"));
            }else if("u".equals(op)){
                before = String.valueOf(object.get("before"));
                after = String.valueOf(object.get("after"));
            }

            JSONObject sourceObj = JSONObject.parseObject(object.get("source").toString());
            dbname = String.valueOf(sourceObj.get("db"));
            tablename = String.valueOf(sourceObj.get("table"));
            ts_ms = String.valueOf(sourceObj.get("ts_ms"));

            statement.setString(1, before);
            statement.setString(2, after);
            statement.setString(3, op);
            statement.setString(4, dbname);
            statement.setString(5, tablename);
            statement.setString(6, ts_ms);
            statement.addBatch();

            flag = flag + 1;
            if(i % batchSize == 0 ){
                statement.executeBatch();
                connection.commit();
                statement.clearBatch();
                flag = 0;  //批次提交后重置
            }
        }
        if(flag > 0){   //不满批次的提交
            statement.executeBatch();
            connection.commit();
            statement.clearBatch();
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }
}

三、插入目标表结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2230408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

群控系统服务端开发模式-应用开发-上传配置功能开发

下面直接进入上传配置功能开发&#xff0c;废话不多说。 一、创建表 1、语句 CREATE TABLE cluster_control.nc_param_upload (id int(11) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 编号,upload_type tinyint(1) UNSIGNED NOT NULL COMMENT 上传类型 1&#xff1a;本站 2&a…

【大语言模型】ACL2024论文-03 MAGE: 现实环境下机器生成文本检测

【大语言模型】ACL2024论文-03 MAGE: 现实环境下机器生成文本检测 目录 文章目录 【大语言模型】ACL2024论文-03 MAGE: 现实环境下机器生成文本检测目录摘要研究背景问题与挑战如何解决核心创新点算法模型实验效果&#xff08;包含重要数据与结论&#xff09;主要参考工作后续优…

C++设计模式结构型模式———组合模式

文章目录 一、引言二、组合模式三、总结 一、引言 组合模式是一种结构型设计模式&#xff0c; 可以使用它将对象组合成树状结构&#xff0c; 并且能像使用独立对象一样使用它们。代码实现中涉及了递归调用。组合模式与传统上的“类与类之间的组合关系”没有关联&#xff0c;不…

【C/C++】qsort函数的学习与使用

零.导言 在之前的文章中&#xff0c;我介绍了冒泡排序&#xff0c;即按ASCII码值把元素从小到大排序&#xff08;文章链接我放在了第五部分&#xff0c;有兴趣的小伙伴可以求看看&#xff09;。而今天我将继续介绍qsort函数&#xff0c;这个函数可以起到和冒泡排序一样的作用&a…

前段(vue)

目录 跨域是什么&#xff1f; SprinBoot跨域的三种解决方法 JavaScript 有 8 种数据类型&#xff0c; 金额的用什么类型。 前段 区别 JQuery使用$.ajax()实现异步请求 Vue 父子组件间的三种通信方式 Vue2 和 Vue3 存在多方面的区别。 跨域是什么&#xff1f; 跨域是指…

【elkb】索引生命周期管理

索引生命周期管理 Index lifecycle management(索引生命周期管理)是elasticsearch提供的一种用于自动管理索引的生命周期的功能。允许使用者定义索引的各个阶段&#xff0c;从创建至删除。并允许使用者在每个阶段定义索引需要执行的特定动作。这些动作包含索引创建&#xff0c…

基于SSM志愿者招募系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;志愿组织管理&#xff0c;组织信息管理&#xff0c;组织申请管理&#xff0c;志愿活动管理活动报名管理 用户账号功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;…

msys2更换国内源(多个文件(不是3个文件的版本!))

msys2更换国内源 起因排查答案如下mirrorlist.mingw64mirrorlist.ucrt64mirrorlist.mingw32mirrorlist.mingwmirrorlist.clang64mirrorlist.clang32mirrorlist.msys 不想看经过的直接跳到答案 起因 查了很多个教程大部分都是【打开MSYS2软件内的\etc\pacman.d\ 中3个文件&…

Spring Boot框架下的信息学科平台系统架构设计

3系统分析 3.1可行性分析 通过对本基于保密信息学科平台系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于保密信息学科平台系统采用Spring Boot框架&a…

基于Python可视化的热门微博数据分析系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于pythondjangovueMySQL的热…

ffplay 实现视频流中音频的延迟

ffplay -rtsp_transport tcp -i rtsp://admin:1234qwer192.168.1.64:554/Streaming/Channels/101 -vn -af "adelay5000|5000"在这个命令中&#xff1a; -vn 参数表示只播放音频。 -af "adelay5000|5000" 参数表示将音频延迟5000毫秒&#xff08;即5秒&…

Iceoryx2:高性能进程间通信框架(中间件)

文章目录 0. 引言1. 主要改进2. Iceoryx2 的架构3. C示例代码3.1 发布者示例&#xff08;publisher.cpp&#xff09;3.2 订阅者示例&#xff08;subscriber.cpp&#xff09; 4. 机制比较5. 架构比较6. Iceoryx vs Iceoryx2参考资料 0. 引言 Iceoryx2 是一个基于 Rust 实现的开…

HTML+javaScript+CSS

文章目录 HTMLjavaScriptCSS属性区块表单层叠样式表选择器常用属性盒子模型相关属性浮动float定位&#xff08;position&#xff09; JS操作节点事件点击事件onclick()聚焦事件、失焦事件鼠标移入移出事件 定时任务延迟定时任务重复定时任务 判断哪个单选框被选中设置按钮失效冒…

跟着红队笔记学习 tmux:渗透测试中的多终端利器

内容预览 ≧∀≦ゞ 跟着红队笔记学习 tmux&#xff1a;渗透测试中的多终端利器进入 tmux 前的准备tmux 概念简介tmux 基础操作会话管理命令会话管理快捷键会话内和会话外命令的区别 tmux 窗口和面板管理新建和管理窗口分割窗口为面板切换面板面板放大与恢复调整面板大小关闭面板…

服务器数据恢复—DELL EqualLogic PS6100系列存储简介及如何收集故障信息?

DELL EqualLogic PS6100系列存储采用虚拟ISCSI SAN阵列&#xff0c;支持VMware、Solaris、Linux、Mac、HP-UX、AIX操作系统&#xff0c;提供全套企业级数据保护和管理功能&#xff0c;具有可扩展性和容错功能。DELL EqualLogic PS6100系列存储介绍&#xff1a; 1、上层应用基础…

【力扣】Go语言回溯算法详细实现与方法论提炼

文章目录 一、引言二、回溯算法的核心概念三、组合问题1. LeetCode 77. 组合2. LeetCode 216. 组合总和III3. LeetCode 17. 电话号码的字母组合4. LeetCode 39. 组合总和5. LeetCode 40. 组合总和 II小结 四、分割问题6. LeetCode 131. 分割回文串7. LeetCode 93. 复原IP地址小…

【深度学习】实验 — 动手实现 GPT【三】:LLM架构、LayerNorm、GELU激活函数

【深度学习】实验 — 动手实现 GPT【三】&#xff1a;LLM架构、LayerNorm、GELU激活函数 模型定义编码一个大型语言模型&#xff08;LLM&#xff09;架构 使用层归一化对激活值进行归一化LayerNorm代码实现scale和shift 实现带有 GELU 激活的前馈网络测试 模型定义 编码一个大…

基于springboot+vue车辆充电桩管理系统

基于springbootvue车辆充电桩管理系统 摘 要 随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;车辆充电桩管理系统也不例外&#xff0c;但目前国内仍都使用人工管理&#xff0c;市场规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;…

WordPress网站添加嵌入B站视频,自适应屏幕大小,取消自动播放

结合bv号 改成以下嵌入式代码&#xff08;自适应屏幕大小,取消自动播放&#xff09; <iframe style"width: 100%; aspect-ratio: 16/9;" src"//player.bilibili.com/player.html?isOutsidetrue&bvidBV13CSVYREpr&p1&autoplay0" scrolling…

BLG与T1谁会赢?python制作预测程序,结果显示,BLG将打败T1

决赛预测 2024英雄联盟全球总决赛 2024年英雄联盟全球总决赛&#xff0c;今天晚上&#xff08;2024年11月2日22点&#xff09;就要开始了&#xff01;今年的总决赛的队伍是BLG与T1。当然一些老的lol玩家&#xff0c;现在可能对于lol关注不多&#xff0c;并不清楚这两个队伍。…