基于华为MRS3.2.0实时Flink消费Kafka落盘至HDFS的Hive外部表的调度方案

news2024/11/25 6:48:58

文章目录

  • 1 Kafka
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 创建一个测试主题
    • 1.3 消费主题的接收测试
  • 2 Flink
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 Flink任务的开发
  • 3 HDFS与Hive
    • 3.1 Shell脚本的编写思路
    • 3.2 脚本测试方法
  • 4 DolphinScheduler


该需求为实时接收对手Topic,并进行消费落盘至Hive。

在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。
在这里插入图片描述

本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。

华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的认证与环境准备

着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。

保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。

运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 创建一个测试主题

在Linux环境中执行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。

同时也可以开启两个新的终端窗口用于测试生产者和消费者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消费主题的接收测试

通过以下网站下载华为MRS所需的样例代码:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。

镜像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

华为开源镜像站:

https://mirrors.huaweicloud.com/home

完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。

在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。

创建生产测试时,首先需要修改KafkaProperties类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2小节中开启的消费者窗口就能接受到生产的数据。

在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。

至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。

如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的认证与环境准备

用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。
在这里插入图片描述
图为Flink在华为MRS安全模式的认证体系。

对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink/flink-checkpoint目录获取权限,保证执行。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。

Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任务的开发

最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。

