【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

news2025/1/22 21:01:52

文章目录

  • 1. 组件简介
  • 2. 项目实践
    • 2.1 负载均衡
      • 2.1.1 需求
      • 2.1.2 配置
      • 2.1.3 运行
    • 2.2 故障转移
      • 2.2.1 需求
      • 2.2.2 配置
      • 2.2.3 运行

1. 组件简介

       Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor。

  • Default Sink Processor是默认的,不用配置Sink group,就是咱们现在使用的这种最普通的形式,一个Channel后面接一个Sink的形式;
  • Load balancing Sink Processor是负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力;
  • Failover Sink Processor是故障转移处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,按照Sink的优先级,默认先让优先级高的Sink来处理数据,如果这个Sink出现了故障,则用优先级低一点的Sink处理数据,可以保证数据不丢失。

2. 项目实践

2.1 负载均衡

使用Load balancing Sink Processor,即负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力。其参数为:

  • processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可;
  • processor.type:针对负载均衡的sink处理器,这里需要指定load_balance;
  • processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。
  • processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长,一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率;
  • processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒。

2.1.1 需求

采集指定端口的数据,并实现两个sink通道的负载均衡,采用轮询方式发送数据,为了展现实验效果,使用avro sink,每到一个event就写一次数据(默认是积攒接收一百个再写一次数据)。

2.1.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 apache-flume-1.9.0-bin]# cat conf/load-balancing.conf 
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type=avro 
a1.sinks.k1.hostname=192.168.152.101 
a1.sinks.k1.port=41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type=avro 
a1.sinks.k2.hostname=192.168.152.102 
a1.sinks.k2.port=41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = load_balance 
a1.sinkgroups.g1.processor.backoff = true 
a1.sinkgroups.g1.processor.selector = round_robin 

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 apache-flume-1.9.0-bin]# cat conf/load-balancing-101.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 apache-flume-1.9.0-bin]# cat conf/load-balancing-102.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.1.3 运行

先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console
[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console
apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

向指定端口发送数据,模拟输入:

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hehe
OK
haha
OK

查看HDFS中的保存的运行结果:

[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -ls -R / 
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 00:47 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data101.1687366028115.log.tmp
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data102.1687366024769.log.tmp
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data101.1687366028115.log.tmp 
haha
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data102.1687366024769.log.tmp
hehe

2.2 故障转移

使用Failover Sink Processor,即故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。其参数为:

  • processor.type:针对故障转移的sink处理器,使用failover;
  • processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被优先级比较高的sink取走;
  • processor.maxpenalty:sink发生故障之后,最大等待时间。

2.2.1 需求

实现两个sink的故障转移。

2.2.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 conf]# cat failover.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = 192.168.152.101 
a1.sinks.k1.port = 41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = 192.168.152.102 
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = failover 
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 conf]# cat failover-101.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 conf]# cat failover-102.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.2.3 运行

  1. 先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-101.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-102.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover.conf -Dflume.root.logger=INFO,console
  1. 向指定端口发送数据,模拟输入两个数据test1test2
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
  1. 查看HDFS中的保存的运行结果:

因为bigdata03的优先级高,可以看到两个数据都是由其写入。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:51 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:51 /failover/data102.1687398676525.log.tmp
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data102.1687398676525.log.tmp
test1
test2
  1. 关闭bigdata03,再输入测试数据test3
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
test3
OK
  1. 查看HDFS中的保存的运行结果:

关闭bigdata03后,数据就由优先度较低的bigdata02写入,保证数据不丢失,达到故障转移的目的,此时若再次开启bigdata03,则数据就又会由优限度更高的bigdata03传输。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:54 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:54 /failover/data101.1687398846336.log.tmp
-rw-r--r--   2 root supergroup         14 2023-06-22 09:53 /failover/data102.1687398676525.log
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data101.1687398846336.log.tmp
test3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/673488.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AIGC连续内容生成几种方案

背景 从AI可以画图到现在各家都在功课的连续性内容的输出,正在AI画图进入到大众圈其实也不过1年左右时间。对于单图的研究已经逐渐完善,单图理论上讲,只要你能按要求做promt的设计出图率应该是比较高的。但是对于要生成连续的图或者要生成连…

推荐 5 个 火火火 的 GitHub 项目

