Flume系列:Flume 自定义Interceptor拦截器

news2024/11/27 6:43:05

目录

Apache Hadoop生态-目录汇总-持续更新

1:Interceptor拦截器的使用场景

 2:Interceptor拦截器在采集流程中的位置

3:自定义Interceptor拦截器

pom.xml

拦截器java代码

打包上传

4:使用自定义的拦截器

方式一:把jar包放到flume-1.9.0/lib下

方式二:把jar包放到项目下


Apache Hadoop生态-目录汇总-持续更新

系统环境:centos7

Java环境:Java8

官方文档:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors

1:Interceptor拦截器的使用场景

        1)对source端的数据进行(简单)清洗,去除无用,不符合要求的数据

        2) 对source端的数据校正,比如使用数据时间 代替 采集时间

        3) 对source端的数据分类,不同类型的日志可能发送到不同的分析系统。

 2:Interceptor拦截器在采集流程中的位置

 

 

 Multiplexing 结构根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel,需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

3:自定义Interceptor拦截器

pom.xml

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <flume.version>1.9.0</flume.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>${flume.version}</version>
        <scope>provided</scope>   <!--provided 不打入jar包, 服务器上有-->
    </dependency>

    <!--fastjson用于解析json的-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

拦截器java代码

实现功能:ETL数据清洗,判断数据是否完整, 解决零点漂移的问题
package com.wester.flume.interceptor2;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wester.flume.interceptor.JSONUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * ETL数据清洗,判断数据是否完整, 解决零点漂移的问题
 * @author wester
 * @create 2022-12-05 14:26
 */