在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:

 if (LoginUtil.isSecurityModel()) {
            try {
                LOG.info("Securitymode start.");
                //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.");
                LOG.error("The IOException occured : {}.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }

对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。

在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。

FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd,这样便于使用shell调度每日增量数据时日期变量的传递。

FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)开启checkpoint,该参数是60秒开启一次。

完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run提交到yarn,此时可以指定并行度及其他配置。

通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd命名的文件夹中。

3 HDFS与Hive

HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。

3.1 Shell脚本的编写思路

  1. source华为的环境,认证状态成功;

  2. 创建日期变量:c_date=$(date '+%Y-%m-%d')

  3. 在beeline -u中执行HiveSQL代码:

    • 使用beeline的变量函数--hivevar将在外部注册的c_date变量注册为hive beeline的变量;

    • 创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;

    • 将临时表中的数据解析,一般是json数据,可通过get_json_object函数解析为字段,insert into table存入贴源层正式表;

    • 删除临时表;

  4. 有需要的话,也可以添加日志路径,将执行结果追加至日志。

3.2 脚本测试方法

该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd格式,与flink写入在HDFS中的每日增量文件夹名相同;

然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;

解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。

需要注意的是,hive -e命令似乎由于认证安全设置,无法在华为集群节点机使用。

4 DolphinScheduler

通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。

需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1400384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

四个简单的bat脚本

Windows11 最大劝退点就是这个右键菜单&#xff0c;复制粘贴都变成一点点的小图标&#xff0c;最气人的是点击底部的显示更多选项才能展示全部功能。让许多本来点一次就能完成的操作变成两次。其实使用一个小命令就能修改回win10版本的菜单。四个简单的bat脚本&#xff0c;能完…

Object.prototype.toString.call个人理解

文章目录 这段代码的常见用处参考文献&#xff1a; 拆分理解1、Object.prototype.toString小问题参考文献&#xff1a; 2、call函数的作用参考文献 3、继续深入一些&#xff08;这部分内容是个人理解&#xff0c;没有明确文献支撑&#xff09; 这段代码的常见用处 Object.prot…

这可能是最全面的计算机组成原理面试八股文了

第一章 计算机系统概述 “较简单&#xff0c;不做过多赘述&#xff0c;后面会详细学到” 第一节 计算机系统层次结构 1.计算机系统的基本组成&#xff1a;硬件软件 2.计算机硬件的基本组成&#xff1a;运算器存储器控制器输入设备输出设备 3.系统软件和应用软件 系统软件…

.NET国产化改造探索(六)、银河麒麟操作系统中安装多个.NET版本

随着时代的发展以及近年来信创工作和…废话就不多说了&#xff0c;这个系列就是为.NET遇到国产化需求的一个闭坑系列。接下来&#xff0c;看操作。 上一篇文章介绍了如何在银河麒麟操作系统上&#xff0c;使用Nginx.NET程序实现自启动。本文介绍下如何在一个环境中&#xff0c;…

explain工具优化mysql需要达到什么级别?

explain工具优化mysql需要达到什么级别&#xff1f; 一、explain工具是什么&#xff1f;二、explain查询后各字段的含义三、explain查询后type字段有哪些类型&#xff1f;四、type类型需要优化到哪个阶段&#xff1f; 一、explain工具是什么&#xff1f; explain是什么&#x…

macOS 14 Sonoma(苹果电脑系统) pkg完整安装包14.2.1正式版

macOS 14 Sonoma(苹果电脑系统) pkg完整安装包14.2.1正式版 游戏模式 Game mode是macOS Sonoma独有的Mac新游戏功能&#xff0c;在 macOS 14 中启用游戏模式后&#xff0c;Mac 会优先考虑运行游戏的CPU和GPU功能&#xff0c;不仅全面提高了游戏的流畅、稳定的帧率&#xff0c;…

Mybatis原理 - 标签解析

很多开源框架之所以能够流行起来&#xff0c;是因为它们解决了领域内的一些通用问题。但在实际使用这些开源框架的时候&#xff0c;我们都是要解决通用问题中的一个特例问题&#xff0c;所以这时我们就需要使用一种方式来控制开源框架的行为&#xff0c;这就是开源框架提供各种…

Python高级列表操作:性能优化、多线程与数据处理全解析

Python高级列表操作&#xff1a;性能优化、多线程与数据处理全解析 引言Python列表的高级特性列表推导式与生成器表达式列表操作的高级技巧列表与函数式编程列表在数据处理中的应用性能优化与内存管理Python列表与多线程/异步编程结语 引言 在现代软件开发中&#xff0c;选择恰…

【JavaEE Spring】SpringBoot 日志

SpringBoot 日志 1. 日志概述2. 日志使用2.1 打印⽇志2.1.1 在程序中得到⽇志对象2.1.2 使⽤⽇志对象打印⽇志 2.2 ⽇志框架介绍2.2.1 ⻔⾯模式(外观模式)2.2.2 SLF4J 框架介绍 2.3 ⽇志格式的说明2.4 ⽇志级别2.4.1 ⽇志级别的分类2.4.2 ⽇志级别的使⽤ 2.5 ⽇志配置2.5.1 配置…

MySQL---单表查询综合练习

创建emp表 CREATE TABLE emp( empno INT(4) NOT NULL COMMENT 员工编号, ename VARCHAR(10) COMMENT 员工名字, job VARCHAR(10) COMMENT 职位, mgr INT(4) COMMENT 上司, hiredate DATE COMMENT 入职时间, sal INT(7) COMMENT 基本工资, comm INT(7) COMMENT 补贴, deptno INT…

C语言练习day8

变种水仙花 变种水仙花_牛客题霸_牛客网 题目&#xff1a; 思路&#xff1a;我们拿到题目的第一步可以先看一看题目给的例子&#xff0c;1461这个数被从中间拆成了两部分&#xff1a;1和461&#xff0c;14和61&#xff0c;146和1&#xff0c;不知道看到这大家有没有觉得很熟…

前端框架学习 Vue (1) 概念,常用指令

Vue是什么 概念: Vue是一个用于 构建用户界面 的 渐进式 框架 1.构建用户界面:基于数据动态渲染页面 2.渐进式:循序渐进的学习(学一点就能用一点) (1)Vue核心包开发 场景:局部模块改造 (2)Vue核心包&Vue插件 工程化开发 场景:整站开发 3.框架:一套完整的项目…

遇到Access violation at address xxx in module ‘LoadDXF.dll‘.的解决方法

今天在设计PCB的时候&#xff0c;需要导入一个AutoCAD生成的DWG文件&#xff0c;结果导入出错&#xff0c;之前从来没有遇到过。也不清楚原因。错误的内容&#xff0c;如标题所示&#xff1a;Access violation at address xxx in module LoadDXF.dll. 对于我们既搞编程又设计…

PaddleDetection学习1——使用Paddle-Lite在 Android 上实现实时的目标检测功能

在 Android 上使用Paddle-Lite实现实时的目标检测功能 1 环境准备1.1 安装Android Studio1.1.1 安装JAVA JDK1.1.2 Android Studio 安装步骤1.1.3 Android Studio 配置NDK 1.2 Android 手机 2 部署步骤2.1 下载Paddle-Lite-Demo2.2 打开 yolo_detection_demo项目2.2.1 修改buil…

【Spring 篇】MyBatis中的CRUD魔法:数据之美的四重奏

MyBatis&#xff0c;这个数据持久化的魔法师&#xff0c;以其优雅的SQL映射和简洁的配置文件&#xff0c;为我们呈现出一场CRUD&#xff08;Create, Read, Update, Delete&#xff09;的奇妙之旅。在这篇博客中&#xff0c;我们将深入探讨MyBatis中的增、删、改、查操作&#x…

一键搭建你的知识库

效果 说明 由于安装包安装需要glibc>2.7 我就不尝试了 因为glib升级是一个繁琐的过程 没有升级的意义 只是为了体验知识库 没必要浪费时间 1.1docker compose部署trilium 1.1.创建目录 mkdir -p /opt/triliumcd /opt/trilium 1.2.编写docker-comppose.yml文件 vim dock…

非线性失真放大电路设计

文章目录 一、设置要求二、系统组成三、仿真设计3.1 放大电路总体设计3.2 仿真结果3.2.1 正常波形3.2.2 双向失真3.2.3 顶部失真(截至失真)3.2.4 底部失真(饱和失真)3.2.5 交越失真3.2.6 50Khz\2mv放大 四、原理图与PCB设计4.1 放大电路部分原理图4.2 控制电路部分4.3 PCB设计 …

EasyRecovery2024专业免费的数据恢复软件,支持从硬盘、光盘、U盘、移动硬盘、等所有类型的介质上恢复数据。

Ontrack EasyRecovery Home是一款企业级的数据恢复软件&#xff0c;支持从硬盘、光盘、U盘、移动硬盘、硬件RAID及软件RAID等所有类型的介质上恢复数据。支持恢复误删除、磁盘格式化、磁盘重新分区、磁盘逻辑坏道等原因而丢失的数据。支持RAID重建&#xff01;Ontrack EasyReco…

如何正确地为Python项目安装依赖

a、创建Python项目&#xff0c;其结构如下&#xff1a; b、激活虚拟环境 启动DOS窗口—>进入“Scripts”目录&#xff0c;这里为D:\workspace\prj_python_1\venv\Scripts—>执行activate激活虚拟环境&#xff0c;如下所示&#xff1a; Microsoft Windows [版本 10.0.18…

李宏毅 Generative Adversarial Network(GAN)生成对抗网络

(延申)GAN Lecture 1 (2018)- Introduction_哔哩哔哩_bilibili Basic Idea of GAN 附课程提到的各式各样的GAN&#xff1a;https://github.com/hindupuravinash/the-gan-zoo 想要让机器做到的是生成东西。->训练出来一个generator。 假设要做图像生成&#xff0c;要做的是…