Flume 快速入门【概述、安装、拦截器】

news2024/11/24 11:52:42

文章目录

    • 什么是 Flume?
    • Flume 组成
    • Flume 安装
    • Flume 配置任务文件
      • 应用示例
      • 启动 Flume 采集任务
    • Flume 拦截器
      • 编写 Flume 拦截器
      • 拦截器应用

什么是 Flume?

Flume 是一个开源的数据采集工具,最初由 Apache 软件基金会开发和维护。它的主要目的是帮助用户将大规模数据从各种数据源(如日志文件、网络数据源、消息队列等)采集、传输和加载到数据存储系统(如 Hadoop HDFS、Apache HBase、Apache Hive 等)。

Flume 旨在处理大规模数据流,以便进行数据分析和处理。

Flume 组成

Flume (配置)主要由以下 4 个部分组成:

1. 数据源(Source): Flume 可以从多种数据源收集数据,例如日志文件、网络流、消息队列等。

2. 通道(Channel): 采集的数据被存储在通道中,等待传输到目标数据存储系统。Flume 支持多种不同类型的通道,如内存通道、文件通道和 Kafka 通道。

3. 拦截器(Interceptor): 拦截器允许用户对采集的数据进行预处理和转换,以满足特定需求。

4. 接收器(Sink): 接收器将数据传输到目标数据存储系统,如 Hadoop HDFS、HBase、Kafka 等。

Flume 通过灵活的配置,允许用户根据其数据采集需求来定义数据流的整个流程,包括数据源、通道、拦截器和接收器。

这使得 Flume 成为处理大规模数据采集和传输任务的强有力工具,构建数据管道,将分散的数据整合到中心存储或处理系统中,用于实时或者离线数据分析和报告。

Flume 安装

官方安装包下载地址:http://archive.apache.org/dist/flume

本篇博客使用的版本为:Flume-1.10.1

1. 解压

tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/

2. 配置环境变量

vim /etc/profile

文件末尾添加:

#FLUME_HOME
export FLUME_HOME=/opt/flume-1.10.1
export PATH=$PATH:$FLUME_HOME/bin

刷新环境变量:source /etc/profile

其实到这里,Flume 算是安装完了,但是为了后期使用方便,这里再调整一下配置参数。

修改日志存储与输出:

cd $FLUME_HOME

vim conf/log4j2.xml

在该文件中修改日志文件的存储目录(正文第 3 行)

    <Property name="LOG_DIR">/opt/flume-1.10.1/logs</Property>

在该文件中添加日志控制台输出方式(正文末尾)

      <AppenderRef ref="Console" />

默认只有 LogFile 日志文件的输出方式。


修改堆内存大小:

cd $FLUME_HOME

vim conf/flume-env.sh

如果是本地学习或者测试环境建议调小一点:

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

我这里调整最小为 512MB,最大 2048MB,也可以将最大和最小调整为一样的,避免进行内存交换。

Flume 配置任务文件

Flume 最主要的内容就是配置任务文件了,在文章开头提到过,主要由四部分组成:

  • 数据源(Source)

  • 通道(Channel)

  • 拦截器(Interceptor)

  • 接收器(Sink)

我们可以根据需求,进入 Flume 的官方网站,查阅各项参数如何进行配置,按照要求配置即可。

配置查阅网站:Flume 1.10.1 User Guide

在这里插入图片描述

其中给出了一个模板文件,内容如下所示:

# example.conf: A single-node Flume configuration

# Name the components on this agent  声明变量名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source  配置数据源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink  配置接收器(存储源)
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 配置管道参数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel  组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

根据该模板文件就可以来快速构建一个数据采集的配置文件啦。

应用示例

将 Maxwell 发送到 Kafka 消息队列中的数据采集到 HDFS 上。

# --------- 声明变量名称 ---------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# --------- 配置数据源 ---------
# 指定数据源类型
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 指定连接地址
a1.sources.r1.kafka.bootstrap.servers = hadoop120:9092,hadoop121:9092,hadoop122:9092
# 指定消费者组别,防止多个消费者之间引发数据冲突
a1.sources.r1.kafka.consumer.group.id = flume1
# 指定主题名称,这里需要和 MaxWell 指定发送的主题保持一致,否则会采集不到数据
a1.sources.r1.kafka.topics = topic_db

# --------- 配置接收器 ---------
# 指定存储源类型
a1.sinks.k1.type = hdfs
# 动态规划 HDFS 写入路径
a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/

