flume使用实例

news2024/12/24 3:55:22

1、监听端口a1.sources.r1.type = netcat

配置文件nc-flume-console.conf

# Name the components on this agent a1 表示jvm进程名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000 #1000个event

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume-ng agent -n a1 -c conf/ -f /export/server/flume/job/nc-flume-console.conf

 参数-n 表示jvm进程名 -c表示本次启动读取的配置文件conf目录下的文件 -f 表示具体执行的文件

另开窗口输入内容后控制台会自动返回OK

2、实时监控单个追加文件

配置文件 flume-exec-logger.conf

#Agent_name

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Sources

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/server/hive/logs/hive.log

#Channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#sinks

a1.sinks.k1.type = logger

#组合

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume监听

flume-ng agent -c conf/ -f job/flume-exec-logger.conf -n a1

手动追加数据到hive.log文件 并查看监控窗口

echo INFO [main] spark.HiveSparkClientFactory >> logs/hive.log

动态添加数据到hive.log

连接hive 观察flume监控变化

 beeline -u jdbc:hive2://node1:10000 -n ljr

 show databases;

  由此可见当我们操作hive的时候 hive.log 就更新,由于我们监控了hive.log文件所以当有新数据追加到hive.log的时候 就会监听到 并打印到控制台

3、实时监控单个追加文件,并将数据输出到hdfs

配置文件 flume-hivelogs-hdfs.con

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log

# Describe the sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照时间滚动文件夹

a2.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k2.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k2.hdfs.rollInterval = 60

#设置每个文件的滚动大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

启动flume监听,操作hive

flume-ng agent -n a2 -c conf/ -f flume-hivelogs-hdfs.con

查看hdfs,有新文件产生

使用 Flume 监听整个目录(a3.sources.r3.type = TAILDIR)

的实时追加文件,并上传至 HDFS

实现步骤:

【1】创建被监控目录

我这里监控data目录  此目录需要提前创建

mkdir data

cd data

touch file1.txt

touch file2.txt

touch log2.txt

toch log1.txt

【2】创建文件 flume-taildir-hdfs.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = TAILDIR

#记录最后监控文件的断点的文件,此文件位置可不改

a3.sources.r3.positionFile =  /export/server/flume/data /tail_dir.json

a3.sources.r3.filegroups = f1 f2

a3.sources.r3.filegroups.f1 = /export/server/flume/data/.*file.*

a3.sources.r3.filegroups.f2 =/export/server/flume/data/.*log.*

# Describe the sink

a3.sinks.k3.type = hdfs

# hdfs://node1:8020 可省略

a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件,单位是秒

a3.sinks.k3.hdfs.rollInterval = 3600

#设置每个文件的滚动大小大概是 128M,单位是byte

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

【3】启动flume监控

        bin/flume-ng agent -c conf -f datas/flume-taildir-hdfs.conf -n a3

【4】向文件中追加内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1687443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是固态继电器?

固态继电器是不需要使用任何机械部件的开关继电器。这通常使它们具有比普通机电继电器寿命更长的优势,然而,尽管固态继电器速度快且耐用,但仍具有某些设计规定。 固态继电器风靡全球,彻底改变了从农业自动化到航空航天等各个行业…

Pytorch梯度下降算法(Gradient Descent)

intro 其实对于我们将要学的梯度最小函数,目的就是先得到loss损失最小的值,然后根据这个最小的值去得到w。 初始点在initial guess这个位置,我们希望找到最小的权重点global cost minimum,我们到底是让这个点左移寻找还是右移寻…

Linux第三十九章

🐶博主主页:ᰔᩚ. 一怀明月ꦿ ❤️‍🔥专栏系列:线性代数,C初学者入门训练,题解C,C的使用文章,「初学」C,linux 🔥座右铭:“不要等到什么都没有了…

【全开源】JAVA同城搬家系统源码小程序APP源码

JAVA同城搬家系统源码 特色功能: 强大的数据处理能力:JAVA提供了丰富的数据结构和算法,以及强大的并发处理能力,使得系统能够快速地处理大量的货物信息、司机信息、订单信息等,满足大规模物流的需求。智能路径规划&a…

【Redis】String的介绍与应用详解

大家好,我是白晨,一个不是很能熬夜,但是也想日更的人。如果喜欢这篇文章,点个赞👍,关注一下👀白晨吧!你的支持就是我最大的动力!💪💪&#x1f4aa…

设置 sticky 不生效?会不会是你还是没懂 sticky?

官方描述 基本上可以看懂的就会知道。sticky 是相对于存在滚动条的内容的,啥意思? 就是不论你被谁包着,你只会往上找有 overflow 属性的盒子进行定位,包括:overflow:hidden; overflow:scroll; overflow:auto; overflo…

一键批量提取TXT文档前N行,高效处理海量文本数据,省时省力新方案!

