Flume实践

news2025/1/11 1:47:45

1 NetCat方式

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume_netcat.conf --name a1 -Dflume.root.logger=INFO,console

[root@master ~]# yum -y intalll telnet

发数据:

]# telnet master 44444

数据接收,是在终端上接收的,而且接收数据已经是编码后的

工作输入主要是来自文件,输出也不是终端上,这里只是测试

2 Exec方式

监控一个输入文件,一般是日记文件,日记变化,flume自动接收

运行flume-ng

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume_exec.conf --name a1 -Dflume.root.logger=INFO,console

发数据:

数据接收

3 输出到HDFS

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

写入

[root@master flume]# echo'flume-hdfs1'>> 2.log

[root@master flume]# echo'flume-hdfs2'>> 2.log

[root@master flume]# echo'flume-hdfs3'>> 2.log

HDFS查看

[root@master ~]# hadoop fs -ls /flume/18-09-17

4 故障转移(failover)

刚开始数据接收服务器是A的,A出问题了,转移B接收数据,A恢复之后A继续B接收数据

这需要集群模式才能实现,三台机器:master、slave1、slave2

avro是网络协议,用于连接agent与agent

master配置:

[root@master agent_agent_collector_base]#pwd

/root/07/flume/apache-flume-1.6.0-bin/conf/agent_agent_collector_base

[root@master agent_agent_collector_base]#ll flume-client.properties

-rwxrwxrwx 1 root root 880 4月 22 10:47 flume-client.properties

master启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-client.properties--name agent1 -Dflume.root.logger=INFO,console

Slave1启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

slave2启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

测试

写数据

Slave1接收==》正常

把slave1停掉,再写数据,发现是Slave2接收

再开启slave1,再测试

Slave1接收

5 负载均衡(loadbalance)

从服务器轮训读取信息

maste启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-client.properties_loadbalance--name a1 -Dflume.root.logger=INFO,console

slave1启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

slave2启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

测试:

写数据

接收数据

Slave1

Slave2

可以看到,它并不是按数量轮流分发的,而是按批次分发的,再写for循环验证

#for i in `seq 1 100`;do echo “$i” >>2.log; sleep 1;done

6 拦截与过滤(Interceptor)

(1) TimestampInterceptor:在event的header中添加一个key叫:timestamp,value为当前的时间戳

[root@master interceptor_test]# catflume_ts_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http ##以http方式连接

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.type =timestamp

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M #接收地址

a1.sinks.k1.hdfs.filePrefix = badou.

a1.sinks.k1.hdfs.fileType=DataStream

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_ts_interceptor.conf --name a1-Dflume.root.logger=INFO,console

http方式输入:(也是在master端)

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"hellobadou"}]' http://master:52020

头信息用来做路由选择

输出

(2) HostInterceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip

[root@master interceptor_test]# catflume_hostname_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = syslogtcp #以syslogtcp输入

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1 i2 #可以定义多个拦截器

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.type=timestamp

a1.sources.r1.interceptors.i2.type =host

a1.sources.r1.interceptors.i2.hostHeader=hostname

a1.sources.r1.interceptors.i2.useIP =false

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix = %{hostname}.

a1.sinks.k1.hdfs.fileType=DataStream

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_hostname_interceptor.conf --name a1-Dflume.root.logger=INFO,console

syslogtcp方式输入:

]# echo "xxxxx" | nc master 52020

输出

(3) StaticInterceptor:可以在event的header中添加自定义的key和value

[root@master interceptor_test]# catflume_static_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http #http方式输入

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = badou_flume

a1.sources.r1.interceptors.i1.value = so_easy

#设置为static之后,强制输出badou_flume和so_easy

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_static_interceptor.conf --name a1 -Dflume.root.logger=INFO,console

输入:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"hellobadou"}]' http://master:52020

输出

把之前强制加入的<key,value>添加进来了

作用:为后面路由选择做准备

(4) RegexFiltering Interceptor:正则过滤器,通过正则来清洗或包含匹配的events

