Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)

news2024/12/28 12:28:29

目录

  • agent特性
    • ChannelSelector
      • 描述:
    • SinkProcessor
      • 描述:
  • 串联架构
    • 结构图解
    • 定义与描述
    • 配置示例
      • Flume1(监测端node1)
      • Flume3(接收端node3)
      • 启动方式
  • 复制和多路复用
    • 结构图解
    • 定义描述
    • 配置示例
      • node1
      • node2
      • node3
      • 启动方式
  • 聚合架构
    • 结构图解
    • 定义描述
    • 示例
      • node1
      • node2
      • node3


agent特性

在这里插入图片描述

ChannelSelector

ChannelSelector是Flume中的一个关键组件,负责根据特定逻辑决定Event的流向。

名称类型描述
ReplicatingSelectorChannelSelector类型将同一个Event复制并发往所有配置的Channel
MultiplexingSelectorChannelSelector类型根据预设的规则或条件,将不同的Event分发至不同的Channel

描述:

  • ReplicatingSelector会无条件地将每个Event发送到与其关联的所有Channel中,实现事件复制。
  • MultiplexingSelector则基于某种规则(如Event中的特定字段、时间戳等)来将Event分发到不同的Channel,实现事件的多路复用。

SinkProcessor

SinkProcessor是Flume中负责处理Sink中Event的组件,它决定了Event如何被发送和处理。

名称类型描述
DefaultSinkProcessorSinkProcessor类型对应于单个Sink,直接处理并发送Event至该Sink
LoadBalancingSinkProcessorSinkProcessor类型对应于Sink Group,实现负载均衡,将Event分发至多个Sink中处理
FailoverSinkProcessorSinkProcessor类型对应于Sink Group,提供错误恢复功能,当主Sink失败时自动切换至备用Sink

描述:

  • DefaultSinkProcessor是最基础的Sink处理器,直接与单个Sink关联,负责将Event发送至该Sink。
  • LoadBalancingSinkProcessor用于处理Sink Group,能够智能地将Event分发至多个Sink中,以实现负载均衡,提高处理效率。
  • FailoverSinkProcessor同样用于处理Sink Group,但它提供了错误恢复机制。当主Sink因故障无法工作时,它会自动将Event发送至备用Sink,以确保数据的连续性和可靠性。

串联架构

结构图解

在这里插入图片描述
在这里插入图片描述
Avro Sink作为Avro客户端,向Avro服务端发送Avro事件。它允许Flume Agent将数据以Avro格式序列化后,发送到指定的Avro Source或其他Avro客户端。

定义与描述

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

配置示例

Flume1(监测端node1)

Flume1(node1),监听node1上的44444端口(source),并输出到node3的10086端口上(sink)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
# port,监听的端口
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
# 指定 Avro Sink 发送数据的目标主机名和端口号
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 10086

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


Flume3(接收端node3)

Flume3(node3),监听node3上的10086端口(source)(当然source内容是来自node1的44444端口的变化情况),输出一般的控制台内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 监听的来自node3上的source,source类型为avro
a1.sources.r1.type = avro
a1.sources.r1.bind = node3
# port,监听的端口
a1.sources.r1.port = 10086

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动方式

先启动node3(flume3),node3的监听是串行的最后一环,从后向前依次启动
理由:
先启动node3的监听(此时node1还未启动),再启动node1,此时可以保证没有任何内容错过


复制和多路复用

结构图解

在这里插入图片描述

在这里插入图片描述

定义描述

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。详细可以参考上面的Agent ChannelSelector和SinkProcessor

配置示例

此部分示例会按照如上的结构图进行配置

node1

replicating_channel.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 这个selector是复制类型的。
# 复制selector会将接收到的每个事件复制到所有配置的channel中。
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c


# Describe the sink
# avro类型的sink,发送给下一个agent
# sink k1的参数配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2 
a1.sinks.k1.port = 10010

# sink k2的参数配置
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node3
a1.sinks.k2.port = 10010


# channel c1的参数配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# channel c2的参数配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

node2

接收node1,并输出到hdfs中,hdfs的参数配置:flume——hdfs

a2.sources = r1
a2.sinks = k1
a2.channels = c1


