Flume(二)【Flume 进阶使用】

news2025/1/9 2:02:32

前言

        学数仓的时候发现 flume 落了一点,赶紧补齐。

1、Flume 事务

Source 在往 Channel 发送数据之前会开启一个 Put 事务:

  1. doPut:将批量数据写入临时缓冲区 putList(当 source 中的数据达到 batchsize 或者 超过特定的时间就会发送数据)
  2. doCommit:检查 channel 内存队列是否足够合并
  3. doRollback:如果 channel 内存队列空间不足没救回滚数据

同样 Sink 在从 Channel 主动拉取数据的时候也会开启一个 Take 事务:

  1. doTake:将数据读取到临时缓冲区 takeList,并将数据发送到 HDFS
  2. doCommit:如果数据全部发送成功,就会清除临时缓冲区 taskList
  3. dooRollback:数据发送过程如果出现异常,rollback 将临时缓冲区的数据归还给 channel 内存队列

2、Flume Agent 内部原理

  1. source 接收数据,把数据封装成 Event 
  2. 传给 channel processor 也就是 channel 处理器
  3. 把事件传给拦截器(interceptor),在拦截器这里可以对数据进行一些处理(我们在上一节中说过,当我们的路径信息中包含时间的时候,需要从 Event Header 中读取时间信息,如果没有就需要我们指定从本地读取 timestamp,所以这里我们就可以在拦截器这里给我们的 event 添加头部信息);而且,拦截器可以设置多个
  4. 经过拦截器处理的事件又返回给了 channel processor ,然后 channel processor 把事件传给 channel 选择器(channel selector 有两种类型:Replicating 和 Multiplexing ,Replicating 会把source 发送来的 events 发往所有 channel,而 multiplexing 可以配置指定发往哪些 channel)
  5. 经过 channel 选择器处理后的事件仍然返回给 channel processor
  6. channel processor 会根据 channel 选择器的结果,发送给相应的 channel(也就是这个时候才会真正的开启 put 事务,之前都是对 event 进行简单的处理)
  7. SinkProcessor 负责协调拉取 channel 中的数据,它有三种类型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(负载均衡,也就是多个 Sink 轮询的方式去读取 channel 中的数据)、FailoverSinkProcessor(故障转移,每个 sink 有自己的优先级,优先级高的去读取 channel 中的事件,只有当它挂掉的时候,才会轮到下一个优先级的 sink 去读)。其中 DefaultSinkProcessor 一个 channel 只能绑定一个 Sink,所以它也就没有 sink 组的概念。

注意:一个 sink 只可以绑定一个 channel ,但是一个 channel 可以绑定多个 sink!

3、Flume 拓扑结构

3.1、简单串联

官网这段话翻译过来就是:为了将数据跨越多个代理或跃点进行传输,前一个代理的接收器(sink)和当前跃点的源(source)需要是avro类型,接收器指向源的主机名(或IP地址)和端口。

这种模式的缺点很好理解,就像串联电路,一个节点坏了会影响整个系统。

3.2、复制和多路复用

从官网翻译过来就是:上述示例显示了一个名为“foo”的代理源将流程分散到三个不同的通道。这种分散可以是复制或多路复用。在复制流程的情况下,每个事件都会发送到这三个通道。对于多路复用的情况,当事件的属性与预配置的值匹配时,事件将被发送到可用通道的子集。例如,如果事件属性名为“txnType”设置为“customer”,则应发送到channel1和channel3,如果为“vendor”,则应发送到channel2,否则发送到channel3。映射可以在代理的配置文件中设置。

这种模式相比上面的串联模式的优点无非就是可以发送过多个目的地。

3.3、负载均衡和故障转移

Flume 支持多个 Sink 逻辑上分到一个 Sink 组,sink 组配合不同的 SinkProcessor ,可以实现负载均衡和错误恢复的功能。

3.4、聚合

这种模式在实际开发中是经常会用到的,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的 flume,再由此flume上传到hdfshivehbase等,进行日志分析。

