本文详细介绍了Apache Flume的关键特性,包括选择器、拦截器、故障转移和负载均衡。选择器负责将数据分发到多个Channel,拦截器用于修改或丢弃Event。故障转移机制能够在Sink故障时自动切换,而负载均衡则在多个Sink间分配负载。文章还提供了自定义拦截器的示例,展示了Flume在复杂数据处理中的灵活性和稳定性。
选择器
当一个Source连接到多个Channel时,选择器决定了数据如何分发到这些Channel。Flume提供了复制选择器和多路复用选择器,允许我们根据需要选择数据分发策略。
一个Source对应多个channel的情况下,多个Channel中的数据是否相同,取决于我们使用了什么选择器,默认是复制选择器。也可以手动的使用多路选择器。
复制选择器
复制选择器会将数据复制到所有配置的Channel,适用于需要在多个地方处理相同数据的场景。
配置示例
编写flume脚本,需要一个source,两个channel,以及两个sink
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
# avro http syslogtcp
# avro avro-client
# http curl
# syslogtcp nc
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 7777
#执行选择器类型为复制选择器
a1.sources.r1.selector.type=replicating
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
启动这个flume脚本:
flume-ng agent -c ./ -f syslogtcp-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
向bigdata01 中的 7777 端口发送消息:
echo "hello world" | nc bigdata01 7777
如果nc命令无法识别,需要安装一下 yum install -y nc
查看里面的数据发现都一样,说明使用的是复制选择器。
多路复用选择器
多路复用选择器可以根据数据内容选择性地将数据发送到特定的Channel,适用于根据数据特性进行分流处理的场景。
就是每次发送消息的时候,可以指定发送消息走哪条channel,只有这条channel对应的sink才有数据,其他sink没数据。
配置示例
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4 #默认使用c4这个channel
说明一个小区别:
avro
syslogtcp
http可以指定一个hostname和端口号
不同的source,我们使用的发送数据的方式是不一样的:
avro-client
nc
curlcurl 是可以模拟发送get 或者 post 请求的。
比如: curl www.baidu.com
编写脚本:mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 8888
a1.sources.r1.selector.type=multiplexing
# header 跟 mapping 结合在一起,用于发送消息时,指定发送的方向
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
# 发送的消息找不到具体的channel,就走默认的c1
a1.sources.r1.selector.default = c1
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
启动该脚本:
flume-ng agent -c ./ -f mul.conf -n a1 -Dflume.root.logger=INFO,console
模拟http请求:
curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:8888
curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://bigdata01:8888
效果就是,当我发送一条指令的时候,走state=USER的路径,只生成一个文件,走另一条路才会生成另一个不同的文件。
自动容灾(故障转移)
Flume的故障转移机制允许在一个Sink组中的多个Sink之间自动切换,确保数据的持续处理,即使某个Sink出现故障。
多个sink组成一个组,这个组内的sink只有一台工作,假如这一台坏了,另一台自动的工作。
为了演示这个效果,我使用了三个Agent.模型如下:
在bigdata02和bigdata03上安装flume
在集群中可以使用脚本
xsync.sh /opt/installs/flume/
xsync.sh /etc/profile
xcall.sh source /etc/profile
也可以使用长拷贝命令,例如:
scp -r /opt/installs/flume1.9.0/ root@hadoop11:/opt/installs/
# 因为 /etc/hosts 文件中没有配置映射,所以使用ip代替了
scp -r /opt/installs/flume1.9.0/ root@192.168.52.12:/opt/installs/
scp -r /etc/profile root@hadoop11:/etc
scp -r /etc/profile root@192.168.52.12:/etc
两个虚拟机需要刷新配置文件
source /etc/profile
在bigdata01上,编写flume脚本:
failover.conf
#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088
#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000
启动该脚本:
flume-ng agent -c ../conf -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console
在bigdata02上,编写 failover2.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
启动flume脚本:
flume-ng agent -f ./failover2.conf -n a1 -Dflume.root.logger=INFO,console
在bigdata03上,编写 failover3.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
启动flume脚本:
flume-ng agent -f ./failover3.conf -n a1 -Dflume.root.logger=INFO,console
bigdata02和03启动无异常,再启动01上的脚本:
flume-ng agent -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console
在bigdata01上,发送消息:
echo "wei,wei,wei" | nc hadoop10 10086
发现bigdata02有反应,出现了消息,而bigdata03无反应。因为bigdata02权重大,需要工作
测试故障转移,将bigdata02停掉,再在bigdata01上发消息,就发现bigdata03收到消息了,故障转移了。
负载均衡
负载均衡机制允许Flume在多个Sink之间平衡数据负载,提高数据处理的效率和可靠性。
演示一下:
bigdata01中创建balance.conf
#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088
#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
bigdata02,bigdata03 不变,服务必须是启动的。
使用我们的nc命令发送请求,发现bigdata02和bigdata03随机的处理请求的数据。
flume轮训是每隔
一段时间
轮训,而不是每秒轮训一次。所以可能多条在同一时间间隔的events都被一个输出到一个sink端。
练习
需求
编写一个Flume配置,实现以下需求:
抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。
数据
{"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
{"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
{"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-02 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-03 12:02:00", "level": "WARN", "message": "Warn occurred!"}
代码
package com.bigdata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class ETLInterceptor implements Interceptor {
/**
* {"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
* {"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
* {"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
*
* 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。
*/
@Override
public void initialize() {
}
// 这个方法是核心方法,可以处理一条,就可以循环处理多条
@Override
public Event intercept(Event event) {
// byte[] --> String
String json = new String(event.getBody());
/*JSONObject jsonObject = JSON.parseObject(json);
String timestamp = jsonObject.getString("timestamp");
String level = jsonObject.getString("level");
String message = jsonObject.getString("message");*/
Log log = JSON.parseObject(json, Log.class);
System.out.println(log);
// 日期比较 after before --> Calendar 、Date、DateLocal
// String --> Date
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
try {
Date date = dateFormat.parse(log.getTimestamp());
Date date2 = dateFormat.parse("2023-03-31");
if((log.getLevel().equals("WARN") || log.getLevel().equals("ERROR")) && date.after(date2)){
System.out.println("符合条件的数据......");
return event;
}
} catch (ParseException e) {
throw new RuntimeException(e);
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();
// 任何一个list ,都不能边循环,边添加或者删除,否则报错!!!
// 如何过滤掉不符合条件的数据,先将其置为null,然后在循环中,将null的数据不添加到集合中,就过滤掉了
for (Event oldEvent : list) {
Event newEvent = intercept(oldEvent);
if(newEvent != null){
events.add(newEvent);
}
}
return events;
}
@Override
public void close() {
}
public static class EventBuilder implements Builder {
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
视频链接
15-flume中的选择器_哔哩哔哩_bilibili
16-flume的故障转移_哔哩哔哩_bilibili
17-flume中的负载均衡_哔哩哔哩_bilibili