# 当以下值中的其中一个满足时,触发滚动操作,将数据写入到新的文件中(避免小文件过多)
# 根据运行时间判定(s),测试环境调小,开发环境30m-1h
a1.sinks.k1.hdfs.rollInterval = 10 
# 根据数据量大小判定(B),128 MB
a1.sinks.k1.hdfs.rollSize = 134217728
# 根据文件的条数判断,为 0 时表示不依据该参数
a1.sinks.k1.hdfs.rollCount = 0

# 压缩文件
# 指定文件类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream
# 指定数据压缩格式
a1.sinks.k1.hdfs.codeC = gzip

# --------- 配置通道 ---------
# 通道类型
a1.channels.c1.type = file
# 检查点存储路径
a1.channels.c1.checkpointDir = /opt/module/flume-1.10.1/file-channel/checkpoint1
# 用于存储日志文件的目录,多个路径用逗号分隔
a1.channels.c1.dataDirs = /opt/module/flume-1.10.1/file-channel/data1
# 指定允许等待的时间
a1.channels.c1.keep-alive = 6


# --------- 组装 ---------
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume 采集任务

cd $FLUME_HOME

./bin/flume-ng agent -c conf/ -f job_file -n a1

参数解析:

  • ./bin/flume-ngflume-ng 是 Flume 的执行脚本,它用于启动 Flume 的 agent 实例。
  1. agent:这告诉 Flume-ng 启动一个代理实例,也就是一个数据采集和传输任务的执行单元。

  2. -c conf/:指定 Flume 配置文件的目录。

  3. -f job_file:指定 Flume 任务配置文件的参数,其中配置文件包含了数据源、通道、接收器以及数据处理的详细信息。

  4. -n a1:指定代理实例的名称,与配置文件中的对应。

Flume 拦截器

当我们在配置文件中定义了动态参数时,例如上方示例中接收器的配置语句:

a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/

我们设想的是将表名称和年月日进行动态规划,但在未设置拦截器时,这些动态参数值都会被默认为空,如果是系统预定义的参数则为系统设定值。

如下所示:


其中 tableName 是自定义的值,Flume 系统并没有对其进行预定义,所以为空,但 %Y %m %d 这三个值系统默认为当前的日期值,所以不为空。

如果想将上述值设定为希望出现的值,此时便引出了拦截器的概念。通过对拦截器的配置,将采集的数据进行预处理和转换,以满足特定需求。

编写 Flume 拦截器

在 IDEA 中编写拦截器代码,然后打包上传,使用依赖如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.work</groupId>
    <artifactId>intercepted</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>

        <!-- JSON 解析包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>

        <!-- flume 包,不打包该 Jar 包-->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

注意,JDK 版本与平台保持一致。

拦截器实现,用于设定表头与写入日期:

package com.work.flume.interceptor;

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * @author Moon_coder
 * @version 1.0
 * @date 2023/10/29 17:32
 */
public class TableNameAndTimestamp implements Interceptor {

    /**
     * 初始化方法
     */
    @Override
    public void initialize() {

    }

    /***
     * 处理单条数据
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        try{
            // 1.获取头数据
            Map<String, String> headers = event.getHeaders();
            // 2.获取数据内容,将字节数据转换为字符串
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
            // 3.将字符串转换为 JSON 对象
            JSONObject jsonObject = JSONObject.parseObject(log);
            // 4.获取表名
            String table = jsonObject.getString("table");
            // 5.获取时间,我的数据是经 Maxwell 采集的,Maxwell 中的数据是 10 位时间戳,不含毫秒,将数据存入 HDFS 时 *1000
            String ts = jsonObject.getString("ts") + "000";
            // 6.更新头数据信息
            headers.put("tableName",table);
            headers.put("timestamp",ts);
        }catch (JSONException e){
            // 如果不是 JSON 数据,则将该数据定义为脏数据
            return null;
        }
        return event;
    }

    /***
     * 批量数据处理方法
     * @param events
     * @return
     */
    @Override
    public List<Event> intercept(List<Event> events) {
        // 批量处理 event 同时实现过滤功能
        events.removeIf(next -> intercept(next) == null);

        return events;
    }

    /**
     * 关闭方法
     */
    @Override
    public void close() {

    }

    // TODO 返回拦截器类
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TableNameAndTimestamp();
        }

        @Override
        public void configure(Context context) {

        }
    }


}

类名或 Jar 包名称都没有特别要求,自定义即可。

注意: 当我们在往头信息里面放东西时,需要与键名一一对应。

            // 6.更新头数据信息
            headers.put("tableName",table);
            headers.put("timestamp",ts);

