一百七十二、Flume——Flume采集Kafka数据写入HDFS中(亲测有效、附截图)

news2025/1/13 9:40:48

一、目的

作为日志采集工具Flume,它在项目中最常见的就是采集Kafka中的数据然后写入HDFS或者HBase中,这里就是用flume采集Kafka的数据导入HDFS中

二、各工具版本

(一)Kafka

kafka_2.13-3.0.0.tgz

(二)Hadoop(HDFS)

hadoop-3.1.3.tar.gz

(三)Flume

apache-flume-1.9.0-bin.tar.gz

三、实施步骤

(一)到flume的conf的目录下

# cd  /home/hurys/dc_env/flume190/conf

(二)创建配置文件evaluation.properties

# vi  evaluation.properties

### Name agent, source, channels and sink alias
a1.sources = s1
a1.channels = c1
a1.sinks = k1

### define kafka source
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

# Maximum number of messages written to Channel in one batch
a1.sources.s1.batchSize = 5000

# Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.
a1.sources.s1.batchDurationMillis = 2000

# set kafka broker address
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092

# set kafka consumer group Id and offset consume
# 官网推荐1.9.0版本只设置了topic,但测试后不能正常消费,需要添加消费组id(自己写一个),并定义偏移量消费方式
a1.sources.s1.kafka.consumer.group.id = evaluation_group
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

# set kafka topic
a1.sources.s1.kafka.topics = topic_b_evaluation


### defind hdfs sink
a1.sinks.k1.type = hdfs
# set store hdfs path
a1.sinks.k1.hdfs.path = hdfs://hurys22:8020/rtp/evaluation/evaluation_%Y-%m-%d
# set file size to trigger roll
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.threadsPoolSize = 30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


### define channel from kafka source to hdfs sink
# memoryChannel:快速,但是当设备断电,数据会丢失
# FileChannel:速度较慢,即使设备断电,数据也不会丢失
a1.channels.c1.type = file
# 这里不单独设置checkpointDir和dataDirs文件位置,参考官网不设置会有默认位置
# channel store size
a1.channels.c1.capacity = 100000
# transaction size
a1.channels.c1.transactionCapacity = 10000


### 绑定source、channel和sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

(三)配置文件创建好后启动flume服务

# cd /home/hurys/dc_env/flume190/

# ./bin/flume-ng agent -n a1  -f /home/hurys/dc_env/flume190/conf/evaluation.properties

(四)到HDFS文件里验证一下

HDFS中生成evaluation_2023-09-07 文件夹,里面有很多小文件

