Flink通过Maxwell读取mysql的binlog发送到kafka再写入mysql

news2025/1/11 10:48:51

1.准备环境

JDK1.8

MySQL

Zookeeper

Kakfa

Maxweill

IDEA

2.实操

2.1开启mysql的binlog

查看binlog 状态,是否开启

show variables like 'log_%'

如果log_bin显示为ON,则代表已开启。如果是OFF 说明还没开启。

[Linux] 编辑 /etc/my.cnf 文件,在[mysqld]后面增加

server-id=1
log-bin=mysql-bin
binlog_format=row
#如果不加此参数,默认所有库开启binlog
binlog-do-db=gmall_20230424 

重启mysql 服务

service mysqld restart

再次查看binlog 状态

[Windows] 编辑 mysql安装目录 下 my.ini 文件,在[mysqld]后面增加 如上 linux 一样

2.2 Zookeeper 、 Kafka

2.2.1启动 ZK

bin/zkServer.sh start

2.2.2启动 Kakfa

#常规模式启动 

bin/kafka-server-start.sh config/server.properties 

#进程守护模式启动

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

2.2.3创建 kafka-topic

bin/kafka-topics.sh --bootstrap-server 192.168.221.100:9092 --create --topic flink_test01 --partitions 1 --replication-factor 1

测试kafka-topic

#消费 

bin/kafka-console-consumer.sh --bootstrap-server 192.168.221.100:9092 --topic flink_test01 --from-beginning 

#生产

bin/kafka-console-producer.sh --broker-list 192.168.221.100:9092 --topic flink_t

2.3配置Maxwell

2.3.1创建Maxwell 所需要的 数据库 和 用户

1)创建数据库

CREATE DATABASE maxwell;

2)调整MySQL数据库密码级别

set global validate_password_policy=0; 

set global validate_password_length=4;

3)创建Maxwell用户并赋予其必要权限

CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'; 

GRANT ALL ON maxwell.* TO 'maxwell'@'%'; 

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

2.3.2配置Maxwell

在Maxwell安装包解压目录下,复制 并 编辑 config.properties.example

mv config.properties.example config.properties
vim config.properties
producer=kafka 

kafka.bootstrap.servers=192.168.221.100:9092 

#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table} kafka_topic=flink_test01 

# mysql login info host=192.168.221.100 

user=maxwell 

password=maxwell

2.3.3 启动Maxwell

1)启动Maxwell

bin/maxwell --config config.properties

2)停止Maxwell

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

2.4测试maxwell、mysql、kafka 正常使用

2.4.1查看Maxwell、kafka、zookeeper 进程

jps

2.4.2 mysql添加、修改、删除数据

查看 kafka 消费者

有消费 说明 流程是通畅 的

2.5 idea 编写程序

2.5.1 idea 创建 maven 项目

2.5.2 pom.xml 依赖

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>1.13.0</version>
    </dependency>
    <!--mysql cdc -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.2.0</version>
        <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.12.7</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.29</version>
    </dependency>

    <!--kafka-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.1</version>
    </dependency>

    <!--本地调试flink ui-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_2.11</artifactId>
        <version>1.13.0</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>1.10.1</version>
    </dependency>

</dependencies>

2.5.3 编写测试代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qiyu.dim.KafkaUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/**
 * @Author liujian
 * @Date 2023/4/24 9:40
 * @Version 1.0
 */
public class Flink_kafka {
    public static void main(String[] args) throws Exception {
        // todo 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // todo 2.将并行度设为1, (生产环境中,kafka中 topic有几个分区 设为几)
        env.setParallelism(1);
        // todo 3.读取 maxwell kafka 数据流
        DataStream dataStreamSource=
                env.addSource(KafkaUtil.getKafkaConsumer("flink_test01","192.168.221.100:9092"));
        // todo 4.取kafka中的数据流 有效数据,获取 emp_user 表中的 新增、修改、初始化数据 。脏数据直接打印控制台,不处理
        DataStream<JSONObject> data = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                try {
                    // 将 数据流中 类型 转换 String >> JsonObject
                    JSONObject json = JSON.parseObject(s);
                    //取 emp_user 表数据
                    if (json.getString("table").equals("emp_user")) {
                        //取新增、修改数据
                        if (json.getString("type").equals("insert") || json.getString("type").equals("update")) {
                            System.out.println(json.getJSONObject("data"));
                            collector.collect(json.getJSONObject("data"));
                        }
                    }
                } catch (Exception e) {
                    System.out.println("脏数据:" + s);
                }

            }
        });

        // todo 5. 将 有效数据转换 为 Row 类型 。JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类
        DataStream<Row> map = data.map(new MapFunction<JSONObject, Row>() {
            @Override
            public Row map(JSONObject jsonObject) throws Exception {
                Row row = new Row(4);
                row.setField(0, jsonObject.getString("id"));
                row.setField(1, jsonObject.getString("name"));
                row.setField(2, jsonObject.getString("age"));
                row.setField(3, jsonObject.getString("sex"));
                return row;
            }
        });

        // todo 6.  将 数据存储 到 mysql 当中,同主键 数据 就修改, 无 就新增
        String query =
                "INSERT  INTO gmall_20230424.emp_user_copy (id,name,age,sex) " +
                        "VALUES (?, ?,?,?) " +
                        "ON DUPLICATE KEY UPDATE name = VALUES(name) , age = VALUES(age)  , sex = VALUES(sex)";
        JDBCOutputFormat finish = JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.221.100:3306/gmall_20230424?user=root&password=000000")
                .setQuery(query)
                .setBatchInterval(1)
                .finish();

        //todo 7.提交存储任务
        map.writeUsingOutputFormat(finish);

        //todo 8.提交flink 任务
        env.execute();

    }
}

