Flume学习

news2024/12/25 9:08:52

Flume(分布式数据采集系统)学习

1.Flume架构

什么是flume?

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。

支持在日志系统中定制各类数据发送方,用于收集数据;

同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由**事件(Event)**贯穿始终。

事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

Event的概念:

flume的核心是把数据从数据源(source)收集过来,在将收集到的数据由目的地(sink)所拉取。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

简单理解:event信息就是flume收集到的数据

image-20220615225417865

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。

它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。

通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示

image-20220615225229222

agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。

agent三大组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。

source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

  • Source

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource
如果内置的Source无法满足需要, Flume还支持自定义Source

  • Channel

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘(或支持jdbc的数据库中)上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel。

  • Sink

Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。

flume运行机制

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。

值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。

比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,

也就是说,多个agent可以协同工作。

Flume可靠性

Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复

flume的广义用法(多个agent顺序连接)

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的
Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

下一个flume source 连接上一个flume sink

image-20220615230439884

2.Flume的安装

flume的官网:(https://flume.apache.org/)

可以在Documentation->Flume User Guide中学习sink,channels,source等知识

-- 下载tar包  上传虚拟机并解压
tar -zxvf apache-flume-1.11.0-bin.tar.gz
-- 改名
 mv apache-flume-1.11.0-bin flume-1.11.0
-- 修改用户
chown -R root:root flume-1.11.0/
-- 配置环境变量
vim /etc/profile

FLUME_HOME=/usr/local/soft/flume-1.11.0
export PATH=$FLUME_HOME/bin:$PATH

source /etc/profile -- 生效
-- 创建存储配置文件的文件夹
mkdir flume_confs

3.Flume使用案例

使用思想:使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。

3.1从控制台打入数据,在控制台显示

#1.确定scource类型,channel类型和sink类型
#确定的使用类型分别是,netcat source, memory channel(内存), logger sink(日志).

#2.编写conf文件
#a1代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件   a1.sources=r1,r2...(多个的写法)
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# 在官网上拿来相关source类型的example
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.220.100
a1.sources.r1.port = 12345
# 同理 channel  capacity 容量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
#  sink
a1.sinks.k1.type = logger

# 组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#3.开启服务,我们重新复制一个客户端进行开启服务
# 命令:注意  -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console

#4、在另一个客户端输入命令:
#注意:这里的master和12345是在conf文件中设置好的ip地址和端口
yum install -y telnet
telnet master 12345
# 在配置文件的文件夹中会生成一个日志文件 监控它
tail -F flume.log 
#在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。

image.png

3.2从本地指定路径中打入数据到HDFS

监控文件夹中的数据到hdfs上

# 1.确定scource类型,channel类型和sink类型
#spooldir source, memory channel, hdfs sink

#2.编写conf文件
#声明各个组件 
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata1
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000

#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata30/flumeout1/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#3.启动hadoop 在指定目录下创建flumedata1
mkdir flumedata1
#4. 开启服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./sqooldir2hdfs.conf
 -Dflume.root.logger=DEBUG,console
#5.将文件复制到指定的目录下
cp students.txt ./flumedata1/
#报错 java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
#解决:hadoop lib下的guava包与Flume lib下的版本不一样 删除Flume lib 下的guava包 

#6.在hdfs上可以看到文件

image.png

延伸:手动打数据到hive表

# 建一个hive表
create external table flume_tb1
(
    id bigint,
    name string,
    age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/flumeout2/log_s'; // 必选,指定列分隔符 
#声明各个组件 
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata2
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000

#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata30/flumeout2/log_s 
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 执行与hdfs相同的步骤后发现可以在flume_tb1表中查到数据

3.3从java代码中进行捕获打入到HDFS

添加依赖

<dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.11.0</version>
            </dependency>

设置log4J.properties的内容

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.220.100
# 1000 -65535
log4j.appender.flume.Port = 41414      
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout 
log4j.appender.flume.layout.ConversionPattern=%m%n

java代码

import org.apache.log4j.Logger;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CreateLogger {
    public static void main(String[] args) throws Exception{
        //创建一个logger对象
        Logger logger = Logger.getLogger(CreateLogger.class.getName());

        //创建一个日期格式化对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        //写一个死循环
        while (true) {
            Date date = new Date();
            logger.info("现在是: " + sdf.format(date));
            //让线程休眠一会儿
            Thread.sleep(1000);
        }
    }
}

编写conf文件

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1

#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.220.100
a.sources.r1.port = 41414

#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 10

#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/bigdata30/flumeout3/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 10
a.sinks.k1.hdfs.batchSize = 10
a.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0 

#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

开启服务 监控flume.log

flume-ng agent -n a -c /usr/local/soft/flume-1.11.0/conf -f ./logger2hdfs.conf
 -Dflume.root.logger=DEBUG,console
image.png

hdfs:

image.png

3.4监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)

在hbase中建表

create 'log','cf1'

编写conf文件 exec2hbase.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1


#指定spooldir的属性  exec 监控到命令的结果
a.sources.r1.type = exec 
a.sources.r1.command = tail -F /usr/local/soft/bigdata30/test1.txt 

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100

#指定sink的类型
#a.sinks.k1.type = hbase
#a.sinks.k1.table = log
#a.sinks.k1.columnFamily = cf1

a.sinks.k1.type = hbase2
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

启动hbase

开启服务

flume-ng agent -n a -c /usr/local/soft/flume-1.11.0/conf -f ./exec2hbase.conf -Dflume.root.logger=DEBUG,console

追加写入test.txt

echo "woshidashuaige" >>test1.txt

查看hbase

image.png

2、监控自定义的文件

确保test_idoall_org表在hbase中已经存在:

hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds

=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds

2.创建配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/flumedata/data.txt

# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动flume agent:

flume-ng agent -n a1 -c ../../flume/conf -f ./file2hbase.conf -Dflume.root.logger=DEBUG, console

4.产生数据:

echo "hello idoall.org from flume" >> data.txt

3.5flume监控Http source

# 确定scource类型,channel类型和sink类型 http source, memory channel, logger sink.


#编写conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
 
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100

#启动服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./http2logger.conf -Dflume.root.logger=DEBUG,console
#模拟一个http请求
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]'  http://192.168.220.100:50000

