【Flume】高级组件之Channel Selectors及项目实践

news2024/10/2 8:38:28

文章目录

  • 1. 组件简介
  • 2. 项目实践
    • 2.1 Replicating Channel Selector实践
      • 2.1.1 需求
      • 2.1.2 配置
      • 2.1.3 运行
    • 2.2 Multiplexing Channel Selector实践
      • 2.2.1 需求
      • 2.2.2 配置
      • 2.2.3 运行

1. 组件简介

       通俗来讲,Channel Selectors组件控制Source采集到的数据分别流向哪些Channels。组件包括Replicating Channel Selector、Load Balancing Channel Selector和Multiplexing Channel Selector,其中Replicating Channel Selector是默认的Channel选择器,它会将Source采集过来的Event发往所有Channel;Load Balancing Channel Selector是一种负载均衡机制,将数据按照随机或者轮换的策略分发到不同的Channel,以缓解数据传输量大时的Channel阻塞;Multiplexing Channel Selector可以根据Event中header里面的值将其发往不同的Channel;。

  • Replicating Channel Selector:该组件有两个配置项,分别是selector.typeselector.optionalselector.type必须配置为replicating,表示使用Replicating Channel Selectors;selector.optional是一个可选配置,若配置a1.sources.r1.selector.optional = c1,则表示c1是可选Channel,对c1的写入失败将被忽略。
  • Load Balancing Channel Selector:该组件有两个配置项,分别是selector.typeselector.policyselector.type必须配置为load_balancing,表示使用Load Balancing Channel Selector;selector.policy为数据分发方式选择,有两个机制可供选择:轮换(round_robin)和随机(random)。
  • Multiplexing Channel Selector:该组件有四个配置项,分别是selector.typeselector.headerselector.defaultselector.mapping.*selector.type必须配置为multiplexing,表示使用multiplexing Channel Selector;selector.header配置标注了Event依据header中的哪一个键值对分发;selector.defaultselector.mapping.*用于定义分发规则,详见后续实践。

2. 项目实践

2.1 Replicating Channel Selector实践

2.1.1 需求

       将Source采集到的数据重复发送给两个Channle,最后每个Channel后面接一个Sink,负责把数据存储到不同存储介质中,方便后期使用。在实际工作中这种需求还是比较常见的,就是希望把一份数据采集过来以后,分别存储到不同的存储介质中,不同存储介质的特点和应用场景是不一样的,典型的就是HDFS Sink和Kafka Sink,通过HDFS Sink实现离线数据落盘存储,方便后面进行离线数据计算;通过Kafka Sink实现实时数据存储,方便后面进行实时计算,由于我还没学Kafka,所以在这里先使用Logger Sink代理。

2.1.2 配置

在这里插入图片描述
       默认其实就采用Replicating Channel Selector将相同的数据发往链接的多个Channels,因此也可以不配置这个选择器。这个实例项目主要实践了多Channel的配置,两个Channel分别发往两个Sink,一个以Log方式打印在控制台,一个发往HDFS。

# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# 配置channle选择器[默认就是replicating,所以可以省略]
a1.sources.r1.selector.type = replicating

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 配置sink组件
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://192.168.182.100:9000/replicating
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.filePrefix = data
a1.sinks.k2.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

2.1.3 运行

运行如下命令运行Agent a1:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/replicatingChannelSelectors.conf -Dflume.root.logger=INFO,console

使用telnet命令连接44444端口,并发送数据:hello

[root@bigData01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello
OK

Sink k1在控制台输出数据hello

2023-02-10 23:09:10,315 ...... Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }

Sink k2在HDFS写入数据hello

[root@bigData01 apache-flume-1.9.0-bin]# hdfs dfs -ls /replicating
Found 1 items
-rw-r--r--   2 root supergroup          7 2023-02-10 23:09 /replicating/data.1676041750272.log.tmp
[root@bigData01 apache-flume-1.9.0-bin]# hdfs dfs -cat /replicating/data.1676041750272.log.tmp
hello

2.2 Multiplexing Channel Selector实践

2.2.1 需求

       根据数据内容不同,把数据分别发往不同的存储介质,把所处城市为bj的居民数据以日志方式发往控制台,其余城市的居民数据存储在HDFS。居民数据如下:

{"name":"jack","age":19,"city":"bj"}
{"name":"tom","age":26,"city":"sh"}

2.2.2 配置

在这里插入图片描述
       先用Regex Extractor Interceptor把城市数据写入Event的header部分,并使用Multiplexing Channel Selector读取header内容并分别发往不同的Channel,最终发往不同的Sink。

