Flume学习笔记(2)—— Flume进阶

news2024/12/27 1:48:53

Flume进阶

Flume 事务

事务处理流程如下:

Put

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,回滚数据

Take

  • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

Flume Agent 内部原理

ChannelSelector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel

其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)

  • ReplicatingSelector 会将同一个 Event 发往所有的 Channel
  • Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

SinkProcessor

SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

  • DefaultSinkProcessor 对应的是单个的Sink
  • LoadBalancingSinkProcessor 可以实现负载均衡的功能
  • FailoverSinkProcessor 可以实现错误恢复的功能

Flume 拓扑结构

简单串联

将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统

不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

复制和多路复用

(单 source,多 channel、sink)

Flume 支持将事件流向一个或者多个目的地

这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、

这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

聚合

业务中常用,比如说日志采集功能:

日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦

可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

Flume实战案例

复制和多路复用

需求:使用 Flume-1 监控文件变动

  1. Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS
  2. Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem

实现流程:
1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf

配置文件中需要有1个source,2个channel,2个sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理

2.创建配置文件flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

source绑定上一个agent的sink1,然后上传到hdfs

3.创建配置文件:flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

参数说明:

sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume

可以将events保存到本地文件系统

  • directory:本地文件系统保存数据的路径(注意,该路径必须已经存在才可以)

4.分别启动相应的flume进程:

nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &

nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &

nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
5.在hdfs和文件夹中都能看到相应的内容:

hdfs:

文件系统:

负载均衡和故障转移

需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

实现流程:

1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf

配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

参数说明:Flume 1.11.0 User Guide — Apache Flume

通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume

2.创建 flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

sink输出到本地的控制台

3.创建 flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

sink输出到本地的控制台

4.执行指令:

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

5.使用nc localhost 44444发送数据

由于console2设置的优先级高于console1,因此数据由console2接收到;

接下来将console2进程kill掉,数据就由console1接收了:

聚合

需求:

hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log

hadoop103 上的 Flume-2 监控某一个端口的数据流

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

实现流程:

1.首先在三台服务器的job文件夹先创建目录group3

2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同

4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

5.分别在三台服务器上执行指令

hadoop104: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

6.在hadoop102上向日志文件中追加内容:

echo "hello" > /home/why/data/flumeDemo/test3/test3.log

在hadoop103中通过nc hadoop103 44444向44444端口发送数据;

然后在hadoop104中即可接收到数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1220035.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

zookeeper学习记录

本文Java代码地址: https://gitee.com/blackjie_1/ljUp/tree/master/zookeeperDemo 个人博客网站:什么是快乐 基于docker 安装 拉取zookeeper 3.4.10 docker pull zookeeper:3.4.10启动服务端 docker run -d -p 2181:2181 -v /root/docker/zookeepe…

电源地虚接,导致信号线发烫

音频板的信号是经过隔直电容接到音频板的。

【Linux】C文件系统详解(一)——C文件操作

文章目录 文件操作总结预备知识结论: C文件操作回顾语言方案w写入方式a写入方式r只读方式 系统方案但是这个**没有设置权限**,需要这样改: 文件操作总结 1.文件描述符,重定向,缓冲区,语言和系统关于文件的不同的视角的理解 – 都是要让我们深刻理解文件 2.文件系统 3.动静态库 …

实时音视频方案汇总

若有好的方案欢迎留言讨论,非常感谢,汇总了一些,从市面上了解的一些低时延的端到端的方案,仅供参照,若有问题,也欢迎留言更正! 方案 方案描述 时延 备注 1大华同轴高清电缆200米电缆&#xf…

Python 使用difflib库 快速实现批量对比文件差异

关注我们 本文内容仅供学习参考,不足错误之处,还请多多指正,如果喜欢我们,请关注我们,你的支持就是我们最大的动力。 需求来源 霍克是一名软件实施工程师,每次版本升级时,虽然会提供升级相关的清单,但不会详细说明具体的改动点。为了能够更有针对性地验证和测试系统,…

通过 Canal 将 MySQL 数据实时同步到 Easysearch

Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。…

Spring Boot Actuator:自定义端点

要在Spring Boot Actuator中实现自定义端点,可以按照以下步骤进行操作: 1.创建一个自定义端点类 该类需要使用Endpoint注解进行标记,并使用Component注解将其作为Spring Bean进行管理。 package com.example.highactuator.point;import lo…

Linux项目自动化构建工具:make与Makefile的基本用法

目录 1.什么是makefile 2.快速简单了解make/Makefile的使用 3.背后的知识: 4.依赖关系 5.其他写法 1.什么是makefile 会不会写makefile,从一个侧面说明了一个人是否具备完成大型工程的能力一个工程中的源文件不计数,其按类型、功能、模块…

