【大数据之Flume】四、Flume进阶之复制和多路复用、负载均衡和故障转移、聚合案例

news2025/1/15 13:01:35

1 复制和多路复用

(1)需求:使用 Flume-1 监控文件变动(可以用Exec Source或Taildir Source),Flume-1 将变动内容传递给 Flume-2(用Avro Sink传),(用Avro Source接)Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

(2)分析:
在这里插入图片描述
步骤:
(1)在/opt/module/flume-1.9.0/job 目录下创建 group1 文件夹,在/opt/module/flume-1.9.0/目录下创建 data 文件夹,在该文件夹下创建flume文件夹。

(2)在 group1创建 flume-file-flume.conf:配置 1 个接收日志文件的source 和两个 channel、两个 sink,分别输送给 flume-flume- hdfs 和 flume-flume-dir。

vim  flume-file-flume.conf

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#将数据流复制给所有 channel 
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/logs/flume.log
a1.sources.r1.shell = /bin/bash -c

# Describethe sink
# sink 端的 avro 是一个数据发送者 
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

# sink 端的 avro 是一个数据发送者
a1.sinks.k2.type= avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe thechannel 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel =c2

(3)在group1下创建flume-flume-hdfs.conf:配置上级 Flume 输出的 Source,输出是到 HDFS 的Sink。

vim flume-flume-hdfs.conf

# Name the components on this agent 
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source 
# source 端的 avro 是一个数据接收服务 
a2.sources.r1.type= avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink 
a2.sinks.k1.type= hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹 
a2.sinks.k1.hdfs.round= true
#多少时间单位创建一个新的文件夹 
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位 
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩 
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval= 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event 数量无关 
a2.sinks.k1.hdfs.rollCount = 0

# Describe thechannel 
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)在group1下创建flume-flume-dir.conf:配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

vim flume-flume-dir.conf

# Name the components on this agent 
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type= file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/data/flume

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel 
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会在本地创建新的目录。

(5)先开启HDFS,再分别启动flume-flume-hdfs、flume-flume-dir、flume-file-flume。
  服务器要先开启,再开启客户端,

myhadoop.sh start

bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-flume-hdfs.conf
bin/flume-ng agent -n a3 -c conf/ -f job/group1/flume-flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flume.conf

(6)检查HDFS上的数据
在这里插入图片描述
在这里插入图片描述

2 负载均衡和故障转移

(1)故障转移需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。

(2)分析:
在这里插入图片描述
步骤:
(1)在/opt/module/flume-1.9.0/job 目录下创建 group2 文件夹,在该文件夹下创建flume-netcat-flume.conf、flume-flume-console1.conf、flume-flume-console2.conf。

(2)flume-netcat-flume.conf:配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给 flume-flume-console1.conf、flume-flume-console2.conf。

# Name the components on this agent 
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port =44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel =c1

(3)创建 flume-flume-console1.conf:配置上级 Flume 输出的 Source,输出是到本地控制台。

# Name the components on this agent 
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source 
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel 
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建 flume-flume-console2.conf:配置上级 Flume 输出的 Source,输出是到本地控制台。

# Name the components on this agent 
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source 
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel 
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)执行配置文件,分别开启对应配置文件: flume-flume-console2.conf , flume-flume-console1.conf , flume-netcat-flume.conf。
  同样是需要先开启服务端,再开客户端。

bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

(6)使用 netcat 工具向本机的 44444 端口发送内容

nc localhost 44444

(7)查看Flume2 及 Flume3 的控制台打印日志。
Flume3优先级更高。
在这里插入图片描述
(8)将 Flume3 kill,观察 Flume2 的控制台打印情况。
在这里插入图片描述
在这里插入图片描述
负载均衡需求:
  使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现负载均衡的功能。

步骤:
  只需要修改flume-netcat-flume.conf中a1.sinkgroups.g1.processor的配置内容,把原来相关的内容都删除掉,添加以下,其余相同。

a1.sinkgroups.g1.processor.type = load_balance;
#使用退避算法轮询sink组
a1.sinkgroups.g1.processor.backoff = true;

3 聚合

(1)需求:hadoop102 上的Flume-1 监控文件/opt/module/flume-1.9.0/group.log,hadoop103 上的Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

(2)分析:
在这里插入图片描述
步骤:
(1)在opt/module/flume-1.9.0/job 目录下创建一个group3 文件夹;分发整个Flume给hadoop103、hadoop104。

