Flume第一章:环境安装

news2025/1/17 3:46:25

系列文章目录

Flume第一章:环境安装


文章目录

  • 系列文章目录
  • 前言
  • 一、Flume是什么?
  • 二、环境安装
    • 1.文件下载
    • 2.环境安装
    • 3.官方案例
  • 三、几个案例
    • 1.实时监控 Hive 日志,并上传到 HDFS 中
    • 2.使用 Flume 监听整个目录的文件,并上传至 HDFS
    • 3.使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 总结


前言

之前由于学校考试,写博客就中断了,后来等学校基本考完了,准备休息两天继续学习,然后就阳性了,休息了差不多一个星期了,又赶上了学校实训,这篇博客,真的是命运多舛


一、Flume是什么?

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
这是官方的原文。
Flume是一种分布式、可靠且可用的服务,用于高效地收集、聚合和移动大量日志数据。它具有基于流数据流的简单而灵活的架构。它具有可调的可靠性机制和许多故障切换和恢复机制,具有鲁棒性和容错性。它使用一个简单的可扩展数据模型,允许在线分析应用程序。
以上是百度翻译 看看就行了

简单来说
Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

二、环境安装

1.文件下载

官方下载连接
由于下载服务器在国外,可能网速不太稳定,尽量在网络环境较好的情况下下载。
本实验使用flume1.9版本(本来是使用最新的1.11的后来发现兼容有问题,不能正常使用)

2.环境安装

为了实验方便,请在hive环境下安装。

  1. 上传文件
    在这里插入图片描述

  2. 解压并改名

tar -xvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
mv apache-flume-1.9.0-bin/ flume

在这里插入图片描述

flume 依靠java和hadoop所以需要提前配置相关环境,这个以前做过,就不做了。

  1. 删除guava
rm -rf /opt/module/flume/lib/guava-11.0.2.jar 

到此flume的环境安装结束,此时可以抓一个快照留存。


3.官方案例

在 flume 目录下创建 job 文件夹并进入 job 文件夹,创建flume-netcat-logger.conf文件

在这里插入图片描述
并添加以下内容,此代码可以在官方文档中找到。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

代码解析
在这里插入图片描述
开启服务器端。
以下两条命令二选一

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console


bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

在这里插入图片描述
启动成功。
再开一个终端,监听44444端口。

nc localhost 44444

在这里插入图片描述
随便写点内容,确认两边可以通信。

三、几个案例

1.实时监控 Hive 日志,并上传到 HDFS 中

创建 flume-file-hdfs.conf

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

在这里插入图片描述

由于我们要将flume监控的日志上传到HDFS目录,所以要先启动hadoop,自行启动。
然后启动flume

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

然后启动hive
在这里插入图片描述
现在可以查看hdfs,会发现根目录下多了一个flume文件。
在这里插入图片描述
进入即可查看日志,根据设定每30秒生成一个日志文件。
在这里插入图片描述
并且会生成tmp临时文件。
在这里插入图片描述

2.使用 Flume 监听整个目录的文件,并上传至 HDFS

创建flume-dir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

在这里插入图片描述

创建测试目录
在这里插入图片描述
启动flume

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

然后随便写点测试数据扔到监控目录中,注意先写好后在移动进去,不能在检测目录里动态修改文件。
在这里插入图片描述
出现这个标志,代表这个文件已经到达集群,速度非常快。
在这里插入图片描述在这里插入图片描述

3.使用 Flume 监听整个目录的文件,并上传至 HDFS

创建配置文件 flume-taildir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

在这里插入图片描述

启动flume

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

创建测试文件。
在这里插入图片描述
随便往里边写点数据,这次可动态修改。
在这里插入图片描述
在hdfs中查看
在这里插入图片描述
继续动态追加数据。
在这里插入图片描述
hdfs中会继续增加数据
在这里插入图片描述
等待文件上传结束后,继续查看。
在这里插入图片描述
另一个文件同理,不做示例了。

总结

flume也是hadoop生态中比较重要的一个核心组件,难度不是很高,但也要学习一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/155796.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【构造】Codeforces Round #843 (Div. 2) B Gardener and the Array

Problem - B - Codeforces题意:给定一个序列,让你判断是否存在两个子序列使得这两个子序列或起来相等思路:设两个子序列是a和b两个子序列凭空出现,那肯定考虑构造满足的条件是:a!bf(a)f(b)如果只考虑第二个条件&#x…

java系列文章之反射

文章目录一、动态语言二、反射机制概念三、反射的应用场合1. 编译时类型和运行时类型2. 编译时类型无法获取具体方法四、 反射 API五、反射使用步骤六、获取 Class 对象的 3 种方法七、创建对象的两种方法总结一、动态语言 动态语言,是指程序在运行时可以改变其结构…

读书:《5%的改变》

《5%的改变》 我们并不需要100%的改变,彻底推翻以前的旧习惯,对于绝大多数人来说,并不太现实,不如考虑一下只改变5%。 一天结束,22:00,开始为睡觉做准备,反思一下,发现今天好像什…

Pytorch LSTM实现中文单词预测(附完整训练代码)

