(二十)大数据实战——Flume数据采集的基本案例实战

news2025/1/17 6:03:24

前言

本节内容我们主要介绍几个Flume数据采集的基本案例,包括监控端口数据、实时监控单个追加文件、实时监控目录下多个新文件、实时监控目录下的多个追加文件等案例。完成flume数据监控的基本使用。

正文

  • 监控端口数据

①需求说明

- 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

②需求分析:

③安装netcat 工具:sudo yum install -y nc

④查看监听端口1111是否被占用:注意测试端口的范围是0-65535

⑤在flume安装目录下创建一个job目录:用与存放监听数据的配置文件

⑥在job目录下创建监听数据的配置文件:job-netcat-flume-console.conf

# Name the components on this agent
#a1:表示agent的名称,不能重复
a1.sources = r1 #r1:表示a1的Source的名称
a1.sinks = k1  #k1:表示a1的Sink的名称
a1.channels = c1 #c1:表示a1的Channel的名称
# Describe/configure the source
a1.sources.r1.type = netcat #表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind = localhost #表示a1的监听的主机
a1.sources.r1.port = 1111 #表示a1的监听的端口号
# Describe the sink
a1.sinks.k1.type = logger #表示a1的输出目的地是控制台logger类型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #表示a1的channel类型是memory内存型
a1.channels.c1.capacity = 1000 #表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100 #表示a1的channel传输时收集到了100条event以后再去提交事务
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #表示将r1和c1连接起来
a1.sinks.k1.channel = c1 #表示将k1和c1连接起来

⑦开启 flume服务监听端口:

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-console.conf -Dflume.root.logger=INFO,console

⑧启动参数说明:

--conf/-c:表示配置文件存储在 conf/目录

--name/-n:表示给 agent 起名为 a1

--conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的job-netcat-flume-console.conf文件

-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error

⑨使用netcat 工具向本机的1111端口发送内容

  • 实时监控单个追加文件

①监控需求

- 实时监控Hive日志,并上传到HDFS

②需求分析:

③在job目录下创建监听数据的配置文件:job-file-flume-hdfs.conf

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
#hive日志的默认位置
a2.sources.r2.command = tail -F /tmp/hadoop/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop101:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到 HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

④启动hadoop集群

⑤启动flume监控任务

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/job-file-flume-hdfs.conf -Dflume.root.logger=INFO,console

⑥启动hive

⑦查看hdfs是否有监控日志

⑧存在的问题

- tail命令不能实现断点续传监控的功能,可能会有数据丢失的情况或者数据重复的问题

- Exec source 适用于监控一个实时追加的文件,不能实现断点续传

  • 实时监控目录下多个新文件

①监控需求

- 使用 Flume 监听整个目录的文件,并上传至 HDFS

②需求分析

③在job目录下创建监听目录数据的配置文件:job-dir-flume-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop101:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

④启动hadoop集群

⑤创建upload监控目录

⑥启动目录监控任务

bin/flume-ng agent -c conf/ -n a3 -f job/job-dir-flume-hdfs.conf -Dflume.root.logger=INFO,console

⑦在upload中上传文件

⑧查看hdfs中是否上传成功

⑨存在的问题

- 相同文件名的文件不能重复上传,只能上传一次,修改了也不会再次上传

- 忽略的文件和配置后缀.COMPLETED的文件不能重复上传

- Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

  • 实时监控目录下的多个追加文件

①案例需求

- 使用Flume监听整个目录的实时追加文件,并上传至HDFS

- 使用Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传

②需求分析

③在job目录下创建监听目录数据的配置文件:job-taildir-flume-hdfs.conf

a4.sources = r4
a4.sinks = k4
a4.channels = c4
# Describe/configure the source
a4.sources.r4.type = TAILDIR
a4.sources.r4.positionFile = /opt/module/apache-flume-1.9.0/tail_dir.json
a4.sources.r4.filegroups = f1 f2
a4.sources.r4.filegroups.f1 = /opt/module/apache-flume-1.9.0/files/.*file.*
a4.sources.r4.filegroups.f2 = /opt/module/apache-flume-1.9.0/files2/.*log.*
# Describe the sink
a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://hadoop101:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a4.sinks.k4.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a4.sinks.k4.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k4.hdfs.roundValue = 1
#重新定义时间单位
a4.sinks.k4.hdfs.roundUnit = hour
#是否使用本地时间戳
a4.sinks.k4.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k4.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k4.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k4.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k4.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k4.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100
# Bind the source and sink to the channel
a4.sources.r4.channels = c4
a4.sinks.k4.channel = c4