2.5.4 启动测试代码

2.5.5 测试

如 2.4.2 一样 ,在测试表 emp_user 中 进行 新增 、修改

查看 是否写入 emp_user_copy表中

INSERT into emp_user VALUES ("1","zhangsan",22,"F"); 

INSERT into emp_user VALUES ("2","lisi",22,"M"); 

INSERT into emp_user VALUES ("3","wangwu",22,"F"); 

INSERT into emp_user VALUES ("4","jia",22,"M"); 

INSERT into emp_user VALUES ("5","yi",22,"F"); 

UPDATE emp_user set age=23 where id ="4"; 

INSERT into emp_user VALUES ("6","666",22,"F");

新增 id为 4的数据时:age=22,但后面做了次 update age=23

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460925.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么品牌台灯最舒服?推荐最热门的护眼灯品牌排行

台灯是人们生活中用来照明的一个电器&#xff0c;平时阅读跟学习时都会用来照明&#xff0c;补充室内不足的光线。 那么&#xff0c;台灯哪个品牌最舒服&#xff1f;分享几款好用的护眼台灯给大家。 (1) 南卡护眼台灯Pro 南卡是目前国内护眼效果方面做得最棒的一个品牌&#x…

企业本地文档如何实现规范在线管理?

随着企业数字化生产方式的不断推进&#xff0c;网络办公和在线协作越来越普遍&#xff0c;企业内部可能出现大量的文件和文档&#xff0c;这些文档多存在于不同的设备和存储介质上&#xff0c;这给企业的信息管理带来了一定程度的困难。为了提高企业的知识管理效率&#xff0c;…

SpringCloudTencent:安装北极星polaris服务

腾讯推出了自己的微服务框架&#xff0c;其中polaris&#xff08;北极星&#xff09;服务更是核心&#xff0c;除了提供服务发现和治理中心&#xff0c;除服务发现、服务注册和健康检查之外&#xff0c;还提供流量控制、故障容错和安全能力。现在我们试着快速安装一下。 1.win…

Java笔记_11(常见算法)

Java笔记_11常见算法 一、常见算法1.1、查找算法1.2、排序算法 二、Arrays三、Lambda表达式四、综合练习4.1、按照要求进行排序4.2、不死神兔4.3、猴子吃桃子4.4、爬楼梯 一、常见算法 1.1、查找算法 基本查找 package Common_algorithms.Basic_Search;import java.util.Ar…

IT运维:服务器管理

服务器是连接到其他设备以提供服务的设备。其他设备称为客户端&#xff0c;此设置称为客户端-服务器模型。 服务器提供的服务包括存储数据、托管网站、运行应用程序、中继信息和执行计算。任何向发出请求的客户端提供服务的设备都可以充当服务器&#xff0c;但大型IT组织通常具…

笔试练习Day01

目录 选择题&#xff1a; 题一&#xff1a; String 类&#xff1a; StringBuffer 类的详解&#xff1a; 关于 Vector 的详解&#xff1a; 编程题&#xff1a; 题一&#xff1a;组队竞赛 题二&#xff1a;删除公共字符串 选择题&#xff1a; 题一&#xff1a; String 类…

node(express框架)连接mysql 基础篇

文章目录 电脑安装mysql配置mysql连接mysql 创建表 创建node文件启动node node 连接数据库连接数据库 电脑安装mysql 由于我的是mac 我就安装mac版本的 mysql 如已安装跳过此步骤 mysql官网选择版本安装配置 这里注意选择下面的 next输入mysql密码 点击finish 配置mysql 打…

ROS:TF变换

一.TF变换数据格式&#xff08;msg&#xff09; TransformStamped.msg&#xff08;两个坐标系之间转换&#xff09; std_msgs/Header header uint32 seq time stamp string frame_id 指明哪一个坐标系&#xff08;父坐标系&#xff09; string child_f…

