尚硅谷Flume(仅有基础)

news2024/12/30 3:41:51

q

1 概述

1.1 定义

        Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。 

        Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

1.2 架构

1.2.1 Agent 

        Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。 
        Agent主要有3个部分组成,Source、Channel、Sink。 

1.2.2 Source   

        Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。 

1.2.3 Sink 

        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 

1.2.4 Channel 

        Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink的读取操作。 
        Flume自带两种Channel:Memory Channel和 File Channel。 

        Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。 

        File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 

1.2.5 Event 

        传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

        Event由Header 和Body 两部分组成,Header用来存放该 event的一些属性,为K-V 结构,Body用来存放该条数据,形式为字节数组。

2 Flume基本操作

2.1 安装部署

http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-tar.gz

直接解压

将lib文件夹下的guava-11.0.2.jar删除以兼容 Hadoop 3.1.3

2.2 案例

2.2.1 监控端口数据官方案例

Flume 1.9.0 User Guide — Apache Flume

使用 Flume监听一个端口,收集该端口数据,并打印到控制台

(1)安装netcat工具 
[atguigu@hadoop102 software]$ sudo yum install -y nc 
(2)判断4444端口是否被占用 
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 4444
(3)创建Flume Agent配置文件flume-netcat-logger.conf 
(4)在flume目录下创建 job文件夹并进入job文件夹。 
[atguigu@hadoop102 flume]$ mkdir job 
[atguigu@hadoop102 flume]$ cd job/ 
(5)在job文件夹下创建 Flume Agent配置文件flume-netcat-logger.conf。 
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf 
(6)在flume-netcat-logger.conf文件中添加如下内容

# name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source 
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

原神启动  

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
or
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -flume.root.logger=INFO,console 

 --conf/-c:表示配置文件存储在 conf/目录 
 --name/-n:表示给 agent 起名为 a1 
 --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 
 -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logge参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。 

另一台nc启动

 nc localhost 4444

然后发消息

2023-10-25 14:03:53,633 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 30   

2.2.2 实时监控单个追加文件

实时监控 Hive 日志,并上传到HDFS中 

开启hadoop集群

start-all.sh

开启hive

/export/servers/hive/bin/hive --service metastore & nohup /export/servers/hive/bin/hive

vim flume-file-hdfs.conf

# Name the components on this agent 
a2.sources = r2
a2.sinks = k2
a2.channels = c2
 
# Describe/configure the source 
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log(要监控拉取的文件)

# Describe the sink 
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop1:8020/flume/%Y%m%d/%H(这里的端口要和hadoop配置里hdfs的一样!!!!!!!!!)
#上传文件的前缀 
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹 
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹 
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位 
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳 
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才flush 到HDFS一次 
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory 
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel 
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

  启动

bin/flume-ng agent -n a2 -c conf -f job/flume-file-hdfs.conf

ctrl+Z退出

在HDFS上查看文件。

2.2.3 实时监控目录下多个新文件 

使用Flume监听整个目录的文件,并上传至 HDFS 

         在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每 500毫秒扫描一次文件变动。 

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf


a3.sources = r3
a3.sinks = k3
a3.channels = c3
 
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /export/server/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
 
# Describe the sink 
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀:
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才flush 到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

在/opt/module/flume 目录下创建upload文件夹 

向 upload文件夹中添加文件 

2.2.4 实时监控目录下的多个追加文件 

        Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 
 
# Describe/configure the source 
a3.sources.r3.type = TAILDIR 
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups = f1 f2 
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* 
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.* 
 
# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event 才flush 到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

Taildir 说明: 
  Taildir Source维护了一个 json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下

3 Flume 高级

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1135522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

易点易动固定资产管理系统:高效盘点海量固定资产的得力助手

固定资产是企业重要的财务资源之一,盘点是保证固定资产准确性和完整性的关键环节。然而,对于拥有海量固定资产的企业来说,传统的手工盘点方式效率低下且容易出错。为了解决这一难题,易点易动固定资产管理系统应运而生。本文将深入…