推荐 5 个开源项目,前两个是 AI 相关,后面 3 个为逛逛GitHub 的读者推荐,如果你挖掘到了很棒的开源项目,可以给老逛投稿。 本期推荐开源项目目录: 1. SuperAGI(AI) 2. 一键换脸(AI&a…

macOS Monterey 12.6.7 (21G651) Boot ISO 原版可引导镜像

macOS Monterey 12.6.7 (21G651) Boot ISO 原版可引导镜像 本站下载的 macOS 软件包,既可以拖拽到 Applications(应用程序)下直接安装,也可以制作启动 U 盘安装,或者在虚拟机中启动安装。另外也支持在 Windows 和 Lin…

计算机实习自我鉴定范文5篇

精选计算机实习鉴定 (一) 时间过的真快,在这过去一个3个月时间里,我在**科技有限公司实习从事运维工作。 在公司实习的这段时间,我主要和其他的实习生一起负责公司刚开发的**系统的部署、更新以及维护。 这三月的时间,在同事和项目…

【并发知识点】CAS的实现原理及应用

系列文章目录 AQS的实现原理及应用 CAS的实现原理及应用 文章目录 系列文章目录前言1、CAS的概念2、CAS的实现原理3、单JVM内锁CAS实现3.1、效果 4、模拟赛龙舟比赛 前言 本章节介绍CAS概念、实现原理,并通过java代码应用,最终模拟赛龙舟比赛。 1、CA…

设计模式之适配器模式笔记

设计模式之适配器模式笔记 说明Adapter(适配器)目录类适配器模式示例类图适配者类的接口适配者类目标接口具体的SD卡类计算机类适配器类测试类 对象适配器模式适配者类的接口适配者类目标接口具体的SD卡类计算机类适配器类测试类 说明 记录下学习设计模式-适配器模式的写法。J…

力扣高频SQL50题(基础版)——第十天

力扣高频SQL50题(基础版)——第十天 1 只出现过一次的最大数字 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出1 1.1.3 示例输入输出2 1.2 示例sql语句 # 查不到时的结果自然就为Null SELECT MAX(t.num) num FROM (SELECT numFROM MyNumbersGROUP By numHAVING count…

窥探系列之Mybatis-plus BaseMapper实现

我们知道,mybatisplus的BaseMapper接口中提供了一些如updateById的方法,框架本身已经实现了这些CRUD功能,基本的CRUD我们就没必要写sql,直接使用java语法就能对数据进行操控,很方便。那么这些功能是如何被实现的呢&…

【并发知识点】AQS的实现原理及应用

系列文章目录 AQS的实现原理及应用 CAS的实现原理及应用 文章目录 系列文章目录前言一、AQS是什么?1、应用场景2、优缺点 二、案例应用1.使用AQS来实现一个简单的互斥锁2.模拟赛龙舟程序 总结 前言 在Java技术方面,AQS指的是AbstractQueuedSynchronize…

2023最新高薪岗位大爆料,大模型算法工程师!凭什么人均月薪50K

大模型算法工程师工资收入一般多少钱一个月? 最多人拿50K以上占 53.7%,2023年较2022年增长了10%。 按学历统计,本科工资¥41.9K。 按经验,1-3年工资¥40.0K。 一起来看华为招聘的大模型工程师的工资水准 岗位…

[补充]机器学习实战|第二周|第2章:监督学习|课后习题

目录 第二章 监督学习 2. 使用不同的超参数,如kernel"linear"和kernel“rbf”,尝试一个支持向量机回归器。并思考最好的SVR预测器是如何工作的? [代码]3. 为MNIST数据集构建一个分类器,并在测试集上达成超过97%的精度…

关于Java中单例模式(饿汉模式和懒汉模式)的简析

目录 一.什么是单例模式 二.饿汉模式和懒汉模式 饿汉模式 代码 懒汉模式 代码 关于多线程安全的问题 如何解决懒汉模式多线程安全问题 双if判断 一.什么是单例模式 简单来说,就是我们在程序中通过代码进行限制,在该程序中 只能创建一个对象 二.饿汉模式和懒汉模式 …

【2023,学点儿新Java-17】变量与运算符:Java中的关键字及类型划分(附: 官网) | 保留字 | 字面量 | 附:Java部分关键字介绍

前情回顾: 【2023,学点儿新Java-16】编程语言的学习方法总结 | 编程的本质和架构 | 如何深度理解编程知识和技能 | 如何成为优秀的软件开发工程师 | 附:Java初学者的困惑!【2023,学点儿新Java-15】案例分享&#xff1…

机器视觉初步7:模板匹配专题

今天端午,祝各位端午安康! 今天来说说模板匹配这个专题。 模板匹配(Template Matching)是一种图像处理技术,用于在一幅图像上查找与另一幅模板图像相同的区域。模板图像和待匹配图像的大小相同。模板匹配的目的是在待…

【MongoDB大作业】MongoDB服务器的部署

【MongoDB大作业】MongoDB服务器的部署 作业要求作业步骤一、在VMware Workstations安装Linux操作系统(最小安装即可)二、安装完成后登录系统三、将ip地址设置为固定ip地址192.168.80.134四、设置虚拟网络编辑器五、使用 CRT 工具远程连接虚拟机六、下载…

《项目实战》构建SpringCloud alibaba项目(一、构建父工程、公共库、网关))

系列文章目录 构建SpringCloud alibaba项目(一、构建父工程、公共库、网关) 构建SpringCloud alibaba项目(二、构建微服务鉴权子工程store-authority-service) 文章目录 系列文章目录1、概要2、整体架构流程2.1、技术结构组成部分…

非监督学习

聚类Clustering 查看大量数据点,自动找到彼此相关或相似的数据点 K-means算法 原理 1.随机选择点,找聚类的中心位置。将点分配给簇质心 2.移动簇质心 不断重复这两个步骤 优化目标 成本函数失真函数distortion 在每次迭代中,失真成本…

极致呈现系列之:Echarts旭日图的绚丽奇观

目录 什么是旭日图旭日图的特性及应用场景旭日图的特性应用场景 旭日图常用的配置项创建基本的旭日图自定义旭日图样式样式旭日图的高级应用 什么是旭日图 旭日图是一种可视化图表,用于展示层级结构和层级之间的关系。它以一个圆形为基础,由多层的环形图…

【从零开始学习JAVA | 第七篇】API 简介

目录 前言 API介绍: 总结: 前言 这篇章为前导性文章,主要向大家介绍了什么是API,不要求掌握,感兴趣的小伙伴们可以看一看。 API介绍: API(Application Programming Interface)是指…

webpack原理之开发第一个loader

一. 搭建项目结构 整体项目结构如图: 1. 初始化包管理器package.json npm init -y 2. 打包入口文件src/main.js 3. 单页面入口public/index.html 4. 配置webpack.config.js const path require(path) const HtmlWebpackPlugin require("html-webpack-plu…