# Describe/configure the source
# avro类型的source,接收来自上一个agent的sink输出
a2.sources.r1.type = avro
# 这个source来自于node2节点的10010端口
a2.sources.r1.bind = node2
a2.sources.r1.port = 10010


# 传输至hdfs中
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /flume2/%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 2
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0


# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

node3

接收node1,并输出到日志

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10010

# Describe the sink
a3.sinks.k3.type = logger

# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动方式

先启动node2(flume2)、node3(flume3),在启动node1(flume1)
理由:
同上,请注意,无论何种架构,都应到先启动最末端的接收,再启动发送


聚合架构

结构图解

在这里插入图片描述

在这里插入图片描述

定义描述

最常见实用的结构模式。
日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。

示例

node1

发送端1,输出到node3的10000端口
没什么需要特别注明的地方,关键节点已经在前面描述了,建议直接复制代码,GPT检查

[root@node1 jobs]# vim agg1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1


# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c


# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node3 
a1.sinks.k1.port = 10000


# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

node2

发送端2,输出到node3的10000端口

a2.sources = r1
a2.sinks = k1
a2.channels = c1


# Describe/configure the source
# source端的netcat是一个数据接收服务
a2.sources.r1.type = netcat
a2.sources.r1.bind = node2
a2.sources.r1.port = 10000


# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node3
a2.sinks.k1.port = 10000




# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

node3

最末的接收端,监听10000端口即可,前面两个节点会发送内容到此端口

[root@node3 jobs]# vim agg3.conf
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3


# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10000


# Describe the sink
a3.sinks.k3.type = logger


# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2257480.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式学习(15)-stm32通用GPIO模拟串口发送数据

一、概述 在项目开发中可能会遇到串口不够用的情况这时候可以用通过GPIO来模拟串口的通信方式。 二、协议格式 按照1位起始位8位数据位1位停止位的方式去编写发送端的程序。起始位拉低一个波特率的时间;发送8位数据;拉高一个波特率的时间。 三、代码 …

【Go系列】:全面掌握 Sentinel Go —— 构建高可用微服务的流量控制、熔断、降级与系统防护体系

前言 在现代分布式系统架构中,服务的稳定性和可用性是至关重要的。随着微服务和云原生技术的发展,如何有效地进行流量控制、熔断降级以及系统保护成为了一个关键课题。Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制组件,它不仅…

多模态RAG:通用框架方案调研汇总

阅读原文 多模态检索增强生成是一种新兴的设计范式,允许AI模型与文本、图像、视频等存储进行交互。在介绍多模态 RAG 之前,我们先简单了解一下传统的检索增强生成 (RAG)。 标准 RAG RAG 的理念是找到与用户查询相关的核心信息,然后将该信息…

《HTML 的变革之路:从过去到未来》

一、HTML 的发展历程 图片: HTML 从诞生至今,经历了多个版本的迭代。 (一)早期版本 HTML 3.2 在 1997 年 1 月 14 日成为 W3C 推荐标准,提供了表格、文字绕排和复杂数学元素显示等新特性,但因实现复杂且缺乏浏览器…

游戏交易系统设计与实现

文末获取源码和万字论文,制作不易,感谢点赞支持。 题目:游戏交易系统设计与实现 摘 要 在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存查询…

Mac mini m4本地跑大模型(ollama + llama + ComfyUI + Stable Diffusion | flux)

安装chat大模型(不推荐,本地运行的大模型只能聊废话,不如网页版使用openAI等高效) 首先下载ollama的安装包 https://ollama.com/ 点击启动访问:http://localhost:11434 Ollama is running 代表已经运行起来了&#x…

精品C++项目推荐:分布式kv存储系统

项目代码直接开源到Github:https://github.com/youngyangyang04/KVstorageBaseRaft-cpp 欢迎去star,fork! 项目背景相关 背景 在当今大规模分布式系统的背景下,需要可靠、高可用性的分布式数据存储系统。 传统的集中式数据库在…

Milvus中如何实现全文检索(Full Text Seach)?

在前两篇文章中(Milvus python库 pymilvus 常用操作详解之Collection(上) 和 Milvus python库 pymilvus 常用操作详解之Collection(下)),我们了解了Milvus基于dense vector和sparse vector实现的…