大量的文本信息充斥着我们的工作与生活。无论是研究资料、项目文档还是市场报告,TXT文本文档都是我们获取和整理信息的重要来源。然而,面对成百上千个TXT文档,如何快速提取所需的关键信息,提高工作效率,成为了许多人头…

EI稳定检索--人文社科类会议(ICBAR 2024)

【ACM独立出版】第四届大数据、人工智能与风险管理国际学术会议 (ICBAR 2024) 2024 4th International Conference on Big Data, Artificial Intelligence and Risk Management 【高录用•快检索,ACM独立出版-稳定快速EI检索 | 往届均已完成EI, Scopus检索】 【见…

运行vue2项目基本过程

目录 步骤1 步骤2 步骤3 补充: 解决方法: node-scss安装失败解决办法 步骤1 安装npm 步骤2 切换淘宝镜像 #最新地址 淘宝 NPM 镜像站喊你切换新域名啦! npm config set registry https://registry.npmmirror.com 步骤3 安装vue-cli npm install…

分布式中traceId链接服务间的日志

使用技术: 网关:SpringCloudGateway RPC调用:Feign 一:在网关入口处设置header:key-traceId,value-UUID import com.kw.framework.common.croe.constant.CommonConstant; import com.kw.framework.gateway…

机器学习高斯贝叶斯算法实战:判断肿瘤是良性还是恶性

概述 我们使用威斯康星乳腺肿瘤数据集,来构建一个机器学习模型,用来判断患者的肿瘤是良性还是恶性。 数据分析 威斯康星乳腺肿瘤数据集,包括569个病例的数据样本,每个样本具有30个特征值。 样本分为两类:恶性Malig…

SHA1获取

这里写目录标题 JDK获取uniapp开发Dcould获取 JDK获取 一、下载jdk 链接: http://www.oracle.com/ 二、安装直接下一步下一步 三、配置环境变量 先新增变量JAVA_HOME变量值为C:\devUtils\jdk (jdk安装路径位置)再配置Path(%JAVA_HOME%\bin) 四、创建SHA1安全证书 win r输入cmd…

常见应用流量特征分析

目录 1.sqlmap 1.常规GET请求 2.通过--os-shell写入shell 3.post请求 2.蚁剑 编码加密后 3.冰蝎 冰蝎_v4.1 冰蝎3.2.1 4.菜刀 5.哥斯拉 1.sqlmap 1.常规GET请求 使用的是sqli-labs的less7 (1)User-Agent由很明显的sqlmap的标志,展…

如何快速增加外链?

要快速增加外链并不难,相信各位都知道,难的是快速增加外链且没有风险,所以这时候GNB外链的重要性就出现了,这是一种自然的外链,何谓自然的外链,在谷歌的体系当中,自然外链指的就是其他网站资源给…

[Spring Boot]baomidou 多数据源

文章目录 简述本文涉及代码已开源 项目配置pom引入baomidouyml增加dynamic配置启动类增加注解配置结束 业务调用注解DS()TransactionalDSTransactional自定义数据源注解MySQL2 测试调用查询接口单数据源事务测试多数据源事务如果依然使用Transactional会怎样?测试正…

不同类型的区块链钱包有什么特点和适用场景?

区块链钱包是用于存储和管理加密货币的重要工具,市面上有许多不同类型的区块链钱包可供选择。以下是几种主要类型的区块链钱包及其特点和适用场景。 1.软件钱包: 特点:软件钱包是最常见的一种区块链钱包,通常作为软件应用程序提供…

docker不删除容器更改其挂载目录

场景:docker搭建的jenkins通常需要配置很多开发环境,当要更换挂载目录,每次都需要删除容器重新运行,不在挂载目录的环境通常不会保留。 先给一个参考博客docker不删除容器,修改容器挂载或其他_jenkins 修改容器挂载do…

第17讲:C语言内存函数

目录 1.memcpy使用和模拟实现2.memmove使用和模拟实现3.memset函数的使用4.memcmp函数的使用 1.memcpy使用和模拟实现 void * memcpy (void * destination, const void * source, size_t num);• 函数memcpy从source的位置开始向后复制num个字节的数据到destination指向的内存…

分析电脑上处理器的性能报告

这张图片给出了一份详细的第11代Intel(R) Core(TM) i7-1165G7 2.80GHz处理器的性能报告。 CPU型号:11th Gen Intel(R) Core(TM) i7-1165G7(这是一个低功耗的移动处理器,常用于轻薄型笔记本电脑) 基准速度:2.80 GHz&…

C语言-信号

信号 一、信号是什么东西 信号是事件发生时通知进程的一种机制,有时也称之为软件中断。 信号的到来会打断了程序执行的正常流程。 大多数情况下,无法预测信号到达的精确时间。 一个(具有合适权限的)进程能够向另一进程发送信…