文章目录
- 视频链接
- bxg代码文档
- 项目离线数据准备
- MySQL映射表
- 流数据准备
- num.txt
- makedata.log(空文件)
- start.sh
- create-log.sh
- insert-data.sh
- 维表
- 创建paimon_dim表
- mysql_to_paimon_dim任务
- 事实表
- ODS层
- ECS创建ods层kafka topic
- kafka_ods表
- mysql_to_kafka_ods数据插入
- paimon_ods表
- kafka_ods_to_paimon_ods
- 附录
- 组件安装
视频链接
第二章:基于Flink CDC数据采集
bxg代码文档
项目离线数据准备
- mysql
create database if not exists bxg character set utf8;
create database if not exists bxg_1 character set utf8;
- 运行脚本bxg.sql,bxg1.sql
MySQL映射表
create database if not exists mysql;
DROP TABLE IF EXISTS mysql.mysql_oe_stu_course_order;
CREATE TABLE if not exists mysql.mysql_oe_stu_course_order
(
`id` INT,
`student_course_id` INT,
`order_id` STRING,
`order_detail_id` STRING,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-2ze047w59ovk9299x.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itcast',
'password' = 'Itcast123',
'database-name' = 'bxg',
'table-name' = 'oe_stu_course_order'
);
CREATE TABLE if not exists mysql.mysql_oe_stu_course
(
`id` INT,
`student_id` STRING,
`course_id` INT,
`status` TINYINT,
`contract_status` TINYINT,
`learn_status` TINYINT,
`service_days` SMALLINT,
`service_expires` TIMESTAMP(3),
`validity_days` INT,
`validity_expires` TIMESTAMP(3),
`terminate_cause` TINYINT,
`effective_date` TIMESTAMP(3),
`finished_time` TIMESTAMP(3),
`total_progress` DECIMAL(10, 2),
`purchase_time` INT,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-2ze047w59ovk9299x.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itcast',
'password' = 'Itcast123',
'database-name' = 'bxg',
'table-name' = 'oe_stu_course'
);
CREATE TABLE if not exists mysql.mysql_oe_order
(
`id` STRING,
`channel` STRING,
`student_id` STRING,
`order_no` STRING,
`total_amount` DECIMAL(10, 2),
`discount_amount` DECIMAL(10, 2),
`charge_against_amount` DECIMAL(10, 2),
`payable_amount` DECIMAL(10, 2),
`status` TINYINT,
`pay_status` TINYINT,
`pay_time` TIMESTAMP(3),
`paid_amount` DECIMAL(10, 2),
`effective_date` TIMESTAMP(3),
`terminal` TINYINT,
`refund_status` TINYINT,
`refund_amount` DECIMAL(10, 2),
`refund_time` TIMESTAMP(3),
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-2ze047w59ovk9299x.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itcast',
'password' = 'Itcast123',
'database-name' = 'bxg',
'table-name' = 'oe_order'
);
CREATE TABLE if not exists mysql.mysql_oe_course
(
`id` INT,
`grade_name` STRING,
`bigimg_path` STRING,
`video_url` STRING,
`img_alt` STRING,
`description` STRING,
`detailimg_path` STRING,
`smallimg_path` STRING,
`sort` INT,
`status` STRING,
`learnd_count` INT,
`learnd_count_flag` INT,
`original_cost` DECIMAL(10, 2),
`current_price` DECIMAL(10, 2),
`course_length` DECIMAL(10, 2),
`menu_id` INT,
`is_free` BOOLEAN,
`course_detail` STRING,
`course_detail_mobile` STRING,
`course_detail1` STRING,
`course_detail1_mobile` STRING,
`course_plan_detail` STRING,
`course_plan_detail_mobile` STRING,
`course_detail2` STRING,
`course_detail2_mobile` STRING,
`course_outline` STRING,
`common_problem` STRING,
`common_problem_mobile` STRING,
`lecturer_id` INT,
`is_recommend` INT,
`recommend_sort` INT,
`qqno` STRING,
`description_show` INT,
`rec_img_path` STRING,
`pv` INT,
`course_type` INT,
`default_student_count` INT,
`study_status` INT,
`online_course` INT,
`course_level` INT,
`content_type` INT,
`recommend_type` INT,
`employment_rate` STRING,
`employment_salary` STRING,
`score` STRING,
`cover_url` STRING,
`offline_course_url` STRING,
`outline_url` STRING,
`project_page_url` STRING,
`preschool_test_flag` BOOLEAN,
`service_period` INT,
`included_validity_period` TINYINT,
`validity_period` INT,
`qualified_jobs` STRING,
`work_year_min` INT,
`work_year_max` INT,
`promote_flag` BOOLEAN,
`create_person` STRING,
`update_person` STRING,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`is_delete` BOOLEAN,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-2ze047w59ovk9299x.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itcast',
'password' = 'Itcast123',
'database-name' = 'bxg',
'table-name' = 'oe_course'
);
流数据准备
num.txt
1
makedata.log(空文件)
start.sh
#!/bin/bash
echo "开始时间:`date '+%Y-%m-%d %H:%M:%S'`"
echo "大概时间:`expr $2 \* 60`秒"
nohup sh ./create-log.sh $@ > makedata.log 2>&1 &
tail -f makedata.log
create-log.sh
#!/bin/bash
echo "执行流数据写入脚本,10s后开启流数据写入!"
sleep 10
step=$1
total=`expr $2 \* 60`
echo "1" > /root/sql/num.txt
num=$(cat num.txt)
for ((i=0;i<total;i=(i+step)));do
sh ./insert-data.sh $num $@
num=`expr $num + 1`
echo $num > num.txt
sleep $step
done
exit 0
insert-data.sh
#!/bin/bash
num=$1
oe_stu_course_order="
replace into bxg.oe_stu_course_order
select id,
student_course_id,
order_id,
order_detail_id,
timestampadd(second, timestampdiff(second, update_time, now()), create_time) as create_time,
now() as update_time,
delete_flag
from bxg_1.oe_stu_course_order
where rn = ${num}
;
"
oe_stu_course="
replace into bxg.oe_stu_course
select id,
student_id,
course_id,
status,
contract_status,
learn_status,
service_days,
timestampadd(second, timestampdiff(second, update_time, now()), service_expires) as service_expires,
validity_days,
timestampadd(second, timestampdiff(second, update_time, now()), validity_expires) as validity_expires,
terminate_cause,
timestampadd(second, timestampdiff(second, update_time, now()), effective_date) as effective_date,
timestampadd(second, timestampdiff(second, update_time, now()), finished_time) as finished_time,
total_progress,
purchase_time,
timestampadd(second, timestampdiff(second, update_time, now()), create_time) as create_time,
now() as update_time,
delete_flag
from bxg_1.oe_stu_course
where rn = ${num}
;
"
oe_order="
replace into bxg.oe_order
select id,
channel,
student_id,
order_no,
total_amount,
discount_amount,
charge_against_amount,
payable_amount,
status,
pay_status,
timestampadd(second, timestampdiff(second, update_time, now()), pay_time) as pay_time,
paid_amount,
timestampadd(second, timestampdiff(second, update_time, now()), effective_date) as effective_date,
terminal,
refund_status,
refund_amount,
timestampadd(second, timestampdiff(second, update_time, now()), refund_time) as refund_time,
timestampadd(second, timestampdiff(second, update_time, now()), create_time) as create_time,
now() as update_time,
delete_flag
from bxg_1.oe_order
where rn = ${num}
;
"
i=1
for argument in $@
do
if [ $i -ge 4 ];then
case $argument in
"oe_stu_course_order")
sql=$oe_stu_course_order
;;
"oe_stu_course")
sql=$oe_stu_course
;;
"oe_order")
sql=$oe_order
;;
"all")
sql=$oe_stu_course_order$oe_stu_course$oe_order
;;
esac
echo $sql
mysql --host=rm-2ze047w59ovk9299x.rwlb.rds.aliyuncs.com --port=3306 --user=itcast --password=Itcast123 --database=bxg -e"${sql}"
fi
i=`expr $i + 1`
done
维表
创建paimon_dim表
- 在dev作业中建库
create database if not exists paimon.dim;
CREATE TABLE if not exists paimon.dim.dim_oe_course
(
`id` INT,
`grade_name` STRING,
`bigimg_path` STRING,
`video_url` STRING,
`img_alt` STRING,
`description` STRING,
`detailimg_path` STRING,
`smallimg_path` STRING,
`sort` INT,
`status` STRING,
`learnd_count` INT,
`learnd_count_flag` INT,
`original_cost` DECIMAL(10, 2),
`current_price` DECIMAL(10, 2),
`course_length` DECIMAL(10, 2),
`menu_id` INT,
`is_free` BOOLEAN,
`course_detail` STRING,
`course_detail_mobile` STRING,
`course_detail1` STRING,
`course_detail1_mobile` STRING,
`course_plan_detail` STRING,
`course_plan_detail_mobile` STRING,
`course_detail2` STRING,
`course_detail2_mobile` STRING,
`course_outline` STRING,
`common_problem` STRING,
`common_problem_mobile` STRING,
`lecturer_id` INT,
`is_recommend` INT,
`recommend_sort` INT,
`qqno` STRING,
`description_show` INT,
`rec_img_path` STRING,
`pv` INT,
`course_type` INT,
`default_student_count` INT,
`study_status` INT,
`online_course` INT,
`course_level` INT,
`content_type` INT,
`recommend_type` INT,
`employment_rate` STRING,
`employment_salary` STRING,
`score` STRING,
`cover_url` STRING,
`offline_course_url` STRING,
`outline_url` STRING,
`project_page_url` STRING,
`preschool_test_flag` BOOLEAN,
`service_period` INT,
`included_validity_period` INT,
`validity_period` INT,
`qualified_jobs` STRING,
`work_year_min` INT,
`work_year_max` INT,
`promote_flag` BOOLEAN,
`create_person` STRING,
`update_person` STRING,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`is_delete` BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
);
mysql_to_paimon_dim任务
- 在mysql_to_paimon_dim作业中配置作业,数据插入
insert into paimon.dim.dim_oe_course
select * from mysql.mysql_oe_course;
事实表
ODS层
Kafka启动
cd /export/server/kafka/bin/
./kafka-server-start.sh -daemon ../config/server.properties
ECS创建ods层kafka topic
kafka-topics.sh --create --topic ods_oe_order --bootstrap-server 172.16.10.106:9092
kafka-topics.sh --create --topic ods_oe_stu_course --bootstrap-server 172.16.10.106:9092
kafka-topics.sh --create --topic ods_oe_stu_course_order --bootstrap-server 172.16.10.106:9092
kafka_ods表
在dev作业中建库,建表
create database if not exists dw;
CREATE TABLE if not exists dw.kafka_ods_oe_stu_course_order
(
`id` INT,
`student_course_id` INT,
`order_id` STRING,
`order_detail_id` STRING,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
proctime as proctime(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_oe_stu_course_order',
'properties.bootstrap.servers' = '172.16.10.106:9092',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE TABLE if not exists dw.kafka_ods_oe_stu_course
(
`id` INT,
`student_id` STRING,
`course_id` INT,
`status` INT,
`contract_status` INT,
`learn_status` INT,
`service_days` INT,
`service_expires` TIMESTAMP(3),
`validity_days` INT,
`validity_expires` TIMESTAMP(3),
`terminate_cause` INT,
`effective_date` TIMESTAMP(3),
`finished_time` TIMESTAMP(3),
`total_progress` DECIMAL(10, 2),
`purchase_time` INT,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
proctime as proctime(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_oe_stu_course',
'properties.bootstrap.servers' = '172.16.10.106:9092',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE TABLE if not exists dw.kafka_ods_oe_order
(
`id` STRING,
`channel` STRING,
`student_id` STRING,
`order_no` STRING,
`total_amount` DECIMAL(10, 2),
`discount_amount` DECIMAL(10, 2),
`charge_against_amount` DECIMAL(10, 2),
`payable_amount` DECIMAL(10, 2),
`status` INT,
`pay_status` INT,
`pay_time` TIMESTAMP(3),
`paid_amount` DECIMAL(10, 2),
`effective_date` TIMESTAMP(3),
`terminal` INT,
`refund_status` INT,
`refund_amount` DECIMAL(10, 2),
`refund_time` TIMESTAMP(3),
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
proctime as proctime(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_oe_order',
'properties.bootstrap.servers' = '172.16.10.106:9092',
'key.format' = 'json',
'value.format' = 'json'
);
mysql_to_kafka_ods数据插入
- 创建mysql_to_kafka_ods作业草稿
BEGIN STATEMENT SET;
insert into dw.kafka_ods_oe_stu_course_order
select *
from mysql.mysql_oe_stu_course_order
-- /*+ OPTIONS('scan.startup.mode' ='latest-offset') */
;
insert into dw.kafka_ods_oe_stu_course
select *
from mysql.mysql_oe_stu_course
-- /*+ OPTIONS('scan.startup.mode' ='latest-offset') */
;
insert into dw.kafka_ods_oe_order
select *
from mysql.mysql_oe_order
-- /*+ OPTIONS('scan.startup.mode' ='latest-offset') */
;
END;
查看Topic的数据
kafka-console-consumer.sh --bootstrap-server 172.16.10.106:9092 --topic ods_oe_order --from-beginning
删除Topic
kafka-topics.sh --delete --topic ods_oe_order --bootstrap-server 172.16.10.106:9092
kafka-topics.sh --delete --topic ods_oe_stu_course --bootstrap-server 172.16.10.106:9092
kafka-topics.sh --delete --topic ods_oe_stu_course_order --bootstrap-server 172.16.10.106:9092
验证kafka_ods层的数据量:
select count(*) from dw.kafka_ods_oe_order;
和RDS的MySQL的bxg.oe_order表的条数一致即可。
关于Flink资源不足的处理方案:
解决方案:将Task Managers 数量调整为2或者以上,也可以将TaskManager的Cpu cores设置更大一些
Task Managers 数量原来是1,由于数据量大带来的计算量大一些,需要调大一些,这里修改为2或以上
关于ECS内存不足的处理方案:
由于免费资源领取ecs服务器1核2g内存,所以运行组件较多的时候,建议重启ecs云服务器在开启服务
重启ECS:
1、关闭Flink作业运维中的作业
2、重启ECS,可以在控制台关闭,可以init 6
重启后的相关组件重启:
1、启动Zookpeer
cd /export/server/zookeeper/bin/
./zkServer.sh start
2、启动Kafka服务
cd /export/server/kafka/bin/
./kafka-server-start.sh -daemon ../config/server.properties
3、查看进程jps
4、重新开启Flink作业中作业运维中的作业
5、重新查询对应Kafka_ods层的数据
paimon_ods表
create database if not exists paimon.ods;
CREATE TABLE if not exists paimon.ods.paimon_ods_oe_stu_course_order (
`id` INT,
`student_course_id` INT,
`order_id` STRING,
`order_detail_id` STRING,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
`proctime` timestamp,
dt string comment '日期',
PRIMARY KEY (id,dt) NOT ENFORCED
) partitioned by (dt)
;
CREATE TABLE if not exists paimon.ods.paimon_ods_oe_stu_course (
`id` INT,
`student_id` STRING,
`course_id` INT,
`status` INT,
`contract_status` INT,
`learn_status` INT,
`service_days` INT,
`service_expires` TIMESTAMP(3),
`validity_days` INT,
`validity_expires` TIMESTAMP(3),
`terminate_cause` INT,
`effective_date` TIMESTAMP(3),
`finished_time` TIMESTAMP(3),
`total_progress` DECIMAL(10,2),
`purchase_time` INT,
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
`proctime` timestamp,
dt string comment '日期',
PRIMARY KEY (id,dt) NOT ENFORCED
) partitioned by (dt)
;
CREATE TABLE if not exists paimon.ods.paimon_ods_oe_order (
`id` STRING,
`channel` STRING,
`student_id` STRING,
`order_no` STRING,
`total_amount` DECIMAL(10,2),
`discount_amount` DECIMAL(10,2),
`charge_against_amount` DECIMAL(10,2),
`payable_amount` DECIMAL(10,2),
`status` INT,
`pay_status` INT,
`pay_time` TIMESTAMP(3),
`paid_amount` DECIMAL(10,2),
`effective_date` TIMESTAMP(3),
`terminal` INT,
`refund_status` INT,
`refund_amount` DECIMAL(10,2),
`refund_time` TIMESTAMP(3),
`create_time` TIMESTAMP(3),
`update_time` TIMESTAMP(3),
`delete_flag` BOOLEAN,
`proctime` timestamp,
dt string comment '日期',
PRIMARY KEY (id,dt) NOT ENFORCED
) partitioned by (dt)
;
kafka_ods_to_paimon_ods
BEGIN STATEMENT SET;
insert into paimon.ods.paimon_ods_oe_stu_course_order
select
*,
date_format(create_time,'yyyy-MM-dd') as dt
from dw.kafka_ods_oe_stu_course_order;
insert into paimon.ods.paimon_ods_oe_stu_course
select
*,
date_format(create_time,'yyyy-MM-dd') as dt
from dw.kafka_ods_oe_stu_course;
insert into paimon.ods.paimon_ods_oe_order
select
*,
date_format(create_time,'yyyy-MM-dd') as dt
from dw.kafka_ods_oe_order;
END;
附录
组件安装
安装jdk:
yum list java
yum install java-1.8.0-openjdk-devel.x86_64
cd /usr/lib/jvm/
环境变量配置:
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java
export JRE_HOME=
J
A
V
A
H
O
M
E
/
j
r
e
e
x
p
o
r
t
C
L
A
S
S
P
A
T
H
=
.
:
JAVA_HOME/jre export CLASSPATH=.:
JAVAHOME/jreexportCLASSPATH=.:JAVA_HOME/lib:
J
R
E
H
O
M
E
/
l
i
b
:
JRE_HOME/lib:
JREHOME/lib:CLASSPATH
export PATH=
P
A
T
H
:
PATH:
PATH:JRE_HOME/bin:$JAVA_HOME/bin
安装zk:
创建目录:
mkdir -p /export/software
mkdir -p /export/server
mkdir -p /export/data
cd /export/software
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.10/apache-zookeeper-3.5.10-bin.tar.gz
tar -zxf apache-zookeeper-3.5.10-bin.tar.gz -C /export/server/
cd /export/server
ln -s apache-zookeeper-3.5.10-bin/ zookeeper
cd /export/data
mkdir zkdata
cd /export/server/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
#修改
dataDir=/export/data/zkdata
#文件最后添加 2888心跳端口 3888选举端口
server.1=172.26.254.71:2888:3888
#为内网ip创建myid文件
mkdir -p /export/data/zkdata
echo 1 >/export/data/zkdata/myid
#启动、查看进程是否启动
cd /export/server/zookeeper/bin/
./zkServer.sh start
#Kafka安装
下载安装包
cd /export/software
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
解压安装包
cd /export/software/
tar -zxvf kafka_2.12-3.5.0.tgz -C /export/server/
@创建软链接
cd /export/server
ln -s /export/server/kafka_2.12-3.5.0 kafka
创建/export/data/kafka-logs目录
cd /export/data
mkdir kafka-logs
修改配置
cd /export/server/kafka/config
vim server.properties
#指定broker的id
broker.id=0
#指定 kafka的绑定监听的地址
listeners=PLAINTEXT://172.18.12.37:9092
#指定Kafka数据的位置
log.dirs=/export/data/kafka-logs
#配zk节点
zookeeper.connect=172.18.12.37:2181
注意:配置文件中要填入服务器内网ip,而不能填入域名,否则会导致后续flink连接不上kafka
配胃环境变量
vim /etc/profile
export KAFKA_HOME=/export/server/kafka
export PATH=:
P
A
T
H
:
PATH:
PATH:{KAFKA_HOME}/bin
刷新环境变量
source /etc/profile
启动服务
cd /export/server/kafka/bin/
./kafka-server-start.sh -daemon …/config/server.properties
查看服务jps
常用命令
#查看所有主题
kafka-topics.sh --list --bootstrap-server 172.26.254.71:9092
#创建topic
kafka-topics.sh --create --topic test --bootstrap-server 172.26.254.71:9092
#MySQL安装
yum install -y perl.x86_64
yum install -y libaio.x86_64
yum install -y net-tools.x86_64
下载解压
mkdir /export/server/mysql5.7
cd /export/software/
wget https://cdn.mysql.com/archives/mysql-5.7/mysql-5.7.29-1.el7.x86_64.rpm-bundle.tar
tar -xvf /export/software/mysql-5.7.29-1.el7.x86_64.rpm-bundle.tar -C /export/server/mysql5.7
cd /export/server/mysql5.7
rpm安装mysql
rpm -ivh mysql-community-common-5.7.29-1.el7.x86_64.rpm mysql-community-libs-5.7.29-1.el7.x86_64.rpm mysql-community-client-5.7.29-1.el7.x86_64.rpm mysql-community-server-5.7.29-1.el7.x86_64.rpm
初始化服务和修改文件权限
mysqld --initialize
chown mysql:mysql /var/lib/mysql -R
启动服务并设骂开机自启
systemctl start mysqld.service
systemctl enable mysqld.service
登录Mysql修改密码及配置远程登录
Pass=$(grep ‘A temporary password’ /var/log/mysqld.log |awk '{print KaTeX parse error: Expected 'EOF', got '}' at position 3: NF}̲') mysql -uroot…Pass"
mysgl>alter user user() identified by “123456”;
use mysql;
GRANT ALL PRIVILEGES ON . TO ‘root’@‘%’ IDENTIFIED BY ‘123456’ WITH GRANT OPTION;
FLUSH PRIVILEGES;
exit;