4、Flume 企业开发实例

4.1、复制和多路复用

注意:多路复用必须配合拦截器使用,因为需要在 Event Header 中添加一些信息

1)案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

2)需求分析

  • 监控文件变动我们可以考虑使用 taildir 或者 exec 这两种 source
  • flume-1 sink 需要使用 avro sink 才能传输到下一个 flume-2 和 flume-3 的 source
  • flume-2 需要上传数据到 HDFS 所以 sink 为 hdfs
  • flume-3 需要把数据输出到本地,所以 sink 为 file_roll sink(要保存到本地目录,这个目录就必须提前创建好,它不像 HDFS Sink 会自动帮我们创建)

我们需要实现三个 flume 作业:

  1. flume-1 把监听到的新日志读取到 flume-2 和 flume-3 的 source
  2. flume-2 把日志上传到 hdfs
  3. flume-3 把日志写到本地

3)需求实现

flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有 channel 默认就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
# 一个 sink 只可以指定一个 channel,但是一个 channel 可以指定多个 sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

4)测试

bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf

查看结果:

注意:写入本地文件时,当一段时间没有新的日志时,它仍然会创建一个新的文件,而不像 hdfs sink 即使达到了设置的间隔时间但是没有新日志产生,那么它也不会创建一个新的文件。

这个需要注意的就是 hdfs 的端口不要写错,比如我的就不是 9870 而是 8020.

4.2、负载均衡和故障转移

1)案例需求

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor,实现故障转移的功能。

2)需求分析

  • 开启一个端口 88888 来发送数据
  • 使用 flume-1 监听该端口,并发送到 flume-2 和 flume-3 (需要 flume-1 的 sink 为 avro sink,flume-2 和 flume-3 的 source 为 avro source),flume-2 和 flume-3 发送日志到控制台(flume-2 和 flume-3 的 sink 为 logger sink)

3)需求实现

flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf 
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

 4)案例测试

bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf

关闭 flume-flume-console1.conf 作业 

 我们发现,一开始我们开启三个 flume 作业,当向 netcat 输入数据时,只有 flume-flume-console1.conf 作业的控制台有日志输出,这是因为它的优先级更高,当把作业 flume-flume-console1.conf 关闭时,再次向端口 44444 发送数据,发现 flume-flume-console2.conf 作业开始输出。

如果要使用负载均衡,只需要替换上面 flume-nc-flume.conf 中:

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

替换为:

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000

其中,backoff 代表退避,默认为 false, 如果当前 sink 没有拉到数据,那么接下来一段时间就不用这个 sink 。maxTimeOut 代表最大的退避时间,因为退避默认是指数增长的(比如一个 sink 第一次没有拉到数据,需要等 1 s,第二次还没拉到,等 2s,第三次等 4s ...),默认最大值为 30 s。

4.3、聚合

1)案例需求

  • hadoop102 上的 Flume-1 监控文件/opt/module/group.log,
  • hadoop103 上的 Flume-2 监控某一个端口的数据流,
  • Flume-1 与 Flume-2 将数据发 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

注意:主机只能在 hadoop104 上配,因为 avro source 在 hadoop104 上,客户端(hadoop02 和 hadoop103 的 sink)可以远程连接,但是服务端(hadoop104 的 source)只能绑定自己的端口号。

2)需求实现

flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)测试

向 group.log 文件中追加文本:

注意:hadoop103 这里不能写 nc localhost 44444 而要写 nc hadoop103 44444! 否则报错:Ncat: Connection refused.

5、自定义 Interceptor

前面我们的多路复用还没有实现,因为我们说多路复用必须配合拦截器来使用,因为我们必须知道每个 Channel 发往哪些 Sink,这需要拦截器往 Event Header 中写一些内容。

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”lyh”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”lyh”,将其分别发往不同的分析系统(Channel)。

 3)需求实现

自定义拦截器

