浅析数据采集工具Flume

news2025/1/18 7:04:06

title: Flume系列


第一章 Flume基础理论

1.1 数据收集工具产生背景

Hadoop 业务的一般整体开发流程:

在这里插入图片描述

任何完整的大数据平台,一般都会包括以下的基本处理过程:

数据采集 
数据 ETL 
数据存储 
数据计算/分析 
数据展现 

其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出。这其中包括:

数据源多种多样 
数据量大,变化快 
如何保证数据采集的可靠性的性能 
如何避免重复数据 
如何保证数据的质量 

我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠, 高性能和高扩展。

总结:
数据的来源大体上包括:

1、业务数据 
2、爬取的网络公开数据 
3、购买数据 
4、自行采集日志数据

1.1 Flume简介

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

​ Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方,用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。

1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用来采集流动型数据。

2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集,它支持从很多数据源聚合数据到 HDFS。

3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。

4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、可靠性。

1.2 Flume版本

Flume 在 0.9.x and 1.x 之间有较大的架构调整:
1.x 版本之后的改称 Flume NG
0.9.x 版本称为 Flume OG,最后一个版本是 0.94,之后是由 Apache 进行了重构
N是New 和 O是Old

Flume1.7版本要求:

Flume OG  Old/Original Generation
Flume NG  New/Next	   Generation	

在这里插入图片描述

注意,上面是flume1.7的要求,其他版本要求可能会不一样!!

本文使用版本链接:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

官网链接:http://flume.apache.org/

Flume1.9 版本要求:

System Requirements

Java Runtime Environment - Java 1.8 or later
Memory - Sufficient memory for configurations used by sources, channels or sinks
Disk Space - Sufficient disk space for configurations used by channels or sinks
Directory Permissions - Read/Write permissions for directories used by agent

第二章 Flume体系结构/核心组件

agent:能独立执行一个数据收集任务的JVM进程
source : agent中的一个用来跟数据源对接的服务
channel : agent内部的一个中转组件
sink : agent中的一个用来跟数据目的地对接的服务
event: 消息流转的一个载体/对象
		header  body
        

常见source的类型
	Avro source :接收网络端口中的数据
	exec source: 监听文件新增内容   tail -f
	spooldir source :监控文件夹的,如果这个文件夹里面的文件发送了变化,就可以采集
	Taildir source: 多目录多文件实时监控
	
	
常见的channel的类型
	memory : 内存中  , 快 , 但不安全
	file : 相对来说安全些,但是效率低些
	jdbc: 使用数据库进行数据的保存


常见的sink的类型
	logger   做测试使用
	HDFS	离线数据的sink 一般
	Kafka   流式数据的sink 
以上仅仅是常见的一些,官网中有完整的。

2.1 介绍

​ Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

​ Flume 以 agent 为最小的独立运行单位

​ 一个 agent 就是一个 JVM

​ 单 agent 由 Source、Sink 和 Channel 三大组件构成。

如下面官网图片
在这里插入图片描述

解释:

在这里插入图片描述

2.2 Flume三大核心组件

Event
Event 是 Flume 数据传输的基本单元。
Flume 以事件的形式将数据从源头传送到最终的目的地。
Event 由可选的 header 和载有数据的一个 byte array 构成。
载有的数据对 flume 是不透明的。
Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。
Header 可以在上下文路由中使用扩展。

Client
Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体
目的是从数据源系统中解耦 Flume
在 Flume 的拓扑结构中不是必须的

Agent
一个 Agent 包含 source,channel,sink 和其他组件。
它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。
Agent 是 flume 流的基础部分。
Flume为这些组件提供了配置,声明周期管理,监控支持。

Agent 之 Source
Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个
包含 event 驱动和轮询两种类型
不同类型的 Source
与系统集成的 Source:Syslog,Netcat,监测目录池
自动生成事件的 Source:Exec
用于 Agent 和 Agent 之间通信的 IPC source:avro,thrift
Source 必须至少和一个 channel 关联

Agent 之 Channel
Channel 位于 Source 和 Sink 之间,用于缓存进来的 event
当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除
不同的 channel 提供的持久化水平也是不一样的
Memory Channel:volatile(不稳定的)
File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现
JDBC Channel:基于嵌入式 database 实现
Channel 支持事务,提供较弱的顺序保证
可以和任何数量的 source 和 sink 工作