image.png

3.6多路复制

flume多路复制案例

1、将flume复制到node1,node2

scp -r flume-1.11.0 node1:`pwd`
scp -r flume-1.11.0 node2:`pwd`

2、将master的环境变量复制给node1 node2

scp /etc/profile node1 :/etc/profile
scp /etc/profile node2 :/etc/profile

3.编写conf文件

#在node1中编写vim avro2logger.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sinks.k1.type = logger


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#在node2中编写vim avro2logger.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 4141

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sinks.k1.type = logger


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 在master上编写netcat2avro.conf
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2


a2.sources.r1.type = netcat
a2.sources.r1.bind = master
a2.sources.r1.port = 44444

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 4141

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = node2
a2.sinks.k2.port = 4141



a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

先启动node1和node2节点的logger服务端:

#分别在node1和node2中启动
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console

启动master节点中的服务

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./netcat2avro.conf -Dflume.root.logger=DEBUG,console

用telnet发送数据

telnet master 44444

#hello

image.png

3.7故障转移

故障转移

类似hadoop的HA

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障

1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作

2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink

3)遇到故障时,我们要立即修复

master:

vim guzhang.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666

#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = failover
# k2的权限高  数据会先打到k2上也就是node2上
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

node1

a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 5555

a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3

node2

a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 6666

a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4

先启动node1,node2上的

flume-ng agent -n a3 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console
flume-ng agent -n a4 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console

再启动master的

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./guzhang.conf -Dflume.root.logger=DEBUG,console

master输入数据

telnet master 4444

image-20220616224619686

数据会打到node2

image-20220616224629191

将node2手动关闭,再输入数据,这时候数据打到node1

image-20220616224725045

image-20220616224733386

再将node2启动起来,再输入数据,这时候,node2继续接收

image-20220616224848627

image-20220616224859467

3.8负载均衡

类似故障转移 只是sink组的类型不一样

通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)

使用负载均衡以后,channel会轮训分配任务,减少机器负荷

master上的配置文件:(随机的打到sink上)

#vim load_balance.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#启动服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./load_balance.conf -Dflume.root.logger=DEBUG,console

#node1 node2 和故障转移中一样 直接启动

#发送数据
telnet master 4444

#数据被随机的发送到 node1或node2上

3.9聚合

聚合flume详细流程

在node1和node2上创建配置文件

