Flume自定义拦截器 - ETL拦截器和分类拦截器

news2024/11/15 21:57:58

水善利万物而不争,处众人之所恶,故几于道💦

目录

一、拦截器(Interceptor)和选择器(Selector)

    拦截器(Interceptor)

    选择器(Selector)

二、自定义拦截器实现步骤:
三、编程实现:

    ETL拦截器

    分类拦截器


一、拦截器(Interceptor)和选择器(Selector)

拦截器(Interceptor)

  位于Source和Channel之间

在这里插入图片描述  我们这里配置的第一个拦截器是ETL拦截器,先对数据进行轻度过滤,然后是分类拦截器,将不同类型的数据向header中添加不同的Value。

  然后配置Source Selector将数据分别发送到不同的Kafka Channel中(也就是指定的Kafka topic中),因为Kafka Channel中的数据是直接存到Kafka中,省去了Sink,所以效率更高。


选择器(Selector)的类型:

  replicating:默认这种类型,会将source发过来的events发往所有的channel
  multiplexing:可以发送到不同的channel


Selector加深理解:

  在Flume中,Selectors是组件之间的接口,用于确定应该将事件从哪个组件移动到哪个组件。通常有Source Selector、Channel Selector和Sink Selector。其中,Source Selector与Event Driven Source一起使用来选择源发送给Channel事件的方式;Channel Selector与Multiple Sink等组件一起使用,根据内部策略选择将事件路由到哪个Sink组件。Sink Selector可用于多种模式,例如Load Balancing Sink Processor,它可以将事件均匀地分配到不同的Sinks,或者Backup Sink Processor,它可以实现容错处理,在主Sink失败时自动切换到备份Sink。

  在源头(Source)中使用的Selector被称为Source Selector,是用来决定哪些事件应该被发送到Channel中的。通常情况下,Source对象向其分配的所有Channel发送相同的事件流。然而,当您有多个Channel并且需要区分出仅适用于其中某一个Channel时,就需要使用Source Selector了。例如,Kafka Channel时就会用到Source Selector,以便将事件流按照特定方式路由到指定的Kafka topic

  与此不同,通道(Channel)中使用的Selectors称为Channel Selector,用于选择会被传输到单个或多个配置的下游组件的事件。在Channel Selector中,根据预定义的规则和算法确定将事件发送到哪个Sink, 来优化 Flume 的性能。


二、自定义拦截器实现步骤:

  1. 实现Interceptor接口
  2. 重写四个方法:
      ● 初始化
      ● 单event,多event
      ● 关闭
      ● 创建静态内部类 - Builder
  3. 打包上传
      将打包后的jar包上传到Flume的lib目录下

三、编程实现:

新建maven工程,向pom.xml文件添加如下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <!--用于编译Java代码,将源代码编译成目标字节码,并生成class文件。这里使用的版本是2.3.2,指定了编译器的源版本和目标版本都是1.8。-->
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <!--用于将当前模块及其所有依赖打包成一个可执行的JAR文件,
            其中使用了descriptorRef为"jar-with-dependencies"的描述符来实现依赖包的合并,它在Maven打包期间会自动将相关的依赖项打包进去。-->
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

项目目录结构如下:

在这里插入图片描述

ETL拦截器:

  对日志进行轻度的过滤,先将event转换为String,然后利用LogUtils类中的验证逻辑进行验证。