[root@master interceptor_test]# catflume_regex_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type=regex_filter

a1.sources.r1.interceptors.i1.regex =^[0-9]*$

a1.sources.r1.interceptors.i1.excludeEvents=true

#数字开头并且数字结尾的过滤掉

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_regex_interceptor.conf --name a1-Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"123"}]' http://master:52020

输出无反应

输入2:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"12345a"}]' http://master:52020

输出有反应:

输入3:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"abc12345"}]' http://master:52020

输出有反应:

(5) RegexExtractor Interceptor:正则筛选器,通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

[root@master interceptor_test]# catflume_regex_interceptor.conf_extractor

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =regex_extractor

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)

a1.sources.r1.interceptors.i1.serializers =s1 s2 s3

a1.sources.r1.interceptors.i1.serializers.s1.name= one

a1.sources.r1.interceptors.i1.serializers.s2.name= two

a1.sources.r1.interceptors.i1.serializers.s3.name= three

#\\d代表数字,输入三个单位数数字s1:s2:s3,并且分别赋予给one、two、three,one、two、three作为key输出

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_regex_interceptor.conf_extractor--name a1 -Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"1:2:3"}]' http://master:52020

输出

输入2:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"1:2:3asd"}]' http://master:52020

输出

可以看到只匹配数字,并加入到header中输出

输入3:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is

header"},"body":"6:7:8:9bbb5"}]' http://master:52020

只会匹配前三位

7 复制与复用(选择器Selector)

(1) 复制(广播的形式发送给下游节点)

Master配置文件

[root@master selector_test]# catflume_client_replicating.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 50000

a1.sources.r1.host = master

a1.sources.r1.selector.type = replicating #复制

a1.sources.r1.channels = c1 c2

# Describe the sink

a1.sinks.k1.type = avro #与下游sinks是通过svro协议连接的

a1.sinks.k1.channel = c1 #连接通道是c1

a1.sinks.k1.hostname = slave1 #服务器slave1

a1.sinks.k1.port = 50000

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2 #也可以用c1通道的

a1.sinks.k2.hostname = slave2

a1.sinks.k2.port = 50000

#还可以再加slave,就这种形式配置

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

slave配置文件

[root@slave1 selector_test]# catflume_server.conf

# Name the components on this agent

a1.sources = r1 #slave1的agent-name是a1,slave2的是a2

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = slave1 #slave2配置也是一样的,只是这里更改为slave2

a1.sources.r1.port = 50000 #端口要跟master对应

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_client_replicating.conf --name a1-Dflume.root.logger=INFO,console

Slave1启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_server.conf --name a1-Dflume.root.logger=INFO,console

Slave2启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_server.conf --name a2-Dflume.root.logger=INFO,console

输入:

输出:slave1、slave2都接收到了

(2) 复用

Master配置文件

[root@master selector_test]# catflume_client_multiplexing.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2 #有两个sink

a1.channels = c1 c2 #有两个channel

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port= 50000

a1.sources.r1.host= master

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.channels= c1 c2

a1.sources.r1.selector.header= areyouok

a1.sources.r1.selector.mapping.OK = c1

a1.sources.r1.selector.mapping.NO = c2

a1.sources.r1.selector.default= c1

#定义输入策略,ok是走c1通道,on是走c2通道,c1c2对应不同的机器,有个默认通道是c1

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = slave1

a1.sinks.k1.port = 50000

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = slave2

a1.sinks.k2.port = 50000

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

slave配置文件跟复制的是一样的

先启动slave

Master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_client_multiplexing.conf --name a1-Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"areyouok":"OK","hadoop1":"hadoop1is header"}, "body":"6:7:8bbb5"}]' http://master:50000

输出:slave1接收到信息

输入2:

]# curl -X POST -d '[{"headers":{"areyouok":"OK","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]'http://master:50000

输出:也一样是slave1接收到信息

输入3:

]# curl -X POST -d'[{"headers":{"areyouok":"NO","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]' http://master:50000

]# curl -X POST -d'[{"headers":{"areyouok":"NO","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]' http://master:50000