unity打包web,如何减小文件体积,特别是 Build.wasm.gz

unity打包WebGL,使用的是wasw,最终生成的Build.wasm.gz体积很大,有6.5M,有几个方法可以稍微减小这个文件的大小 1. 裁剪引擎代码: 此步可将大小从6.5减小到 6.2(此项默认开启,只是改了裁剪等级…

STM32 CubeMx HAL库 独立看门狗IWDG配置使用

看门狗这里我就不多介绍了,能搜到这篇文章说明你了解 总之就是一个单片机重启程序,设定好超时时间,在超时时间内没有喂狗,单片机就会复位 主要应用在单片机异常重启方面,比如程序跑飞(注意程序跑飞时你就…

Selenium:强大的 Web 自动化测试工具

Selenium:强大的 Web 自动化测试工具 在当今的软件开发和测试领域,自动化工具的重要性日益凸显。Selenium 就是一款备受欢迎的 Web 自动化测试工具,它为开发者和测试人员提供了强大的功能和便利。本文将详细介绍 Selenium 是什么&#xff0c…

幼儿园学校养老院供电安全解决方案

一、 电气火灾每年以30%的比例高居各类火灾原因。以50%到80%的比例高居重特大火灾。已成为业界重点关注的对象并为此进行着孜孜不倦的努力。2021年“119”消防日,国家应急管理部消防救援局公布了2021年1至10月份全国火灾形势报告。数据显示,从火灾种类来…

UnityShaderLab-实现沿y轴溶解效果

实现思路: 实现思路同UnityShaderLab-实现溶解效果-CSDN博客 ShaderGraph实现: ShaderLab实现: 效果: 未完待续。。。

5G Multi-TRP R16~R18演进历程

提升小区边缘用户的性能,在覆盖范围内提供更为均衡的服务质量,NR中引入了多TRP协作传输的方案。多TRP协作传输通过多个TRP之间进行非相干联合传输(Non Coherent-Joint Transmission,NC-JT)、重复传输/接收或…

deepin 搭建 hadoop singlenode

deepin 搭建 hadoop singlenode 一、准备 1、升级软件 sudo apt updatesudo apt -y dist-upgrade2、安装常用软件 sudo apt -y install gcc make openssl libssl-dev libpcre3 libpcre3-dev libgd-dev \rsync openssh-server vim man zip unzip net-tools tcpdump lrzsz ta…

计算机毕业设计Python中华古诗词知识图谱可视化 古诗词智能问答系统 古诗词数据分析 古诗词情感分析模型 自然语言处理NLP 机器学习 深度学习

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

pythonbug修复案例----修复 Python数据分析程序中的持续 Bug

在 Python 编程的世界里,Bug 就像隐藏在暗处的幽灵,时不时地跳出来捣乱。而持续出现的 Bug,则更是让人头疼不已。今天,就让我们一同踏上修复一个 Python 持续 Bug 的征程。 假设我们正在开发一个简单的数据分析程序,其…

YOLOv8改进,YOLOv8引入U-Netv2分割网络中SDI信息融合模块+GSConv卷积,助力小目标

理论介绍 完成本篇需要参考以下两篇文章,并已添加到YOLOv8代码中 YOLOv8改进,YOLOv8引入GSConv卷积+Slim-neck,助力小目标检测,二次创新C2f结构YOLOv8改进,YOLOv8引入U-Netv2分割网络中SDI信息融合模块,助力小目标检测下文都是手把手教程,跟着操作即可添加成功 目录 理…

双指针解题

双指针的使用范围 对于暴力解法的时间复杂度来说,双指针一般可以将暴力解法的时间复杂度降低一个量级. 常⻅的双指针有两种形式,⼀种是对撞指针,⼀种是左右指针. 快慢指针 ⼜称为⻳兔赛跑算法,其基本思想就是使⽤两个移动速度…

Linux安装Python2.7.5(centos自带同款)

卸载已安装的python,防止版本兼容问题 rpm -qa|grep python|xargs rpm -ev --allmatches --nodeps 删除残余文件 whereis python |xargs rm -frv 安装前提是已安装gcc和g gcc --version g --version 下载安装python2.7.5 https://www.python.org/downloads/release/pyt…