public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    /**
     * 处理flume单条数据
     * 取数据,进行校验,是否是json, 是否缺少数据
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        // 获取头信息
        Map<String, String> headers = event.getHeaders();

        // 获取内容信息
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        // 判断内容是否是标准json
        if (JSONUtils.isJSONValidate(log)){
            // 解决零点漂移的问题,用数据里的时间覆盖headers里的timestamp, 主要是sink hdfs
            JSONObject jsonObject = JSON.parseObject(log);
            String ts = jsonObject.getString("ts");
            // 将ts付给headers里的timestamp
            headers.put("timestamp", ts);
            return event;
        }
        return null;
    }

    // 处理flume多条数据
    @Override
    public List<Event> intercept(List<Event> list) {
        // 循环每一条,如果发现不合格的数据就删掉
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()){
            Event next = iterator.next();
            if (intercept(next) == null){
                // 发现不合格的数据就删掉
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包上传

打包得到jar包:project_v4_flume.jar

4:使用自定义的拦截器

方式一:把jar包放到flume-1.9.0/lib下

1)打包放到/usr/local/flume-1.9.0/lib目录下
2)复制拦截器全类名
com.wester.flume.interceptor2.ETLInterceptor

3)定义flume采集job时直接使用
# 定义source
file_flume_kafka.sources.r1.type = TAILDIR
....
file_flume_kafka.sources.r1.interceptors = i1
  # 配置拦截器,全类名+$Builder
file_flume_kafka.sources.r1.interceptors.i1.type = com.wester.flume.interceptor2.ETLInterceptor$Builder

4) 启动flume-ng采集
因为把jar包上传到flume的lib下,所以flume-ng启动时不需要指定
flume-ng agent --name file_flume_kafka --conf-file /flume2.conf -Dflume.root.logger=INFO,console

方式二:把jar包放到项目下

1)打包放到项目flume_job/jar目录下
2)复制拦截器全类名
com.wester.flume.interceptor2.ETLInterceptor

3)定义flume采集job时直接使用
# 定义source
file_flume_kafka.sources.r1.type = TAILDIR
....
file_flume_kafka.sources.r1.interceptors = i1
  # 配置拦截器,全类名+$Builder
file_flume_kafka.sources.r1.interceptors.i1.type = com.wester.flume.interceptor2.ETLInterceptor$Builder

4) 启动flume-ng采集
jar不在flume的lib下,所以flume-ng启动时需要手动指定
flume-ng agent --name file_flume_kafka --conf-file /flume2.conf -C flume_jobs/jar/project_v4_flume.jar -Dflume.root.logger=INFO,console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/573380.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Springboot +spring security,自定义认证和授权异常处理器

一.简介 在Spring Security中异常分为两种&#xff1a; AuthenticationException 认证异常AccessDeniedException 权限异常 我们先给大家演示下如何自定义异常处理器&#xff0c;然后再结合源码帮助大家进行分析 二.创建项目 如何创建一个SpringSecurity项目&#xff0c;前…

分布式锁和事务关系的细节

使用redssion在redis上以及结合自定义注解利用spring的环绕切面来实现分布式锁功能 代码示例 controller、service层 RequestMapping("insertNumber/{number}/{id}") public boolean insertNumber(PathVariable Long number,PathVariable Long id){return testSer…

rust 中protobuf生成与使用

首先创建一个项目proto 进入到这个文件夹中 创建我们的proto文件 初始化的项目结构是这个样子的 新建一个hello.proto文件内容如下 syntax "proto3";package hello;service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {} }message HelloRequest …

干货 | 师兄手把手教你如何踏上科研道路

Hello&#xff0c;大家好&#xff01; 这里是壹脑云科研圈&#xff0c;我是喵君姐姐&#xff5e; 今天&#xff0c;邀请到鲁小白&#xff0c;给大家分享一下他踏上科研道路的心路历程。 大家好&#xff0c;我是鲁小白&#xff0c;我真正进入科研的时间&#xff0c;研究生3年再…

【C++】类和对象——类的引入、类的访问限定符、类的作用域、类的实例化、类的储存、this指针的引出和特性

文章目录 1.类的引入2.类的访问限定符3.类的作用域4.类的实例化5.类的储存6.this指针6.1this指针的引出6.2this指针的特性 1.类的引入 C是在C的基础上加以扩展。 在C语言中&#xff0c;我们想要让一个类型含有多种成员变量&#xff0c;我们使用结构体&#xff1b;而在C中我们可…

Doris节点扩容及数据表

扩容和缩容 上篇文章简单讲了doris的安装&#xff0c;本章分享的是doris中fe和be节点的扩容缩容以及doris的数据表1、FE 扩容和缩容 使用 MySQL 登录客户端后&#xff0c;可以使用 sql 命令查看 FE 状态&#xff0c;目前就一台 FE mysql -h linux -P 9030 -uroot -p mysql&…

python+django乡村居民数据的可视化平台

本论文主要论述了如何使用Django框架开发一个乡村振兴数据的可视化平台 &#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述乡村振兴数据的可视化平台的当前背景以…

拼多多二面,原来是我对自动化测试的理解太浅了

如果你入职一家新的公司&#xff0c;领导让你开展自动化测试&#xff0c;作为一个新人&#xff0c;你肯定会手忙脚乱&#xff0c;你会如何落地自动化测试呢&#xff1f; 01 什么是自动化 有很多人做了很长时间的自动化但却连自动化的概念都不清楚&#xff0c;这样的人也是很悲…

Android之 MVC到MVVM架构发展和封装

一 简介 1.1 软件架构发展趋势是解耦&#xff0c;即分离数据层和视图层&#xff0c;使得数据层专注于业务的数据和逻辑处理。从而提高代码的可读可编辑效率&#xff0c;提高团队协作能力&#xff0c;项目的生产能力&#xff0c;降低后期维护成本。 1.2 Android架构发展MVC -…

计算机组成原理实验四 微程序控制器实验报告

我班算是几乎最后一个做实验的班级了&#xff0c;报告参考了一些朋友提供的数据加上一些自己的主观拙见&#xff0c;本人水平有限加之制作仓促难免有错误&#xff0c;望大家批评指正。 4.1 微程序控制器实验 一、实验目的 (1) 掌握微程序控制器的组成原理。 (2) 掌握微程序的…

【蓝桥杯计算思维题】少儿编程 蓝桥杯青少组计算思维真题及详细解析第5套

少儿编程 蓝桥杯青少组计算思维真题及详细解析第5套 1、北京冬奥会经历 17( ),中国体育代表团收获的金牌数和奖牌数均创历史新高 A、年 B、月 C、天 D、小时 答案:C 考点分析:主要考查小朋友们对时事的了解,北京冬奥会总共经历了17天,所以答案C 2、下面图形的周长是…

Python系列模块之标准库json详解

感谢点赞和关注 &#xff0c;每天进步一点点&#xff01;加油&#xff01; 目录 一、Json介绍 二、JSON 函数 2.1 json.dumps 2.2 json.loads 2.3 实战案例&#xff1a;钉钉消息发送 一、Json介绍 JSON(JavaScript Object Notation)是一种轻量级的数据交换格式。它使得人们…

2023年21个最佳的Ruby测试框架

作者 | Veethee Dixit 测试人员总是在寻找最好的自动化测试框架&#xff0c;它能提供丰富的功能&#xff0c;并且语法简单、兼容性好、执行速度快。如果你选择将Ruby与Selenium结合起来进行web测试&#xff0c;那么可能需要搜索基于Ruby的测试框架进行web应用程序测试。 Ruby…

【Python】函数式编程第二弹

知识目录 一、写在前面✨二、最小公倍数三、移除数字四、总结撒花&#x1f60a; 一、写在前面✨ 大家好&#xff01;我是初心&#xff0c;希望我们一路走来能坚守初心&#xff01; 今天跟大家分享的文章是 Python函数式编程第二弹&#xff0c;再次以两个简单的例子带大家更好…

selenium UI自动化中文件上传的两种方式

前言 文件上传是自动化中很常见的一个功能&#xff0c;那么对于文件上传你又有多少了解呢&#xff1f;请往下看 1、被测产品中文件上传的功能非常普遍&#xff0c;一般情况下需要将准备好的文件放在预定的路径下&#xff0c;然后在自动化测试的脚本中&#xff0c;去预置的路径…

国内可以免费使用的GPT

一、wetab新标签页 教程&#xff1a;https://diwlwltzssn.feishu.cn/docx/MnHhdvxATomBnMxfas2cm8wWnVd 装GPT界面&#xff1a;https://microsoftedge.microsoft.com/addons/detail/wetab%E5%85%8D%E8%B4%B9chatgpt%E6%96%B0%E6%A0%87%E7%AD%BE%E9%A1%B5/bpelnogcookhocnaokfp…

TeX Live和TeX studio安装

最近想要研究一下Letex怎么写论文&#xff0c;然后就查阅资料了解了一下&#xff0c;先安装上两个软件&#xff0c;怎么用在研究研究&#xff0c;这里记录一下软件安装过程&#xff0c;方便以后查阅。 TeX Live和TeX studio安装 Latex介绍TexLive安装下载TexLive的安装包安装Te…

C++知识第三篇之继承

C继承 继承是面向对象编程的重要特征&#xff0c;是对类设计层次的复用 文章目录 C继承一.介绍1.继承定义2.继承方式3.class与struct 二.作用域1.成员变量2.成员函数 三.赋值转换1.给基类对象赋值2.给基类对象指针赋值 四.派生类的默认函数五. 其他1.友元2.静态 六.继承1.单继承…

Android车载学习笔记1——车载整体系统简介

一、汽车操作系统 汽车操作系统包括安全车载操作系统、智能驾驶操作系统和智能座舱操作系统。 1. 安全车载操作系统 安全车载操作系统主要面向经典车辆控制领域&#xff0c;如动力系统、底盘系统和车身系统等&#xff0c;该类操作系统对实时性和安全性要求极高&#xff0c;生态…

VCSA 和ESXi 6.7.0版本升级

1. VCSA升级步骤 1&#xff09;指定升级包的位置 software-packages stage --iso (如果是从vmware下载补丁&#xff0c;使用CD/DVD来映射ISO映像) 或 software-packages stage --url https://vapp-updates.vmware.com/vai-catalog/valm/vmw/8d167796-34d5-4899-be0a-6daade400…