Agent 之 Sink
Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除
不同类型的 sink ,比如 HDFS,HBase

2.3 Flume经典部署方案

1、单Agent采集数据

在这里插入图片描述

​ 由一个 agent 负责把从 web server 中收集数据到 HDFS 。

2、多Agent串联

在这里插入图片描述

​ 在收集数据的过程中,可以让多个 agent 串联起来,形成一条 event 数据线,进行传输,但 是注意的是:相邻两个 agent 的前一个 agent 的 sink 类型要和后一个 agent 的 source 类型一 致。

3、多Agent合并串联

在这里插入图片描述

​ 多个 agent 串联,并联成一个复杂的 数据收集架构。反映了 flume 的部署灵活。并且针对关键节点,还可以进行高可用配置。

4、多路复用

在这里插入图片描述

​ 一份数据流,可以被复制成多份数据流,交给多个不同组件进行处理。一般用于一边永久存储一边进行计算。

第三章 Flume安装及案例

3.1 安装部署

3.1.1 Flume1.7安装部署

1、将apache-flume-1.7.0-bin.tar.gz上传到hadoop0的/software目录下,并解压

[root@hadoop0 software]# tar -zxvf apache-flume-1.7.0-bin.tar.gz

2、重命名为flume

[root@hadoop0 software]# mv apache-flume-1.7.0-bin flume

3、修改flume-env.sh文件