mkdir -p /usr/local/soft/bigdata30/scrips/taillogs
cd /usr/local/soft/bigdata30/scrips/taillogs
touch /usr/local/soft/bigdata30/scrips/taillogs/access.log
touch /usr/local/soft/bigdata30/scrips/taillogs/nginx.log
touch /usr/local/soft/bigdata30/scrips/taillogs/web.log
# vim juhe.conf       node1 node2 各一份


# Name the components on this agent 
a1.sources = r1 r2 r3 
a1.channels = c1 
a1.sinks = k1 

# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/access.log 
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对  区分数据 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = type 
a1.sources.r1.interceptors.i1.value = access 

a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/nginx.log 
a1.sources.r2.interceptors = i2 
a1.sources.r2.interceptors.i2.type = static 
a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx 

a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static 
a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web 

# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = master 
a1.sinks.k1.port = 41414 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1 
a1.sources.r3.channels = c1 
a1.sinks.k1.channel = c1

在master上创建配置文件

#vim juhe.conf

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# 定义source 
a1.sources.r1.type = avro 
a1.sources.r1.bind = master 
a1.sources.r1.port =41414 
# 添加时间拦截器 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = timestamp

# 定义channels 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# 定义sink    %{type} 将event head对应的值取出来
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://master:9000/bigdata30/flumelogs/%{type}/%Y%m%d 
a1.sinks.k1.hdfs.filePrefix = events 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
# 时间类型 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件不按条数生成 
a1.sinks.k1.hdfs.rollCount = 0 
# 生成的文件按时间生成 
a1.sinks.k1.hdfs.rollInterval = 30 
# 生成的文件按大小生成 
a1.sinks.k1.hdfs.rollSize = 10485760 
# 批量写入hdfs的个数 
a1.sinks.k1.hdfs.batchSize = 10000 
# flume操作hdfs的线程数(包括新建,写入等) 
a1.sinks.k1.hdfs.threadsPoolSize=10 
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000 


# 组装source、channel、sink 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

顺序启动服务 master启动flume实现数据收集

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console

node1与node2启动flume实现数据监控

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./juhe.conf -Dflume.root.logger=DEBUG,console

采集端文件生成脚本
在node1与node2上面开发shell脚本,模拟数据生成 createdata.sh

# !/bin/bash 

while true 
	do
	date >> /usr/local/soft/bigdata30/scrips/taillogs/access.log; 
	date >> /usr/local/soft/bigdata30/scrips/taillogs/web.log; 
	date >> /usr/local/soft/bigdata30/scrips/taillogs/nginx.log; 
	sleep 0.5; 
done

运行脚本

sh createdata.sh

hdfs上有聚合后的文件

image.png

3.10自定义拦截器

ChannelSelector:

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating

  1. Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
  2. 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用

SinkProcessor:

SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

自定义Interceptor

使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ShuJiaInterceptor implements Interceptor {
    @Override
    public void initialize() {
        //可以初始化一些外部的链接对象 mysql redis ck ...
    }

    @Override
    public Event intercept(Event event) {
        //判断监控到的数据内容中是否有shujia这样的单词
        //如果有,就在这个event的header中添加一个键值对 type:sj
        //如果没有,就在这个event的header中添加一个键值对 type:nsj
        //获取监控到的数据内容
        String info = new String(event.getBody(), 0, event.getBody().length);

        //获取event的header数据
        Map<String, String> headers = event.getHeaders();
        if(info.contains("shujia")){
            headers.put("type","sj");
        }else {
            headers.put("type","nsj");
        }

        event.setHeaders(headers);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> events = new ArrayList<>();
        for (Event event : list) {
            events.add(intercept(event));
        }
        return events;
    }

    @Override
    public void close() {
      //释放在initialize创建的链接对象
    }

    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
            return new ShuJiaInterceptor();
        }

        @Override
        public void configure(Context context) {
            //...
        }
    }
}

代码打包上传

将jar包放在master的flume的lib目录下。简单暴力,但是不方便管理

编写配置文件

#node2
#vim interceptor.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1 

a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.type = logger

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 


#node1
#vim interceptor.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type =logger

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

#master
#vim interceptor.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = type
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.sj = c1
a1.sources.r1.selector.mapping.nsj = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = interceptor.ShuJiaInterceptor$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2 



a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666


a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

开始服务

