2、Flume进阶

news2025/1/19 3:15:33

目录

1、Flume事务

1.1 Flume事务

1.2 Flume Agent内部原理

1.3 重要组件:

2、 Flume拓扑结构

2.1 简单串联

2.2 复制和多路复用

2.3 负载均衡和故障转移

2.4 聚合

3、开发案例

3.1 复制和多路复用

3.4.2 负载均衡和故障转移

3.3 聚合



1、Flume事务

1.1 Flume事务

1.2 Flume Agent内部原理

1.3 重要组件:

ChannelSelector

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

SinkProcessor

SinkProcessor共有三种类型,分别是DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

2、 Flume拓扑结构

2.1 简单串联

图 Flume Agent连接

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

2.2 复制和多路复用

图 单source,多channel、sink

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

2.3 负载均衡和故障转移

图 Flume负载均衡或故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

2.4 聚合

图 Flume Agent聚合

这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。

3、开发案例

3.1 复制和多路复用

1)案例需求

使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

2)需求分析:

3)实现步骤:

(1)准备工作

在/opt/module/flume/job目录下创建group1文件夹

[atguigu@hadoop102 job]$ cd group1/

在/opt/module/datas/目录下创建flume3文件夹

[atguigu@hadoop102 datas]$ mkdir flume3

(2)创建flume-file-flume.conf

配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。

编辑配置文件

[atguigu@hadoop102 group1]$ vim flume-file-flume.conf

添加如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(3)创建flume-flume-hdfs.conf

配置上级Flume输出的Source,输出是到HDFS的Sink。

编辑配置文件

[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf

添加如下内容

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H

#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-

#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0


# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建flume-flume-dir.conf

配置上级Flume输出的Source,输出是到本地目录的Sink。

编辑配置文件

[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf

添加如下内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

(5)执行配置文件

分别启动对应的flume进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

(6)启动Hadoop和Hive

[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh

[atguigu@hadoop102 hive]$ bin/hive
hive (default)>

(7)检查HDFS上数据

(8)检查/opt/module/datas/flume3目录中数据

[atguigu@hadoop102 flume3]$ ll
总用量 8
-rw-rw-r--. 1 atguigu atguigu 5942 5月  22 00:09 1526918887550-3

3.4.2 负载均衡和故障转移

1)案例需求

使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能。

2)需求分析:故障转移案例

3)实现步骤

(1)准备工作

在/opt/module/flume/job目录下创建group2文件夹

[atguigu@hadoop102 job]$ cd group2/

(2)创建flume-netcat-flume.conf

配置1个netcat source和1个channel、1个sink group(2个sink),分别输送给flume-flume-console1和flume-flume-console2。

编辑配置文件

[atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf

添加如下内容

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(3)创建flume-flume-console1.conf

配置上级Flume输出的Source,输出是到本地控制台。

编辑配置文件

[atguigu@hadoop102 group2]$ vim flume-flume-console1.conf

添加如下内容

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建flume-flume-console2.conf

配置上级Flume输出的Source,输出是到本地控制台。

编辑配置文件

[atguigu@hadoop102 group2]$ vim flume-flume-console2.conf

添加如下内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)执行配置文件

分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

(6)使用netcat工具向本机的44444端口发送内容

$ nc localhost 44444

(7)查看Flume2及Flume3的控制台打印日志

(8)将Flume2 kill,观察Flume3的控制台打印情况。注:使用jps -ml查看Flume进程。

3.3 聚合

1)案例需求:

hadoop102上的Flume-1监控文件/opt/module/group.log,

hadoop103上的Flume-2监控某一个端口的数据流,

Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。

2)需求分析

3)实现步骤:

(1)准备工作

分发Flume

[atguigu@hadoop102 module]$ xsync flume

在hadoop102、hadoop103以及hadoop104的/opt/module/flume/job目录下创建一个group3文件夹。