(五)注意:小文件里的数据是JSON格式,即使我设置文件后缀名为csv也没用(可能配置文件中的文件类型设置需要优化

a1.sinks.k1.hdfs.writeFormat=Text

(六)jps查看Flume的服务

[root@hurys22 conf]# jps
16801 ResourceManager
4131 Application
18055 AlertServer
16204 DataNode
22828 Application
17999 LoggerServer
2543 launcher.jar
22224 Application
17393 QuorumPeerMain
16980 NodeManager
17942 WorkerServer
16503 SecondaryNameNode
11384 Application
32669 Application
17886 MasterServer
10590 Jps
16031 NameNode
18111 ApiApplicationServer

注意:Application就是Flume运行的任务

(七)关闭Flume服务

如果想要关闭Flume服务,直接杀死服务就好了

# kill -9 32669

(八)checkpointDir和dataDirs默认的文件位置

默认的文件位置:/root/.flume/file-channel/

总之,Flume这个工具的用法还需进一步研究优化,当然kettle也可以,所以这个项目目前还是用kettle吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/986298.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Netty编程面试题

1.Netty 是什么? Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty是基于nio的,它封装了jdk的nio,让我们使用起来更加方法灵活。 2.Netty 的特点是什么? 高并发&a…

React【组件生命周期 、组件生命周期_挂载、 组件生命周期_更新 、组件生命周期_卸载、表单_受控组件、表单_受控组件处理多个输入】(三)

文章目录 组件生命周期 组件生命周期_挂载 组件生命周期_更新 组件生命周期_卸载 表单_受控组件 表单_受控组件处理多个输入 组件生命周期 每个组件都有自己的生命周期,从“生”到”死“。 在这个过程当中,它会有不同的状态,针对不同的状态…

深入探究数据结构与算法:构建强大编程基础

文章目录 1. 为什么学习数据结构与算法?1.1 提高编程技能1.2 解决复杂问题1.3 面试准备1.4 提高代码效率 2. 学习资源2.1 经典教材2.2 在线学习平台2.3 学习编程社区 3. 数据结构与算法的实际应用3.1 排序算法3.2 图算法3.3 字符串匹配算法 4. 结论 🎉欢…

【前端】WebWorker 在前端SPA框架的应用

一、什么是WebWorker 概念: Web Worker是一种在Web浏览器中运行的JavaScript脚本,它可以在后台线程中运行,而不会阻塞主线程。这意味着Web Worker可以在后台执行复杂的计算任务,而不会影响用户界面的响应性能 除了标准的JavaScri…

C++之生成key-value键值三种方式(一百九十)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…

vue前后端端口不一致解决方案

在config index.js文件中 引入如下代码即可 const path require(path) const devEnv require(./dev.env) module.exports {dev: {// PathsassetsSubDirectory: static,assetsPublicPath: /,proxyTable: devEnv.OPEN_PROXY false ? {} : {/api: {target: http://localhos…

swiper删除虚拟slide问题

在存在缓存的情况下,删除较前的slide,会出现当前slide与后一个slide重复出现的情况 假设当前存在5个slide,且这5个slide已缓存,则删除slide2后,仍为5个slide,且slide2的内容变为slide3的内容,此…

Linux入门之多线程|线程的同步|生产消费模型

文章目录 一、多线程的同步 1.概念 2.条件变量 2.1条件变量概念 2.2条件变量接口 1.条件变量初始化 2.等待条件满足 3.唤醒等待 3.销毁条件变量 2.3条件变量demo 二、生产消费模型 1.生产消费模型 2.基于BlockQueue的生产者消费者模型 3.基于C用条件变量和互斥锁实…

RecyclerView+Flexbox实现流式布局

之前使用 FlexboxLayout 实现流式布局,但是选中和反选效果不好实现,就改用RecyclerViewFlexboxLayoutManager 实现流式布局: 说明:如果是直接展示标签,没有其他选中效果时,建议直接使用 FlexboxLayout实现…

Scrum认证高级Scrum Master (A-CSM) 认证培训课程

课程简介 高级ScrumMaster (Advanced Certified ScrumMaster, A-CSM) 认证课程是国际Scrum联盟推出的进阶级Scrum认证课程,是Scrum Master通往专业级敏捷教练必经的学习路径。 在ScrumMaster(CSM)认证课程中,您学习到了Scrum的价…

Redis核心数据结构与高性能原理

Redis的单线程和高性能 Redis是单线程吗? Redis 的单线程主要是指 Redis 的网络 IO 和键值对读写是由一个线程来完成的,这也是 Redis 对外提供键值存储服务的主要流程。但 Redis 的其他功能,比如持久化、异步删除、集群数据同步等&#xff…

Mybatis复杂查询及动态SQL

文章目录 一. 较复杂的查询操作1. 参数占位符#{}和${}2. SQL注入3. like查询4. resultType与resultMap5. 多表查询5.1. 一对一表映射5.2. 一对多表映射 二. 动态SQL1. if标签2. trim标签3. where标签4. set标签5. foreach标签 本篇中使用的数据表即基础映射类都是基于上一篇博客…

什么是SpringMVC以及SpringMVC框架的优点

它是基于MVC开发模式的框架,用来优化控制器.它是Spring家族的一员.它也具备IOC和AOP. 什么是MVC? 它是一种开发模式,它是模型视图控制器的简称.所有的web应用都是基于MVC开发. M:模型层,包含实体类,业务逻辑层,数据访问层 模型 模型(Model):就是业务流程/状态…

每日刷题-2

目录 一、选择题 二、编程题 1、倒置字符串 2、排序子序列 3、字符串中找出连续最长的数字串 4、数组中出现次数超过一半的数字 一、选择题 1、 题目解析: 二维数组初始化的一般形式是: 数据类型 数组名[常量表达式1][常量表达式2] {初始化数据}; 其…

使用SpringCloud Eureka 搭建EurekaServer 集群- 实现负载均衡故障容错【上】

😀前言 本篇博文是关于使用SpringCloud Eureka 搭建EurekaServer 集群- 实现负载均衡&故障容错,希望你能够喜欢 🏠个人主页:晨犀主页 🧑个人简介:大家好,我是晨犀,希望我的文章可…

使用共享 MVI 架构实现高效的 Kotlin Multiplatform Mobile (KMM) 开发

使用共享 MVI 架构实现高效的 Kotlin Multiplatform Mobile (KMM) 开发 文章中探讨了 Google 提供的应用架构指南在多平台上的实现。通过共享视图模型(View Models)和共享 UI 状态(UI States),我们可以专注于在原生端…

RHCSA-VMware Workstation Pro-Linux基础配置命令

1.代码命令 1.查看本机IP地址&#xff1a; ip addr 或者 ip a [foxbogon ~]$ ip addre [foxbogon ~]$ ip a 1&#xff1a;<Loopback,U,LOWER-UP> 为环回2网卡 2: ens160: <BROADCAST,MULTICAST,UP,LOWER_UP>为虚拟机自身网卡 2.测试网络联通性&#xff1a; [f…

【0907作业】写一个shell脚本,将以下内容放到脚本中

在家目录下创建目录文件&#xff0c;dir在dir下创建dir1和dir2把当前目录下的所有文件拷贝到dir1中&#xff0c;把当前目录下的所有脚本文件拷贝到dir2中把dir2打包并压缩为dir2.tar.xz再把dir2.tar.xz移动到dir1中解压dir1中的压缩包使用tree工具&#xff0c;查看dir下的文件 …

vue3:5、组合式API-reactive和ref函数

<script setup> /* reactive接收一个对象类型的数据&#xff0c;返回一个响应式的对象 *//*** ref:接收简单类型或复杂类型&#xff0c;返回一个响应式对象* 本质&#xff1a;是在原有传入数据的基础上&#xff0c;外层报了一层对象&#xff0c;包成了复杂类型* 底层&…

宇凡微YE09合封芯片,集成高性能32位mcu和2.4G芯片

合封芯片是指将主控芯片和外部器件合并封装的芯片&#xff0c;能大幅降低开发成本、采购成本、减少pcb面积等等。宇凡微YE09合封芯片&#xff0c;将技术领域推向新的高度。这款高度创新性的芯片融合了32位MCU和2.4G芯片&#xff0c;为各种应用场景提供卓越的功能和性能。 32位M…