(2)在 hadoop102 上创建配置文件flume1-logger-flume.conf:配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级Flume,并在/opt/module/flume-1.9.0/data下创建空白的文件group.log。

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/data/group.log
a1.sources.r1.shell= /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)在 hadoop103 上创建配置文件flume2-netcat-flume.conf:配置 Source 监控端口 44444 数据流,配置Sink 数据到下一级Flume。

# Name the components on this agent 
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444

# Describe the sink 
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory 
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 10

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)在 hadoop104 上创建配置文件flume3-flume-logger.conf:配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)在各主机上分别执行配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

[lyx@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[lyx@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[lyx@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

(6)在 hadoop102 上向/opt/module/flume-1.9.0/data 目录下的 group.log 追加内容

[lyx@hadoop102 data]$ echo 'hello' > group.log

(7)在 hadoop103 上向 44444 端口发送数据

nc hadoop103 44444

(8)查看hadoop104上的数据。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/822632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ES6基础知识九:你是怎么理解ES6中Module的?使用场景?

一、介绍 模块,(Module),是能够单独命名并独立地完成一定功能的程序语句的集合(即程序代码和数据结构的集合体)。 两个基本的特征:外部特征和内部特征 外部特征是指模块跟外部环境联系的接口…

iMacros WebBrowser Component for .NET

iMacros WebBrowser Component for .NET 在几分钟内实现应用程序自动化 快速轻松地将iMacro集成到您的应用程序中。不需要单独的安装程序。 无缝集成 iMacros与您的.NET应用程序无缝集成,作为Microsoft WebBrowser控件的替代品。它甚至可以用作每个.NET应用程序中的…

C++报错 XX does not name a type;field `XX’ has incomplete type解决方案

C报错 XX does not name a type;field XX’ has incomplete type解决方案 两个C编译错误及解决办法–does not name a type和field XX’ has incomplete type 编译错误一:XX does not name a type 编译错误二:field XX’ has incomplete t…

【云原生】Serverless 技术架构分析

一、什么是Serverless? 1、Serverless技术简介 ​ Serverless(无服务器架构)指的是由开发者实现的服务端逻辑运行在无状态的计算容器中,它由事件触发, 完全被第三方管理,其业务层面的状态则被开发者使用的数据库和存…

【BASH】回顾与知识点梳理(五)

【BASH】回顾与知识点梳理 五 五. 数据流重导向5.1 什么是数据流重导向standard output 与 standard error output/dev/null 垃圾桶黑洞装置与特殊写法standard input &#xff1a; < 与 << 5.2 命令执行的判断依据&#xff1a; ; , &&, ||cmd ; cmd (不考虑指…

Android 从LibVLC-android到自编译ijkplayer播放H265 RTSP

概述 ijkplayer: Android/iOS video player based on FFmpeg n3.4, with MediaCodec, VideoToolbox support. 官方的描述就这么简单的一句话&#xff0c;但丝毫都不影响它的强大。 从LibVLC 到 ijkplayer 截止到2023.7.20 LibVLC-Android 最大的问题在与OOM&#xff0c;测试了…

多线程(JavaEE初阶系列6)

目录 前言&#xff1a; 1.什么是线程池 2.标准库中的线程池 3.实现线程池 结束语&#xff1a; 前言&#xff1a; 在上一节中小编带着大家了解了一下Java标准库中的定时器的使用方式并给大家实现了一下&#xff0c;那么这节中小编将分享一下多线程中的线程池。给大家讲解一…

威胁分析风险评估(TARA)影响和攻击可行性评估参考

在威胁分析风险评估&#xff08;TARA)过程中&#xff0c;风险等级由对资产安全属性侵害造成后果的影响等级和威胁的可能性两方面综合评估。 备注&#xff1a;以上内容的评估皆是建立在由信息安全问题引起并导致的前提下。 影响等级评估 影响等级说明&#xff0c;影响从安全&a…

k8s-服务发现service和ingress

回到目录 service用于集群内部应用的网络调用&#xff0c;处理东西流量 ingress用于集群外部用户访问内部服务&#xff0c;处理南北流量 一 kube-proxy三种代理模式 kubernetes集群中有三层网络&#xff0c;一类是真实存在的&#xff0c;例如Node Network、Pod Network,提供真…

css position: sticky;实现上下粘性布局,中间区域滚动

sticky主要解决的问题 1、使用absolute和fixed中间区域需要定义高度2、使用absolute和fixed底部需要写padding-bottom 避免列表被遮挡住一部分&#xff08;底部是浮窗的时候&#xff0c;需要动态的现实隐藏&#xff09; <!DOCTYPE html> <html lang"en"&…

从0-1实现简易Raft分布式共识算法

一、Raft前置简介 Raft目前是最著名的分布式共识性算法&#xff0c;被广泛的应用在各种分布式框架、组件中&#xff0c;如Redis、RocketMq、Kafka、Nacos&#xff08;CP&#xff09;等 根据Raft论文&#xff0c;可将Raft拆分为如下4个功能模块&#xff1a; 领导者选举日志同…

蓝桥云课ROS机器人旧版实验报告-04三维建模与仿真

项目名称 实验四 3D建模与仿真 成绩 内容&#xff1a;自定义机器人3D模型&#xff0c;创建一个URDF文件、xacro文件、ROS2[Kinetic/Melodic/Noetic]仿真 实验记录&#xff08;70分&#xff09; 从头开始构建使用 URDF 的可视化机器人模型&#xff1a; 先尝试两个案例&a…

合合信息上会在即:“排队”耗时近两年,能否交出IPO答卷?

撰稿|行星 来源|贝多财经 近日&#xff0c;上海合合信息科技股份有限公司&#xff08;下称“合合信息”&#xff09;在上海证券交易所科创板递交招股书&#xff08;上会稿&#xff09;。据贝多财经了解&#xff0c;合合信息于2021年9月27日递交招股书&#xff0c;将于2023年8…

今日头条面试真题及答案,软件测试工程师面试秘籍

试题1&#xff0e;在浏览器地址栏里输入一个网址&#xff0c;接下来会发生什么&#xff1f; 答案&#xff1a;发生的操作如下。 &#xff08;1&#xff09;浏览器查找该网址的IP地址。 &#xff08;2&#xff09;浏览器根据解析得到的IP地址向Web服务器发送一个HTTP请求。 &am…

CFI技术新探索,struct_san今日登场

一、背景 C/C开发的应用程序&#xff0c;长久以来存在内存破坏类的安全问题。当攻击者掌握了目标程序的漏洞后&#xff0c;就可以开发漏洞利用程序劫持目标程序的控制流。早期的漏洞利用是采用代码注入的方式&#xff0c;通过在缓冲区置入一段代码&#xff08;shellcode&#…

在 Tinkercad 中加快设计的 22 个技巧

在 Tinkercad 中加快设计的 22 个技巧 原文 Everyone knows that Tinkercad is the easiest way to get started in 3D design. Once you get the hang of it, you realize that it’s one of the fastest design tools available. With no software to launch or complex me…

Pytest学习教程_测试报告生成pytest-html(三)

前言 pytest-html 是一个用于生成漂亮的 HTML 测试报告的 pytest 插件。它可以方便地将 pytest 运行的测试结果转换为易于阅读和理解的 HTML 报告&#xff0c;提供了丰富的测试结果展示功能和交互性。 一、安装 # 版本查看命令 pytest版本&#xff1a; pytest --version pyte…

PHP代码审计--理论

提供资料&#xff1a; php 基础 : https://www.runoob.com/php/php-tutorial.html php是什么&#xff1f; PHP 是服务器端脚本语言。 首先在学习PHP前需要对HTML 和CSS有一定的认识 PHP 能做什么&#xff1f; PHP 可以生成动态页面内容PHP 可以创建、打开、读取、写入、关…

InnoDB引擎底层逻辑讲解——架构之磁盘架构

1. System Tablespaces区域 系统表空间是change buffer&#xff08;更改缓冲区&#xff09;的存放区域&#xff0c;这是在8.0之后重新规划的&#xff0c;在5.x版本的时候&#xff0c;系统表空间还会存放innodb的数据字典undolog日志等信息&#xff0c;在8.0之后主要主要存放更…

【程序猿周末如何才能获得充分的休息】

工作以后常常容易感到疲于奔命&#xff0c;即使在周末也没有得到高质量的休息。打工人/学生党如何过周末&#xff1f;你有哪些延长周末和下班时间的好方法吗&#xff1f;你可以选择从以下几个方向谈谈你的想法和观点。 一&#xff1a;周末的时间规划 周末双休 二&#xff1a;提…