④启动hadoop集群

⑤创建监控目录文件files和files2

⑥启动flume监控

bin/flume-ng agent -c conf/ -n a4 -f job/job-taildir-flume-hdfs.conf -Dflume.root.logger=INFO,console

⑦往files和files2目录中的文件写数据

⑧在hdfs中查看数据

结语

关于Flume数据采集的基本案例实战到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/951927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity碰撞检测

Unity碰撞检测 前言准备材料代码使用OnCollisionEnter()进行碰撞Collider状态代码 使用OnTriggerEnter()进行碰撞Collider状态代码 区别代码OnCollisionEnter()OnTriggerEnter() 碰撞显示效果OnCollisionEnter()OnTriggerEnter() 提示结语 前言 碰撞检测可以说时学习Unity中最…

时间复杂度和空间复杂度的最小单位是什么

C数据结构与算法 目录 时间复杂度&#xff1a;CPU读写一次内存算作时间复杂度的最小单位。 读内存的场景&#xff1a;获取变量的值。 例如&#xff1a; if(x < 1000) 写内存的场景&#xff1a;给变量赋值。 例如&#xff1a;x 1000 空间复杂度&#xff1a;内存占用一…

VueX 与Pinia 一篇搞懂

VueX 简介 Vue官方&#xff1a;状态管理工具 状态管理是什么 需要在多个组件中共享的状态、且是响应式的、一个变&#xff0c;全都改变。 例如一些全局要用的的状态信息&#xff1a;用户登录状态、用户名称、地理位置信息、购物车中商品、等等 这时候我们就需要这么一个工…

启莱OA treelist.aspx SQL注入

子曰&#xff1a;“为政以德&#xff0c;譬如北辰&#xff0c;居其所&#xff0c;而众星共之。” 漏洞复现 访问漏洞url&#xff1a; 使用SQLmap对参数 user 进行注入 漏洞证明&#xff1a; 文笔生疏&#xff0c;措辞浅薄&#xff0c;望各位大佬不吝赐教&#xff0c;万分感…

Java“牵手”1688淘口令转换API接口数据,1688API接口申请指南

1688平台商品淘口令接口是开放平台提供的一种API接口&#xff0c;通过调用API接口&#xff0c;开发者可以获取1688商品的标题、价格、库存、商品快递费用&#xff0c;宝贝ID&#xff0c;发货地&#xff0c;区域ID&#xff0c;快递费用&#xff0c;月销量、总销量、库存、详情描…

juicefs源码format命令阅读

之前博文中介绍过在windows下安装GO和vscode windows下安装go环境 和vscode中go扩展调试 1、获取源码 git clone https://github.com/juicedata/juicefs.git 首先观察代码架构 上图是我已经编译过得代码&#xff0c;可能和刚git下来的有些出入。 2、编译 我是在windows上进…

C++学习笔记总结练习:运算符重载两种方式

运算符重载的两种方式 1 基本概念 基础 运算符时具有特殊名字的函数&#xff1a;由关键字operator和气候定义的运算符共同组成。 可以被重载的运算符 方式 将运算符重载为类的成员函数。重载运算符函数&#xff0c;并声明为类的友元。 规则 重载后的运算符必须至少有一个…

可控硅调功电路原理

在常见的马达调速以及需要调整负载功率的场合&#xff0c;经常会用到可控硅调功电路&#xff0c;下图是常见的应用电路。 调功电路主要由阻容移相电路和可控硅触发电路构成&#xff0c;工作过程如下&#xff0c;当交流电的正半周时&#xff0c;交流电通过R5,可调电阻R3给电容C1…

Elasticsearch数据库操作

