1.准备环境
JDK1.8 |
MySQL |
Zookeeper |
Kakfa |
Maxweill |
IDEA |
2.实操
2.1开启mysql的binlog
查看binlog 状态,是否开启
show variables like 'log_%'
如果log_bin显示为ON,则代表已开启。如果是OFF 说明还没开启。
[Linux] 编辑 /etc/my.cnf 文件,在[mysqld]后面增加
server-id=1
log-bin=mysql-bin
binlog_format=row
#如果不加此参数,默认所有库开启binlog
binlog-do-db=gmall_20230424
重启mysql 服务
service mysqld restart
再次查看binlog 状态
[Windows] 编辑 mysql安装目录 下 my.ini 文件,在[mysqld]后面增加 如上 linux 一样
2.2 Zookeeper 、 Kafka
2.2.1启动 ZK
bin/zkServer.sh start
2.2.2启动 Kakfa
#常规模式启动
bin/kafka-server-start.sh config/server.properties
#进程守护模式启动
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
2.2.3创建 kafka-topic
bin/kafka-topics.sh --bootstrap-server 192.168.221.100:9092 --create --topic flink_test01 --partitions 1 --replication-factor 1
测试kafka-topic
#消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.221.100:9092 --topic flink_test01 --from-beginning
#生产
bin/kafka-console-producer.sh --broker-list 192.168.221.100:9092 --topic flink_t
2.3配置Maxwell
2.3.1创建Maxwell 所需要的 数据库 和 用户
1)创建数据库
CREATE DATABASE maxwell;
2)调整MySQL数据库密码级别
set global validate_password_policy=0;
set global validate_password_length=4;
3)创建Maxwell用户并赋予其必要权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
2.3.2配置Maxwell
在Maxwell安装包解压目录下,复制 并 编辑 config.properties.example
mv config.properties.example config.properties
vim config.properties
producer=kafka
kafka.bootstrap.servers=192.168.221.100:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table} kafka_topic=flink_test01
# mysql login info host=192.168.221.100
user=maxwell
password=maxwell
2.3.3 启动Maxwell
1)启动Maxwell
bin/maxwell --config config.properties
2)停止Maxwell
ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
2.4测试maxwell、mysql、kafka 正常使用
2.4.1查看Maxwell、kafka、zookeeper 进程
jps
2.4.2 mysql添加、修改、删除数据
查看 kafka 消费者
有消费 说明 流程是通畅 的
2.5 idea 编写程序
2.5.1 idea 创建 maven 项目
2.5.2 pom.xml 依赖
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<!--mysql cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.29</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<!--本地调试flink ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.13.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
2.5.3 编写测试代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qiyu.dim.KafkaUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
/**
* @Author liujian
* @Date 2023/4/24 9:40
* @Version 1.0
*/
public class Flink_kafka {
public static void main(String[] args) throws Exception {
// todo 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 2.将并行度设为1, (生产环境中,kafka中 topic有几个分区 设为几)
env.setParallelism(1);
// todo 3.读取 maxwell kafka 数据流
DataStream dataStreamSource=
env.addSource(KafkaUtil.getKafkaConsumer("flink_test01","192.168.221.100:9092"));
// todo 4.取kafka中的数据流 有效数据,获取 emp_user 表中的 新增、修改、初始化数据 。脏数据直接打印控制台,不处理
DataStream<JSONObject> data = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
try {
// 将 数据流中 类型 转换 String >> JsonObject
JSONObject json = JSON.parseObject(s);
//取 emp_user 表数据
if (json.getString("table").equals("emp_user")) {
//取新增、修改数据
if (json.getString("type").equals("insert") || json.getString("type").equals("update")) {
System.out.println(json.getJSONObject("data"));
collector.collect(json.getJSONObject("data"));
}
}
} catch (Exception e) {
System.out.println("脏数据:" + s);
}
}
});
// todo 5. 将 有效数据转换 为 Row 类型 。JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类
DataStream<Row> map = data.map(new MapFunction<JSONObject, Row>() {
@Override
public Row map(JSONObject jsonObject) throws Exception {
Row row = new Row(4);
row.setField(0, jsonObject.getString("id"));
row.setField(1, jsonObject.getString("name"));
row.setField(2, jsonObject.getString("age"));
row.setField(3, jsonObject.getString("sex"));
return row;
}
});
// todo 6. 将 数据存储 到 mysql 当中,同主键 数据 就修改, 无 就新增
String query =
"INSERT INTO gmall_20230424.emp_user_copy (id,name,age,sex) " +
"VALUES (?, ?,?,?) " +
"ON DUPLICATE KEY UPDATE name = VALUES(name) , age = VALUES(age) , sex = VALUES(sex)";
JDBCOutputFormat finish = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.221.100:3306/gmall_20230424?user=root&password=000000")
.setQuery(query)
.setBatchInterval(1)
.finish();
//todo 7.提交存储任务
map.writeUsingOutputFormat(finish);
//todo 8.提交flink 任务
env.execute();
}
}
2.5.4 启动测试代码
2.5.5 测试
如 2.4.2 一样 ,在测试表 emp_user 中 进行 新增 、修改
查看 是否写入 emp_user_copy表中
INSERT into emp_user VALUES ("1","zhangsan",22,"F");
INSERT into emp_user VALUES ("2","lisi",22,"M");
INSERT into emp_user VALUES ("3","wangwu",22,"F");
INSERT into emp_user VALUES ("4","jia",22,"M");
INSERT into emp_user VALUES ("5","yi",22,"F");
UPDATE emp_user set age=23 where id ="4";
INSERT into emp_user VALUES ("6","666",22,"F");
新增 id为 4的数据时:age=22,但后面做了次 update age=23