Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合

news2025/1/11 0:44:18

1、Flume进阶

1.1 Flume事务

在这里插入图片描述

1.2 Flume Agent内存原理

在这里插入图片描述
1、ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
2、SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingProcessor和FailoverSinkProcessor。
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverProcessor对应的是Sink,Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复功能。

1.3 拓扑结构

1.3.1 简单串联

在这里插入图片描述
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

1.3.2 复制和多路复用

在这里插入图片描述
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。

1.3.3 负载均衡和故障转移

在这里插入图片描述
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

1.3.4 聚合在这里插入图片描述

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

1.4 案例实现

前提说明:在Flume之间传输数据要用avro,并且Source用的是avro的FlumeAgent是服务端,在开启时要先开启服务端!!!!

1.4.1 复制和多路复用

1、案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2、需求分析
在这里插入图片描述
3、实现步骤
(1)准备工作
在/opt/module/flume/job 目录下创建 group1 文件夹
在/opt/module/datas/目录下创建 flume3 文件夹
(2)创建 flume-file-flume.conf(group1文件夹下)

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(3)创建 flume-flume-hdfs.conf(group1文件夹下)
作用:配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建 flume-flume-dir.conf(group1文件夹下)
作用:配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)执行配置文件

 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

(6)启动hadoop和Hive

start-dfs.sh
start-yarn.sh

 bin/hive

(7)检查HDFS上数据
在这里插入图片描述
(8)检查/opt/module/datas/flume3 目录中数据

1.4.2 负载均衡和故障转移

1、案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
2、需求分析
在这里插入图片描述
3、实现步骤
(1)准备工作
在/opt/module/flume/job 目录下创建 group2 文件夹
(2)创建 flume-netcat-flume.conf(group2 文件夹下)
配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(3)创建 flume-flume-console1.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建 flume-flume-console2.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)执行配置文件

 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

(6)使用 netcat 工具向本机的 44444 端口发送内容

 nc localhost 44444

(7)查看Flume2及Flume3的控制台打印日志
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
(8)将Flume2 kill ,利用nc发送数据,观察 Flume3 的控制台打印情况
在这里插入图片描述

1.4.3 聚合

1、案例需求
hadoop102 上的 Flume-1 监控文件/opt/module/group.log,hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。
2、需求分析
在这里插入图片描述
3、案例实现
(1)准备工作
分发 Flume

 xsync flume  ##xsync是要自己写的分发脚本

在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。
(2)在 hadoop102 上编辑配置文件

 vim flume1-logger-flume.conf
 # Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)在 hadoop103 上编辑配置文件
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)在 hadoop104 上编辑配置文件
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)执行配置文件

 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-logger-flume.conf


 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-netcat-flume.conf

(6)在 hadoop102 上向/opt/module 目录下的 group.log 追加内容
在这里插入图片描述

(7)在 hadoop103 上向 44444 端口发送数据
在这里插入图片描述

(8)检查 hadoop104 上数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/630039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django-入门