如果是自定义的值,名称与 Flume 配置文件设定的必须对应:


如果是系统预定义的值,则需要在官方网站中查询其对应的键名。例如这里出现的 %Y %m %d 这三个值,在接收器的参数定义那里即可查询到(HDFS Sink¶),如下所示:

在这里插入图片描述

这里官方给出了提示,说 对于所有与时间相关的转义序列,事件的标头中必须存在一个关键字为 “timestamp” 的标头,所以在拦截器中对头信息的时间进行操作时,对应的键名为 timestamp

拦截器应用

将打包好的拦截器 Jar 包上传到 Flume 中的 lib 目录下,然后在 Flume 任务配置文件中添加拦截器配置,如下所示:

# --------- 拦截器 ---------
# 拦截器名称
a1.sources.r1.interceptors = i1
# 编写的拦截器全类名 + $Builder 标识符
a1.sources.r1.interceptors.i1.type = com.work.flume.interceptor.TableNameAndTimestamp$Builder

再次执行上方的示例任务,可以看到配置完拦截器后,头信息已经达到了我们预期的结果。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1147587.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于STM32景区人流检测控制系统设计

**单片机设计介绍&#xff0c;1651【毕设课设】基于STM32景区人流检测控制系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序程序文档 六、 文章目录 一 概要 基于STM32的景区人流检测控制系统设计是一种利用STM32微控制器开发的系统&#xff0c;用…

“赋能信创,物联未来” AntDB数据库携高可用解决方案亮相2023世界数字经济大会

10月14日&#xff0c;在2023世界数字经济大会暨京甬信创物联网产融对接会上&#xff0c;AntDB数据库技术总监北陌应邀发表《AntDB国产分布式数据库创新演进与高可用解决方案》主题演讲&#xff0c;就AntDB数据库助力客户数智化升级的高可用信创解决方案进行了详实、真挚地分享&…

前端实现埋点监控

前端实现埋点&监控 实现埋点功能的意义主要体现在以下几个方面&#xff1a; 数据采集&#xff1a;埋点是数据采集领域&#xff08;尤其是用户行为数据采集领域&#xff09;的术语&#xff0c;它针对特定用户行为或事件进行捕获、处理和发送的相关技术及其实施过程。通过埋…

nginx 内存管理(二)

共享内存 共享内存结构与接口定义nginx共享内存在操作系统上的兼容性设计互斥锁锁的结构体锁的一系列操作&#xff08;core/ngx_shmtx.c&#xff09;创建锁 原子操作nginx的上锁操作尝试加锁获取锁释放锁强迫解锁唤醒等待进程 slab共享内存块管理nginx的slab大小规格内存池结构…

ctfshow-web入门命令执行29

29 源代码给了禁用flag 使用tac、nl ?cecho nl f*; ?cecho tac f*; 30 多禁用了system和php 和上题区别不大&#xff0c;使用上一题命令就能解 ?cecho nl f*; ?cecho tac f*; 31 禁用了空格使用%09代替 ?cecho%09tac%09f*; 32 禁用了echo 使用php伪协议 ?cinclud…

不做学习的奴隶,更要注重生活

下面是国外社交软件 i n s ins ins上近 40 40 40万点赞的帖子。 “睡8小时&#xff0c;而不是6小时。 锻炼1小时&#xff0c;而不是4小时。 学习3小时&#xff0c;而不是10小时。 读书2小时&#xff0c;而不是5小时。 深度工作3小时&#xff0c;而不是12小时。 你是人&#xff…

uniapp 模仿 Android的Menu菜单栏

下面这张图就是我们要模拟的菜单功能 一、模拟的逻辑 1. 我们使用uni-popup组件&#xff08;记得要用hbuilder X导入该组件&#xff09;uni-app官网 2. 将组件内的菜单自定义样式 二、uniapp代码 写法vue3 <template><view><uni-popup ref"showMenu"…

设计师在团队协作中的关键角色与策略

作为设计师&#xff0c;团队协作也是日常工作的一部分。在设计团队中&#xff0c;设计师如何参与团队协作&#xff1f;怎样才能更好的发挥自己的价值&#xff0c;顺利推进项目呢&#xff1f; 设计师遇到的协作难题&#xff1f; 首先我们看一下设计师在日常团队协作工作中可能…

C语言实现输入一个字符串,递归将其逆序输出

完整代码&#xff1a; // 输入一个字符串&#xff0c;递归将其逆序输出。如输入 LIGHT&#xff0c;则输出 THGIL #include<stdio.h> #include<stdlib.h> //字符串的最大长度 #define N 20//逆序输出字符串 void func(char *str){if (*str\0){//结尾时直接退出递归…