索引操作 新建索引 PUT /ztt {"mappings": {"properties": {"info":{"type": "text","analyzer": "ik_smart"},"email":{"type": "keyword","index": false…

简单聊聊Https的来龙去脉

简单聊聊Https的来龙去脉 Http 通信具有哪些风险Https Http SSL/TLS对称加密 和 非对称加密数字证书数字证书的申请数字证书怎么起作用 Https工作流程一定需要Https吗&#xff1f; Http 通信具有哪些风险 使用明文通信&#xff0c;通信内容可能会被监听不验证通信双方身份&a…

每天刷题五道RHCSA/6-10题(Radhat8.2)

6.创建协作目录权限 mkdir /home/managers chown :sysmgrs /home/managers chmod 2770 /home/managers 测试&#xff1a; touch /home/managers/12345 ll /home/managers/12345 7.配置NTP systemctl status chronyd #查看状态 yum -y install chrony #如果没有安装&#xff0c…

最佳实践:TiDB 业务读变慢分析处理

作者&#xff1a;李文杰 网易游戏计费 TiDB 负责人 在使用或运维管理 TiDB 的过程中&#xff0c;大家几乎都遇到过 SQL 变慢的问题&#xff0c;尤其是查询相关的读变慢问题。读变慢的问题大部分情况下都遵循一定的规律&#xff0c;通过经验的积累可以快速的定位和优化&#xff…

Java实现根据关键词搜索京东商品列表数据方法,当当API接口(jd.item_search)申请指南

要通过京东网的API获取商品列表数据&#xff0c;您可以使用京东开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过京东开放平台API获取商品列表&#xff1a; 首先&#xff0c;确保您已注册成为当当开放平台的开发者&#xff0c;并创建…

五金轴尺寸机器视觉测量软硬件方案--康耐德智能

检测内容&#xff1a; 五金轴尺寸机器视觉测量 检测要求&#xff1a; 精度0.015mm&#xff0c;速度180~240个/分钟 视觉可行性分析&#xff1a; 对样品进行了光学实验&#xff0c;并进行图像处理&#xff0c;原则上可以使用机器视觉系统进行测试测量。 结果&#xff1a; 对…

新手小白想要做好跨境电商独立站,需要考虑哪些要素?

对于不少中小卖家而言&#xff0c;利用独立站出海已然成为下一个跨境热潮。但是采用独立站模式做出海生意前&#xff0c;卖家需要考虑哪些要素&#xff1f; 产品选择 对于国内的卖家来说&#xff0c;依托于国内强大的供应链优势&#xff0c;只要能把握住消费者心态&#xff0…

C# 获取Windows系统版本注意事项

首先通过微软官方文档&#xff1a;https://learn.microsoft.com/zh-cn/windows/win32/sysinfo/operating-system-version了解各个操作系统对应的版本号 下面介绍3种获取版本号的方式及弊端 1. Environment.OSVersion.Version OperatingSystem os Environment.OSVersion;// 判断…

Dapper入门教程

什么是Dapper Dapper是一个简单的.NET对象映射器&#xff0c;在速度方面具有"King of Micro ORM"的头衔&#xff0c;几乎与使用原始的ADO.NET数据读取器一样快。ORM是一个对象关系映射器&#xff0c;它负责数据库和编程语言之间的映射。 Dapper通过扩展IDbConnecti…

PSP - 蛋白质结构预测 OpenFold Multimer 模型训练参数与配置

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132575709 OpenFold Multimer 是用于预测蛋白质多聚体结构的计算方法。基于OpenFold 的单体预测框架&#xff0c;利用深度学习技术&#xff0c;结…

指针与空间按钮的交互

文章目录 原理案例&#xff1a;“直线指针”和“点击按钮”的交互1、效果2、步骤 原理 指针不能直接和空间按钮交互&#xff0c;得借助一个中间层——分发器——它分发指针的进入、退出、选择事件&#xff0c;空间按钮自动监听这些事件 案例&#xff1a;“直线指针”和“点击…

SQLServer审计功能配置

一. SQL Server审计功能介绍 SQL Server审计功能&#xff08;Audit&#xff09;是SQL Server 2008之后才有的功能&#xff0c;审计(Audit)用于追踪和记录SQL Server实例&#xff0c;或者单个数据库中发生的事件(Event)&#xff0c;审计运作的机制是通过捕获事件(Event)&#x…