Flume 拦截器概念及自定义拦截器的运用

news2025/1/16 7:48:06

文章目录

    • Flume 拦截器
    • 拦截器的作用
    • 拦截器运用
      • 1.创建项目
      • 2.实现拦截器接口
      • 3.编写事件处理逻辑
      • 4.拦截器构建
      • 5.打包与上传
      • 6.编写配置文件
      • 7.测试运行

Flume 拦截器

在 Flume 中,拦截器(Interceptors)是一种可以在事件传输过程中拦截、处理和修改事件的组件。

位于 Source 与 Channel 之间,在写入Channel 之前,拦截器可以对数据进行转换、提取或删除,以满足特定的需求。每个拦截器只处理同一个 Source 接收到的事件,你也可以同时配置多个拦截器,它们会按顺序执行。

拦截器的作用

  • 数据处理和转换: 拦截器可以对事件数据进行处理和转换。例如,可以对原始日志进行解析、过滤、格式化等操作,以便后续处理或存储。

  • 数据增强: 拦截器可以为事件数据添加额外的信息或元数据。例如,可以添加时间戳、主机信息、标签等,以丰富事件数据的内容。

  • 数据过滤: 拦截器可以根据特定条件过滤掉不需要的事件数据,减少数据传输的量或过滤掉无效数据。

  • 监控和日志: 拦截器可以用于监控数据流的运行情况,记录日志信息或统计数据流中的事件数量、处理速率等指标,帮助用户进行性能分析和故障排查。

拦截器运用

1.创建项目

创建一个 Maven 工程项目,引入 Flume 依赖。

在 IDEA 中创建 Maven 项目想必大家都会,这里不再赘述。

根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

无需将该依赖打包进最后的 JAR 包中,故将其作用域设置为 provided

当一个依赖项的 scope 被设置为 compile 时,它将在编译和运行时都可用,并包含在最终的项目包中。而 provided 范围的依赖项仅在编译和测试阶段需要,运行时不包括。

2.实现拦截器接口

创建测试类 TestInterceptor 实现拦截器 Interceptor,注意,导包时不要导错了。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        return null;
    }

    @Override
    public void close() {

    }
}

在上面的代码中,我们实现了 Flume 拦截器接口 Interceptor,并重写了其中的四个方法:

  • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。

  • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。

  • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。

  • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。

3.编写事件处理逻辑

在这里做个简单的事件处理,如果数据中包含字符串 hello 则进行过滤操作,这样我们可以直观感受到拦截器的存在,下面来进行逻辑设计。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        // 创建一个新的列表,存储处理过后的事件
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {

    }
    
}

intercept(List<Event> events) 方法用于对事件列表进行批量处理。这个方法会遍历传入的事件列表,并对每一个事件调用 intercept(Event event) 方法来进行单独处理。

注意,如果只有 intercept(Event event) 方法被重写了,而没有实现 intercept(List<Event> events) 批处理方法,那么在处理事件时会以单个事件的方式进行处理。

在不需要进行初始化和释放资源的情况下,我们可以选择不重写 initializeclose 方法。

4.拦截器构建

在编写完事件处理逻辑后,我们还需要对拦截器进行构建。

在 Flume 中,拦截器的创建和配置通常是通过 Builder 模式来完成的。

在程序中,我们可以定义一个静态内部类 Builder,实现 Interceptor.Builder 接口来对拦截器进行构建,如下所示:

    public static class Builder implements Interceptor.Builder {

        @Override
        public void configure(Context context) {
            // 配置操作,可留空
        }

        @Override
        public Interceptor build() {
            // 返回构建的拦截器类
        }
        
    }

在我们这个案例中,完整的代码如下所示:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {

    }

    // 拦截器构建
    public static class Builder implements Interceptor.Builder {

        @Override
        public void configure(Context context) {

        }

        @Override
        public Interceptor build() {
            return new TestInterceptor();
        }

    }

}

5.打包与上传

将写好的项目进行打包,并上传到集群中,进行测试。

注意,需要将打包好的拦截器包放在 Flume 安装目录下的 lib 文件夹中。

在这里插入图片描述

6.编写配置文件

这里为了验证拦截器的作用,通过一个 Flume 采集案例来进行体现。

如果你不知道如何编写配置文件,可以看我写的这篇文章 —— Flume 配置文件编写技巧(包会的,抄就完了)

这个配置案例是将发送到 HTTP 源中的数据采集到 HDFS 上,将本地文件作为缓冲通道,该配置文件命名为 httpToHDFS.conf

# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost

# 拦截器配置
# 拦截器定义
a1.sources.r1.interceptors = i1
# 拦截器全类名
a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
	
# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data

# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

拦截器全类名配置那里需要注意,格式为 拦截器的全类名 + $Builder

在 IDEA 中获取全类名的方式:右击需要引用的类,依次选择【File——>Copy Path/Reference…——>Copy Reference】即可复制。

你可以根据你的需要对该配置文件进行修改。

7.测试运行

因为我们是将数据采集到 HDFS 上,所以需要先启动 Hadoop,然后再进行操作。

# 运行 Flume
cd $FLUME_HOME

# 注意引用路径,需要修改成你自己的
./bin/flume-ng agent -n a1 -c conf/ -f job/httpToHDFS.conf -Dflume.root.logger=INFO,console

Flume 启动完成后,如下所示:

在这里插入图片描述

我们通过其它窗口,使用 curl 命令向 HTTP 源发送两条模拟数据:

curl -X POST -d '[{"body":"hello body"}]'  http://localhost:5140

curl -X POST -d '[{"body":"HELLO FLUME"}]'  http://localhost:5140

在这里插入图片描述

数据发送完成后,Flume 会采集到该数据,并存储到 HDFS 上。

在这里插入图片描述

通过命令,查看 HDFS 中存储的内容,验证拦截器是否生效:

hdfs dfs -text /flume/events/2024-04-04/1630/00/ev* 

结果如下所示:

在这里插入图片描述

可以看到,我们在上面分别发送了两条数据 hello bodyHELLO FLUME,但最终 HDFS 中只存储了一条数据。

这是因为我们设置的拦截器生效了,它对数据中包含 hello 字符串的事件进行了过滤,故只存储了一条数据。

Flume 拦截器就是起到这样的效果,对数据进行处理、转换、删除等操作,是不是很简单呀。(同学,包会的呀)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1569554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt 学习笔记】Qt的坐标体系

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt的坐标体系 文章编号&#xff1a;Qt 学习笔记 / 11 文章目录 Qt的坐…

ML.NET(二) 使用机器学习预测表情分析

这个例子使用模型进行表情分析&#xff1a; 准备数据&#xff1a; happy,sad 等&#xff1b; using Common; using ConsoleApp2; using Microsoft.ML; using Microsoft.ML.Data; using System.Diagnostics; using static Microsoft.ML.Transforms.ValueToKeyMappingEstimator;…

