Flume学习--1、Flume概述、Flume入门、

news2025/1/16 16:45:33

1、Flume概述

1.1 Flume定义

Flume是Cloudera提供的一个高可用,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式结构,灵活简单。

在这里插入图片描述
Flume最主要的作用就是实时读取服务器本地磁盘的数据,将数据写入到HDFS。

1.2 Flume基础架构

在这里插入图片描述

1.2.1 Agent

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
Agent主要有三个部分组成,Source、Channel、Sink。

1.2.2 Source

Source是负责接受数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence、generator、syslog、http、legacy。

1.2.3 Sink

Sink不断轮询Channel中的事件并且批量地移除它们,并将这些事件批量写入到储存或索引系统、或者被发送到另一个Flume Agent。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义

1.2.4 Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件都写到磁盘。因为在程序关闭或机器宕机的情况下不会丢失数据。

1.2.5 Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

在这里插入图片描述

2、Flume入门

2.1 Flume安装部署

2.1.1 安装地址

1、Flume 官网地址:http://flume.apache.org/
2、文档查看地址:http://flume.apache.org/FlumeUserGuide.html
3、下载地址:http://archive.apache.org/dist/flume/

2.1.2 安装部署

1、将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下
2、解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

tar -zxf /opt/software/apacheflume-1.9.0-bin.tar.gz -C /opt/module/

3、修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0

4、将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

2.2 Flume入门案例

2.2.1 监控端口数据

1、案例需求
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
2、需求分析

在这里插入图片描述

3、案例实现
1、安装netcat工具

 sudo yum install -y nc

2、判断44444端口是否被占用

sudo netstat -nlp | grep 44444

3、在 flume 目录下创建 job 文件夹并进入 job 文件夹。
4、在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
5、在 flume-netcat-logger.conf 文件中添加如下内容。

添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置文件解析

在这里插入图片描述
6、先开启Flume监听端口
第一种写法:

 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第二种写法:

 bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

--conf/-c:表示配置文件储存在conf/目录下
--name/-n:表示给agent取名为a1
--conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-netcat-logger.conf文件
-Dflume.root.logger=INFO,console:-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括log、info、warn、error。

在这里插入图片描述

7、使用netcat根据向本级的44444端口发送内容

在这里插入图片描述
8、在flume监听页面观察接收数据情况

在这里插入图片描述

2.2.2 实时监控单个追加文件

1、案例需求:实时监控Hive日志文件,并且上传到HDFS
2、需求分析:

在这里插入图片描述
3、案例实现:
(1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包
检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
(2)创建 flume-file-hdfs.conf 文件

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注:(1)要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive
日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行
Linux 命令来读取文件。
(2)对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的
key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自
动添加 timestamp)。

参数分析:

在这里插入图片描述
注意:a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
这个参数的填写中主机名和端口号要写NameNode主机名和内部通信的端口号
(3)运行flume

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
(5)在HDFS上查看文件

在这里插入图片描述

2.2.3 实时监控目录下多个文件

1、案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS
2、需求分析

在这里插入图片描述

3、案例实现
(1)创建配置文件flume-dir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

参数解析

在这里插入图片描述
(2)启动监控文件夹命令

 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

注意:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文
件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。
(3)向 upload 文件夹中添加文件

  • 在/opt/module/flume 目录下创建 upload 文件夹(这个操作要在开启监控之前,要不然监控不了,会报错)

  • 向 upload 文件夹中添加文件

  • touch atguigu.txt

  • touch atguigu.tmp

  • touch atguigu.log

(4)查看HDFS上的数据

在这里插入图片描述

2.2.4 实时监控目录下的多个追加文件

Exec Source适用于监控一个实时追加的文件,不能实现断电续传;Spooldir Source适合用于同步新文件,但是不适合对实时追加的日志文件进行监听和同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
1、案例需求
使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
2、需求分析

在这里插入图片描述
3、案例实现
(1)创建配置文件 flume-taildir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

参数解析

在这里插入图片描述
(2)启动监控文件夹命令

 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

(3)向files 文件夹中追加内容

  • 在/opt/module/flume 目录下创建 files 文件夹
  • 向 upload 文件夹中添加文件
 echo hello >> file1.txt
  echo atguigu >> file2.txt

(4)查看HDFS上的数据

在这里插入图片描述
Taildir说明:
Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新位置,因此能够实现断点续传。Position File格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/626532.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#winform多国语言应用实例

我们在开发项目中,一般需要软件支持多种语言,供不同客户使用。本文实例讲解实现办法。 1 窗体项目创建 添加控件MenuStrip、comboBox及Button,并修改对应显示文本,combobox编辑项输入英语 确定窗体的Localizable属性为true,自动创建Form1.resx,为False时,没有Form1.r…

基站机房:保障通信网络稳定,如何解决安全隐患?

基站机房作为无线通信网络的关键组成部分,承载着大量的网络设备和通信设施,对于运营商和通信服务提供商来说具有重要意义。 无论是大型运营商还是通信服务提供商,动环监控系统都将成为他们成功运营和管理通信网络的关键工具。 客户案例 案例…

vue使用高德地图--附带移动获取当前城市信息

高德地图 1.使用准备申请密钥vue使用 2.移动地图获取城市案例(注意事项)3.总结 1.使用准备 申请密钥 登录注册高德开放平台进入控制台 创建应用 申请key–生成key和安全密钥(2021之后key需要配合安全密钥使用) 注意:安全密钥需要在key之前 vue使用 首先在pubil…

一款功能强大的报表引擎-VeryReport报表引擎