输出:slave2接收到信息

不做标记输入4:

]# curl -X POST -d'[{"headers":{"areyouok":"IDONEKNOW","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]'http://master:50000

默认设置slave1输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/578735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[创业之路-70] :聊天的最高境界因场景不同而不同

一、聊天的最高境界因场景不同而不同。 销售式聊天&#xff1a; 聊天的最高境界不是真相&#xff0c;而是拨动对方的心弦&#xff0c;让对方心理爽&#xff0c;让对方舒心。聊天的结果是你买单。 消费式聊天&#xff1a; 聊天的最高境界不是真相&#xff0c;而是让自己心理爽…

Java面试集锦

1. html与jsp区别&#xff1f; 答&#xff1a;HTML是文本标记语言&#xff0c;它是静态页面&#xff1b;JSP页面是有JSP容器执行该页面的Java代码部分然后实时生成动态页面&#xff0c;可动态更新页面上的内容。 在jsp中用<%%>就可以写Java代码了&#xff0c;而html没有…

数组的存储和压缩

数组 定义 一维数组是有限个相同类型的数据元素构成的序列&#xff0c;逻辑关系是相邻关系 推广&#xff1a;一个二维数组可以看作相同类型的一维数组的一维数组&#xff1b;n维数组可以看作以n-1维数组作为元素的线性表 性质 数组中的数据元素数目…

提前布局

深圳最近的天气是美&#xff0c;有风&#xff0c;天空可以看得见飘来飘去的透明的云&#xff0c;傍晚时候也可以看见城市被霞光笼罩下那种很安静要睡去的样子&#xff0c;随处可以见到的青草绿叶和大树&#xff0c;从公司楼下穿过的一个小桥洞前看得见的红绿路牌也安静和谐。 今…

将 Nacos 转变为 Windows 系统服务,实现开机自启

文章目录 前言下载 WinSW配置 WinSW安装和启动 Nacos 服务联系我 前言 本文将为您介绍如何使用 WinSW 工具将 Nacos 打包成 Windows 系统服务&#xff0c;并实现开机自启动的便利功能。通过将 Nacos 安装为系统服务&#xff0c;您将摆脱每次手动启动的麻烦&#xff0c;从而提高…

论文阅读_增强语言模型综述

论文信息 name_en: Augmented Language Models: a Survey name_ch: 增强语言模型综述 paper_addr: http://arxiv.org/abs/2302.07842 date_read: 2023-05-20 date_publish: 2023-02-15 tags: [‘深度学习’,‘自然语言处理’,‘大模型’] author: Grgoire Mialon&#xff0c;M…

jQuery样式操作和效果操作

1. css方法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width,…

day8--链表倒数第k个结点

链表中倒数最后k个结点 双指针法 是定义两个指针p和q&#xff0c;先让p指向链表的头结点&#xff0c;然后让q指向第k个结点。 接着&#xff0c;同时移动p和q&#xff0c;直到q指向链表的尾结点。此时&#xff0c;p指向的结点就是倒数第k个结点 struct ListNode {int val;Li…

git commit规范

目录 一、代码提交风格&#xff1a; 二、代码提交验证: 一、代码提交风格&#xff1a; 通常我们的git commit会按照统一的风格来提交&#xff0c;这样可以快速定位每次提交的内容&#xff0c;方便之后对版本进行控制。 但是如果每次手动来编写这些是比较麻烦的事情&#xff0…

【Flutter】widgets (1) 组件概述 widget tree 常见的widgets

文章目录 一、前言二、如何理解 widgets三、widgets 和 Div 布局很像四、常见的组件五、总结一、前言 在 Flutter 中,所有的 UI 元素都被称为 widgets,包括整个应用程序本身。一个 Flutter 应用通常由多个小的 widgets 组合而成,这些 widgets 可以是文本,按钮,图片,甚至…