Pytorch LSTM实现中文单词预测(附完整训练代码) 目录 Pytorch LSTM实现中文单词预测(词语预测 附完整训练代码) 1、项目介绍 2、中文单词预测方法(N-Gram 模型) 3、训练词嵌入word2vec(可选) 4、文本预处理 (1&…

Java面向对象之继承

目录继承概述、使用继承的好处总结继承的设计规范、内存运行原理总结继承的特点总结继承后:成员变量、成员方法的访问特点总结继承后:方法重写继承后:子类构造器的特点总结继承后:子类构造器访问父类有参构造器总结this、super使用…

k8s之DaemonSet

写在前面 假定现在有一个这样的需求,需要收集每个Node的运行状态信息,并进行上报,假设有4个节点,我们可以使用Deployment 来实现吗?好像是可以的,我们只需要将repliacas设置为4不就行了,但是de…

怎样让公司全员贡献结构化内容?

- 1 - 问题 一个朋友在一个生产型企业的文档团队负责产品文档,他们使用DITA来编写各类文档,比如:公司管理文档、产品介绍、产品使用说明、产品安装手册等。 DITA 是基于XML的体系结构,用于编写、制作、交付面向主题的信息类型…

【NI Multisim 14.0 操作实例——音量控制电路】

目录 序言 一、音量控制电路 🍊1.设置工作环境 🍊 2.设置原理图图纸 🍊 3.设置图纸的标题栏 🍊 4.放置元器件 🍊 5.编辑元器件属性 🍊 6. 布局元器件 序言 NI Multisim最突出的特点之一就是用户界面…

数字IC设计、验证、FPGA笔试必会 - Verilog经典习题 (一)四选一多路器

数字IC设计、验证、FPGA笔试必会 - Verilog经典习题 (一)四选一多路器 🔈声明: 😃博主主页:王_嘻嘻的CSDN博客 🧨未经作者允许,禁止转载 🔑系列专栏: &#x…

Mercurius <11.5.0 存在拒绝服务漏洞(CVE-2023-22477)

漏洞描述 Mercurius 是NPM仓库中的开源组件,用作于 Fastify Web 框架的 GraphQL 适配器。 11.5.0 之前版本的 Mercurius 开启“订阅”功能时,任何 Mercurius 用户都可以通过 WebSocket 向 /graphql 端点(如:ws://127.0.0.1:1337…

【屏幕驱动移植】点亮OLED屏幕并播放视频

写在前面 硬件软件准备: 名称备注屏幕SSD1106本文章所使用的的屏幕型号,仅仅作为驱动移植示例,其他型号的都可以按照本文的方法推广树莓派3B用于驱动屏幕,树莓派2B3B4B等型号都可以ESP32开发板用于驱动屏幕,具体是ESP32还是ESP32…

都2023年啦~用python来玩一次股票.....

人生苦短,我用python 这不是2023年已经来了吗? 总不能空着手回去吧? 这次简单用python来玩一下股票~ 本章源码更多电子书点击文末名片~ 准备工作 我们需要使用这些模块,通过pip安装即可。 后续使用的其它的模块都是Python自…

启动jeecg-boot框架(vue3版本)

jeecg-boot框架(vue3版本)一、简介二、项目启动1.前端模组:jeecgboot-vue3-master2.后端模组:jeecg-boot-master3.环境要求:4.数据库准备:5.前端启动:6.redis启动:7.后端启动&#x…

(Matlab实现)基于蒙特卡诺和拉格朗日乘子法的电动车调度【有序、无序充放电】

目录 1 概述 2 蒙特卡洛模拟方法介绍 3 拉格朗日乘子法 4 规模化电动汽车充电负荷预测计算方法 5 Matlab代码实现 1 概述 电动汽车EV(Electric Vehicle)具有清洁环保、高效节能的优点,不仅能缓解化石能源危机,而且能够有效地减少温室气体的排放。2015年10月,国…

Day 7 Spring 整合第三方框架

xml整合第三方框架有两种整合方案:不需要自定义名空间,不需要使用Spring的配置文件配置第三方框架本身内容,例如: MyBatis;需要引入第三方框架命名空间,需要使用Spring的配置文件配置第三方框架本身内容,例如: Dubbo.1 整合MyBati…

Apollo星火计划学习笔记——Control 专项讲解(PID)

文章目录1. PID算法介绍1.1 时间连续与时间离散1.2 位置式与增量式1.3 PID算法扩展2. PID调试方法3. APOLLO代码介绍3.1 PID算法3.2 积分饱和问题3.3 纵向控制代码3.3.1 构造函数3.3.2 加载各种纵向控制的配置参数3.3.3 二阶巴特沃斯低通滤波器《数字信号处理》3.3.4 插值出油门…

PMP考试是什么?适合哪些人来学呢?

PMP,根据PMI的解释,就是项目管理专业人士资格认证,全称如下图:PMP考试是由PMI发起、组织和出题,严格评估项目管理专业人士知识技能是否具有高品质的资格认证考试。PMI:美国项目管理协会(Project…

【小米路由器3】breed刷机救砖-nand flash硬改SPI flash-编程器救砖(解决ttl无法救砖问题)

大家好,我是老子姓李!(gzh:楠瘦) 本博文带来【小米路由器3】变砖,ttl无法救砖,硬改焊接一块SPI flash,使用编程器刷入小米路由器mini的breed最终成功救砖。 目录1.引言1.1 背景1.2回…

07MEMS传感器技术 讲座

把同步现象应用于传感器设计。 什么是MEMS? 1.mems芯片是什么意思 MEMS是Micro-Electro-Mechanical System的缩写,中文名称是微机电系统。MEMS芯片简而言之,就是用半导体技术在硅片上制造电子机械系统,再形象一点说就是做一个微…

Vue3——第十一章(内置组件:KeepAlive、Transition、TransitionGroup)

一、KeepAlive <KeepAlive> 是一个内置组件&#xff0c;它的功能是在多个组件间动态切换时缓存被移除的组件实例。 1、基本使用 默认情况下&#xff0c;一个组件实例在被替换掉后会被销毁。这会导致它丢失其中所有已变化的状态——当这个组件再一次被显示时&#xff0…