[root@hadoop0 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk

3.1.2 Flume1.9安装部署

1、将apache-flume-1.9.0-bin.tar.gz上传到hadoop10的/software目录下,并解压

[root@hadoop10 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz

2、重命名为flume

[root@hadoop10 software]# mv apache-flume-1.9.0-bin flume

3、修改flume-env.sh文件

[root@hadoop10 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk

4、看看Flume版本

[root@hadoop10 bin]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@hadoop10 bin]# pwd
/software/flume/bin
[root@hadoop10 bin]# 

3.2 案例

3.2.1 监控端口数据(官方案例)

1、在flume的目录下面创建文件夹
[root@hadoop0 flume]# mkdir job
[root@hadoop0 flume]# cd job
2、定义配置文件telnet-logger.conf
[root@hadoop0 job]# vim telnet-logger.conf
添加内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console

4、执行telnet localhost 44444
telnet localhost 44444
会先报找不到telnet
[root@hadoop10 flume]# telnet localhost 44444
bash: telnet: command not found...
[root@hadoop10 flume]# 
然后执行yum -y install telnet
5、发送命令测试即可

针对于上述配置telnet-logger.conf文件的内容的解释:

# example.conf: A single-node Flume configuration

# Name the components on this agent  #a1: 表示的是agent的名字
a1.sources = r1		#r1 : 表示的是a1的输入源
a1.sinks = k1  		#k1 : 表示的a1的输出目的地
a1.channels = c1  	#c1 : 表示的a1的缓冲区

# Describe/configure the source	#配置source
a1.sources.r1.type = netcat		#表示a1的输入源r1的类型是netcat类型
a1.sources.r1.bind = localhost  #表示a1监听的主机
a1.sources.r1.port = 44444      #表示a1监听的端口号

# Describe the sink		    #描述sink
a1.sinks.k1.type = logger	#表示a1的输入目的地k1的类型是logger

# Use a channel which buffers events in memory	
a1.channels.c1.type = memory		#表示a1的channel的类型是memory类型
a1.channels.c1.capacity = 1000		#表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100  #表示a1的channel传输的时候收集到了100个event以后再去提交事务

# Bind the source and sink to the channel
a1.sources.r1.channels = c1  #表示将r1和c1 连接起来
a1.sinks.k1.channel = c1     #表示将k1和c1 连接起来

3、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger2.conf -Dflume.root.logger=INFO,console

参数说明:
	--conf conf : 表示配置文件在conf目录
	--name a1  :  表示给agent起名为a1 
	--conf-file job/telnet-logger.conf : flume本次启动所要读取的配置文件在job文件夹下面的telnet-logger.conf文件
	-Dflume.root.logger=INFO,console : -D 表示flume运行时候的动态修改flume.root.logger参数值,并将日志打印到控制台,级别是INFO级别。
	日志级别: log、info、warn、error 

3.2.2 监控目录中的文件到HDFS

1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

针对于上述配置dir-hdfs.conf文件的内容的解释:

1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:

a3.sources = r3		#定义source为r3
a3.sinks = k3       #定义sink为k3
a3.channels = c3    #定义channel为c3

# Describe/configure the source  #配置source相关的信息
a3.sources.r3.type = spooldir    #定义source的类型是spooldir类型
a3.sources.r3.spoolDir = /software/flume/upload   #定义监控的具体的目录
a3.sources.r3.fileSuffix = .COMPLETED			  #文件上传完了之后的后缀
a3.sources.r3.fileHeader = true					  #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)        #忽略以tmp结尾的文件,不进行上传

# Describe the sink			#配置sink相关的信息
a3.sinks.k3.type = hdfs		#定义sink的类型是hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H	#文件上传到hdfs的具体的目录
a3.sinks.k3.hdfs.filePrefix = upload-		#文件上传到hdfs之后的前缀
a3.sinks.k3.hdfs.round = true				#是否按照时间滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1				#多长时间单位创建一个新的文件
a3.sinks.k3.hdfs.roundUnit = hour			#时间单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true   #是否使用本地时间
a3.sinks.k3.hdfs.batchSize = 100			#积累多少个event才刷写到hdfs一次
a3.sinks.k3.hdfs.fileType = DataStream		#文件类型
a3.sinks.k3.hdfs.rollInterval = 600			#多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700		#多大生成新文件
a3.sinks.k3.hdfs.rollCount = 0				#多少event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1		#副本数

# Use a channel which buffers events in memory
a3.channels.c3.type = memory				
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

在执行上面的命令过程中遇到的了一点点小问题

......
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
	at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)

解决方案:将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop的版本。可以通过重命名的方式注释掉即可(实现删除的效果)。

[root@hadoop10 lib]# mv guava-11.0.2.jar guava-11.0.2.jar.backup

3.2.3 监控文件到HDFS

1、创建一个自动化文件
[root@hadoop0 job]# vim mydateauto.sh
写入:
#!/bin/bash

while true
do
	echo `date`
	sleep 1
done

然后运行测试:
[root@hadoop0 job]# sh mydateauto.sh 
Wed Aug 19 18:34:19 CST 2020
Wed Aug 19 18:34:20 CST 2020


然后修改配置,将输出的日志追加到某个文件中
#!/bin/bash

while true
do
        echo `date` >> /software/flume/mydate.txt
        sleep 1
done

再次执行[root@hadoop0 job]# sh mydateauto.sh 
就会在flume的文件夹下面生成了mydate.txt文件
通过tail -f mydate.txt 查看
再次执行sh mydateauto.sh  查看输出。

2、创建配置vim file-hdfs.conf


# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /software/flume/mydate.txt
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、启动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

针对于上述配置file-hdfs.conf文件的内容的解释:

# Name the components on this agent
a2.sources = r2		#定义source为r2
a2.sinks = k2		#定义sink为k2
a2.channels = c2	#定义channel为c2

# Describe/configure the source
a2.sources.r2.type = exec	#定义source的类型是exec 可执行命令
a2.sources.r2.command = tail -F /software/flume/mydate.txt	#具体文件位置
a2.sources.r2.shell = /bin/bash -c  #命令开头

# Describe the sink	#sink相关配置
a2.sinks.k2.type = hdfs		#定义sink的类型是hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H		#具体的位置
a2.sinks.k2.hdfs.filePrefix = logs- 	
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600   #单位是秒!!
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、启动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

过程中遇到的一点点小问题:

在这里插入图片描述

18 Oct 2021 14:32:24,340 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k2, type: hdfs
18 Oct 2021 14:32:24,348 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:469)  - Sink k2 has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100
	at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

解决方案:

问题原因:原因其实很明了了,就是字面的意思,channel 与 sink的设置不匹配,sink的batch size大于channel的transaction capacity
解决方案:将a2.sinks.k2.hdfs.batchSize设置为小于等于100 。 或者注释掉也可以。

3.2.4 多目录多文件实时监控(Taildir Source)

与前面使用到的Source的对比

Spooldir Source 用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。
Exec source 用于监控一个实时追加的文件,不能实现断点续传;
Taildir Source 用于监听多个实时追加的文件,并且能够实现断点续传。

操作案例:

1、在job下面创建 vim taildir-hdfs.conf