先启动node1和node2上的

flume-ng agent -n a2 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console

flume-ng agent -n a3 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console

再启动master上的服务

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./interceptor.conf -Dflume.root.logger=DEBUG,console

打入数据

telnet master 4444
# 带有shujia的会被打入node1
#没有带有shujia的数据会被打入到node2中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1870440.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为昇腾310B1芯片DVPP模块VENC视频编码接口调用流程以及视频编码代码梳理

目录 1 接口调用流程 2 代码流程梳理 1 接口调用流程 在CANN 8.0.RC1 AscendCL应用软件开发指南 (C&C, 推理) 01.pdf 文档中有接口调用流程 2 代码流程梳理 代码在samples: CANN Samples - Gitee.com 然后我把这个代码完整的看了一遍&#xff0c;然后梳理了详细的代码…

web学习笔记(七十二)

目录 1.vue2通过$parent实现组件传值——父传子 2.vue2 通过$children实现组件传值——子传父 3. provide和inject传值&#xff08;依赖注入&#xff09; 4.vue2如何操作dom 5.vue2如何拿到最新的dom 6.filters过滤器 7.vue2的生命周期 8.vuex的用法 1.vue2通过$parent…

【SCI索引,Fellow主讲】2024年可持续发展与能源资源国际学术会议(SDER 2024,8月9-11)

2024年可持续发展与能源资源国际学术会议&#xff08;SDER 2024&#xff09;将在2024年8月9-11日于中国重庆召开。 大会旨在为从事可持续发展与能源资源方面的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xff0c;拓…

2.4G特技翻斗车方案定制

遥控翻斗车不仅能够提供基本的前进、后退、左转和右转功能&#xff0c;还设计有多种特技动作和互动模式&#xff0c;以增加娱乐性和互动性。 1、无线遥控&#xff1a;玩具翻斗车一般通过2.4G无线遥控器进行控制&#xff0c;允许操作者在一定距离内远程操控车辆。 2、炫彩灯光…

Java程序员接单的十条“野路子”,分分钟收入20K!

Java程序员除了主业工作外&#xff0c;也要适当扩展兼职接单这条路。毕竟Java接单可以说是Java程序员进行技术变现的最佳方式之一。 因为Java程序员兼职接单的难度相对更低&#xff0c;单量也比较可观&#xff0c;最重要的是性价比也很顶&#xff0c;且听我一一道来&#xff1a…

Nature推荐的三种ChatGPT论文写作指令(含PDF下载)

1. 润色学术论文 ChatGPT学术润色指令&#xff1a; “I’m writing a paper on [topic]for a leading [discipline] academic journal. WhatItried to say in the following section is [specific point]. Please rephrase itfor clarity, coherence and conciseness, ensuri…

Charles抓包工具系列文章(五)-- DNS spoofing (DNS域名伪装)

一、背景 DNS域名是依赖DNS域名服务器&#xff0c;特别是内部域名&#xff0c;最后寻址到后端服务地址。 当我们无法修改客户端的域名&#xff0c;而想让其指向到我们期望地址时&#xff0c;可以采用charles的DNS spoofing。 何谓DNS 欺骗&#xff1a;将自己的主机名指定给远…

电商平台数据功能封装API需要注意些什么?如何调用封装后的API?

一、引言 随着电商行业的蓬勃发展&#xff0c;电商平台的数据功能愈发复杂多样&#xff0c;如何高效、安全地管理和使用这些数据成为了电商平台开发者面临的重要问题。API&#xff08;Application Programming Interface&#xff09;作为不同软件之间进行通信的桥梁&#xff0…

Win32消息机制原理及消息运转

一.消息机制原理 1.消息类型&#xff1a; WIndows定义的一系列WM_XXX开头的&#xff0c;用来表示键盘按键&#xff0c;鼠标点击&#xff0c;窗口变化&#xff0c;用户自定义等各种消息; 2.消息队列&#xff1a; Windows为每一个正在运行的程序维护一个消息队列应用程序的消…

Pycharm 文件标头设置

一、设置模板步骤&#xff1a; “文件File--设置Settings--编辑器Editor--File and Code Templates- Python Script” 里面设置模板 官方预设变量表 变量名 含义 ${DATE} 当前系统日期 ${DAY} 当前月的第几日 ${DAY_NAME_SHORT} 当前星期几的单词缩写&#xff08…