引入 flume 依赖

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>
package com.lyh.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class TypeInterceptor implements Interceptor {

    // 存放事件集合
    private List<Event> events;


    @Override
    public void initialize() {
        // 初始化存放事件的集合
        events = new ArrayList<>();
    }

    // 单个事件拦截
    @Override
    public Event intercept(Event event) {

        // 1. 获取事件中的 header 信息
        Map<String, String> headers = event.getHeaders();

        // 2. 获取事件中的 body 信息
        String body = new String(event.getBody());

        // 3. 根据 body 中是否包含 'lyh' 来决定发往哪个 sink
        if (body.contains("lyh"))
            headers.put("type","first");
        else
            headers.put("type","second");

        return event;
    }

    // 批量事件拦截
    @Override
    public List<Event> intercept(List<Event> list) {

        // 1. 清空集合
        events.clear();

        // 2. 遍历 events
        for (Event event : events) {
            // 3. 给每个事件添加头信息
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包放到 flume 安装目录的 lib 目录下:
 

flume 作业配置

hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop103:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
hadoop104:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4)需求实现

#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console

#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console

#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444

hadoop102:

hadoop103:

hadoop104: 

可以看到,从 hadoop102 发送的日志中,包含 "lyh" 的都被发往 hadoop103 的 4141 端口,其它日志则被发往 hadoop104 的 4242端口。

6、自定义 Source

自定义 source 用的还是比较少的,毕竟 flume 已经提供了很多常用的了。

1)介绍

        Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
  • getBackOffSleepIncrement() //backoff 步长,当从数据源拉取数据时,拉取不到数据的话它不会一直再去拉取,而是等待,之后每一次再=如果还拉取不到,就会比上一次多等待步长单位个时间。
  • getMaxBackOffSleepInterval()  //backoff 最长时间,如果不设置最长等待时间,它最终会无限等待,所以需要指定。
  • configure(Context context)  //初始化 context(读取配置文件内容)
  • process()  //获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。

2)需求

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文
件中配置。

3)分析

4)需求实现

代码

package com.lyh.source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;
import java.util.Map;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    // 定义配置文件将来要读取的字段
    private Long delay;
    private String field;

    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 创建事件头信息
            Map<String,String> headerMap = new HashMap<>();
            // 创建事件
            SimpleEvent event = new SimpleEvent();
            // 循环封装事件
            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(headerMap);
                // 给事件设置内容
                event.setBody((field + i).getBytes());
                // 将事件写入 channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Status.READY;
    }

    // 步长
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    // 最大间隔时间
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    // 初始化配置信息
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field","Hello");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console

运行结果: 

7、自定义 Sink

1)介绍

        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
        Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
        Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
        官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:
  • configure(Context context)//初始化 context(读取配置文件内容)
  • process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

2)需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
流程分析:

 3)需求实现