a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /software/flume/taildir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /software/flume/taildirtest/filedir/.*file.*
a3.sources.r3.filegroups.f2 = /software/flume/taildirtest/logdir/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/uploadtaildir/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2、创建文件文件夹,注意需要在启动之前创建监控的文件夹
[root@hadoop10 flume]# mkdir taildirtest
[root@hadoop10 flume]# cd taildirtest/
[root@hadoop10 taildirtest]# ll
total 0
[root@hadoop10 taildirtest]# mkdir filedir
[root@hadoop10 taildirtest]# mkdir logdir
[root@hadoop10 taildirtest]# ll
total 0
drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir
drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir
[root@hadoop10 taildirtest]# vim file.txt
[root@hadoop10 taildirtest]# vim log.txt
[root@hadoop10 taildirtest]# ll
total 8
drwxr-xr-x. 2 root root  6 Oct 18 16:44 filedir
-rw-r--r--. 1 root root 35 Oct 18 16:45 file.txt
drwxr-xr-x. 2 root root  6 Oct 18 16:45 logdir
-rw-r--r--. 1 root root 35 Oct 18 16:46 log.txt

3、启动监控目录命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-hdfs.conf

4、测试
[root@hadoop10 taildirtest]# cp file.txt filedir/
[root@hadoop10 taildirtest]# cp log.txt logdir/
[root@hadoop10 taildirtest]# cd filedir/
[root@hadoop10 filedir]# echo hello1 >> file.txt 
[root@hadoop10 filedir]# cd ../logdir/
[root@hadoop10 logdir]# echo hello2 >> log.txt 
[root@hadoop10 logdir]# 



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nacos注册中心和服务方式

目录 一、服务治理介绍 常见的注册中心 二、Nacos注册中心介绍 三、运用Nacos搭建环境 四、DiscoveryClient实现负载均衡 五、Ribbon实现负载均衡 六、基于Feign实现服务调用 七、Feign传参 一、服务治理介绍 通过上一章的操作,我们已经可以实现微服务之间的调…

【Android +Tensroflow Lite】实现从基于机器学习语音中识别指令讲解及实战(超详细 附源码)

需要源码和配置文件请点赞关注收藏后评论区留言~~~ 一、基于机器学习的语音推断 Tensorflow基于分层和模块化的设计思想,整个框架以C语言的编程接口为界,分为前端和后端两大部分 Tensorflow框架结构如下图 二、Tensorflow Lite简介 虽然Tensorflow是一…

WMS类图结构分析-android12

为什么要分析类图? WMS是一个复杂的模块,就像一个很大的家族,里面有各种角色,认识类图就像是认识WMS模块中的各个角色,不先把人认清楚了,怎么更好的理解他们之间的交互? 我觉得,这…

【MATLAB教程案例47】基于双目相机拍摄图像的三维重建matlab仿真

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 本课程学习成果预览: 目录 1.软件版本 2.基于双目相机拍摄图像的三维重建原理概述

GII全球创新指数2013-2020

1、数据来源:世界知识产权组织发布的《2021年全球创新指数报告》 2、时间跨度:2013-2020 3、区域范围:全球 4、指标说明: 全球创新指数(Global Innovation Index,GII)是世界知识产权组织、康…

20221127-1Spring_day01(资料来自黑马程序)

Spring_day01 今日目标 掌握Spring相关概念完成IOC/DI的入门案例编写掌握IOC的相关配置与使用掌握DI的相关配置与使用 1,课程介绍 对于一门新技术,我们需要从为什么要学、学什么以及怎么学这三个方向入手来学习。那对于Spring来说: 1.1 为什么要学? …

Reactive UI -- 反应式编程UI框架入门学习(一)

反应式编程 反应式编程是一种相对于命令式的编程范式,由函数式的组合声明来构建异步数据流。要理解这个概念,可以简单的借助Excel中的单元格函数。 上图中,A1B1C1,无论B1和C1中的数据怎么变化,A1中的值都会自动变化&a…

Kafka - 08 Kafka Broker工作流程 | 节点服役 | 节点退役

