二百三十三、Flume——Flume采集JSON文件到Kafka,再用Flume采集Kafka数据到HDFS中

news2025/1/23 8:00:52

一、目的

由于使用了新的Kafka协议,因为根据新的协议推送模拟数据到Kafka中,再Flume采集Kafka数据到HDFS中

二、技术选型

(一)Kettle工具

准备使用Kettle的JSON input控件和Kafka producer控件,但是搞了1天没搞定,博客上也找不到相关的资料,后面再研究研究

之前用kettle采集Kafka数据写入HDFS中(不建议使用这种方式采集Kafka数据到HDFS中

(二)Flume工具

三、实施步骤

(一)准备json文件

注意:Json数据用JSOB工具压缩一下,1个完整的JSON为1行

(二)创建Kafka主题

[root@hurys23 bin]# ./kafka-topics.sh  --create --bootstrap-server  192.168.0.23:9092 --topic topic_internal_data_queue  --partitions 1 --replication-factor 1

(三)创建文件目录

[root@gree128 userfriends]# cd  /opt/kb15tmp/

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/queue

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/data/queue

(四)配置Flume采集json任务文件

queue.sources=queueSource
queue.channels=queueChannel
queue.sinks=queueSink

queue.sources.queueSource.type=spooldir
queue.sources.queueSource.spoolDir=/opt/kb15tmp/flumelogfile/queue
queue.sources.queueSource.deserializer=LINE
queue.sources.queueSource.deserializer.maxLineLength=320000
queue.sources.queueSource.includePattern=queue_[0-9]{4}-[0-9]{2}-[0-9]{2}.json

queue.channels.queueChannel.type=file
queue.channels.queueChannel.checkpointDir=/opt/kb15tmp/checkpoint/queue
queue.channels.queueChannel.dataDirs=/opt/kb15tmp/checkpoint/data/queue

queue.sinks.queueSink.type=org.apache.flume.sink.kafka.KafkaSink
queue.sinks.queueSink.batchSize=640
queue.sinks.queueSink.brokerList=192.168.0.23:9092
queue.sinks.queueSink.topic=topic_internal_data_queue

queue.sources.queueSource.channels=queueChannel
queue.sinks.queueSink.channel=queueChannel

(五)修改Flume采集Kafka任务文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.23:9092
a1.sources.s1.kafka.topics = topic_internal_data_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
​​​​

(六)打开监控,拷贝文件

[root@hurys23 kb15tmp]# cp queue.json /opt/kb15tmp/flumelogfile/queue/queue_2024-04-19.json

(七)执行Kafka消费窗口

[root@hurys23 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.23:9092  --topic topic_internal_data_queue  --from-beginning

(八)​​​​​​​运行Flume采集Kafka到HDFS的任务​​​​​​​

[root@hurys23 flume190]# bin/flume-ng agent -n a1  -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

(九)​​​​​​​运行Flume采集json文件到Kafka的任务

[root@hurys23 flume190]# bin/flume-ng agent --name queue --conf ./conf/ --conf-file ./conf/KB15conf/queue.conf   -Dflume.root.logger=INFO,console

(十)查看HDFS文件

(十一)ODS层验证JSON格式是否正确,是否可以解析

--刷新表分区
msck repair table ods_queue;

里面的JSON字段都可以解析出来,搞定!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1615345.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《自动机理论、语言和计算导论》阅读笔记:p261-p314

《自动机理论、语言和计算导论》学习第 10 天,p261-p314总结,总计 48 页。 一、技术总结 1.generating & reachable 2.Chomsky Normal Form(CNF) 乔姆斯基范式。 3.pumping lemma 泵作用引理。引理:引理是数学中为了取得某个更好的…

基于docker搭建瀚高数据库HighGo6.0.1【图文】

基于docker搭建瀚高数据库HighGo6.0.1 拉取镜像启动验证进入容器 登录数据库查看数据库加密方式修改加密方式为sm3进入数据库修改密码重启容器 数据库验证数据库密码到期参考 docker部署 https://blog.csdn.net/weixin_44385419/article/details/127738868 拉取镜像 docker p…

【ARM Trace32(劳特巴赫) 使用介绍 12.1 -- Trace32 读写 64位地址】

请阅读【Trace32 ARM 专栏导读】 文章目录 Trace32 读写 64位地址读 64 位地址写64位地址Trace32 读写 64位地址 在使用TRACE32进行调试时,有时需要读取或操作64位的地址,特别是在处理64位的处理器或操作系统时。以下是如何在TRACE32中读取64位地址的一般方法。 读 64 位地…

数据可视化(八):Pandas时间序列——动态绘图,重采样,自相关图,偏相关图等高级操作

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊! 喜欢我的博客的话,记得…

IPD集成产品开发(二)

时间:2024年04月21日 作者:小蒋聊技术 邮箱:wei_wei10163.com 微信:wei_wei10 音频地址:IPD集成产品开发(二)https://www.ximalaya.com/sound/724309598 大家好,欢迎来到小蒋聊技…

前端三剑客 HTML+CSS+JavaScript ③ HTML标准结构

生活没有任何意义&#xff0c;这就是活着的理由&#xff0c;而且是唯一的理由 —— 24.4.22 一、HTML注释 1.特点 注释的内容会被浏览器所忽略&#xff0c;不会呈现到页面中&#xff0c;但源代码中依然可见 2.作用 对代码进行解释和说明 3.写法 <!-- xxxxx --> <html&…

【Web】2022DASCTF X SU 三月春季挑战赛 题解(全)

目录 ezpop calc upgdstore ezpop 瞪眼看链子 fin#__destruct -> what#__toString -> fin.run() -> crow#__invoke -> fin#__call -> mix.get_flag() exp <?php class crow {public $v1;public $v2;}class fin {public $f1; }class what {public $a; }…

grafana安装篇(1)

目录 grafana概念&#xff1a; 它具有以下主要特点&#xff1a; Grafana 常用于以下场景&#xff1a; 环境介绍&#xff1a; 前置环境&#xff1a; (1)保证可以连接外网 (2)防火墙和selinux已关闭 1.下载安装grafana10.0.1-1rpm包 2.启动grafana 3.浏览器访问 3.设置…

android学习笔记(五)-MVP模式

1、MVP模式demo的实现&#xff0c;效果下&#xff1a; 2、创建一个Fruit类&#xff1a; package com.example.listview; //Fruit类就是Model&#xff0c;表示应用程序中的数据对象。 public class Fruit {private int imageId;private String name;private String price;publi…

TFTLCD原理硬件介绍

介绍 TFT LCD&#xff08;薄膜晶体管液晶显示器&#xff09;是一种广泛使用的显示技术&#xff0c;它结合了薄膜晶体管&#xff08;TFT&#xff09;和液晶显示&#xff08;LCD&#xff09;技术。TFT LCD的主要特点是使用TFT矩阵来控制施加到每个像素的电压&#xff0c;从而实现…

上位机图像处理和嵌入式模块部署(树莓派4b使用pcl点云库)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 图像处理&#xff0c;大家都知道它有显著的优点和缺点。优点就是分辨率高&#xff0c;信息丰富。缺点就是&#xff0c;整个图像本身没有深度信息。…

京东商品详情数据采集API接口|附京东商品数据返回PHP多语言高并发

京东获得JD商品详情 API 返回值说明 item_get-获得JD商品详情 API测试 注册开通 jd.item_get 公共参数 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请求地址…

扫描工具nmap

介绍 说到黑客&#xff0c;知识就是力量。您对目标系统或网络的了解越多&#xff0c;可用的选项就越多。因此&#xff0c;在进行任何利用尝试之前&#xff0c;必须进行适当的枚举。 假设我们获得了一个 IP&#xff08;或多个 IP 地址&#xff09;来执行安全审计。在我们做任何…

Linux使用Docker部署DashDot访问本地服务器面板

文章目录 1. 本地环境检查1.1 安装docker1.2 下载Dashdot镜像 2. 部署DashDot应用 本篇文章我们将使用Docker在本地部署DashDot服务器仪表盘&#xff0c;并且结合cpolar内网穿透工具可以实现公网实时监测服务器系统、处理器、内存、存储、网络、显卡等&#xff0c;并且拥有API接…

牛客NC233 加起来和为目标值的组合(四)【中等 DFS C++、Java、Go、PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/7a64b6a6cf2e4e88a0a73af0a967a82b 解法 dfs参考答案C class Solution {public:/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可*** param nums int整型…

【从浅学到熟知Linux】进程间通信之匿名管道方式(进程间通信方式汇总、匿名管道的创建、匿名管道实现进程池详解)

&#x1f3e0;关于专栏&#xff1a;Linux的浅学到熟知专栏用于记录Linux系统编程、网络编程等内容。 &#x1f3af;每天努力一点点&#xff0c;技术变化看得见 文章目录 进程间通信介绍如何实现进程间通信进程间通信分类 管道通信方式什么是管道匿名管道pipe匿名管道读写规则管…

TQZC706开发板教程:编译zynq linux内核2019_R1

您需要下载对应版本的Linux系统文件以及IMG1.3.1镜像文件。为了方便您的操作&#xff0c;本文所使用的所有文件以及最终生成的文件&#xff0c;我都已经整理并放置在本文末尾提供的网盘链接中。您可以直接通过该链接进行下载&#xff0c;无需在其他地方单独搜索和获取。希望这能…

spring aop介绍

Spring AOP&#xff08;面向切面编程&#xff09;是一种编程范式&#xff0c;它允许开发者将横切关注点&#xff08;cross-cutting concerns&#xff09;从业务逻辑中分离出来&#xff0c;从而提高代码的模块化。在传统的对象导向编程中&#xff0c;这些横切关注点&#xff0c;…

Vue2进阶之Vue2高级用法

Vue2高级用法 mixin示例一示例二 plugin插件自定义指令vue-element-admin slot插槽filter过滤器 mixin 示例一 App.vue <template><div id"app"></div> </template><script> const mixin2{created(){console.log("mixin creat…

MySql 安装教程+简单的建表

目录 1.安装准备 1.MySQL官方网站下载 2.安装步骤 3.测试安装 4.简单的建表 1.安装准备 1.MySQL官方网站下载 下载安装包或者压缩包都可以 选择相应版本&#xff0c;点击Download开始通过网页下载到本地&#xff08;压缩包下载快一些&#xff09; 2.安装步骤 双击此.exe…