接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。
常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。
1.模拟数据
1.1 模拟数据
{
"reportFormat": "2",
"reportVersion": 1,
"reports": [
{
"filename": "1733277155032RvReport",
"c": {
"objStationInfo": {
"sStationName": "LLP入口",
"ucStationDir": 1,
"sStationID": 500001
},
"objVehicle": {
"sUUID": "fdabd178-a169-11eb-9483-b95959072a9d",
"w64Timestamp": "1733881971628",
"objRfidInfo": {
"sReaderID": "10",
"objTagData": {
"sTID": "1234567891",
"sEPC": "1234567890"
}
},
"ucReportType": "8",
"ucVehicleType": "1"
}
}
}
]
}
1.2 添加到kafka
使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
连接成功:
添加数据:
添加成功:
2.代码实现
2.1 EnvUtil实现
EnvUtil用于创建flink的运行环境。
package com.zl.utils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
/**
* EnvUtil
* @description:
*/
public class EnvUtil {
/**
* 设置flink执行环境
* @param parallelism 并行度
*/
public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.setInteger("rest.port", 1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
if (parallelism >0 ){
//设置并行度
env.setParallelism(parallelism);
} else {
env.setParallelism(1);// 默认1
}
// 添加重启机制
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
//rocksdb状态后端,启用增量checkpoint
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//设置checkpoint路径
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 同一时间只允许一个 checkpoint 进行(默认)
checkpointConfig.setMaxConcurrentCheckpoints(1);
//最小间隔,10*60*1000=60000
checkpointConfig.setMinPauseBetweenCheckpoints(60000);
// 取消任务后,checkpoint仍然保存
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//checkpoint容忍失败的次数
checkpointConfig.setTolerableCheckpointFailureNumber(5);
//checkpoint超时时间 默认10分钟
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
//禁用operator chain(方便排查反压)
env.disableOperatorChaining();
return env;
}
public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//设置时区 东八
tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
Configuration configuration = tenv.getConfig().getConfiguration();
// 开启miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
//设置TTL API指定
tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));
return tenv;
}
}
2.2 FlinkSourceUtil实现
FlinkSourceUtil用于连接kafka。
package com.zl.kafka.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
/**
* @desc:
*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
private String uniqueId;//flink生成的唯一键
private long reportTime;// 过车时间
private String dt; // 分区字段
private String dh; // 小时
private String reportFormat;
private int reportVersion;
private String filename;
public String sStationName; // 采集点名称
public String ucStationDir; // 采集点方向编号
public String sStationID; // 采集点编号
private String sUUID;
private long w64Timestamp; //事件时间(毫秒级别)
private String sReaderID;//射频设备(模块)代码
private String sTIDR;
private String sEPCR;
private int ucReportType;//8->视频 2->射频 138,202->视频+射频
private int ucVehicleType;
public void parseTableColunm() {
this.reportTime = this.w64Timestamp;
this.uniqueId = this.sUUID;
}
}
2.3 RvTable实现
RvTable解析数据最后存储的model。
package com.zl.kafka.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
/**
* @desc:
*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
private String uniqueId;//flink生成的唯一键
private long reportTime;// 过车时间
private String dt; // 分区字段
private String dh; // 小时
private String reportFormat;
private int reportVersion;
private String filename;
public String sStationName; // 采集点名称
public String ucStationDir; // 采集点方向编号
public String sStationID; // 采集点编号
private String sUUID;
private long w64Timestamp; //事件时间(毫秒级别)
private String sReaderID;//射频设备(模块)代码
private String sTIDR;
private String sEPCR;
private int ucReportType;//8->视频 2->射频 138,202->视频+射频
private int ucVehicleType;
public void parseTableColunm() {
this.reportTime = this.w64Timestamp;
this.uniqueId = this.sUUID;
}
}
2.4 核心逻辑实现
package com.zl.kafka;
import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
public class KafkaExample {
public static void main(String[] args) throws Exception {
// 配置运行环境,并行度1
StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
// 程序间隔离,每个程序单独设置
env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");
/// 读取kafka数据
SingleOutputStreamOperator<String> rvSourceStream = env
.addSource(FlinkSourceUtil.getKafkaSource(
"rvGroup",
"rv-test",
"10.86.97.21:9092",
"earliest"))// earliest/latest
.setParallelism(1).uid("getRV").name("getRV");
// 解析转换数据格式
SingleOutputStreamOperator<String> rvParseStream = null;
try {
rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
if (StringUtils.isEmpty(value)) {
return;
}
parseRVData(value, out);
}
}).setParallelism(1).uid("rvParse").name("rvParse");
} catch (Exception e) {
e.printStackTrace();
}
rvParseStream.print();
env.execute("rvParseJob");
}// main
public static void parseRVData(String jsonStr, Collector<String> out) {
try {
if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {
return;
}
JSONObject in = JSONObject.parseObject(jsonStr);
// =====报告头信息 =====
String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));
int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));
JSONArray reports = in.getJSONArray("reports");
if (reports != null) {
for (int i = 0; i < reports.size(); i++) {
RvTable rvTable = new RvTable();
JSONObject record = reports.getJSONObject(i);
if (record != null) {
String filename = stringDefaultIfEmpty(record.getString("filename"));
JSONObject c = record.getJSONObject("c");
if (c != null) {
// ===== 采集点信息 =====
JSONObject objStationInfo = c.getJSONObject("objStationInfo");
if(objStationInfo != null) {
rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));
rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));
rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));
}
JSONObject objVehicle = c.getJSONObject("objVehicle");
if (objVehicle != null) {
// ===== 车辆报告信息 =====
rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));
rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));
rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));
rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));
// ===== 车辆报告信息/射频车辆信息 =====
JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");
if (objRfidInfo != null) {
rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));
JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");
if (objTagData != null) {
rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));
rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));
}
}
// ===== 自加特殊处理字段 =====
long timestamp = rvTable.getW64Timestamp();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
Date date = new Date(timestamp);
String[] s = simpleDateFormat.format(date).split(" ");
rvTable.setDt(s[0]);
rvTable.setDh(s[1]);
out.collect(JSONObject.toJSONString(rvTable));
}// if (objVehicle != null)
}// if (c != null)
}// if (record != null)
}// for 循环
}
} catch (Exception e) {
e.printStackTrace();
// 此处把解析后的数据存储到数据库……
}
}// parseRVData
public static boolean isJSON(String str) {
boolean result;
try {
JSON.parse(str);
result = true;
} catch (Exception e) {
result = false;
}
return result;
}
public static int intDefaultIfEmpty(Integer num) {
if (num == null) {
num = 0;
return num;
}
return num;
}
public static String stringDefaultIfEmpty(String str) {
return StringUtils.defaultIfEmpty(str, "ENULL");
}
public static Long longDefaultIfEmpty(Long num) {
if (num == null) {
num = 0l;
return num;
}
return num;
}
public static Double doubleDefaultIfEmpty(Double num) {
if (num == null) {
num = 0.0;
return num;
}
return num;
}
}
2.5 pom.xml
注意修改此处:
3.运行效果
3.1 运行日志
3.2 web UI
访问:http://IP:1000/
4.部署
相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:
flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
5. 完整代码
完整代码见:https://gitee.com/core815/flink-cdc-mysql