文章目录1. Kafka Broker 工作流程2. Kafka 节点服役1. 增加一个Kafka节点2. 执行负载均衡操作3. Kafka 节点退役1. Kafka Broker 工作流程 Kafka上下线时Zookeeper中的数据变化: [zk: localhost:2181(CONNECTED) 9] ls / [zookeeper, kafka_cluster][zk: localhost…

使用nw.js快速开发一个基于浏览器的小型桌面端(适用于高校学生实验作业)

首先讲下退坑事项,节约读者时间 生成的exe会依赖SDK文件夹下的一些dll,所以不能简单的交付这个exe,需要使用额外的软件进行打包,如Enigma Virtual Box、inno setup自定义可执行文件的icon也要额外软件,如Resource Hac…

SCDM 实例教程:基本几何建模

作者 | 张杨 ANSYS SpaceClaim Direct Modeler(简称 SCDM)是基于直接建模思想的新一代3D建模和几何处理软件。SCDM可以显著地缩短产品设计周期,大幅提升CAE分析的模型处理质量和效率,为用户带来全新的产品设计体验。 本文将主要…

常用 CMD 命令

前言 作为一个程序员,可能更多的是在 Linux 中使用命令来操作。但在日常使用 Windows 的过程中,或多或少会使用到命令提示符窗口,也就是 Windows 中的 CMD。这个时候,掌握一些常用的命令就尤为重要了,一方面方便自己使…

排序-指标解读

一、ROC ROC曲线全称是(receiver operating characteristic cure)受试者工作特征曲线。 首先大家看到这里肯定会好奇,为啥名字这么奇怪,来一波背景介绍先。 “ROC起先应用于军事领域,据说在第二次世界大战期间,ROC 曲线最先是由…

几分钟快速学会Linux开启启动服务

背景 最近在银行遇到一个部署问题,uat、prod 两个环境的ECS中的服务要求制作好基础镜像,上环境的时候只需要在对应的ECS中选择更换系统即可,不允许传统连接SSH上去安装,这就要求我们就得提前把需要运行的服务内置到系统中&#x…

债券数据集:绿色债券数据集、历时新发、发行债券、DCM定价估值四大指标数据

1、绿色债券数据集 1、数据来源:wind 2、时间跨度:2016.01-2021.11年 3、区域范围:全国 4、指标说明: 部分指标如下: 数据截图如下: 2、历史新发债券数据库 1、数据来源:wind 2、时间跨度…

领悟《信号与系统》之 傅立叶变换的性质与应用

傅立叶变换的性质与应用一、傅里叶变换性质表二、傅里叶性质详细1. 线性性质2. 尺度变换特性3. 时移特性4. 频移特性5. 时域微分特性6. 频域微分特性7. 时域积分特性8. 频域积分特性9. 卷积定理1. 时域卷积定理2. 频域卷积定理10. 对称性11. 帕塞瓦尔定理依据傅里叶变换对概念&…

xxl-job 快速使用

xxl-job 快速使用xxl-job 简单使用注意事项执行xxl-job 简单使用 xxl-job 官方使用说明 XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 注意事项 详细可…

【Canvas】js用Canvas绘制阴阳太极图动画效果

学习JavaScript是否兴趣缺缺,那就需要来一个兴趣学习,问一下有没有兴趣用Canvas画图呢,可以画很多有趣的事物,自由发挥想象,收获多多哦,这里有一个例子,如何用canvas画阴阳太极图动图效果&#…

王道考研——操作系统(第二章 进程管理)(死锁)

一、死锁的概念 什么是死锁 死锁、饥饿、死循环的区别 死锁产生的必要条件 什么时候会发生死锁 死锁的处理策略 知识回顾与重要考点 二、死锁的处理策略——预防死锁 知识总览 破坏互斥条件 破坏不剥夺条件 破坏请求和保持条件 破坏循环等待条件 知识回顾与重要考点 与前面哲…

分省/市/县最低工资标准(2012-2021年)和 全国/省/市/县GDP数据(1949-2020年)

一、最低工资数据 1、数据来源:各省\市\县政府公布资料 2、时间跨度:2012-2021年 3、区域范围:全国各省\市\县 4、指标说明: 部分数据如下: 二、各省市县人均GDP 1、数据来源:地方统计局 2、时间跨度…

常用PC,移动浏览器User-Agent大全

常用PC,移动浏览器User-Agent大全,提供PC浏览器user-agent,Android手机浏览器user-agent大全 PC端User-Agent IE 9.0 IE 8.0 IE 7.0 IE 6.0 Firefox 4.0.1–MAC Firefox 4.0.1–Windows Opera 11.11–MAC Opera 11.11–Windows Chrome 17.0–MAC safari 5…