案例演示
案例演示:Avro+Memory+Logger
Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
编写采集方案
[root@qianfeng01 flume-1.9.0]# mkdir flumeconf
[root@qianfeng01 flume-1.9.0]# cd flumeconf
[root@qianfeng01 flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1
#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=qianfeng01
a1.sources.avro-sour1.port=9999
#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory
#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100
#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1
复制代码
启动Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
复制代码
测试数据
[root@qianfeng01 ~]# mkdir flumedata
[root@qianfeng01 ~]# cd flumedata/
[root@qianfeng01 flumedata]#
[root@qianfeng01 flumedata]# date >> test.data
[root@qianfeng01 flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
[root@qianfeng01 flumedata]# cat test.data
....省略....
[root@qianfeng01 flumedata]# flume-ng avro-client -c /usr/local/flume-1.9.0/conf/ -H qianfeng01 -p 9999 -F ./test.data
复制代码
实时采集(监听文件):Exec+Memory+HDFS
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容memory:传输数据的Channel为Memory
hdfs 是输出目标为Hdfs
配置方案
[root@qianfeng01 flumeconf]# vi exec-hdfs.conf
#定义各个组件的名字
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources=r1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/flumedata/test.data
a1.channels=c1
a1.channels.c1.type=memory
#通道中可以保存的最大事件数量
a1.channels.c1.capacity=1000
#通道从一个source可以获取的最大事件数量或者每个事务中给一个sink的最大事件数量
a1.channels.c1.transactionCapacity=100
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://qianfeng01:8020/flume/tailout/%y-%m-%d/%H%M/
#设置文件的前缀
a1.sinks.k1.hdfs.filePrefix=events-
#时间戳是否四舍五入
a1.sinks.k1.hdfs.round=true
#时间戳舍入的最高位数
a1.sinks.k1.hdfs.roundValue=10
#时间戳舍入的单位
a1.sinks.k1.hdfs.roundUnit=second
#设置滚动的条件(关闭当前文件,开启新文件)---3秒钟滚动一次
a1.sinks.k1.hdfs.rollInterval=3
#设置滚动的条件---20字节
a1.sinks.k1.hdfs.rollSize=20
#设置滚动的条件---5个事件
a1.sinks.k1.hdfs.rollCount=5
#刷新进hdfs的事件数量
a1.sinks.k1.hdfs.batchSize=100
#是否使用本地时间戳(自定义拦截器中)---true是使用本地的
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
复制代码
启动Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
复制代码
报错解决:
报错:
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
原因:com.google.common.base.Preconditions.checkArgument 这是因为flume-1.9.0内依赖的guava-11.02.jar和hadoop内的(guava-27.0-jre.jar)版本不一致造成的。
检验方法:
查看hadoop安装目录下share/hadoop/common/lib内guava.jar版本
查看Flume安装目录下lib内guava.jar的版本
如果两者不一致,删除版本低的,并拷贝高版本过去
复制代码
测试数据
[root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
复制代码
实时采集(监听文件) Exec+Memory+Logger
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 ,logger为日志格式输出
配置方案
[root@qianfeng01 flumeconf]# vi exec-logger.conf
a2.sources = r1
a2.channels = c1
a2.sinks = s1
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /root/flumedata/log.01
a2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=100
a2.channels.c1.keep-alive=3
#通道中的事件总容量(byteCapacity)和预估总事件容量的百分比
a2.channels.c1.byteCapacityBufferPercentage=20
a2.channels.c1.byteCapacity=800000
a2.sinks.s1.type=logger
a2.sinks.s1.maxBytesToLog=16
a2.sources.r1.channels=c1
a2.sinks.s1.channel=c1
复制代码
启动agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./exec-logger.conf -n a2 -Dflume.root.logger=INFO,console
复制代码
测试:
[root@qianfeng01 ~]# echo "nice" >> /root/flumedata/log.01