Windows系统安装Flink及实现MySQL之间数据同步

news2024/11/15 15:36:09

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行,并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理,并且是一个面向流处理和批处理的分布式计算框架,将批处理看作一种特殊的有界流。

Flink的主要特点包括:

  1. 事件驱动型:Flink是一个事件驱动型的应用,可以从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
  2. 支持有状态计算:Flink提供了Extactor-once语义及checkpoint机制,支持带有事件操作的流处理和窗口处理,以及灵活的窗口处理(如时间窗口、大小窗口等)。
  3. 轻量级容错处理:Flink使用savepoint进行错误恢复,可以在出现故障时快速恢复任务。
  4. 高吞吐、低延迟、高性能:Flink的设计目标是在保证数据处理稳定性的同时,实现高吞吐、低延迟、高性能的流处理。
  5. 支持大规模集群模式:Flink支持在yarn、Mesos、k8s等大规模集群环境中运行。
  6. 支持多种编程语言:Flink对java、scala、python都提供支持,但最适合使用java进行开发。

        Flink的应用场景非常广泛,可以用于实时流数据的分析计算、实时数据与维表数据关联计算、实时数仓建设、ETL(提取-转换-加载)多存储系统之间进行数据转化和迁移等场景。同时,Flink也适用于事件驱动型应用场景,如以kafka为代表的消息队列等。

1.Winows系统安装Flink

下载地址:Downloads | Apache Flink

选择 Apache Flink 1.16.0 - 2022-10-28 (Binaries)

下载 flink-1.16.0-bin-scala_2.12.tgz

使用CMD窗口,在Flink安装路径/bin目录下启动start-cluster.bat

访问http://localhost:8081,界面如下:

2.使用Flink实现MySQL数据库之间数据同步(JAVA)

<flink.version>1.16.0</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>

1.创建Flink流处理运行环境。

2.设置流处理并发数。

3.设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步。

4.在Flink中创建中间同步数据库。

5.在Flink中创建中间表flink_source,来源于MySQL表source,(注意connector为mysql-cdc)。

6.在Flink中创建中间表flink_sink,来源于MySQL表sink。

7.将Flink中间表来源表数据写入flink_sink表,Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步。

package com.demo.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCdcMySql {
    public static void main(String[] args) {
        System.out.println("==========start run FlinkCdcMySql#main.");

        // 创建Flink流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);
        // 设置流处理并发数
        env.setParallelism(3);
        // 设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步
        env.enableCheckpointing(5000);

        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 在Flink中创建中间同步数据库
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_test");

        // 在Flink中创建中间表flink_source,来源于MySQL表source
        // 注意connector为mysql-cdc
        tEnv.executeSql("CREATE TABLE flink_test.flink_source (\n" +
                "    id int,\n" +
                "    name varchar(255),\n" +
                "    create_time TIMESTAMP\n," + // Flink不支持datetime格式
                "    PRIMARY KEY (id) NOT ENFORCED" + //主键必须标明NOT ENFORCED
                ") WITH (\n" +
                "  'connector'  = 'mysql-cdc',\n" +
                "  'hostname'   = '127.0.0.1',\n" +
                "  'database-name'   = 'flink-source',\n" +
                "  'table-name' = 'source',\n" +
                "  'username'   = 'root',\n" +
                "  'password'   = 'root'\n" +
                ")");

        // 在Flink中创建中间表flink_sink,来源于MySQL表sink
        tEnv.executeSql("CREATE TABLE flink_test.flink_sink (\n" +
                "    id int,\n" +
                "    name varchar(255),\n" +
                "    create_time TIMESTAMP\n," +
                "    PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://127.0.0.1:3306/flink-sink',\n" +
                "  'table-name' = 'sink',\n" +
                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
                "  'username'   = 'root',\n" +
                "  'password'   = 'root'\n" +
                ")");

//        Table transactions = tEnv.from("flink_source");
//        transactions.executeInsert("flink_sink");

        System.out.println("==========begin Mysql data cdc.");

        // 将Flink中间表来源表数据写入flink_sink表
        // Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步
        tEnv.executeSql("INSERT INTO flink_test.flink_sink(id, name, create_time)\n" +
                "select id, name, create_time\n" +
                "from flink_test.flink_source\n");

        System.out.println("==========continue Mysql data cdc.");
    }

}