计算机网络之数据通信原理(下)

上一讲内容&#xff1a;数据传输方式、数据传输形式、传输差错处理、常用差错检测方法 数据通信过程中&#xff0c;一个很重要的问题就是如何控制数据的传输&#xff0c;就涉及到了传输控制规程&#xff08;协议&#xff09; 下面介绍两种&#xff1a; ①BSC&#xff1a;面向…

java基于ssm+jsp 弹幕视频网站

1前台首页功能模块 弹幕视频网站&#xff0c;在弹幕视频网站可以查看首页、视频信息、商品信息、论坛信息、我的、跳转到后台、购物车、客服等内容&#xff0c;如图1所示。 图1前台首页界面图 登录&#xff0c;通过登录填写账号、密码等信息进行登录操作&#xff0c;如图2所示…

高性能并行计算课程论文:并行网络爬虫的设计与实现

目录 1.绪论 1.1 研究背景 1.2 研究意义 ​​​​​​​1.3 文章结构 2. 网络爬虫相关理论 ​​​​​​​2.1 URL地址格式 ​​​​​​​2.2 网页爬取策略 2.2.1 深度优先策略 2.2.2 广度优先策略 2.2.3 最佳优先策略 ​​​​​​​2.3 网页分析算法 ​​​​​​​2.3.1 正…

three.js - matcap材质(MeshMatcapMaterial)

说一下matcap纹理 先总结&#xff1a;MeshMatcapMaterial材质&#xff0c;通过采样含有光照信息的贴图来模拟光照效果。这种材质特别适用于模拟静态光源下的光照&#xff0c;并且&#xff0c;因其简单性和快速性而被广泛应用于各种场景。但是&#xff0c;由于其性能考虑&#x…

Zynq7000系列FPGA中的DMA控制器——PL外设请求接口

图9-4中展示了PL外设请求接口主要由两部分组成&#xff1a;PL外设请求总线和DMAC确认总线。这两部分分别使用特定的前缀进行标识&#xff0c;具体如下&#xff1a; PL外设请求总线&#xff08;PL Peripheral Request Bus&#xff09;&#xff1a; 前缀&#xff1a;DR功能&…

13个行业数据分析指标体系如何建设100问

提供针对13个行业的数据分析指标体系的全面指南&#xff0c;涵盖各行业的关键指标和分析维度&#xff0c;帮助读者深入了解和构建有效的指标体系。以下是文章的主要内容&#xff1a; 电商行业数据指标体系&#xff1a;包括客户价值、商品、网站流量、整体运营、市场营销活动、市…

Vue2 - 首页登录实现随机验证码组件的封装与实现详解(详细的注释及常见问题汇总)

在网站首页等登录时,随机验证码在现代网络应用中扮演着重要的安全角色。为了帮助开发者轻松集成和使用随机验证码功能,本文将介绍如何利用 Vue.js 2 封装一个简单而功能强大的随机验证码组件。让你能够快速理解并应用这一组件到你的项目中。 一、解决方案 本文提供了完美便捷…

事件sigma代数(两分钟学会~)

在概率论的时候&#xff0c;还没开始进入正题&#xff0c;上来一个事件sigma代数&#xff0c;人就直接懵逼了&#xff0c;这啥东西啊&#xff0c;那今天咱就结合一个例子来解释一下事件sigma代数。 首先我们看一下定义&#xff1a; 这是南开大学杨振明第二版书里面的一个定义&a…

纯血鸿蒙Beta版本发布,中国华为,站起来了!

2024年6月21日至23日&#xff0c;华为开发者大会2024&#xff08;HDC 2024&#xff09;于东莞盛大举行。 此次大会不仅在会场设置了包括鸿蒙原生应用、统一生态统一互联等在内的11个展区&#xff0c;以供展示HarmonyOS NEXT的强大实力&#xff0c;还对外宣布了HarmonyOS的最新进…

如何在浏览器中查看网页的HTML源代码?

如何在浏览器中查看网页的HTML源代码&#xff1f; 浏览html网页&#xff0c;查看其源代码&#xff0c;可以帮助我们了解该版网页的信息以及架构&#xff0c;每个浏览器都是允许用户查看他们访问的任何网页的HTML源代码的。以下编程狮小师妹就介绍几个常见浏览器的查看网页 HTM…