LogETLInterceptor

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 将event 转换为string 方便处理
        byte[] body = event.getBody();

        String log = new String(body, Charset.forName("UTF-8"));

        if(log.contains("start")){
            // 清洗启动日志
            if(LogUtils.vaildateStart(log)){
                return event;
            }

        }else{
            // 清洗事件日志
            if(LogUtils.vaildateEvent(log)){
                return event;
            }
        }

        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        // 遍历event
        for (Event event : events) {
            // 调用上面的单event方法进行清洗
            Event intercept1 = intercept(event);

            if(intercept1 != null){   //因为单event返回的结果有null,所以这里要判空一下
                interceptors.add(intercept1);
            }
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    // 静态内部类    Builder 这个静态内部类的类名不要变,也就是说,这个类的定义是死的,重写方法后new一个实体返回就行了
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            // new 一个自己
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

分类拦截器:

  是为了将启动日志和事件日志发往不同的topic,所以将拦截到的event判断是否含有start(也就是是否为启动日志)然后往headers里面添加K-V,这个K-V不是随便写的,是你flume配置文件里面interceptor那块定义的

在这里插入图片描述

LogTypeInterceptor

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 去除body数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 取出header
        Map<String, String> headers = event.getHeaders();

        if(log.contains("start")){
            headers.put("topic","topic_start");
        }else{
            headers.put("topic","topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> resultEvents = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);
            // 不用判断因为只是添加了一个标记
            resultEvents.add(intercept1);
        }
        return resultEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

日志工具类:

  首先对传过来的日志进行判空,然后判断是否以大括号开头和结尾(json完整性判断);事件日志也一样,先进行判空,然后根据分隔符分割,判断长度是否为2,然后判断服务器时间是否全为数字以及json的完整性。

LogUtils

public class LogUtils {
    public static boolean vaildateStart(String log) {

        if(log == null){
            return false;
        }

        // 是否是大括号开头和结尾,不是的话就干掉
        if(!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }

        return true;
    }

    public static boolean vaildateEvent(String log) {

        if(log == null){
            return false;
        }

        // 时间 | json
        // 切割
        String[] logConents = log.split("\\|");  //正则表达式中 \| 表示 | ,所以要以|分隔的话就转义一下 \\|

        // 判断长度
        if(logConents.length != 2){
            return false;
        }

        // 判断服务器时间  长度和都是数字,工具类,不等于13位和不全是数字就干掉
        if(logConents[0].length() != 13 || !NumberUtils.isDigits(logConents[0])){
            return false;
        }

        // 判断json完整性
        if(!logConents[1].trim().startsWith("{") || !logConents[1].trim().endsWith("}")){
            return false;
        }

        return true;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/623764.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习 | 深度学习】Colab是什么?以及如何使用它?

文章目录 一、介绍二、如何使用 Colaboratory 创建代码三、实例测试 一、介绍 Colaboratory&#xff08;简称为Colab&#xff09;是由Google开发的一种基于云端的交互式笔记本环境。它提供了免费的计算资源&#xff08;包括CPU、GPU和TPU&#xff09;&#xff0c;可让用户在浏…

本地部署gitlab学习git使用

文章目录 前言一、安装gitlab二、nginx反向代理三、本地配置hosts&#xff0c;自定义域名四、配置gitlab独立ngxin实现域名访问五、其他总结 前言 最近想学习git使用了&#xff0c;在本地部署一个gitlab社区版玩玩吧~ gitlab只能部署在liunx系统上面&#xff0c;可以使用云服务…

TLD2314EL-ASEMI代理英飞凌汽车芯片TLD2314EL

编辑&#xff1a;ll TLD2314EL-ASEMI代理英飞凌汽车芯片TLD2314EL 型号&#xff1a;TLD2314EL 品牌&#xff1a;Infineon(英飞凌) 封装&#xff1a;SSOP-14-EP-150mil 特性&#xff1a;LED驱动、汽车芯片 宽温度范围&#xff1a;-40C~150C 封装&#xff1a;SSOP-14&…

虚拟云网络系列 | 如何将 NSX NVDS 迁移到 VDS

1.NVDS 迁移到 VDS 的主要原因 在早期的 vsphere6.7 的版本上安装 NSX-T 采用的都是 NVDS&#xff0c;而随着 NSX 版本的升级&#xff0c;从 NSX 4.0 开始&#xff0c;NSX 已经不在支持在 ESXi 上部署 NVDS&#xff0c;仅能使用 vsphere7.0 上的 VDS。所以&#xff0c;对于早期…

linux系统编程-----下

linux网络编程 tcp通信 Berkeley Socket TCP/IP协议族标准只规定了网络各个层次的设计和规范&#xff0c;具体实现则需要由各个操作系统厂商完成。最出名的网络库由BSD 4.2版本最先推出&#xff0c;所以称作伯克利套接字&#xff0c;这些API随后被移植到各大操作系统中&…

Android Key Hash生成

在接入FaceBook 安卓第三方登录的时候&#xff0c;就需要获取Debug Android Hash Key。 Android Hah Key有两种&#xff0c;即开发密钥散列和发布密钥散列 获得散列值需要借助openssl工具。 下载并配置openssl 1、下载 到https://code.google.com/archive/p/openssl-for-win…

校验表格里的表单

<template><el-dialogtitle"收货地址":visible.sync"dialogFormVisible">{{ form }}<el-formref"form":model"form":rules"rules"label-width"100px"><el-form-itemlabel"活动名称&quo…

Roop:单图离线版软件包及使用方法!

你们要的“单图换脸”离线一键运行版来了。Roop发布几十个小时后&#xff0c;马不停蹄地搞了Colab在线版。其实这东西都挺好的&#xff0c;又快又方便&#xff0c;几乎没有任何硬件要求&#xff0c;点一点就可以搞定了。但是它有一个问题&#xff0c;就是没有“魔法” 就没法使…

国际电商网站APP开发-国际电商网站,跨境方案

跨境电商一种在国际贸易中进行电子商务的策略。它涉及到在线销售产品或服务给海外消费者&#xff0c;通常涉及到国际支付、物流和海外市场营销的问题。以下是一些跨境电商方案的例子&#xff1a; 跨境电商平台&#xff1a;建立自己的跨境电商平台&#xff0c;提供海外消费者便捷…

Docker安装达梦(DM)关系型数据库,DBeaver远程连接使用数据库

Docker安装达梦&#xff08;DM&#xff09;关系型数据库 首先你得去达梦数据库官网注册一个账号。 下载数据库部署包 官网&#xff1a;https://www.dameng.com/ 然后找到需要的数据库&#xff1a; 官网试用地址&#xff1a;https://eco.dameng.com/tour/?source_urlht…

ansible使用剧本操作硬盘

在一个节点添加一块20G的硬盘 通过ansible剧本判断是否存在第二块硬盘&#xff0c;且硬盘的大小大于10G 满足条件&#xff1a; 在此硬盘创建一个分区&#xff0c;大小为10G 使用此分区创建一个卷组 从此卷组中创建一个逻辑卷 将此逻辑卷格式化为xfs 将此逻辑卷挂载至/mountdir目…

上海28岁程序员失业,感叹:测试估计没戏了,想去卖点煎饼果子养家~

程序员危机&#xff0c;似乎是一个跨不过去的坎&#xff0c;最近&#xff0c;在职场论坛上看到了一位魔都程序员被裁的帖子&#xff0c;现在因为“互联网寒冬”不少程序员优化被裁。 帖子具体内容如下&#xff1a;因为疫情&#xff0c;老大哥所在部门被砍掉了&#xff0c;部门所…

科技项目验收测试报告获取有什么注意事项?作用都有哪些?

当科技项目通过测试并准备交付验收时&#xff0c;需要编写科技项目验收测试报告。科技项目验收测试报告是项目验收的重要部分&#xff0c;是对项目质量的一种客观证明。获取科技项目验收测试报告需要注意什么呢?本文从专业角度探讨这个话题&#xff0c;并介绍验收测试报告的作…

【数据分享】1929-2022年全球站点的逐日降水量(Shp\Excel\12000个站点)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;说到常用的降水数据&#xff0c;最详细的降水数据是具体到气象监测站点的降水数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2022年全…

069:cesium围绕一个固定点自动左右旋转

第069个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中设置一个固定点为中心点,通过lookAtTransform来固化点,通过监听clock,来设置自动旋转。 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共79行)相关…

网络安全怎么入行?有哪些误区需要避免?

目录 一、学习网络安全容易造成的误区 二、学习网络安全的基本准备与条件 三、网络安全学习路线 第一步&#xff1a;计算机基础 第二步&#xff1a;编程能力 第三步&#xff1a;安全初体验 第四步&#xff1a;分方向 怎么入门&#xff1f; 四、明确目标&#xff0c;定…

硬核科普:“画”说业界首个算网大脑

数字经济时代 算力已经成为核心生产力 中国移动提出“算力网络”全新理念 创新构建“连接算力能力” 新型信息服务体系 作为数字中国建设的国家队、主力军 中国移动布局算力网络的先锋队 移动云依托集团运营商禀赋优势 构建“4N31X”分布式云资源布局 为推动算力一点接入…

python 第四章 字符串str

系列文章目录 第一章 初识python 第二章 变量 第三章 基础语句 文章目录 4.1认识字符串字符串特征 4.2字符串输出4.3字符串输入4.4下标4.5切片4.6常用操作方法查找修改修改大小写转换字符串对齐删除空白字符判断 4.1认识字符串 字符串是 Python 中最常用的数据类型。我们一般使…

7min 到 40s:SpringBoot 优化居然可以玩出这么多花样

背景 耗时问题排查 观察 SpringBoot 启动 run 方法 监控 Bean 注入耗时 优化方案 如何解决扫描路径过多&#xff1f; 如何解决 Bean 初始化高耗时&#xff1f; 新的问题 SpringBoot 自动化装配&#xff0c;让人防不胜防 使用 starter 机制&#xff0c;开箱即用 背景 公…

我的GIT练习TWO

目录 前言 GIT安装教程 Git作者 GIT优点 GIT缺点 为什么要使用 Git GIT练习TWO ​编辑 总结 前言 Git 是一个分布式版本控制及源代码管理工具;Git 可以为你的项目保存若干快照&#xff0c;以此来对整个项目进行版本管理 GIT安装教程 点击进入查看教程&#xff1a;点击进入…