flink实现复杂kafka数据读取

news2024/12/22 20:33:47

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。

1.模拟数据

1.1 模拟数据

{
    "reportFormat": "2",
    "reportVersion": 1,
    "reports": [
        {
            "filename": "1733277155032RvReport",
            "c": {
                "objStationInfo": {
                    "sStationName": "LLP入口",
                    "ucStationDir": 1,
                    "sStationID": 500001
                },
                "objVehicle": {
                    "sUUID": "fdabd178-a169-11eb-9483-b95959072a9d",
                    "w64Timestamp": "1733881971628",
                    "objRfidInfo": {
                        "sReaderID": "10",
                        "objTagData": {
                            "sTID": "1234567891",
                            "sEPC": "1234567890"
                        }
                    },
                    "ucReportType": "8",
                    "ucVehicleType": "1"
                }
            }
        }
    ]
}

1.2 添加到kafka

使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
在这里插入图片描述
连接成功:
在这里插入图片描述
添加数据:
在这里插入图片描述
在这里插入图片描述
添加成功:
在这里插入图片描述

2.代码实现

2.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
            //设置并行度
            env.setParallelism(parallelism);
        } else {
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }

    public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        //设置时区 东八
        tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
        Configuration configuration = tenv.getConfig().getConfiguration();
        // 开启miniBatch
        configuration.setString("table.exec.mini-batch.enabled", "true");
        // 批量输出的间隔时间
        configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
        // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
        configuration.setString("table.exec.mini-batch.size", "20000");
        // 开启LocalGlobal
        configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
        //设置TTL API指定
        tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));

        return tenv;
    }

}

2.2 FlinkSourceUtil实现

FlinkSourceUtil用于连接kafka。

package com.zl.kafka.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;

/**
 * @desc:
 */
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
	private String uniqueId;//flink生成的唯一键
	private long reportTime;// 过车时间
	private String dt;                           // 分区字段
	private String dh;                           // 小时

	private String reportFormat;
	private int reportVersion;
	private String filename;
	public String sStationName;    // 采集点名称
	public String ucStationDir;     // 采集点方向编号
	public String sStationID;      // 采集点编号
	private String sUUID;
	private long w64Timestamp;     //事件时间(毫秒级别)
	private String sReaderID;//射频设备(模块)代码
	private String sTIDR;
	private String sEPCR;
	private int ucReportType;//8->视频 2->射频 138,202->视频+射频
	private int ucVehicleType;

	public void parseTableColunm() {
		this.reportTime = this.w64Timestamp;
		this.uniqueId = this.sUUID;
	}
}

2.3 RvTable实现

RvTable解析数据最后存储的model。

package com.zl.kafka.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;

/**
 * @desc:
 */
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
	private String uniqueId;//flink生成的唯一键
	private long reportTime;// 过车时间
	private String dt;                           // 分区字段
	private String dh;                           // 小时

	private String reportFormat;
	private int reportVersion;
	private String filename;
	public String sStationName;    // 采集点名称
	public String ucStationDir;     // 采集点方向编号
	public String sStationID;      // 采集点编号
	private String sUUID;
	private long w64Timestamp;     //事件时间(毫秒级别)
	private String sReaderID;//射频设备(模块)代码
	private String sTIDR;
	private String sEPCR;
	private int ucReportType;//8->视频 2->射频 138,202->视频+射频
	private int ucVehicleType;

	public void parseTableColunm() {
		this.reportTime = this.w64Timestamp;
		this.uniqueId = this.sUUID;
	}

}

2.4 核心逻辑实现

package com.zl.kafka;

import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

public class KafkaExample {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");

        /// 读取kafka数据
        SingleOutputStreamOperator<String> rvSourceStream = env
                .addSource(FlinkSourceUtil.getKafkaSource(
                        "rvGroup",
                        "rv-test",
                        "10.86.97.21:9092",
                        "earliest"))// earliest/latest
                .setParallelism(1).uid("getRV").name("getRV");