git代码地址:

flink-cdc-MySQL: FlinkCDC实现MySQL之间数据同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436759.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Leetcode】292. Nim 游戏

文章目录 题目思路代码结果 题目 题目链接 你和你的朋友&#xff0c;两个人一起玩 Nim 游戏&#xff1a; 桌子上有一堆石头。你们轮流进行自己的回合&#xff0c; 你作为先手 。每一回合&#xff0c;轮到的人拿掉 1 - 3 块石头。拿掉最后一块石头的人就是获胜者。 假设你们每…

C++ 动态规划 状态压缩DP 最短Hamilton路径

给定一张 n 个点的带权无向图&#xff0c;点从 0∼n−1 标号&#xff0c;求起点 0 到终点 n−1 的最短 Hamilton 路径。 Hamilton 路径的定义是从 0 到 n−1 不重不漏地经过每个点恰好一次。 输入格式 第一行输入整数 n 。 接下来 n 行每行 n 个整数&#xff0c;其中第 i 行…

【MySQL】学习如何使用DCL进行用户管理

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-JwFD16F1Kh0fle0X {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

百面嵌入式专栏(面试题)进程管理相关面试题1.0

沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们将介绍进程管理相关面试题 。 一、进程管理相关面试题 进程是什么?操作系统如何描述和抽象一个进程?进程是否有生命周期?如何标识一个进程?进程与进程之间的关系如何?Linux操作系统的进程0是什么?Linux操…

OJ_浮点数加法(高精度运算)

题干 C实现 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<map> #include<string> using namespace std;string GetInteger(string a) {return a.substr(0, a.find(.)); }string GetFraction(string a) {return a.substr(a.find(.) 1 ,a.siz…

记一次VulnStack渗透

信息收集 netdiscover的主机发现部分不再详解&#xff0c;通过访问端口得知20001-2003端口都为web端口&#xff0c;所以优先考虑从此方向下手 外网渗透 GetShell Struct漏洞 访问2001端口后&#xff0c;插件Wappalyzer爬取得知这是一个基于Struct的web站点&#xff0c;直接…

浅谈bypass Etw

文章目录 c#ExecuteAssemblybypass etw c# loader 一种是通过反射找到指定空间的类中method进行Invoke 另一种是通过EntryPoint.Invoke加载 反射加载 Assembly.Load()是从String或AssemblyName类型加载程序集&#xff0c;可以读取字符串形式的程序集 Assembly.LoadFrom()从指定…

记录下Flybirds移动端ui自动化框架的搭建

一、参考文档 1.官方文档&#xff1a;携程机票跨端跨框架 BDD UI 自动化测试方案Flybirds — flybirds v0.1.5 文档 2.Flybirds运行环境&#xff1a;Flybirds运行环境 - 简书 3.Windows系统连接IOS安装tidevice&#xff1a;iOS自动化之tidevice-CSDN博客 二、Windows系统演…

Nacos注册中心和服务发现

Nacos注册中心 01 认识和安装Nacos Nacos比Eureka功能更为丰富&#xff0c;是SpringCloud中的一个组件&#xff0c;Nacos是阿里巴巴的产品&#xff0c;在国内更流行。 NACOS功能&#xff1a;服务发现&#xff08;对标Eureka)、配置管理、服务管理 下载见&#xff1a;D:\zwx\…

【EI会议征稿通知】第三届智能控制与应用技术国际学术会议(AICAT 2024)

第三届智能控制与应用技术国际学术会议&#xff08;AICAT 2024&#xff09; 2024 3rd International Symposium on Artificial Intelligence Control and Application Technology 2024年第三届智能控制与应用技术国际学术会议&#xff08;AICAT 2024&#xff09;定于2024年5月…

jquery写表格,通过后端传值,并合并单元格

<!DOCTYPE html> <html> <head><title>Table Using jQuery</title><style>#tableWrapper {width: 100%;height: 200px; /* 设置表格容器的高度 */overflow: auto; /* 添加滚动条 */margin-top: -10px; /* 负的外边距值&#xff0c;根据实际…

BT656视频传输标准

前言 凡是做模拟信号采集的&#xff0c;很少不涉及BT.656标准的&#xff0c;因为常见的模拟视频信号采集芯片都支持输出BT.656的数字信号&#xff0c;那么&#xff0c;BT.656到底是何种格式呢&#xff1f; 本文将主要介绍 标准的 8bit BT656&#xff08;4:2:2&#xff09;YCbC…

时间序列预测 —— DeepAR 模型

时间序列预测 —— DeepAR 模型 DeepAR 模型是一种专门用于处理时间序列概率预测的深度学习模型&#xff0c;它可以自动学习数据中的复杂模式&#xff0c;提高预测的准确性。本文将介绍 DeepAR 模型的理论基础、优缺点&#xff0c;并通过 Python 实现单步预测和多步预测的完整…

git整合分支的两种方法——合并(Merge)、变基(Rebase)

问题描述&#xff1a; 初次向git上传本地代码或者更新代码时&#xff0c;总是会遇到以下两个选项。有时候&#xff0c;只是想更新一下代码&#xff0c;没想到&#xff0c;直接更新了最新的代码&#xff0c;但是自己本地的代码并没有和git上的代码融合&#xff0c;反而被覆盖了…

坚持刷题 | 二叉树的直径

文章目录 题目考察点代码实现实现总结方便用迭代的方式实现吗&#xff1f;迭代实现迭代实现总结 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持话题&#xff0c;老年痴呆追不上我&#xff0c;今天还有时间&#xff0c;那就再来一题吧&#xff1a;二叉树的直径 题目 543.…

子集枚举介绍

集合枚举的意思是从一个集合中找出它的所有子集。集合中每个元素都可以被选或不选&#xff0c;含有n个元素的集合总共有个子集&#xff08;包括全集和空集&#xff09; 例如考虑集合和它的4个子集、、、&#xff0c;按照某个顺序&#xff0c;把全集A中的每个元素在每个子集中的…

Visual Studio 2010+C#实现信源和信息熵

1. 设计要求 以图形界面的方式设计一套程序&#xff0c;该程序可以实现以下功能&#xff1a; 从输入框输入单个或多个概率&#xff0c;然后使用者可以通过相关按钮的点击求解相应的对数&#xff0c;自信息以及信息熵程序要能够实现马尔可夫信源转移概率矩阵的输入并且可以计算…

计算机网络-差错控制(纠错编码 海明码 纠错方法)

文章目录 纠错编码-海明码海明距离1.确定校验码位数r2.确定校验码和数据的位置3.求出校验码的值4.检错并纠错纠错方法1纠错方法2 小结 纠错编码-海明码 奇偶校验码&#xff1a;只能发现错误不能找到错误位置和纠正错误 海明距离 如果找到码距为1&#xff0c;那肯定为1了&…

鸿蒙开发-UI-组件导航-Tabs

鸿蒙开发-UI-组件 鸿蒙开发-UI-组件2 鸿蒙开发-UI-组件3 鸿蒙开发-UI-气泡/菜单 鸿蒙开发-UI-页面路由 鸿蒙开发-UI-组件导航-Navigation 文章目录 一、基本概念 二、导航 1.底部导航 2.顶部导航 3.侧边导航 4.导航栏限制滑动 三、导航栏 1.固定导航栏 2.滚动导航栏 3…

莉莉与神奇花朵的冒险

现在&#xff0c;我将根据这些步骤编写一个对话形式的童话故事。 在很久很久以前的一个小村庄里&#xff0c;有一个勤劳善良的小女孩叫莉莉。她住在一间小茅屋里&#xff0c;和她的奶奶一起生活。奶奶年纪大了&#xff0c;行动不便&#xff0c;所以莉莉每天都要照顾她。 一天&a…