package com.lyh.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable{

    private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {

        // 声明返回值状态信息
        Status status;

        // 获取当前 sink 绑定的 channel
        Channel channel = getChannel();

        // 获取事务
        Transaction txn = channel.getTransaction();

        // 声明事件
        Event event;

        // 开启事务
        txn.begin();

        // 读取 channel 中的事件、直到读取事件结束循环
        while (true){
            event = channel.take();
            if (event!=null) break;
        }

        try {
            // 打印事件
            LOG.info(prefix + new String(event.getBody()) + suffix);

            // 事务提交
            txn.commit();
            status = Status.READY;
        }catch (Exception e){
            // 遇到异常回滚事务
            txn.rollback();
            status = Status.BACKOFF;
        }finally {
            // 关闭事务
            txn.close();
        }

        return null;
    }

    // 初始化配置信息
    @Override
    public void configure(Context context) {
        // 带默认值
        prefix = context.getString("prefix","hello");
        // 不带默认值
        suffix = context.getString("suffix");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)测试

bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console

运行结果:

总结

        自此,flume 的学习基本也完了,这一篇虽然不多但也用了大概3天时间。相比较 kafka、flink,flume 这个框架还是非常简单的,比如我们自己实现一些 source、sink,都是很简单的,没有太多复杂的理解的东西。

        总之 flume 这个工具还是多看官网。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1451676.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qt-C++笔记之捕获鼠标滚轮事件并输出滚轮角度增量

qt-C笔记之捕获鼠标滚轮事件并输出滚轮角度增量 code review! 文章目录 qt-C笔记之捕获鼠标滚轮事件并输出滚轮角度增量1.运行2.main.cpp3.main.pro 1.运行 2.main.cpp #include <QApplication> #include <QWidget> #include <QWheelEvent> #include <…

.NET Core MongoDB数据仓储和工作单元模式封装

前言 上一章我们把系统所需要的MongoDB集合设计好了&#xff0c;这一章我们的主要任务是使用.NET Core应用程序连接MongoDB并且封装MongoDB数据仓储和工作单元模式&#xff0c;因为本章内容涵盖的有点多关于仓储和工作单元的使用就放到下一章节中讲解了。仓储模式&#xff08;R…

java的面向对象编程(oop)——认识枚举

前言 打好基础&#xff0c;daydayup! 枚举 1&#xff0c;认识枚举&#xff1a; 枚举是一种特殊类&#xff0c;用enum语句修饰。与普通类不同的是&#xff1a;枚举类的第一行只能写一些合法的标识符&#xff08;名称&#xff09;&#xff0c;多个名称用逗号隔开。这些标识符&a…

相机图像质量研究(16)常见问题总结:光学结构对成像的影响--IRCUT

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

原型模式-Prototype Pattern

原文地址:https://jaune162.blog/design-pattern/prototype-pattern/ 引言 在Java中如果我们想要拷贝一个对象应该怎么做?第一种方法是使用 getter和setter方法一个字段一个字段设置。或者使用 BeanUtils.copyProperties() 方法。这种方式不仅能实现相同类型之间对象的拷贝,…

第三百四十九回

文章目录 1. 概念介绍2. 原理与方法2.1 知识对比2.2 使用方法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"加密包crypto"相关的内容&#xff0c;本章回中将介绍characters包.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 在项目中会遇到获取字…

解决vscode报错,在赋值前使用了变量“XXX“

问题&#xff1a;如图所示 解决方法&#xff1a; 法一&#xff1a; 补全函数使其完整 法二&#xff1a; 使用断言

Python算法探索:从经典到现代

目录 引言 一、经典算法&#xff1a;快速排序 示例代码&#xff1a; 二、经典算法&#xff1a;二分查找 示例代码&#xff1a; 三、现代算法&#xff1a;支持向量机&#xff08;SVM&#xff09; 示例代码&#xff1a; 四、现代算法&#xff1a;神经网络 示例代码&…

从汇编分析C语言可变参数的原理,并实现一个简单的sprintf函数

C语言可变参数 使用printf等函数的时候函数原型是printf(const char* fmt, ...), 这一类参数的个数不限的函数是可变参数 使用 使用一个头文件stdarg.h, 主要使用以下的宏 typedef char * va_list;// 把 n 圆整到 sizeof(int) 的倍数 #define _INTSIZEOF(n) ( (sizeo…

STM32 USART串口通信

目录 USART串口 串口发送 串口发送接收 串口收发HEX数据包 串口收发文本数据包 USART串口 串口发送 Serial.c #include "stm32f10x.h" // Device header #include "stdio.h" #include "stdarg.h"/*** brief 初始化串口以…

leetcode:96.不同的二叉搜索树

解题思路&#xff1a; 输入n3 n 0 1个 n 1 1个 n 2 2个 头1头2头3 头1 左子树0节点&#xff08;个数&#xff09;x右子树2个节点&#xff08;个数&#xff09; 头2 左子树1节点&#xff08;个数&#xff09;x右子树1个节点&#xff08;个数&#xff09; 头3 左子…

Android 13.0 SystemUI下拉状态栏定制二 锁屏页面横竖屏解锁图标置顶显示功能实现

1.前言 在13.0的系统rom定制化开发中,在关于systemui的锁屏页面功能定制中,由于在平板横屏锁屏功能中,时钟显示的很大,并且是在左旁边居中显示的, 由于需要和竖屏显示一样,所以就需要用到小时钟显示,然后同样需要居中,所以就来分析下相关的源码,来实现具体的功能 如图…

租赁香港服务器多少钱一个月?24元

阿里云香港服务器2核1G、30M带宽、40GB ESSD系统盘优惠价格24元/月&#xff0c;288元一年&#xff0c;每月流量1024GB&#xff0c;多配置可选&#xff0c;官方优惠活动入口 https://t.aliyun.com/U/bLynLC 阿里云服务器网aliyunfuwuqi.com分享阿里云香港服务器优惠活动、详细配…

VMware Workstation 17.0 虚拟机的安装、配置、创建运行DOS、Windows、Linux和VMware ESX(详细图文搭建系统教程)

VMware Workstation 17.0 虚拟机的安装、配置、创建运行DOS、Windows、Linux和VMware ESX&#xff08;图文教程&#xff09; 一、VMware Workstation是什么&#xff1f;1.1 VMware Workstation 17.0简介1.2 VMware Workstation 17.0新特性1.3 VMware Workstation 17.0下载地址1…

内容安全审核系统的设计思路

今年负责的APP产品涉及到内容的审核&#xff0c;并且针对性的做了一套内容审核系统和账号安全体系。因此总结了一些经验。 内容审核基础逻辑&#xff1a; 内容类型&#xff1a;文本、图片、视频、音频 审核类型&#xff1a;涉黄、暴恐、涉政、广告、垃圾违禁、辱骂自定义&…

【Git】.gitignore 的匹配规则

每行一个规则&#xff1a;每行只能包含一个规则&#xff0c;多个规则需要分别写在不同的行上。 示例&#xff1a; # 忽略日志文件 logs/ # 忽略临时文件 temp.txt种类匹配&#xff1a; 文件&#xff1a;在规则的开头指定文件名或路径&#xff0c;如 file.txt。 示例&#xff1a…

leetcode hot100不同路径

本题可以采用动态规划来解决。还是按照五部曲来做 确定dp数组&#xff1a;dp[i][j]表示走到&#xff08;i&#xff0c;j&#xff09;有多少种路径 确定递推公式&#xff1a;我们这里&#xff0c;只有两个移动方向&#xff0c;比如说我移动到&#xff08;i&#xff0c;j&#x…

第7章 Page446~449 7.8.9智能指针 std::unique_ptr

“unique_ptr”是“独占式智能指针” 名字透露身份&#xff0c;“unique_ptr”是“独占式智能指针”。使用它管理前面的O类指针&#xff1a; 演示1&#xff1a; 例中 p 是一个智能指针。其中的“<O>”指明它所指向的数据类型是“O”。除了创建方法不太一样&#xff0c;…

Flutter 动画(显式动画、隐式动画、Hero动画、页面转场动画、交错动画)

前言 当前案例 Flutter SDK版本&#xff1a;3.13.2 显式动画 Tween({this.begin,this.end}) 两个构造参数&#xff0c;分别是 开始值 和 结束值&#xff0c;根据这两个值&#xff0c;提供了控制动画的方法&#xff0c;以下是常用的&#xff1b; controller.forward() : 向前…

什么是自编码器Auto-Encoder?

来源&#xff1a;https://www.bilibili.com/video/BV1Vx411j78H/?spm_id_from333.1007.0.0&vd_sourcef66cebc7ed6819c67fca9b4fa3785d39 为什么要压缩呢&#xff1f; 让神经网络直接从上千万个神经元中学习是一件很吃力的事情&#xff0c;因此通过压缩提取出原图片中最具代…