        // 解析转换数据格式
        SingleOutputStreamOperator<String> rvParseStream = null;
        try {
            rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) {
                    if (StringUtils.isEmpty(value)) {
                        return;
                    }
                    parseRVData(value, out);
                }
            }).setParallelism(1).uid("rvParse").name("rvParse");
        } catch (Exception e) {
            e.printStackTrace();
        }

        rvParseStream.print();

        env.execute("rvParseJob");

    }// main

    public static void parseRVData(String jsonStr, Collector<String> out) {
        try {
            if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {
                return;
            }
            JSONObject in = JSONObject.parseObject(jsonStr);
            // =====报告头信息 =====
            String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));
            int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));
            JSONArray reports = in.getJSONArray("reports");

            if (reports != null) {
                for (int i = 0; i < reports.size(); i++) {
                    RvTable rvTable = new RvTable();
                    JSONObject record = reports.getJSONObject(i);
                    if (record != null) {
                        String filename = stringDefaultIfEmpty(record.getString("filename"));
                        JSONObject c = record.getJSONObject("c");
                        if (c != null) {
                            // ===== 采集点信息 =====
                            JSONObject objStationInfo = c.getJSONObject("objStationInfo");
                            if(objStationInfo != null) {
                                rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));
                                rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));
                                rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));
                            }
                            JSONObject objVehicle = c.getJSONObject("objVehicle");
                            if (objVehicle != null) {
                                // ===== 车辆报告信息 =====
                                rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));
                                rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));
                                rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));
                                rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));
                                // ===== 车辆报告信息/射频车辆信息 =====
                                JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");
                                if (objRfidInfo != null) {
                                    rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));
                                    JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");
                                    if (objTagData != null) {
                                        rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));
                                        rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));
                                    }
                                }
                                // ===== 自加特殊处理字段 =====
                                long timestamp = rvTable.getW64Timestamp();
                                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
                                Date date = new Date(timestamp);
                                String[] s = simpleDateFormat.format(date).split(" ");
                                rvTable.setDt(s[0]);
                                rvTable.setDh(s[1]);
                                out.collect(JSONObject.toJSONString(rvTable));
                            }// if (objVehicle != null)
                        }// if (c != null)
                    }// if (record != null)
                }// for 循环
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 此处把解析后的数据存储到数据库……
        }
    }// parseRVData

    public static boolean isJSON(String str) {
        boolean result;
        try {
            JSON.parse(str);
            result = true;
        } catch (Exception e) {
            result = false;
        }
        return result;
    }

    public static int intDefaultIfEmpty(Integer num) {
        if (num == null) {
            num = 0;
            return num;
        }
        return num;
    }

    public static String stringDefaultIfEmpty(String str) {
        return StringUtils.defaultIfEmpty(str, "ENULL");
    }

    public static Long longDefaultIfEmpty(Long num) {
        if (num == null) {
            num = 0l;
            return num;
        }
        return num;
    }

    public static Double doubleDefaultIfEmpty(Double num) {
        if (num == null) {
            num = 0.0;
            return num;
        }
        return num;
    }
}

2.5 pom.xml

注意修改此处:
在这里插入图片描述

3.运行效果

3.1 运行日志

在这里插入图片描述

3.2 web UI

访问:http://IP:1000/
在这里插入图片描述
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5. 完整代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263881.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型微调---Prompt-tuning微调

目录 一、前言二、Prompt-tuning实战2.1、下载模型到本地2.2、加载模型与数据集2.3、处理数据2.4、Prompt-tuning微调2.5、训练参数配置2.6、开始训练 三、模型评估四、完整训练代码 一、前言 Prompt-tuning通过修改输入文本的提示&#xff08;Prompt&#xff09;来引导模型生…

如何使用 WebAssembly 扩展后端应用

1. WebAssembly 简介 随着互联网的发展&#xff0c;越来越多的应用借助 Javascript 转到了 Web 端&#xff0c;但人们也发现&#xff0c;随着移动互联网的兴起&#xff0c;需要把大量的应用迁移到手机端&#xff0c;随着手端的应用逻辑越来越复杂&#xff0c;Javascript 的解析…

python学习——洛谷P2010 [NOIP2016 普及组] 回文日期 三种方法

[NOIP2016 普及组] 回文日期 文章目录 [NOIP2016 普及组] 回文日期题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示方法一方法二方法三 题目背景 NOIP2016 普及组 T2 题目描述 在日常生活中&#xff0c;通过年、月、日这…

前端yarn工具打包时网络连接问题排查与解决

最近线上前端打包时提示 “There appears to be trouble with your network connection”&#xff0c;以此文档记录下排查过程。 前端打包方式 docker启动临时容器打包&#xff0c;命令如下 docker run --rm -w /app -v pwd:/app alpine-node-common:v16.20-pro sh -c "…

BenchmarkSQL使用教程

1. TPC-C介绍 Transaction Processing Performance Council (TPC) 事务处理性能委员会&#xff0c;是一家非盈利IT组织&#xff0c;他们的目的是定义数据库基准并且向产业界推广可验证的数据库性能测试。而TPC-C最后一个C代表的是压测模型的版本&#xff0c;在这之前还有TPC-A、…

Linux网络基础--传输层Tcp协议(上) (详细版)

目录 Tcp协议报头&#xff1a; 4位首部长度&#xff1a; 源端口号和目的端口号 32位序号和确认序号 标记位 超时重传机制&#xff1a; 两个问题 连接管理机制 三次握手&#xff0c;四次挥手 建立连接&#xff0c;为什么要有三次握手&#xff1f; 先科普一个概念&…

全志H618 Android12修改doucmentsui鼠标单击图片、文件夹选中区域

背景: 由于当前的文件管理器在我们的产品定义当中,某些界面有改动的需求,所以需要在Android12 rom中进行定制以符合当前产品定义。 需求: 在进入File文件管理器后,鼠标左击整个图片、整个文件夹可以选中该类型,进行操作,故代码分析以及客制化如下: 主要涉及的代码:…