[C#]OpenCvSharp实现直方图均衡化全局直方图局部直方图自适应直方图

【什么是直方图均衡化】 直方图均衡化是一种简单而有效的图像处理技术&#xff0c;它旨在改善图像的视觉效果&#xff0c;使图像变得更加清晰和对比度更高。其核心原理是将原始图像的灰度直方图从可能较为集中的某个灰度区间转变为在全部灰度范围内的均匀分布。通过这种方法&a…

【接口】HTTP(1)|请求|响应

1、概念 Hyper Text Transfer Protocol&#xff08;超文本传输协议&#xff09;用于从万维网&#xff08;就是www&#xff09;服务器传输超文本到本地浏览器的传送协议。 HTTP协议是基于TCP的应用层协议&#xff0c;它不关心数据传输的细节&#xff0c;主要是用来规定客户端和…

【Linux】第二个小程序--简易shell

请看上面的shell&#xff0c;其本质就是一个字符串&#xff0c;我们知道bash本质上就是一个进程&#xff0c;只不过命令行就是一个输出的字符串&#xff0c; 我们输入的命令“ls -a -l”实际上是我们在输入行输入的字符串&#xff0c;所以&#xff0c;如果我们想要做一个简易的…

vscode开发ESP32问题记录

vscode 开发ESP32问题记录 1. 解决vscode中的波浪线警告 1. 解决vscode中的波浪线警告 参考链接&#xff1a;https://blog.csdn.net/fucingman/article/details/134404485 首先可以通过vscode 中的IDF插件生成模板工程&#xff0c;这样会自动创建.vscode文件夹中的一些json配…

Jackson @JsonUnwrapped注解扁平化 序列化反序列化数据

参考资料 Jackson 2.x 系列【7】注解大全篇三JsonUnwrapped 以扁平的数据结构序列化/反序列化属性Jackson扁平化处理对象 目录 一. 前期准备1.1 前端1.2 实体类1.3 Controller层 二. 扁平化序列反序列化数据2.1 序列化数据2.2 反序列化数据 三. 前缀后缀处理属性同名四. Map数…

RabbitMQ3.7.8集群分区(脑裂现象)模拟及恢复处置全场景测试

测试环境准备: MQ服务器集群地址&#xff0c;版本号为3.7.8&#xff1a; 管理控制台地址:http://173.101.4.6:15672/#/queues 集群状态 rabbitmqctl cluster_status 集群操作相关命令: 创建一个RabbitMQ集群涉及到如下步骤&#xff1a; 安装RabbitMQ&#xff1a; 在每台要在集…

【Linux】Ubuntu 文件权限管理

Linux 系统对文件的权限有着严格的控制&#xff0c;用于如果相对某个文件执行某种操作&#xff0c;必须具有对应的权限方可执行成功&#xff0c;这也是Linux有别于Windows的机制&#xff0c;也是基于这个权限机制&#xff0c;Linux可以有效防止病毒自我运行。因为运行的条件是必…

软件架构复用

1.软件架构复用的定义及分类 软件产品线是指一组软件密集型系统&#xff0c;它们共享一个公共的、可管理的特性集&#xff0c;满足某个特定市场或任务的具体需要&#xff0c;是以规定的方式用公共的核心资产集成开发出来的。即围绕核心资产库进行管理、复用、集成新的系统。核心…

【随笔】Git 高级篇 -- 相对引用2(十三)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

HTML:框架

案例&#xff1a; <frameset cols"5%,*" ><frame src"left_frame.html"><frame src"right_frame.html"> </frameset> 一、<frameset>标签 <frameset>标签&#xff1a;称为框架标记&#xff0c;将一个HTML…

Linux 学习之路 - 进程篇 - PCB介绍1-标识符

目录 一、基础的命令 <1> ps axj 命令 <2> top 命令 <3> proc 目录 二、进程的标识符 <1>范围 <2>如何获取标识符 <3>bash进程 三、创建进程 一、基础的命令 前面介绍了那么多&#xff0c;但是我们没有观察到进程相关状态&#x…

什么是智慧公厕?智慧旅游下的智慧公厕功能和特点

智慧旅游下的智慧公厕功能和特点&#xff1f;智慧旅游是景区、公园、游乐场、文化场馆等领域的一种信息化解决方案&#xff0c;智慧公厕是智慧旅游极为重要的一部分&#xff0c;能大大提升游客满意度。智慧公厕采用物联网、互联网、大数据、云计算等技术&#xff0c;实现旅游景…

深入浅出 -- 系统架构之微服务架构选型参考图

技术选型架构图 是一个用于展示项目中所采用的各种技术和组件之间关系的图表。 它通常包括以下几个部分&#xff1a; 1. 项目名称和描述&#xff1a;简要介绍项目的背景和目标。 2. 技术栈&#xff1a;列出项目中使用的主要技术和工具&#xff0c;如编程语言、框架、数据库…

Unity开发一个FPS游戏之三

在前面的两篇博客中&#xff0c;我已实现了一个FPS游戏的大部分功能&#xff0c;包括了第一人称的主角运动控制&#xff0c;武器射击以及敌人的智能行为。这里我将继续完善这个游戏&#xff0c;包括以下几个方面&#xff1a; 增加一个真实的游戏场景&#xff0c;模拟一个废弃的…

[C#]OpenCvSharp利用MatchTemplate实现多目标匹配

【效果展示】 原图 模板图 匹配结果&#xff1a; 【实现部分代码】 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using…

Flutter仿Boss-4.短信验证码界面

效果 简述 在移动应用开发中&#xff0c;处理短信验证码是确保用户身份验证和安全性的重要步骤。本文将介绍如何使用Flutter构建一个短信验证码界面&#xff0c;让用户输入通过短信发送到他们手机的四位验证码。 依赖项 在这个项目中&#xff0c;我们将使用以下依赖项&#…

关于Tomcat双击startup.bat 闪退的解决⽅法

详解Tomcat双击startup.bat 闪退的解决⽅法 作为⼀个刚学习Tomcat的程序猿来说&#xff0c;这是会经常出现的错误。 1.环境变量问题 1.1 ⾸先需要确认java环境是否配置正确&#xff0c;jdk是否安装正确 winR打开cmd&#xff0c;输⼊java 或者 javac 出现下图所⽰就说明jdk配置正…

单元测试 mockito(二)

1.返回指定值 2.void返回值指定插桩 3.插桩的两种方式 when(obj.someMethod()).thenXxx():其中obj可以是mock对象 doXxx().wien(obj).someMethod():其中obj可以是mock/spy对象 spy对象在没有插桩时是调用真实方法的,写在when中会导致先执行一次原方法,达不到mock的目的&#x…