AM@第二类换元法积分

文章目录 abstract第一类换元法第二类换元法分析定理👺证明第二类换元公式的应用 倒代换三角恒等化去根式其他使用第二换元法情形例 附加积分公式表例 附 abstract 第二类换元法(简称第二换元法)的原理和应用 第一类换元法 通过变量代换 u ϕ ( x ) u\phi(x) uϕ…

GoLong的学习之路(十)语法之函数

书接上回,上回书说到,结构体,一言之重在于体。一体之重在于经。经之重甚于骨。这张就说go的经络—函数。 文章目录 函数函数如何定义参数可变参数 返回值多返回值 函数类型与变量 高阶函数函数作为参数函数作为返回值匿名函数闭包defer语句底…

虹科 | 解决方案 | 非道路移动机械诊断方案

虹科Pico汽车示波器为卡车、拖拉机、叉车、船只、联合收割机、挖掘机开发了专用的测试附件和软件测试菜单,比如 24 V 电池、Bosch Denoxtronic、J1939 通信、发动机和液压传动系统以及部件测试等。我们为从事重型车辆和非道路移动机械的维护与诊断的朋友&#xff0c…

通用表表达式查询

1.方法: 1.1普通变量创建 with 表名(列名) as(select 内容) 语义:创建一张表 列名和内容11对应 和临时表的区别,这个类似变量,变量和常量的区别 后面可以影响前面: 1…

通天之网:卫星互联网与跨境电商的数字化未来

在当今数字化时代,互联网已经成为商业的核心。跨境电商,作为在线商业的一部分,一直在寻求新的途径来拓宽其边界。近年来,卫星互联网技术的发展已经成为这一领域的重要驱动力,不仅将互联网带到了全球各个角落&#xff0…

DSP 开发例程: led_flash

此例程实现在 EVM6678L 开发板控制 LED 闪烁. 使用了 SYS/BIOS 和 MCSDK PDK TMS320CC6678 两个组件. 例程源码可从我的 gitee.com 仓库上克隆或下载. 目录 创建工程源码编辑main.cplatform_osal.capp.cfg 编译调试使用 板载仿真器使用 外部仿真器 创建工程 点击菜单: File | N…

51单片机实验:数码管动态显示00-99