[atguigu@hadoop102 job]$ mkdir group3
[atguigu@hadoop103 job]$ mkdir group3
[atguigu@hadoop104 job]$ mkdir group3

(2)创建flume1-logger-flume.conf

配置Source用于监控hive.log文件,配置Sink输出数据到下一级Flume。

在hadoop102上编辑配置文件

[atguigu@hadoop102 group3]$ vim flume1-logger-flume.conf

添加如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)创建flume2-netcat-flume.conf

配置Source监控端口44444数据流,配置Sink数据到下一级Flume:

在hadoop103上编辑配置文件

[atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf

添加如下内容

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建flume3-flume-logger.conf

配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到控制台。

hadoop104上编辑配置文件

[atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf

[atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf

添加如下内容

(5)执行配置文件

分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf

[atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf

(6)在hadoop103上向/opt/module目录下的group.log追加内容

[atguigu@hadoop103 module]$ echo 'hello' > group.log

(7)在hadoop102上向44444端口发送数据

[atguigu@hadoop102 flume]$ telnet hadoop102 44444

(8)检查hadoop104上数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/528060.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在数据化知识经济的时代,你该学会如何经营好自己的知识管理

在当今的数据化知识经济时代,知识管理已经越来越成为了一个必备的技能。在这个竞争激烈的时代,拥有良好的知识管理能力,可以帮助我们更好地应对各种挑战和机遇。 如何经营好自己的知识管理 一、认识知识管理的重要性 知识管理是指通过系统…

【计算机视觉】如何利用 CLIP 做简单的图像分类任务?(含源代码)

要使用 CLIP 模型进行预测,您可以按照以下步骤进行操作: 一、安装 安装依赖:首先,您需要安装相应的依赖项。您可以使用 Python 包管理器(如 pip )安装 OpenAI 的 CLIP 库。 pip install githttps://gith…

2023年Android开发者路线-第1部分

2023年Android开发者路线-第1部分 2023年Android开发者路线-第2部分 2023年Android开发者路线-第3部分 2023年Android开发者路线-第4部分 2023年Android开发者路线-第1部分 Android 生态系统处于不断发展的状态:每天都会引入新的库和资料,旨在加快开…

linux常见指令以及权限理解

1.linux下基本指令: ls指令: 查看文件的属性 ls-l:文件的属性 ls-la:显示所有文件的属性 ls *: linux任何一个目录下面都有两个隐藏文件: ..:表示当前路径的上级路径,可以原路返回 .&…

分布式锁-01(单节点解决方案)

分布式锁概述 为什么需要分布式锁 在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、 ReentrantLock等。 但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行…

可调整界面输出的桌面万年历设计

可调整界面输出的桌面万年历设计 本文主要介绍月历和生辰八字五行的界面输出方法。一个有趣的方法是可调整界面输出格式,显示几种屏幕排版的布局。本文示例了四个式样。算法的精髓是用一种简单的算法来设置调节屏幕打印输出。分三个显示内容,即月历、大字…

Docker入门实战---修改Docker镜像源

前言 现在大部分互联网公司在实施项目时几乎都会以微服务架构进行落地,那么微服务一旦多了之后就会面临一个如何友好的治理的问题,本人不会重点介绍治理的问题,而是会简单就治理的其中一个环节服务部署运维的问题进行介绍,服务部…

排序算法之桶排序

一、桶排序(BucketSort) 桶排序(Bucket sort)或所谓的箱排序,是一个排序算法,工作的原理是将数组分到有限数量的桶里。每个桶再个别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序…

[论文阅读] (28)李沐老师视频学习——1.研究的艺术·跟读者建立联系

《娜璋带你读论文》系列主要是督促自己阅读优秀论文及听取学术讲座,并分享给大家,希望您喜欢。由于作者的英文水平和学术能力不高,需要不断提升,所以还请大家批评指正,非常欢迎大家给我留言评论,学术路上期…

如何成功申请计算机软件著作权【流程完整记录】

致谢 :此博文的编写包括软著的申请,均借鉴了大佬【万里守约】的博客https://blog.csdn.net/qq_45625499/article/details/123463407 提示:此博文仅适合个人申请,因为我是自己一个人的项目,自己一个人申请软著 文章目录…

2023 Android开发者路线-第一部分

2023 Android开发者路线-第一部分 Android 生态系统处于不断发展的状态:每天都会引入新的库和资料,旨在加快开发速度并让我们作为开发人员的生活更轻松。 在这个由多个部分组成的系列中,您将按照我们的2023 年 Android 开发者路线图了解有关…

pyhton GUI编程之Tkinter美化皮肤ttkbootstrap

文章目录 pyhton GUI编程之Tkinter美化皮肤ttkbootstrap介绍 pyhton GUI编程之Tkinter美化皮肤ttkbootstrap介绍 tkinter 相对简单,学习入门很快,但是做出来的GUI界面不够美观,各个组件的外观都很老土,所谓 " 爱美之心&#…

发现一个国产BI软件,做财务数据分析效果绝了

如果是一般的财务数据分析,BI软件们都能做,但如果真要深入了解财务痛点,逐个击破财务数据分析难点,实现多维立体自助式的财务数据分析,那就难。就目前而言,财务数据分析做得好的国产BI软件也就一个奥威BI软…

使用docker构建ElasticSearch集群

目录 一、准备工作 二、编写docker-compose.yml 三、编写ElasticSearch和kibana的配置文件 四、执行构建ElasticSearch集群 五、验证结果: 六、可视化工具 ElasticSearch可视化工具介绍(elasticsearch-head、kibana、elasticHD) 一、e…

CTF权威指南 笔记 -第四章Linux安全机制-4.1-Stack Canaries

目录 Stack Canaries 简介 我们进行简单的例子 64 32 checksec Stack Canaries 是对抗栈溢出攻击的技术 SSP安全机制 Canary 的值 栈上的一个随机数 在程序启动时 随机生成并且保存在比返回地址更低值 栈溢出是从低地址向高地址进行溢出 如果攻击者要攻击 就一定要覆…

电动力学专题:圆柱形导体中趋肤效应

电动力学分析 金属导体内的电流密度方程 由Maxwell方程组导出Helmhltz方程 对于良导体,有\sigma/(\omega \eprsilon),因此有 圆柱形导线中电流密度分布 设电流沿Z轴方向流动,均匀导体,可简化为 通解: 安培环路定理 定态电磁波的Maxwell方程组 贝塞尔函数性质&…

【SQL】作为前端,应该了解的SQL知识(第三弹)

📑视图 使用表时,会将数据保存在存储设备(硬盘上) 而使用视图时,并不会将数据保存在存储设备上,也不会将数据保存在任何地方。 视图里面保存的是 从表中取出数据所使用的SELECT语句(视图中的…

zhangrelay博客置顶三篇点击量分析

230515只有三篇置顶,如下: 分别为: 20.03.13 : 901522.01.12 :1372923.04.15 :18836 熟悉zhangrelay博客风格的AI都清楚,他的博客内容都是筛选和设计过的。 置顶三篇阅读量差值为&#xff1…

C++--AVL树的插入,详解四种旋转规则(结尾附源代码链接)

AVL树的插入 前言左单旋右单旋左右双旋右左双旋检查是否这颗树是否是AVL树 前言 AVL树可以说是对二叉搜索树的优化,我们来看二叉树搜索树的下一面一种特殊情况: 当我们插入的数是上面的情况时,二叉树搜索树的特点就形同虚设了,这…

ChatGpt 2步制作流程图与思维导图,你确定不来看一下吗?

什么?你还不会使用ChatGpt。推荐下面这篇文章 ChatGPT保姆级教程,一分钟学会使用ChatGPT! - 掘金 (juejin.cn) 如果没有谷歌账号推荐直接买一个,因为你在中国注册谷歌账号,被谷歌查到,也是使用不了ChatGp…