在企业管理中,数据分析和决策制定是非常重要的环节。而报表则是这个过程中最常用的工具之一。但是,传统的报表设计与展现方式已经无法满足企业对于数据分析和报表展示的需求。为了解决这些问题,我们向大家推荐一款新一代Web报表软件——VeryR…

越是大型企业越需要企业内部知识库?

随着信息时代的到来,越来越多的企业开始注重知识管理。知识管理是一种通过有效地捕捉、共享和利用企业内部的知识资源,促进企业创新和发展的方法。而企业内部知识库作为知识管理的一种重要方式,对于大型企业来说尤为重要。 一、大型企业内部…

苹果相关网站和服务器状态

https://www.apple.com.cn/cn/support/systemstatus/

googlecloud谷歌云的初学体会(1)

googlecloud谷歌云入门(1) 一、纯小白自述二、云是个什么云三、装一个软件(资源、服务)四、服务器(爷爷提供服务的电脑)五、PGSQL的安装六、总结 一、纯小白自述 自己是个小白,仅仅懂得几句sql…

华为OD机试真题 Java 实现【寻找密码】【2023Q1 100分】,附详细解题思路

一、题目描述 小王在进行游戏大闯关,有一个关卡需要输入一个密码才能通过,密码获得的条件如下: 在一个密码本中,每一页都有一个由 26 个小写字母组成的若干位密码,从它的末尾开始依次去掉一位得到的新密码也在密码本中存在。 请输出符合要求的密码,如果由多个符合要求…

爬虫如何选择工具和编程语言

爬虫选择工具和编程语言需要根据具体的需求和技术水平来决定。以下是一些常用的工具和编程语言: 工具: Scrapy:一个基于Python的高级爬虫框架,可用于快速开发和部署爬虫。Beautiful Soup:一个Python库,用…

基于“三维六类”干扰分析模型进行FDD900干扰规避优化指导

1.概述 随着网络发展,鉴于900M覆盖上的优势,为增强深度覆盖及竞对提升,当前FDD 900M已在加快部署,但随之也带来了干扰问题。当前,干扰排查成为FDD 900M部署过程中大量存在的难题。由于干扰排查难度大,且排…

线程池和使用

tip: 作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。 推荐:体系化学习Java(Java面试专题) 文章目录 线程池的目的线程池的参…

360,可真小看你了:耍流氓耍到日本人身上,凌晨2点笑得我胃疼

天下苦流氓软件久矣 大厂的软件,都有点牛皮癣特性:捆绑安装广告推广,简直无所不用其极,身为用户着实无可奈何。 此处点名四大全家桶家族——360、鲁大师、金山毒霸、2345。 说来好笑,之前发的文章不是有关于金山的嘛…

Cefsharp-Winform-113.3.50(chromium5672)最新版体验兼容性测试

一、下载nupkg包(4个)提示:(不支持H264,支持MP3,WEBGL,WEBGL2等)支持H264最新版本109.*自行搜索 winform包地址(依赖包下载地址如下):NuGet Gallery | CefSharp.WinForms 113.3.50 https://globalcdn.nuget.org/packages/cefsharp.winforms.113.3.50.nupkg https://…

仓库拣货标签10代—电子料架

CK_Label_v10 无线电子标签拣货系统特点与效益 无线通信,极简快速部署 超低功耗,墨水屏显示 多彩指示灯,支持24V外接供电 按键及三色高亮LED指示灯指示 3位0.8寸高亮LED数码管显示 提升作业速度与品质 实现无纸化标准化作业 缩短操…

怎么快速掌握Python爬虫技术?

Python总的来说是一门比较容易入门的编程语言,因为它的语法简洁易懂,而且有很多优秀的教程和资源可供学习。相比其他编程语言,Python 的学习曲线较为平缓,初学者可以很快上手,但要想深入掌握 Python,还需要…

chatgpt赋能python:用Python实现ping命令:掌握网络连接的艺术

用Python实现ping命令:掌握网络连接的艺术 当我们需要测试网络连接的时候,ping命令是最经典的选择之一。然而,在一些情况下,使用命令行并不是很方便。那么,有没有可能用Python编写一个类似ping的功能呢? …

ISO21434 概念阶段网络安全(六)

目录 一、概述 二、目标 三、项目定义 3.1 输入 3.1.1 先决条件 3.1.2 进一步支持信息 3.2 要求和建议 3.3 输出 四、网络安全目标 4.1 输入 4.1.1 先决条件 4.1.2 进一步支持信息 4.2 要求和建议 4.3 输出 五、网络安全概念 5.1 输入 5.1.1 先决条件 5.1.2 …

phpMyAdmin连接MySQL,出现服务器拒绝连接解决方法

当你登录mysql的时候出现下面情况时 把config.inc.php删除就可以,或者修改config.inc.php里的 $cfg[Servers][$i][controluser] ; $cfg[Servers][$i][controlpass] ; 注释掉就会弹出来要求登陆。 例如我的文件位置是在C:\wamp\apps\phpmyadmin4.1.14&#xff…

dreamer-cms docker复现

dreamer-cms docker复现 前言一,赛题复现二,人生第一个jar包1 ubuntu本地复现(1)创建文件夹(2)解压资源(3)安装并导入数据库(4)使用idea自动化部署&#xff0…

软件锁步冗余执行等安全机制是什么?

软件锁步冗余执行等安全机制是一种用于提高软件系统的功能安全性和可靠性的技术。它的基本思想是让两个或多个软件副本执行相同的功能,然后比较它们的输出,以检测和处理可能的故障。不同的安全机制有不同的特点和优缺点,例如: 锁…