常见网络攻击及防御方法总结(XSS、SQL注入、CSRF攻击)

网络攻击无时无刻不存在&#xff0c;其中XSS攻击和SQL注入攻击是网站应用攻击的最主要的两种手段&#xff0c;全球大约70%的网站应用攻击都来自XSS攻击和SQL注入攻击。此外&#xff0c;常用的网站应用攻击还包括CSRF、Session劫持等。 1、 XSS攻击 XSS攻击即跨站点脚本攻击&am…

VBA宏查找替换目录下所有Word文档中指定字符串

原来搞质量管理&#xff0c;要替换质量文件里面所有特定名称或者某一错误时&#xff0c;需要逐一打开所有文件&#xff0c;非常麻烦&#xff0c;所以写了个VBA程序。过了这么多年&#xff0c;突然又要做同样的事情&#xff0c;发现新版本Word不支持其中的Application.FileSearc…

python自动化测试(五):按键模拟输入:全选、复制、清空、粘贴、完成

前置条件&#xff1a; 本地部署&#xff1a;ECShop的版本是3.0.0、Google版本是 Google Chrome65.0.3325.162 (正式版本) &#xff08;32 位&#xff09; Google驱动的selenium版本是3.11.0 目录 一、配置代码 二、键盘组合输入 2.1 全选&#xff1a;ctrl a 2.2 复制…

2023上半年系统集成项目管理工程师下午真题

文章目录 一&#xff1a;第5章 项目立项管理。第7章 项目范围管理&#xff0c;需求文件二&#xff1a;第9章 项目成本管理。第8章 项目进度管理&#xff0c;压缩工期三&#xff1a;第15章 信息&#xff08;文档&#xff09;和配置管理四&#xff1a;第18章 项目风险管理&#x…

ELASTICO-A Secure Sharding Protocol For Open Blockchains

INTRO 在中本聪共识中&#xff0c;通过POW机制来公平的选举leader&#xff0c;不仅非常消耗power&#xff0c;并且拓展性也不好。现在比特币中是7 TPS&#xff0c;和其他的支付系统相比效率相差甚远。 当前的许多拜占庭共识协议&#xff0c;并不支持在一个开放的环境中使用&a…

Linux 音频驱动实验

目录 音频接口简介为何需要音频编解码芯片&#xff1f;WM8960 简介I2S 总线接口I.MX6ULL SAI 简介 硬件原理图分析音频驱动使能修改设备树使能内核的WM8960 驱动alsa-lib 移植alsa-utils 移植 声卡设置与测试amixer 使用方法音乐播放测试MIC 录音测试LINE IN 录音测试 开机自动…

探索低代码PaaS平台的优势与选择原因

PaaS是一种云产品&#xff0c;它为应用程序的开发和部署提供基础结构。它提供中间件、开发工具和人工智能来创建功能强大的应用程序&#xff0c;大多数PaaS服务都与存储和网络基础架构捆绑在一起&#xff0c;就像基础架构即服务&#xff08;IaaS&#xff09;一样&#xff0c;可…

在Spring boot中 使用JWT和过滤器实现登录认证

在Spring boot中 使用JWT和过滤器实现登录认证 一、登录获得JWT 在navicat中运行如下sql,准备一张user表 -- ---------------------------- -- Table structure for t_user -- ---------------------------- DROP TABLE IF EXISTS t_user; CREATE TABLE t_user (id int(11) …

css文字竖向排列

div { writing-mode: vertical-rl;text-orientation: upright;font-size: .25rem; //文字大小letter-spacing: 0.1em; //文字间距}

Ubuntu安装ddns-go使用阿里ddns解析ipv6

Ubuntu安装ddns-go 1.何为ddns-go2.安装环境3.获取ddns-go安装包4.解压ddns-go5.安装ddns-go6.配置ddns-go 1.何为ddns-go DDNS-GO是简单好用的DDNS&#xff0c;它可以帮助你自动更新域名解析到公网IP。比如你希望在本地部署网站&#xff0c;但是因为公网IP是动态的&#xff0…

Transformer模型 | Python实现Attention-Transformer时间序列预测

时间序列预测 | Python实现Attention-Transformer时间序列预测 目录 时间序列预测 | Python实现Attention-Transformer时间序列预测基本介绍模型结构程序设计学习总结基本介绍 Python实现Attention-Transformer时间序列预测(TSAT model) 模型结构 main.py :含训练集合测试集的…