文章目录 一、Django框架介绍二、后台管理第一步:项目的创建与运行第二步:应用的创建和使用第三步: 项目的数据库模型第四步: 启用后台Admin站点管理 三、前台管理第一步: URLconf 路由管理第二步: 视图函数处理业务逻辑第三步: 模板管理实现好看的HTML页面(可参考菜…

一文带你读懂:TCP连接的三次握手和四次挥手(下篇)

天下没有不散的宴席,对于 TCP 连接也是这样, TCP 断开连接是通过四次挥手方式。下面我们通过实操,来彻底理解四次挥手。 对TCP连接建立三次握手感兴趣的同学,可以看我上一篇文章:一文带你读懂:TCP连接的三次…

[环境配置]让sd自动翻译提示词插件sd-webui-prompt-all-in-one安装

安装方式 方式一(使用git克隆): 此方法需要你的电脑上安装了 git,如果没有安装,可参考 git 官方文档 进行安装。 打开终端,进入到你的 stable-diffusion-webui 目录下。 使用 git 克隆 sd-webui-prompt…

RobinKarp(字符串哈希)---分析与实现(C++)

1. 简述 给定字符串pattern和串text。求串pattern在串text中出现的位置。 暴力比较是逐个字符比较来确定两个串是否相等,若当前比较失败 则回到开始字符对应字符的后一个字符重复过程。 哈希就是一个大范围到小范围的映射 字符串哈希则是通过比较两个串的哈希值相…

Leetcode 剑指 Offer II 030. 插入、删除和随机访问都是 O(1) 的容器

题目难度: 中等 原题链接 今天继续更新 Leetcode 的剑指 Offer(专项突击版)系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 设计一个支持在平均 时间复杂度 O(1) 下,执行以下操作…

使用NLPAUG 进行文本数据的扩充增强

在机器学习中,训练数据集的质量在很大程度上决定了模型的有效性。我们往往没有足够的多样化数据,这影响了模型的准确性。这时数据增强技术就派上了用场。 数据增强可以通过添加对现有数据进行略微修改的副本或从现有数据中新创建的合成数据来增加数据量…

代码随想录训练营Day53|1143.最长公共子序列;1035.不相交的栈;53.最大子序和

1143.最长公共子序列 class Solution {public int longestCommonSubsequence(String text1, String text2) {int[][] dp new int[text1.length()1][text2.length()1];for(int i1;i<text1.length();i){for(int j1;j<text2.length();j){if(text1.charAt(i-1)text2.charAt(…

12性能提升:如何提升gRPC系统性能

这篇文章我们来一起学习下如何提升gRPC系统服务的性能。 gRPC 是一个高性能、开源的 RPC 框架,设计目标是支持多种编程语言和多种平台。它基于 Google 发布的 Protobuf(Protocol Buffers)序列化协议,可以在不同的应用程序之间传输数据。gRPC 具有高效率和可扩展性的特点,…

python3 爬虫相关学习7:使用 BeautifulSoup下载网页图片到本地文件夹

目录 1 一个爬图片pic的代码的例子 1.1 学习的原文章 1.2 原始代码的问题总结 问题1 问题2 问题3 其他问题 1.3 原始代码 2 直接在cmd里 python运行报错 和 处理 2.1 运行报错 2.2 报错原因&#xff1a; 没有提前安装这个bs4 模块 2.3 如何提前知道我的python环境…

【微信小程序】wxml、wxss、js、json文件介绍

&#x1f609;博主&#xff1a;初映CY的前说(前端领域) ,&#x1f4d2;本文核心&#xff1a;微信小程序的入门介绍 【前言】书接上回&#xff0c;我们知道了一个小程序的构成结构&#xff0c;接下来我们来进一步学习小程序的目录结构中的.wxml、.wxss、.js、.json。 目录 ⭐ 一…

一学就会-----链表中倒数第K个节点

文章目录 题目描述思路一代码示例思路二代码示例 题目描述 输入一个链表&#xff0c;输出该链表中倒数第k个结点。 图片示例&#xff1a; 思路一 由于这道题目并没有要求时间复杂度&#xff0c;我们完全可以先遍历一遍链表&#xff0c;得到链表的结点总数&#xff08;count&am…

利用Zookeeper实现集群选举

什么是Zookeeper 分布式开源协调系统&#xff0c;数据模型简单&#xff0c;可以实现同步&#xff0c;配置管理&#xff0c;分组管理&#xff0c;分命名空间管理等。 技术本质 一个原子消息传递系统&#xff0c;它使所有服务器保持同步 FLP(3个科学家名字命名) 理论角度&…

【Spring Security】的RememberMe功能流程与源码详解,基础-进阶-升级-扩展,你学会了吗?

文章目录 前言原理 基础版搭建初始化sql依赖引入配置类验证 源码分析 进阶版集成源码分析疑问1疑问2 鉴权 升级版集成初始化sql配置类验证 源码分析鉴权流程 扩展版 前言 之前我已经写过好几篇权限认证相关的文章了&#xff0c;有想复习的同学可以查看【身份权限认证合集】。今…

OpenAI官方提示词课(三)如何总结文章

现在是信息爆炸时代&#xff0c;打开手机&#xff0c;各种文章扑面而来。我们的精力是有限的。如果有人帮忙把文章总结好给我们&#xff0c;这不就节省了很多时间嘛&#xff01;我们也就可以阅读更多的文章了。 恰好大语言模型在总结文章方面非常有天赋。 下面来看看示例。 …

数学基础第二天

介绍 对于Hissian矩阵是正定的&#xff0c;在这一点是整个范围内的最小值&#xff0c;y在各个方向的二阶导数都是>0的 对于Hissian矩阵是负定的&#xff0c;在这一点是整个范围内的最大值&#xff0c;y在各个方向的二阶导数都是<0的, 对于Hissian矩阵是不定的&#xff…

有了这个工具,支付宝商家多个账号下的账单管理更方便了

大家好&#xff0c;我是小悟 为方便拥有多个支付宝账号的商家获取自身业务、资金数据及下载对账单的能力&#xff0c;为商家提供了商家账单产品&#xff0c;商家可以通过该产品系统化接入账单数据&#xff0c;实现支付宝商家多个账号账单管理的功能。 为拥有多个支付宝账号的…

华为OD机试真题 JavaScript 实现【求符合要求的结对方式】【2023Q1 100分】,附详细解题思路

一、题目描述 用一个数组A代表程序员的工作能力&#xff0c;公司想通过结对编程的方式提高员工的能力&#xff0c;假设结对后的能力为两个员工的能力之和&#xff0c;求一共有多少种结对方式使结对后能力为N。 二、输入描述 6 2 3 3 4 5 1 6 第一行为员工的总人数&#xff…

centos 7 安装git并配置ssh

一、安装 1、查看是否安装git <span style"color:#333333"><span style"background-color:#ffffff"><code class"language-perl">rpm -qa|<span style"color:#0000ff">grep</span> git </code>…

【白嫖系列】永久免费域名申请教程 eu.org

&#x1f951; Welcome to Aedream同学 s blog! &#x1f951; 文章目录 eu.org注册激活注册域名解析 eu.org eu.org 一个从1996开始提供免费域名的组织, 其官网地址是 https://nic.eu.org/ 他帮助学生、爱好者或者非营利组织不用花费购买域名就可能拥有自己的免费域名&#x…

2023.6.9小记——ARM的工作模式与状态

今天打算学一点就写一点&#xff0c;不然全部堆积到晚上压力太大了&#xff0c;有些东西写不完就要睡觉了&#x1f4a4; 1. 什么是numpy&#xff1f; 1.1 numpy简介 是Python中的用于科学计算的库&#xff0c;提供高性能的多维数组对象和对应的操作函数&#xff0c;用于处理大…