Flume的安装部署及常见问题解决

news2025/1/11 9:55:14

在这里插入图片描述

1.安装地址

(1) Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

2.安装部署

注意:前提是配置好java环境

(1)将apache-flume-1.10.1-bin.tar.gz上传到linux的/opt/package/目录下
在这里插入图片描述
(2)解压apache-flume-1.10.1-bin.tar.gz到/opt/software/目录下

[zhangflink@9wmwtivvjuibcd2e package]$ tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/software/

(3)修改apache-flume-1.10.1-bin的名称为flume

[zhangflink@9wmwtivvjuibcd2e software]$ mv apache-flume-1.10.1-bin/ flume

(4)修改conf目录下的log4j2.xml配置文件,配置日志文件路径

修改日志路径

<Property name="LOG_DIR">/opt/module/flume/log</Property>

在这里插入图片描述

 <AppenderRef ref="Console" />

在这里插入图片描述

编写配置文件

官网翻译成中文的网站,可以参考这个网站进行编写配置文件:https://flume.liyifeng.org/

在这里插入图片描述

(1).Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有三个组成部分,Source、Channel、Sink。
(2).第一步:配置各个组件,根据你采集数据的需求进行选择对应的source,channels,sinks组件(直接去参考官网对应的组件功能选择即可)。
(3).第二步:连接各个组件,把采集端(Flume Sources),中间缓存(Flume Channels)和写入端(Flume Sinks)连接到一起。
(4).第三步:启动Agent。
bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

-n后面就是agent的主节点,-f 后面就是配置文件的位置,其它不变。

常用案例

监听端口配置:

# example.conf: 一个单节点的 Flume 实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

# 配置Agent a1的source r1的属性
#使用的是NetCat TCP Source,这里配的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.type = netcat       
#NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.bind = localhost    
#监听的端口
a1.sources.r1.port = 44444        

# 配置Agent a1的sink k1的属性

# sink使用的是Logger Sink,这个配的也是别名
a1.sinks.k1.type = logger         

# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    

# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1         

启动agent

 bin/flume-ng agent -n a1 -c conf -f conf/example.conf

在这里插入图片描述

监听文件写入HDFS里面

# file_chanel_hdfs.conf: 一个监听文件数据写入hdfs的实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

#监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# 配置Agent a1的sink k1的属性

#写入HDFS的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.0.3.141:8020/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.timeZone = Asia/Shanghai


# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    



# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1        

启动后可能遇到的问题及解决方法

在这里插入图片描述

原因是普通用户没有创建文件的权限,使用root权限启动即可

sudo bin/flume-ng agent -c conf -n a1 -f conf/file_chanel_hdfs.conf

在这里插入图片描述

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;
1、agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp
为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
2、agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true
(如果客户端和flume集群时间不一致数据时间会不准确)
3、在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

我使用了第二种方法(如果实时链路中,一般数据中都会带有时间戳,要使用第一种方法,保证时间语义的准确性)。

在这里插入图片描述
在这里插入图片描述

遇到这个错误是sink配置语句中创建hdfs的路径报错

要和hadoop里面的core-site.xml 文件保持一致

<!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://flinkv1:8020</value>
</property>

在这里插入图片描述
此问题是由于操作hdfs的文件权限不足,修改hdfs文件权限即可。