汇编语言(第3版) - 学习笔记 - 实验8 分析一个奇怪的程序

实验8 分析一个奇怪的程序 题目解析顺序执行查看反汇编测试一下 题目 分析下面的程序&#xff0c;在运行前思考:这个程序可以正确返回吗? 运行后再思考:为什么是这种结果? 通过这个程序加深对相关内容的理解。 assume cs:codesg codesg segmentmov ax, 4c00h int 21h …

BUUCTF pwn1_sctf_2016

小白垃圾笔记而已&#xff0c;不建议阅读。 唉&#xff0c;因为没有在一开始创建flag文件&#xff0c;导致调试了半天也没有找到问题所在。 这道题是这样的&#xff1a; main函数调用vuln函数 其实在程序中还有一个get_flag函数&#xff1a; 我们可以将返回地址覆盖成它。 覆…

E. Number With The Given Amount Of Divisors

传送门 题意&#xff1a;求出整好有n个因子的最小整数。 思路&#xff1a; 要找到恰好有n个因子的最小整数&#xff0c;我们可以利用质因数分解的思想来求解。设该整数的质因数分解式为&#xff1a;其中p1,p2,...,pn均为不同的质数&#xff0c;a1,a2,...,an均为正整数。则该整…

【vue3】05-vue的双向绑定 — v-model

文章目录 v-mdelv-model的基本使用v-model绑定其他表单元素textareaselectcheckboxradio v-model修饰符 v-mdel v-model 是 Vue.js 中用于表单元素和组件双向数据绑定的指令。它可以将表单元素或组件的值和 Vue 实例的数据属性进行双向绑定: 即当表单元素或组件的值发生变化时…

Ajax 实例

文章目录 AJAX 实例AJAX 实例解析 AJAX 实例 为了帮助您理解 AJAX 的工作原理&#xff0c;我们创建了一个小型的 AJAX 应用程序: 实例 <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <script> function loadXMLDoc() {var x…

第五期(2022-2023)传统行业云原生技术落地调研报告——金融篇

随着数字化浪潮的来临&#xff0c;云原生技术正在改变着各行各业&#xff0c;通过IT变革驱动业务创新发展&#xff0c;促进企业自身以及产业生态的转型升级。 因此&#xff0c;灵雀云联合云原生技术实践联盟&#xff08;CNBPA&#xff09;和行业内头部厂商F5&#xff0c;共同发…

如何实现一款接入chatGPT的智能音箱

现有的一些“智能音箱”如某度和某猫精灵&#xff0c;跟现在的chatGPT比显得智障。如果能有一款接入chatGPT的智能音箱&#xff0c;它的交互性就好多啦。有gpt加持的智能音箱绝对会很强&#xff0c;以下提供探讨下实现思路。 目前智能音箱在语音交互层面依然不够成熟&#xff0…

PSO算法、MATLAB代码实现以及测试效果

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 PSO算法原理进化操作算法流程图matlab代码实现main函数部分适应度函数部分PSO算法主体测试结果 (F1~F6) PSO算法原理 粒子群优化( Particle Swarm Optimization&am…

【C++关联容器】map的成员函数

目录 map 1. 构造、析构和赋值运算符重载 1.1 构造函数 1.2 析构函数 1.3 赋值运算符重载 2. 迭代器 3. 容量 4. 元素访问 5. 修改器 6. 观察者 7. 操作 8. 分配器 map map是关联容器&#xff0c;它按照特定的顺序存储由关键字值和映射值的组合形成的元素。 在一…

【Java】再看排序 —— 关于 Comparator 的用法

谈一个比较基础&#xff0c;又很常用的东西&#xff0c; Comparator 类&#xff0c;之前我写过一篇浅浅的关于这个的文章 &#xff0c;今天再复盘一下这个问题&#xff0c;把它弄熟 ps: 本文中提供的代码&#xff0c;为了提高可读性都没用 lamdam 和 函数式编程 简化书写&…

JVM-0423

运行时内存 程序计数器 作用:记录每个线程的代码执行到哪一条指令了 为了保证程序(在操作系统中理解为进程)能够连续地执行下去&#xff0c;CPU必须具有某些手段来确定下一条指令的地址。而程序计数器正是起到这种作用&#xff0c;所以通常又称为指令计数器。在程序开始执行…

论文阅读:Swin Transformer: Hierarchical Vision Transformer using Shifted Windows

使用移位窗口的分层视觉Transformer 继续阅读Transformer相关 0、摘要 本文提出了一种新的视觉Transformer&#xff0c;称为Swin Transformer&#xff0c;能够作为一个通用的骨干计算机视觉。将Transformer从语言适应到视觉的挑战来自于两个领域之间的差异&#xff0c;例如视…