1、实验要求 利用STC89C52RC单片机开发板实现:使用2位数码管循环显示00-99,每次间隔1s,并且当计数到20时,则蜂鸣器鸣响1次。 2、实验分析 程序实现分析: 1、定义数码管位选引脚(P2.4、P2.5、P2.6、…

C++ BinarySercahTree for version

搜索二叉树定义 搜索二叉树模拟实现 首先写一个模版&#xff0c;然后写一个搜索二叉树的类 BSTree&#xff0c;类里面给 BSTe进行重命名为&#xff1a;Node。 template<class K> class BSTree {tyepdef BSTree<K> Node; private:Node* root nullptr; };再写一个…

Qt中的枚举变量,Q_ENUM,Q_FLAG以及Qt中自定义结构体、枚举型做信号参数传递

Qt中的枚举变量,Q_ENUM,Q_FLAG,Q_NAMESPACE,Q_ENUM_NS,Q_FLAG_NS以及其他 理论基础&#xff1a;一、Q_ENUM二、QMetaEnum三、Q_FLAG四、示例 Chapter1 Qt中的枚举变量,Q_ENUM,Q_FLAG,Q_NAMESPACE,Q_ENUM_NS,Q_FLAG_NS以及其他前言Q_ENUM的使用Q_FLAG的引入解决什么问题&#xf…

怎么在Python爬虫中使用IP代理以避免反爬虫机制?

在进行网络爬虫的过程中&#xff0c;尤其是在大规模批量抓取数据时&#xff0c;需要应对各种反爬虫技术&#xff0c;其中最常用的就是IP封锁。为了避免IP被封锁&#xff0c;我们可以使用IP代理来隐藏自己的真实IP地址&#xff0c;从而让爬虫活动看起来更像正常的浏览器行为。 I…

Table-GPT:让大语言模型理解表格数据

llm对文本指令非常有用&#xff0c;但是如果我们尝试向模型提供某种文本格式的表格数据和该表格上的问题&#xff0c;LLM更有可能产生不准确的响应。 在这篇文章中&#xff0c;我们将介绍微软发表的一篇研究论文&#xff0c;“Table-GPT: Table- tuning GPT for Diverse Table…

虹科 | 解决方案 | 汽车示波器 远程诊断方案

车厂总部专家实时指导你修车 当一线汽修技师遇到疑难问题无从下手时&#xff0c;可以准备好pico汽车示波器套装&#xff0c;并戴上我们的M400智能AR眼镜&#xff0c;通过语音操作&#xff0c;呼叫主机厂的技术支持老师&#xff1b;老师通过AR眼镜上的摄像头老师可以实时看到现…

Python数据挖掘:入门、进阶与实用案例分析——基于非侵入式负荷检测与分解的电力数据挖掘

文章目录 摘要01 案例背景02 分析目标03 分析过程04 数据准备05 属性构造06 模型训练07 性能度量08 推荐阅读赠书活动 摘要 本案例将根据已收集到的电力数据&#xff0c;深度挖掘各电力设备的电流、电压和功率等情况&#xff0c;分析各电力设备的实际用电量&#xff0c;进而为电…

财务RPA机器人真的能提高效率吗?

财务部门作为一个公司的管理职能部门承担着一个公司在商业活动中各个方面的重要职责。理论上来说&#xff0c;一个公司的财务部门的实际工作包含但不限于对企业的盈亏情况进行评估、对风险进行预测、通过数据分析把握好公司的财务状况、税务管理等。 然而&#xff0c;实际上在…

分析主题帆软决策报表控件实现点击查询按钮后才能查询

点击「我的分析」即进入分析主题的管理界面&#xff0c;如下图所示&#xff1a; 2.1.1 分析列表区域 「分析列表」存放着当前用户创建的和其他用户协作给该用户的所有分析主题。 可以对分析列表和分析主题进行管理&#xff0c;详情参见&#xff1a;管理我的分析、管理分析主…

oa系统是什么?有哪些功能和作用?

本文将为大家讲解&#xff1a;1、oa系统是什么&#xff1f;有哪些功能和作用&#xff1f; 一、什么是OA系统 OA系统全称为Office Automation&#xff0c;即办公自动化系统。它是一种专门为企业和机构的日常办公工作提供服务的综合性软件平台&#xff0c;具有信息管理、流程管…

微信小程序实现文章内容详情

方案一、使用微信小程序官方提供的webview 前提已经在微信公众平台开发管理配置好了安全域名即&#xff1a; 方案二、把网页转成pdf直接展示 前提已经在微信公众平台开发管理配置好了安全域名即&#xff1a; 实现思路是发起网络请求拿到pdf下载地址&#xff0c;然后wx.download…

Linux (KDE) 中使用Network Settings设置静态ip

在 Linux (KDE) 中使用 Network Settings 设置s5静态IP详细教程 。 首先&#xff0c;打开 KDE 的设置面板。可以通过点击桌面上的设置图标&#xff0c;或者在开始菜单中搜索 “Settings” 并打开。 在设置面板中&#xff0c;点击 “Network” 选项。 接下来&#xff0c;你会看…

AD20生产Gerber文件

1、打开所要导出文件的PCB文件。并选择制造输出Gerber文件。 2、选择单位和格式 3、设置需要输出的图层 点击确定后&#xff0c;就可以生成Gerber文件。如下图所示。 4、生成钻孔文件 5&#xff0c;选择PCB窗口&#xff0c;跟第一步一样&#xff1b; 6&#xff0c;点击菜单 文…