Flume(分布式数据采集系统)学习
1.Flume架构
什么是flume?
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
支持在日志系统中定制各类数据发送方,用于收集数据;
同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由**事件(Event)**贯穿始终。
事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Event的概念:
flume的核心是把数据从数据源(source)收集过来,在将收集到的数据由目的地(sink)所拉取。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。简单理解:event信息就是flume收集到的数据
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。
它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。
通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示
agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent三大组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
- Source
Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource
如果内置的Source无法满足需要, Flume还支持自定义Source。
- Channel
Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘(或支持jdbc的数据库中)上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel。
- Sink
Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。
flume运行机制
Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,
也就是说,多个agent可以协同工作。
Flume可靠性
Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复
flume的广义用法(多个agent顺序连接)
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的
Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
下一个flume source 连接上一个flume sink
2.Flume的安装
flume的官网:(https://flume.apache.org/)
可以在Documentation->Flume User Guide中学习sink,channels,source等知识
-- 下载tar包 上传虚拟机并解压
tar -zxvf apache-flume-1.11.0-bin.tar.gz
-- 改名
mv apache-flume-1.11.0-bin flume-1.11.0
-- 修改用户
chown -R root:root flume-1.11.0/
-- 配置环境变量
vim /etc/profile
FLUME_HOME=/usr/local/soft/flume-1.11.0
export PATH=$FLUME_HOME/bin:$PATH
source /etc/profile -- 生效
-- 创建存储配置文件的文件夹
mkdir flume_confs
3.Flume使用案例
使用思想:使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。
3.1从控制台打入数据,在控制台显示
#1.确定scource类型,channel类型和sink类型
#确定的使用类型分别是,netcat source, memory channel(内存), logger sink(日志).
#2.编写conf文件
#a1代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件 a1.sources=r1,r2...(多个的写法)
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# 在官网上拿来相关source类型的example
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.220.100
a1.sources.r1.port = 12345
# 同理 channel capacity 容量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# sink
a1.sinks.k1.type = logger
# 组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#3.开启服务,我们重新复制一个客户端进行开启服务
# 命令:注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console
#4、在另一个客户端输入命令:
#注意:这里的master和12345是在conf文件中设置好的ip地址和端口
yum install -y telnet
telnet master 12345
# 在配置文件的文件夹中会生成一个日志文件 监控它
tail -F flume.log
#在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。
3.2从本地指定路径中打入数据到HDFS
监控文件夹中的数据到hdfs上
# 1.确定scource类型,channel类型和sink类型
#spooldir source, memory channel, hdfs sink
#2.编写conf文件
#声明各个组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata1
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata30/flumeout1/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#3.启动hadoop 在指定目录下创建flumedata1
mkdir flumedata1
#4. 开启服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./sqooldir2hdfs.conf
-Dflume.root.logger=DEBUG,console
#5.将文件复制到指定的目录下
cp students.txt ./flumedata1/
#报错 java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
#解决:hadoop lib下的guava包与Flume lib下的版本不一样 删除Flume lib 下的guava包
#6.在hdfs上可以看到文件
延伸:手动打数据到hive表
# 建一个hive表
create external table flume_tb1
(
id bigint,
name string,
age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/flumeout2/log_s'; // 必选,指定列分隔符
#声明各个组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata2
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata30/flumeout2/log_s
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 执行与hdfs相同的步骤后发现可以在flume_tb1表中查到数据
3.3从java代码中进行捕获打入到HDFS
添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.11.0</version>
</dependency>
设置log4J.properties的内容
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.220.100
# 1000 -65535
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%m%n
java代码
import org.apache.log4j.Logger;
import java.text.SimpleDateFormat;
import java.util.Date;
public class CreateLogger {
public static void main(String[] args) throws Exception{
//创建一个logger对象
Logger logger = Logger.getLogger(CreateLogger.class.getName());
//创建一个日期格式化对象
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//写一个死循环
while (true) {
Date date = new Date();
logger.info("现在是: " + sdf.format(date));
//让线程休眠一会儿
Thread.sleep(1000);
}
}
}
编写conf文件
#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1
#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.220.100
a.sources.r1.port = 41414
#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 10
#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/bigdata30/flumeout3/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 10
a.sinks.k1.hdfs.batchSize = 10
a.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0
#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
开启服务 监控flume.log
flume-ng agent -n a -c /usr/local/soft/flume-1.11.0/conf -f ./logger2hdfs.conf
-Dflume.root.logger=DEBUG,console
hdfs:
3.4监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)
在hbase中建表
create 'log','cf1'
编写conf文件 exec2hbase.conf
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性 exec 监控到命令的结果
a.sources.r1.type = exec
a.sources.r1.command = tail -F /usr/local/soft/bigdata30/test1.txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
#指定sink的类型
#a.sinks.k1.type = hbase
#a.sinks.k1.table = log
#a.sinks.k1.columnFamily = cf1
a.sinks.k1.type = hbase2
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动hbase
开启服务
flume-ng agent -n a -c /usr/local/soft/flume-1.11.0/conf -f ./exec2hbase.conf -Dflume.root.logger=DEBUG,console
追加写入test.txt
echo "woshidashuaige" >>test1.txt
查看hbase
2、监控自定义的文件
确保test_idoall_org表在hbase中已经存在:
hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds
=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds
2.创建配置文件:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/flumedata/data.txt
# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.启动flume agent:
flume-ng agent -n a1 -c ../../flume/conf -f ./file2hbase.conf -Dflume.root.logger=DEBUG, console
4.产生数据:
echo "hello idoall.org from flume" >> data.txt
3.5flume监控Http source
# 确定scource类型,channel类型和sink类型 http source, memory channel, logger sink.
#编写conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
#启动服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./http2logger.conf -Dflume.root.logger=DEBUG,console
#模拟一个http请求
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]' http://192.168.220.100:50000
3.6多路复制
1、将flume复制到node1,node2
scp -r flume-1.11.0 node1:`pwd`
scp -r flume-1.11.0 node2:`pwd`
2、将master的环境变量复制给node1 node2
scp /etc/profile node1 :/etc/profile
scp /etc/profile node2 :/etc/profile
3.编写conf文件
#在node1中编写vim avro2logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#在node2中编写vim avro2logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 在master上编写netcat2avro.conf
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2
a2.sources.r1.type = netcat
a2.sources.r1.bind = master
a2.sources.r1.port = 44444
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 4141
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = node2
a2.sinks.k2.port = 4141
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
先启动node1和node2节点的logger服务端:
#分别在node1和node2中启动
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console
启动master节点中的服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./netcat2avro.conf -Dflume.root.logger=DEBUG,console
用telnet发送数据
telnet master 44444
#hello
3.7故障转移
类似hadoop的HA
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障
1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作
2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink
3)遇到故障时,我们要立即修复
master:
vim guzhang.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555
#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666
#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# k2的权限高 数据会先打到k2上也就是node2上
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
node1
a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 5555
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3
node2
a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 6666
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100
a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4
先启动node1,node2上的
flume-ng agent -n a3 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console
flume-ng agent -n a4 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console
再启动master的
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console
master输入数据
telnet master 4444
数据会打到node2
将node2手动关闭,再输入数据,这时候数据打到node1
再将node2启动起来,再输入数据,这时候,node2继续接收
3.8负载均衡
类似故障转移 只是sink组的类型不一样
通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)
使用负载均衡以后,channel会轮训分配任务,减少机器负荷
master上的配置文件:(随机的打到sink上)
#vim load_balance.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
#启动服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./load_balance.conf -Dflume.root.logger=DEBUG,console
#node1 node2 和故障转移中一样 直接启动
#发送数据
telnet master 4444
#数据被随机的发送到 node1或node2上
3.9聚合
在node1和node2上创建配置文件
mkdir -p /usr/local/soft/bigdata30/scrips/taillogs
cd /usr/local/soft/bigdata30/scrips/taillogs
touch /usr/local/soft/bigdata30/scrips/taillogs/access.log
touch /usr/local/soft/bigdata30/scrips/taillogs/nginx.log
touch /usr/local/soft/bigdata30/scrips/taillogs/web.log
# vim juhe.conf node1 node2 各一份
# Name the components on this agent
a1.sources = r1 r2 r3
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/access.log
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 区分数据
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
在master上创建配置文件
#vim juhe.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = master
a1.sources.r1.port =41414
# 添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# 定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# 定义sink %{type} 将event head对应的值取出来
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9000/bigdata30/flumelogs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
# 生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
# 组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
顺序启动服务 master启动flume实现数据收集
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console
node1与node2启动flume实现数据监控
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console
采集端文件生成脚本
在node1与node2上面开发shell脚本,模拟数据生成 createdata.sh
# !/bin/bash
while true
do
date >> /usr/local/soft/bigdata30/scrips/taillogs/access.log;
date >> /usr/local/soft/bigdata30/scrips/taillogs/web.log;
date >> /usr/local/soft/bigdata30/scrips/taillogs/nginx.log;
sleep 0.5;
done
运行脚本
sh createdata.sh
hdfs上有聚合后的文件
3.10自定义拦截器
ChannelSelector:
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating
- Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
- 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用
SinkProcessor:
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。
自定义Interceptor
使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ShuJiaInterceptor implements Interceptor {
@Override
public void initialize() {
//可以初始化一些外部的链接对象 mysql redis ck ...
}
@Override
public Event intercept(Event event) {
//判断监控到的数据内容中是否有shujia这样的单词
//如果有,就在这个event的header中添加一个键值对 type:sj
//如果没有,就在这个event的header中添加一个键值对 type:nsj
//获取监控到的数据内容
String info = new String(event.getBody(), 0, event.getBody().length);
//获取event的header数据
Map<String, String> headers = event.getHeaders();
if(info.contains("shujia")){
headers.put("type","sj");
}else {
headers.put("type","nsj");
}
event.setHeaders(headers);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
//释放在initialize创建的链接对象
}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new ShuJiaInterceptor();
}
@Override
public void configure(Context context) {
//...
}
}
}
代码打包上传
将jar包放在master的flume的lib目录下。简单暴力,但是不方便管理
编写配置文件
#node2
#vim interceptor.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.type = logger
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
#node1
#vim interceptor.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
a2.sinks.k1.type =logger
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#master
#vim interceptor.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444
#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = type
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.sj = c1
a1.sources.r1.selector.mapping.nsj = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = interceptor.ShuJiaInterceptor$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
开始服务
先启动node1和node2上的
flume-ng agent -n a2 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console
flume-ng agent -n a3 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console
再启动master上的服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console
打入数据
telnet master 4444
# 带有shujia的会被打入node1
#没有带有shujia的数据会被打入到node2中