Flume的常用组件包括Event和Agent。Agent又包含了Source、Channel以及Sink,本片文章将从官方说明文档入手,详细描述各组件以及组件的属性和功能。
文章目录
- 1 常用的Source类型描述
- 1.1 Netcat Source
- 1.2 Exec Source
- 1.3 Spooling Directory Source
- 1.4 Taildir Source
- 2 常用的Channel类型描述
- 2.1 Memory Channel
- 2.2 File Channel
- 3常用的Sink类型描述
- 3.1 HDFS Sink
- 3.2 File Roll Sink
- 3.3 Avro Sink
1 常用的Source类型描述
Source顾名思义,使用此组件可以将外部数据读入存储到Event(Event作为Flume传送数据的基本单位),并且将数据写入到Channel。
1.1 Netcat Source
官方文档:
- 类型简介
- 它打开一个指定的端口并监听数据。期望提供的数据是换行符分隔的文本,每一行文本都转换为一个Flume事件,并通过连接的通道发送。
- 属性介绍
- channels:绑定Channel (相当于缓冲区),数据从Source送入到Channel,从Channel再送往Sink。------》Source是主动推送,Sink是主动拉取数据
- type:Source类型
- bind:主机名称
- port:获取主机的那个端口信息
使用样例:
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
1.2 Exec Source
官方文档
-
简介
- Exec源程序在启动时运行给定的Unix命令,并期望该进程在标准输出时持续生成数据。如果进程因任何原因退出,源也将退出,并且不再产生其他数据。
-
属性说明:
- command:需要执行的命令
- shell:用于运行该命令的shell调用。例如/bin/sh -c。只需要依赖于shell特性的命令,如通配符、反勾号、管道等。
使用样例:
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
1.3 Spooling Directory Source
官方文档:
- 类型描述
- 这个源允许您通过将要被摄取的文件放入磁盘上的“假脱机”目录来摄取数据。该源将监视指定目录中的新文件,并在新文件出现时解析事件。
- 属性
- spoolDir:监听的文件目录
- fileSuffix:后缀附加到完全摄入的文件
- fileHeader:是否添加存储绝对路径filename的头文件。
使用样例
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
1.4 Taildir Source
官方描述:
-
类型简介
- 监视指定的文件,一旦检测到每个文件追加了新行,就几乎实时地跟踪它们。如果正在写入新行,此源将重试读取它们,等待写入完成。
- 该源是可靠的,即使在跟踪文件旋转时也不会丢失数据。它定期将每个文件的最后一次读取位置写入JSON格式的给定位置文件。如果Flume因某种原因停止或关闭,它可以从写入现有位置文件的位置开始重新启动。-----------支持实时监听和断点续传,实现可靠传输。
-
新加属性描述
- filegroups:以空格分隔的文件组列表,每个文件组表示要跟踪的一组文件。
- filegroups.< filegroupName >:文件组的绝对路径。
使用样例:
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*
2 常用的Channel类型描述
2.1 Memory Channel
官方文档:
- Channel类型简介
- 事件存储在具有可配置最大大小的内存队列中,可以实现高速的数据吞吐,Flume出现故障时,数据会丢失。
- 属性
- capacity:Channel队列的最大容量
- transactionCapacity:从Source到Channel的最大传输量-----》一定要小于capacity
使用样例
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
2.2 File Channel
官方文档
- 类型描述
- 此类缓冲区会将数据存储在磁盘中,可以持久化所有的Event,Flume出现故障时数据不会丢失。
- 属性
- checkpointDir:文件保存路径
使用样例
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
3常用的Sink类型描述
3.1 HDFS Sink
官方文档
- Sink描述:
- 该接收器将事件写入Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。
- 属性:
- hdfs.path:保存的路径
使用样例
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
3.2 File Roll Sink
官方文档
-
Sink描述
- 将数据存储到本地文件系统,多用作数据收集
-
属性
- sink.directory:保存到本地文件的目录
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
3.3 Avro Sink
官方文档
- Sink描述:
- 发送到该接收器的Flume事件被转换为Avro事件并发送到配置的主机名/端口对。事件以配置的批处理大小的批量从配置的Channel中获取。-------》可以作为传输多个Agent的中间人
使用样例:
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141