phpStorm Xdebug调试 加FireFox浏览器

步骤1: [Xdebug] zend_extension“D:\phpstudy_pro\Extensions\php\php5.4.45nts\ext\php_xdebug.dll” xdebug.collect_params1 xdebug.collect_return1 xdebug.remote_enableOn xdebug.remote_hostlocalhost xdebug.remote_port9001 xdebug.remote_handlerdbgp ;…

超融合还是虚拟化?一文了解二者的区别、优缺点与传统虚拟化转型方案

超融合作为一种创新的 IT 基础架构,从最初进入市场,到当下成为主流方案,发展速度十分迅猛,且未来趋势依然向好。但对于国内不少用户而言,“超融合”仍是一个新概念,很容易与“虚拟化”相混淆:“…

如何为初创企业选择合适的 ERP 系统?

**ERP系统**是制造、分销、供应链、金融、会计、风险管理等多个行业必不可少的企业技术解决方案。不论垂直行业、企业规模或目标受众如何,将ERP作为企业管理战略的核心部分都非常重要。 对于渴望发展的小型企业和初创企业来说,更是如此。大型企业需要对…

quickapp_快应用_tabBar

tabBar 配置项中配置tabBar(版本兼容)使用tabs组件配置tabBar语法示例问题-切换tab没有反应问题-数据渲染问题解决优化 问题-tab的动态配置 第三方组件tabbar 一般首页都会显示几个tab用于进行页面切换,以下是几种tab配置方式。 配置项中配置tabBar(版本兼容) 在m…

三种快速创建SpringBoot项目的方式

文章目录 在线创建IntelliJ IDEA 创建Maven 创建基本项目结构 SpringBoot 是一个快速开发框架,通过maven依赖的继承方式,帮助我们快速整合第三方常用框架。现在是 Java 领域的绝对霸主。 今天介绍三种快速创建 SpringBoot 项目的方式。 在线创建 1、打…

Uniapp-安装HBuilder调试基座失败解决方案

无法安装原因 有时候我们测试的时候,在手机上插上了线可能因为各种原因没有点击安装或者安装后删除就无法再次安装了,会提示同步资源失败,未得到同步资源的授权,请停止运行后重新运行,而且无论怎么操作都解决不聊这个问题,这是由…

搭建自己的JRebel服务

引入 JRebel插件热部署快速入门教程 上一篇文章有提到如何使用JRebel,其中在激活JRebel那一步骤时咱们使用的激活地址实际就是放在我的个人服务器上,本篇文章咱们手把手的搭建一个个人的JRebel服务。 下载激活工具 下载地址 如下图所示,…

专注于绘画,不受限制!尝试Growly Draw for Mac的快速绘画应用

Growly Draw Mac版是Mac平台上的一款绘画应用,它提供了简单易用的画板页面和多种色彩、画笔工具,让你可以轻松地完成作画。无论你是初学者还是专业人士,都可以在这款应用中找到适合自己的绘画方式。通过使用Growly Draw Mac版,你可…

echarts 实现双y轴折线图示例

该示例有如下几个特点: ①实现tooltip自定义样式(echarts 实现tooltip提示框样式自定义-CSDN博客) ②legend左右区分展示 ③双y轴折线展示 代码如下: this.options {grid: {left: "3%",right: "3%",top: &…

3GPP TS38.201 NR; Physical layer; General description (Release 18)

TS38.201是介绍性的标准,简单介绍了RAN的信道组成和PHY层承担的功能,下图是PHY层相关标准的关系。 文章目录 结构信道类型调制方式PHY层支持的过程物理层测量其他标准TS 38.202: Physical layer services provided by the physical layerTS 38.211: Ph…

docker笔记14--docker-nerdctl-crictl-ctr使用对比

docker笔记14--docker-nerdctl-crictl-ctr使用对比 介绍工具对比dockernerdctlcrictlctr 注意事项说明 介绍 随着容器云技术的成熟,越来越多的从业者开始接触、熟悉 docker和containerd 了,很多时候需要同时在 docker 和 containerd之间切换&#xff0c…

L1 频段卫星导航射频前端低噪声放大器芯片MS2659

产品简述 MS2659 是一款具有高增益、低噪声系数的低噪声放大器 (LNA) ,支持 L1 频段多模式全球卫星定位,可以应用于 GPS 、 北斗二代、伽利略、 GLONASS 等 GNSS 导航接收机中。芯片采 用 SOT23-6 的封装形式。 主要特点 ◼ 支持北斗、 …