Flume
Flume 是一种分布式、可靠且可用的服务 高效收集、聚合和移动大量日志 数据。
它具有基于流媒体的简单灵活的架构 数据流。它坚固耐用,容错,可靠性可调 机制以及许多故障转移和恢复机制。
它 使用允许在线分析的简单可扩展数据模型 应用。
系统要求
- Java 运行时环境 - Java 1.8 或更高版本
- 内存 - 为源、通道或接收器使用的配置提供足够的内存
- 磁盘空间 - 为通道或接收器使用的配置提供足够的磁盘空间
- 目录权限 - 代理使用的目录的读/写权限
数据流模型
创建软件存放目录
mkdir -p /opt/soft
cd /opt/soft
下载安装包
wgt https://www.apache.org/dyn/closer.lua/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz
解压安装包并改名
tar -zxvf apache-flume-1.11.0-bin.tar.gz
mv apache-flume-1.11.0-bin.tar.gz flume
配置系统环境变量
vim /etc/profile
export FLUME_HOME=/opt/soft/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
printenv
创建配置文件目录
mkdir -p /opt/soft/flume-conf
cd /opt/soft/flume-conf
采集网络数据
编写配置文件
vim netcat.conf
# netcat.conf: 这是一个单节点flume配置
# 定义这个 agent 各个组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 组件 r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# 描述和配置 sink 组件 k1
# logger 控制台打印
a1.sinks.k1.type = logger
# 描述和配置 channel 组件 c1 缓存事件在内存中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 agent
flume-ng agent --name a1 --conf conf -conf-file netcat.conf
flume-ng agent -n a1 -c conf -f netcat.conf
flume-ng agent --conf conf --conf-file /opt/soft/flume-conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f /opt/soft/flume-conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console
在线安装网络工具 netcat
yum -y install nc
netcat 发送 socket 数据
nc spark03 44444
file-to-hdfs
创建HDFS目录
hdfs dfs -mkdir -p /region
hdfs dfs -ls /
编写配置文件
vim file2hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/region
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://lihaozhe/region
a1.sinks.k1.hdfs.filePrefix = region-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/cache/data/region/data
a1.channels.c1.checkpointDir = /opt/cache/data/region/checkpoint
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
mkdir -p /opt/cache/data/region/data
mkdir -p /opt/cache/data/region/checkpoint
启动 agent
flume-ng agent -n a1 -c conf -f file2hdfs.conf
exec-to-hdfs
运行web应用实时生成log日志
编写配置文件
vim exec2hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/info.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://lihaozhe/logs/%Y-%m-%d/%H%M/
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = info-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 agent
flume-ng agent -n a1 -c conf -f exec2hdfs.conf
exec-to-avro-to-hdfs
flume-avro-01 spark01
flume-avro-02 spark02
flume-hdfs spark03
flume-avro-01.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/info.log
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = spark03
a1.sinks.k1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-avro-02.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/info.log
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = spark03
a1.sinks.k1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
avro-to-hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://lihaozhe/logs/%Y-%m-%d/%H
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = info-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 agent
flume-ng agent -n a1 -c conf -f avro-to-hdfs.conf
flume-ng agent -n a1 -c conf -f flume-avro-01.conf
flume-ng agent -n a1 -c conf -f flume-avro-02.conf