[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - zhangflink supergroup          0 2023-11-19 11:04 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -chmod 777 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxrwxrwx   - zhangflink supergroup          0 2023-11-19 11:04 /flume

启动成功数据写入

在这里插入图片描述
在这里插入图片描述

监听文件写入kafka里面

首先创建kafka的topic

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-topics.sh --bootstrap-server flinkv1:9092 --create --partitions 1 --replication-factor 3 --topic flumeData

编写配置文件:

# file_memory_kafka.conf: 一个监听文件数据写入hdfs的实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

#监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# 配置Agent a1的sink k1的属性

#写入kafka的sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeData
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1


# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    

# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1     

消费对应topic测试数据是否写入

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-console-consumer.sh --bootstrap-server flinkv1:9092 --from-beginning --topic flumeData

监听成功
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226529.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RVC从入门到......

RVC变声器官方教程&#xff1a;10分钟克隆你的声音&#xff01;一键训练&#xff0c;低配显卡用户福音&#xff01;_哔哩哔哩_bilibili配音&#xff1a;AI逍遥散人&#xff08;已授权&#xff09;关注UP主并私信"RVC"&#xff08;三个字母&#xff09;自动获取一键训…

PS 颜色取样器标尺工具 基本使用讲解

上文 PS 吸管工具基本使用方法 我们讲完了 吸管工具 那么 我们继续 打开ps先 接着 我们选择这个 颜色取样器工具 选择之后 我们鼠标在图像上随便点一下 就会出现一个标记 然后 我们可以点多几个地方 边上的信息面板就会输出 点1 和 点2 甚至 多个 点3 点4 的 颜色 RGB代码 …

Python 如何实现备忘录设计模式?什么是备忘录设计模式?Python 备忘录设计模式示例代码

什么是备忘录&#xff08;Memento&#xff09;设计模式&#xff1f; 备忘录&#xff08;Memento&#xff09;设计模式是一种行为型设计模式&#xff0c;用于捕获一个对象的内部状态&#xff0c;并在对象之外保存这个状态&#xff0c;以便在需要时恢复对象到先前的状态。这种模…

[qemu逃逸] DefconQuals2018-EC3

前言 一道简单的套壳堆题.原本题目环境为 ubu16, 我这里使用的是 ubu18 设备逆向 qemu-system-x86_64 只开了 Canary 和 NX 保护. 比较简单, 主要逻辑在 mmio_write 里面, 其实现了一个菜单堆, 具有增删改的功能: 但是在释放堆块时并没有置空, 所以这里存在 UAF. 而程序还直…

三、程序员指南:数据平面开发套件

定时器库 定时器库为DPDK执行单元提供了定时器服务&#xff0c;以便异步执行回调函数。该库的特点包括&#xff1a; 定时器可以是周期性的&#xff08;多次触发&#xff09;或单次的&#xff08;一次性触发&#xff09;。定时器可以从一个核加载并在另一个核上执行。这必须在…

IntelliJ IDEA 2023 v2023.2.5

IntelliJ IDEA 2023是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;为开发人员提供了许多特色功能&#xff0c;以下是其特色介绍&#xff1a; 新增语言支持&#xff1a;IntelliJ IDEA 2023新增对多种编程语言的支持&#xff0c;包括Kotlin、TypeScript、…

介绍交换空间概念以及如何设置交换空间

文章目录 什么交换空间新增交换空间 什么交换空间 交换空间&#xff08;Swap space&#xff09;是计算机内存的一种补充&#xff0c;位于硬盘驱动器上。当物理内存不足时&#xff0c;系统会将不活跃的页面移到交换空间中。 交换空间可以帮助系统在以下情况下运行&#xff1a…

mysql 实现去重

个人网站 首发于公众号小肖学数据分析 1、试题描述 数据表user_test如下&#xff0c;请你查询所有投递用户user_id并且进行去重展示&#xff0c;查询结果和返回顺序如下 查询结果和返回顺序如下所示 解题思路&#xff1a; (1) 对user_id列直接去重&#xff1a; &#xff…

Kotlin学习(一)

Kotlin学习&#xff08;一&#xff09; 1.使用IDEA构建Kotlin项目 新建工程即可 我这里选择的Build System是IntelliJ&#xff0c;虽然我没用过但是这是Kotlin基础学习应该不会用到其他依赖 2.Hello World package com.simonfun main(args:Array<String>){println(&q…

Go 语言中切片的使用和理解

切片与数组类似&#xff0c;但更强大和灵活。与数组一样&#xff0c;切片也用于在单个变量中存储相同类型的多个值。然而&#xff0c;与数组不同的是&#xff0c;切片的长度可以根据需要增长和缩小。在 Go 中&#xff0c;有几种创建切片的方法&#xff1a; 使用[]datatype{val…

使用 C 语言快速排序将字符串按照 ASCII 码升序排列

示例代码&#xff1a; #include <stdio.h> #include <string.h> #include <stdlib.h>static Comp(const void *a, const void *b) {char *pa (char *)a;char *pb (char *)b;return strcmp(a, b); }int main(void) {char strs[3][10] { "bd", &q…

C++ Qt 学习(十):Qt 其他技巧

1. 带参数启动外部进程 QProcess 用于启动外部进程int QProcess::execute(const QString &program, const QStringList &arguments);QObject *parent; ... QString program "./path/to/Qt/examples/widgets/analogclock"; QStringList arguments; argument…

卷积、卷积图像操作和卷积神经网络

好多内容直接看书确实很难坚持&#xff0c;就比如这个卷积&#xff0c;书上的一大堆公式和图表直接把人劝退&#xff0c;我觉得一般的学习流程应该是自顶向下&#xff0c;先整体后局部&#xff0c;先把握大概再推敲细节的&#xff0c;上来就事无巨细地展示对初学者来说很痛苦。…

泉盛UV-K5/K6全功能中文固件

https://github.com/wu58430/uv-k5-firmware-chinese/releases 主要功能&#xff1a; 中文菜单 许多来自 OneOfEleven 的模块&#xff1a; AM 修复&#xff0c;显著提高接收质量长按按钮执行 F 操作的功能复制快速扫描菜单中的频道名称编辑频道名称 频率显示选项扫描列表分配…

文本转语音

免费工具 音视频转译 通义听悟 | https://tingwu.aliyun.com/u/wg57n33kml5nkr3p 音色迁移 speechify | https://speechify.com/voice-cloning/ 视频生成 lalamu | http://lalamu.studio/demo/ 画质增强 topazlabs video AI | https://www.topazlabs.com 付费工具 rask | htt…

重生奇迹mu转职任务详解

重生奇迹mu神骑士怎么转 神骑士是一种转职类型&#xff0c;需要你的角色达到一定等级以及完成相应任务方可转职。以下是神骑士转职的具体步骤&#xff1a; 1.等级要求&#xff1a;首先&#xff0c;你的角色需要达到150级才能进行神骑士转职任务。 2.神骑士转职任务&#xff…

hyperledger fabric2.4测试网络添加组织数量

!!!修改内容比较繁琐,预期未来提供模板修改 修改初始配置文件,初始添加3个组织 organizations文件夹 /cryptogen文件夹下创建文件crypto-config-org3.yaml,内容如下: PeerOrgs:# ---------------------------------------------------------------------------# Org3# ----…

获取每个部门中当前员工薪水最高的相关信息

个人网站 首发于公众号小肖学数据分析 描述 有一个员工表dept_emp简况如下: 有一个薪水表salaries简况如下: 获取每个部门中当前员工薪水最高的相关信息&#xff0c;给出dept_no, emp_no以及其对应的salary&#xff0c;按照部门编号dept_no升序排列&#xff0c;以上例子输出…

ESP32 MicroPython 蜂鸣器及传感器的使用⑦

ESP32 MicroPython 蜂鸣器及传感器的使用⑦ 1、蜂鸣器奏乐2、实验目的3、实验内容5、实验结果6、小车传感器应用7、实验目的8、实验内容9、参考代码10、实验结果 1、蜂鸣器奏乐 我们小车底板配置有蜂鸣器&#xff0c;下面我们来学习如何去利用蜂鸣器演奏乐曲 2、实验目的 学…

ESP32 Arduino实战协议篇-搭建独立的 Web 服务器

在此项目中,您将创建一个带有 ESP32 的独立 Web 服务器,该服务器使用 Arduino IDE 编程环境控制输出(两个 LED)。Web 服务器是移动响应的,可以使用本地网络上的任何浏览器设备进行访问。我们将向您展示如何创建 Web 服务器以及代码如何逐步工作。 项目概况 在直接进入项目…