Unity命令行传递自定义参数 命令行打包

命令行参数增加位置 -executeMethod 某脚本.某方法 参数1 参数2 参数3 ... 例如执行EditorTest.GetCommandLineArgs方法 增加两个命令行参数 Version=125 CDNVersion=100 -executeMethod EditorTest.GetCommandLineArgs Version=125 CDNVersion=100 Unity测试脚本 需要放在…

如何重新设置VSCode的密钥环密码?

故障现象&#xff1a; 忘记了Vscode的这个密码&#xff1a; Enter password to unlock An application wants access to the keyring “Default ke... Password: The unlock password was incorrect Cancel Unlock 解决办法&#xff1a; 1.任意terminal下&#xff0c;输入如下…

电子发票汇总改名,批量处理电子发票问题

今天给大家推荐一个财务方面工作的软件。可以帮你解决很多财务。发票方面的问题。 电子发票汇总改名 批量处理电子发票问题 这个软件安装之后。会在桌面上分成三个小软件&#xff0c;分别是修改单位信息、自定义命名规则和电子发票汇总改名。 你可以在这个软件里提取PDF或者of…

Linux——卷

Linux——卷 介绍 最近做的项目&#xff0c;涉及到对系统的一些维护&#xff0c;有些盘没有使用&#xff0c;需要创建逻辑盘并挂载到指定目录下。有些软件需要依赖空的逻辑盘&#xff08;LVM&#xff09;。 先简单介绍一下卷的一些概念&#xff0c;有分区、物理存储介质、物…

MySQL通用语法 -DDL、DML、DQL、DCL

SQL 全称 Structured Query Language&#xff0c;结构化查询语言。操作关系型数据库的编程语言&#xff0c;定义了 一套操作关系型数据库统一标准 。 SQL通用语法 MySQL语言的通用语法。 SQL语句可以单行或多行书写&#xff0c;以分号结尾。SQL语句可以使用空格/缩进来增强…

利用DnslogSqlinj工具DNSlog注入

工具下载链接 https://github.com/adooo/dnslogsqlinj 配置 将域名和API进行一个更改 之后再安装两个python2的库就可以使用dnslog进行自动化注入了 python2安装pip2 curl https://bootstrap.pypa.io/2.7/get-pip.py -o get-pip.py python2 get-pip.py库 pip2 install geven…

QT网络(一):主机信息查询

网络简介 在QT中进行网络通信可以使用QT提供的Qt Network模块&#xff0c;该模块提供了用于编写TCP/IP网络应用程序的各种类&#xff0c;如用于TCP通信的QTcpSocket和 QTcpServer&#xff0c;用于 UDP 通信的 QUdpSocket&#xff0c;还有用于网络承载管理的类&#xff0c;以及…

STM32-笔记5-按键点灯(中断方法)

1、复制03-流水灯项目&#xff0c;重命名06-按键点灯&#xff08;中断法&#xff09; 在\Drivers\BSP目录下创建一个文件夹exti&#xff0c;在该文件夹下&#xff0c;创建两个文件exti.c和exti.h文件&#xff0c;并且把这两个文件加载到项目中&#xff0c;打开项目工程文件 加载…

重塑医院挂号体验:SSM 与 Vue 搭建的预约系统设计与实现

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

mysql的事务控制和数据库的备份和恢复

事务控制语句 行锁和死锁 行锁 两个客户端同时对同一索引行进行操作 客户端1正常运行 客户端2想修改&#xff0c;被锁行 除非将事务提交才能继续运行 死锁 客户端1删除第5行 客户端2设置第1行为排他锁 客户端1删除行1被锁 客户端2更新行5被锁 如何避免死锁 mysql的备份和还…

C# OpenCV机器视觉:尺寸测量

转眼就是星期一了&#xff0c;又到了阿强该工作的时候了&#xff01;阿强走进了他作为机器视觉工程师的办公室&#xff0c;准备迎接新一天的挑战。随着周末的结束&#xff0c;他心中暗想&#xff1a;“如果我能让机器像我一样聪明&#xff0c;那就太好了&#xff01;” 正当他…

四川托普信息技术职业学院教案1

四川托普信息技术职业学院教案 【计科系】 周次 第 1周&#xff0c;第1次课 备 注 章节名称 第1章 XML语言简介 引言 1.1 HTML与标记语言 1.2 XML的来源 1.3 XML的制定目标 1.4 XML概述 1.5 有了HTML了&#xff0c;为什么还要发展XML 1.5.1 HTML的缺点 1.5.2 XML的特点 1.6 X…

网络安全防范

实践内容 学习总结 PDR&#xff0c;$$P^2$$DR安全模型。 防火墙&#xff08;Firewall&#xff09;&#xff1a; 网络访问控制机制&#xff0c;布置在网际间通信的唯一通道上。 不足&#xff1a;无法防护内部威胁&#xff0c;无法阻止非网络传播形式的病毒&#xff0c;安全策略…