# agent的名称是a1

# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# 配置source拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = city

# 配置channle选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = city
a1.sources.r1.selector.mapping.bj = c1
a1.sources.r1.selector.default = c2

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 配置sink组件
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://192.168.150.100:9000/multiplexing
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.filePrefix = data
a1.sinks.k2.hdfs.fileSuffix = .log


# 把组件连接起来
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

2.2.3 运行

如下命令运行Agent a1:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/multiplexingChannelSelectors.conf -Dflume.root.logger=INFO,console

使用telnet命令连接44444端口,并发送数据:

[root@bigData01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
{"name":"jack","age":19,"city":"bj"}
OK
{"name":"tom","age":26,"city":"sh"}
OK

Sink k1在控制台打印居住在bj的居民信息:

2023-02-10 23:19:58,494 ...... {"name":"jack"," }

Sink k2向HDFS中发送居住在其他城市的居民信息:

[root@bigData01 apache-flume-1.9.0-bin]# hdfs dfs -cat /multiplexing/data.1676042412667.log.tmp
{"name":"tom","age":26,"city":"sh"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Idea超好用的管理工具ToolBox(附带idea工具)

文章目录为什么要用ToolBox总结idea管理安装、更新、卸载寻找ide配置、根路径idea使用准备工作配置为什么要用ToolBox 快速轻松地更新,轻松管理您的 JetBrains 工具 安装自动更新同时更新插件和 IDE回滚和降级通过下载补丁或一组补丁而不是整个包,节省维护 IDE 的…

snakeyaml数字字符串显示单引号的问题

如题所示&#xff0c;一般yaml结构中&#xff0c;字符串直接显示没有单引号的字符串&#xff0c;如果字符串由数字组成&#xff0c;为了区别真正的数字&#xff0c;这个字符串会使用单引号包围起来。 数据结构如下&#xff1a; Map<String,Object> map new LinkedHashM…

COCO物体检测评测方法简介

本文从ap计算到map计算&#xff0c;最后到coco[0.5:0.95:0.05] map的计算&#xff0c;一步一步拆解物体检测指标map的计算方式。 一、ap计算方法 一个数据集有多个类别&#xff0c;对于该数据库有5个gt&#xff0c;算法检测出来10个bbox&#xff0c;对于人这个类别来说检测有…

类和对象实操之【日期类】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; C修行之路 &#x1f38a;每篇一句&#xff1a; 图片来源 The pessimist complains about the wind; the optimist expects it to change; the realist adjusts the sails. 悲观主义者抱怨风;乐观主义者期望它…

两数相加-力扣2-java高效方案

一、题目描述给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。你可以假设除了数字 0 之外&#xff0c;这两个数都不…

论文阅读 | Rethinking Coarse-to-Fine Approach in Single Image Deblurring

前言&#xff1a;ICCV2021图像单帧运动去糊论文 论文地址&#xff1a;【here】 代码地址&#xff1a;【here】 Rethinking Coarse-to-Fine Approach in Single Image Deblurring 引言 图像去糊来自与物体或相机的运动。现有的deblur领域的深度学习方法大多都是coarse-to-fin…

RiProV2主题美化增加支付页底部提示语ritheme主题美化

美化背景 默认的RiProV2主题在支付提示页,是没有这一行提示的 希望增加根据用户类别,未登录用户购买时提示:当前为游客模式购买。或者其他提示,提示用户未登录购买不保存购买记录等。 索引关键字:ritheme主题美化之增加支付页底部提示语,RiProV2主题美化增加支付页底部提…

git必会的知识点

注&#xff1a;本文参考https://www.liaoxuefeng.com/wiki/896043488029600 原文非常值得一读&#xff0c;作者学识渊博&#xff0c;补充了很多有意思的知识。我仅仅是拾人牙慧。 git是最先进的分布式版本控制系统。 版本控制系统——自动记录系统中文件的改动情况&#xff0…

多核异构核间通信-mailbox/RPMsg 介绍及实验

1. 多核异构核间通信 由于MP157是一款多核异构的芯片&#xff0c;其中既包含的高性能的A7核及实时性强的M4内核&#xff0c;那么这两种处理器在工作时&#xff0c;怎么互相协调配合呢&#xff1f; 这就涉及到了核间通信的概念了。 IPCC (inter-processor communication contr…

Maven_第四章 使用Maven:IDEA环境

目录第一节 创建父工程第二节 配置Maven信息第三节 创建Java模块工程第四节 创建Web模块工程1、创建模块2、修改打包方式3、Web 设定4、借助IDEA生成web.xml5、设置 Web 资源的根目录6、测试6.1 创建文件6.2 配置tomcat第五节 其他操作1、在IDEA中执行Maven命令①直接执行②手动…

代码随想录算法训练营第24天25天|● 77. 组合● 216.组合总和III ● 17.电话号码的字母组合

77组合 看完题后的思路 void f&#xff08;数组&#xff0c;startIndex&#xff09;递归终止 if&#xff08;startIndex数组长度||path.sizek&#xff09;{ if(path.sizek){ 加入} }递归 for&#xff08;&#xff1b;startIndex<num.size&#xff1b;startIndex&#xff0…

为什么分库分表

系列文章目录 文章目录系列文章目录前言一、什么是分库分表二、分库分表的原因分库分表三、如何分库分表3.1 垂直拆分1.垂直分库2、垂直分表3.2 水平拆分水平分库水平分表水平分库分表的策略hash取模算法range范围rangehash取模混合地理位置分片预定义算法四、分库分表的问题分…

华为动态二进制翻译工具(ExaGear)

你还在为liunix x86程序移植到ARM环境而烦恼吗&#xff1f;你们现在您的福利来了&#xff0c;ExaGear可以解决您的烦恼&#xff0c;让您试下零代码迁移运行Linux x86程序。ExaGear是华为自研动态二进制翻译工具&#xff0c;通过在运行时&#xff0c;将x86应用指令翻译为ARM64指…

备战金三银四,软件测试面试题(全)

1.B/S架构和C/S架构区别 B/S 只需要有操作系统和浏览器就行&#xff0c;可以实现跨平台&#xff0c;客户端零维护&#xff0c;维护成本低&#xff0c;但是个性化能力低&#xff0c;响应速度较慢 C/S响应速度快&#xff0c;安全性强&#xff0c;一般应用于局域网中&#xff0c;因…

leetcode: Two Sum

leetcode: Two Sum1. 题目1.1 题目描述2. 解答2.1 baseline2.2 基于baseline的思考2.3 优化思路的实施2.3.1 C中的hashmap2.3.2 实施2.3.3 再思考2.3.4 最终实施3. 总结1. 题目 1.1 题目描述 Given an array of integers nums and an integer target, return indices of the …

Fluent Python 笔记 第 4 章 文本和字节序列

Python 3 明确区分了人类可读的文本字符串和原始的字节序列。隐式地把字节序列转换成 Unicode 文本已成过去。本章将要讨论 Unicode 字符串、二进制序列&#xff0c;以及在二者之间转 换时使用的编码。 没啥可看的&#xff0c;就一句话&#xff0c;一定不能依赖默认编码&#x…

DP优化 - 斜率优化

假设当前的 DP 方程为 fimin⁡0≤j<i{−K(i)X(j)Y(j)}F(i)f_i\min\limits_{0\leq j< i}\{-K(i)X(j)Y(j)\} F(i)fi​0≤j<imin​{−K(i)X(j)Y(j)}F(i) 或 fimax⁡0≤j<i{−K(i)X(j)Y(j)}F(i)f_i\max\limits_{0\leq j< i}\{-K(i)X(j)Y(j)\} F(i)fi​0≤j<im…

Node.js笔记-Express(基于Node.js的web开发框架)

目录 Express概述 Express安装 基本使用 创建服务器 编写请求接口 接收请求参数 获取路径参数(/login/2) 静态资源托管-express.static&#xff08;内置中间件&#xff09; 什么是静态资源托管&#xff1f; express.static() 应用举例 托管多个静态资源 挂载路径前缀…

车厢调度(train)(栈)

目录 题目描述 解题思路&#xff1a; 代码部分&#xff1a; 题目描述 有一个火车站&#xff0c;铁路如图所示&#xff0c;每辆火车从A驶入&#xff0c;再从B方向驶出&#xff0c;同时它的车厢可以重新组合。假设从A方向驶来的火车有n节&#xff08;n≤1000&#xff09;&…

Revit中关于屋顶编辑线移动的问题

一、Revit中关于屋顶编辑线移动的问题 在绘制屋顶的时候&#xff0c;如果出现有稍微偏差的时候&#xff0c;个别习惯移动编辑线&#xff0c;这种方法是不可取的&#xff0c;接下来为大家介绍一下这种方法的问题所在。 首先我们绘制几面这样的墙体&#xff0c;主要做测试用的&am…