JetBrains的C和C++集成开发环境CLion 2023版本在Linux系统的下载与安装配置教程

目录 前言一、CLion安装二、使用配置总结 前言 CLion是一款为C和C语言开发人员设计的集成开发环境&#xff08;IDE&#xff09;。它提供了丰富的功能和工具&#xff0c;可以帮助开发人员更高效地编写、调试和部署C和C应用程序。注&#xff1a;已在CentOS7.9和Ubuntu20.04安装测…

《数据库》期末考试复习手写笔记-第10章 数据库恢复技术(日志文件+检查点)【10分】

知识点:事务+日志文件+检查点 考题一:日志记录 考题二:数据库恢复 如果一个数据库恢复系统采用检查点机制,且其日志文件如表4所示

深入了解平均精度(mAP):通过精确率-召回率曲线评估目标检测性能

平均精度&#xff08;Average Precision&#xff0c;mAP&#xff09;是一种常用的用于评估目标检测模型性能的指标。在目标检测任务中&#xff0c;模型需要识别图像中的不同目标&#xff0c;并返回它们的边界框&#xff08;bounding box&#xff09;和类别。mAP用于综合考虑模型…

开源情报搜集系统的核心技术

随着科技快速发展&#xff0c;科研方向的开源情报搜集系统的应用越来越广泛。为了满足科研工作者的需求&#xff0c;开发人员大力研发了许多功能强大的科研开源情报系统。这些系统不仅可以帮助科研人员更加高效地获取、管理和利用科研信息资源&#xff0c;还能为他们提供全方位…

【Android工具】免费好用无广告安卓手机解压缩软件工具:ZArchiver

微信关注公众号 “DLGG创客DIY” 设为“星标”&#xff0c;重磅干货&#xff0c;第一时间送达。 前言 压缩工具在日常工作和生活中很常用&#xff0c;不光可以减小文件大小&#xff0c;还可以将多个文件进行打包&#xff0c;方便管理。 当然还有一些其他的特殊功能&#xff0c;…

奇舞周刊第493期:Hook 革命!浅谈 React 新 Hook 的未来与思想

关注前端生态发展&#xff0c;了解行业动向。 下面先一起看下本期周刊 摘要 吧~ 奇舞推荐 ■ ■ ■ Hook 革命&#xff01;浅谈 React 新 Hook 的未来与思想 作者阳羡曾写文章对 React 新 Hook use 的设计理念和限制进行了深入分析&#xff0c;并提供了一个可能的实现来帮助读者…

学习测试用例

✏️作者&#xff1a;银河罐头 &#x1f4cb;系列专栏&#xff1a;JavaEE &#x1f332;“种一棵树最好的时间是十年前&#xff0c;其次是现在” 目录 测试用例好处测试用例的设计方法基于需求进行测试用例的设计等价类边界值判定表正交表法案例 场景设计法错误猜测法 面试题 测…

分布式简要说明

1.分布式简要说明 《分布式系统原理与范型》定义&#xff1a; 分布式系统是若干独立计算机的集合&#xff0c;这些计算机对于用户来说就像单个相关系统。 分布式系统 (distributed system) 是建立在网络之上的软件系统。 随着互联网的发展&#xff0c;网站应用的规模不断扩…

RabbitMQ学习-死信队列

死信队列 背&#xff1a;就是三种情况导致消息无法消费就是死信&#xff0c;然后就会转到死信交换机中&#xff0c;死信交换机发送到死信队列中&#xff0c;然后创建个消费者消费死信队列中的东西,再没什么哈哈 死信&#xff0c;顾名思义就是无法被消费的信息&#xff0c;字面…

springboot接口返回的json字符串如何不显示null值字段

springboot接口返回的json字符串如何不显示null值字段 POSTMAN 测试接口时&#xff0c;默认字段值即使是null也显示出来&#xff0c;如何去掉更加简洁&#xff1f;这个跟POSTMAN无关&#xff0c;POSTMAN仅仅是展示response的body而已 思考&#xff1a;为什么要去掉null值的字…