Idea中flume的Interceptor的编写教程

news2025/3/15 4:23:31

1.新建-项目-新建项目

注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找

2. 在maven项目中导入依赖

Pom.xml文件中写入

<dependencies>

        <dependency>

            <groupId>org.apache.flume</groupId>

            <artifactId>flume-ng-core</artifactId>

            <version>1.9.0</version>

        </dependency>

    </dependencies>

3.创建包(scr-main-java右键-新建-软件包)

4.创建Java类(右键包名-新建-java类)

5. 继承(implements)flume 的拦截器接口

//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;

    //此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类

6.实现方法

public class MyInterceptor implements Interceptor {

    @Override

    //初始化方法

    public void initialize() {


    }

//单个事件拦截

//需求:在event的头部信息中添加标记

 //提供给channel selector 选择发送给不同的channel



    @Override

public Event intercept(Event event)

       //Map也需要alt + enter 导入

        Map<String, String> headers = event.getHeaders();

        //输入even.getHeaders().var回车即可自行填充等号前面的变量信息

        String log = new String(event.getBody());

        //envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型

        // 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名

        //判断log开头的第一个字符,字母则发到channel1,数字则发到channel2

        char c = log.charAt(0);

        //log.charAt(0).var回车即可自行填充等号前面的变量信息

        if(c >= '0' && c <= '9'){

            headers.put("type","number");

        }else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){

            // 注意字符串类型要使用>=需要用单引号而不能用双引号

            headers.put("type","letter");

        }

        //因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法   

        event.setHeaders(headers);

        //返回event

        return event;

    }

    //批量事件拦截(处理多个event,系统调用这个方法)

    @Override

    public List<Event> intercept(List<Event> list) {



        for (Event event : list){

            intercept(event);

        }

        return list;

    }



//重写静态内部类Builder

    @Override

    public void close() {


}

public static class  Builder implements Interceptor.Builder{

       //创建一个拦截器对象

        @Override

        public Interceptor build() {

            return new MyInterceptor();

        }

       //配置方法

        @Override

        public void configure(Context context) {


        }

    }

}

7.打包(idea右侧菜单栏maven-生命周期-package)

打包完成在idea左侧菜单栏 target 中可以看到我们的包

8.将建好的包复制到flume家目录下的lib中即可使用

cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib

9.测试

 9.1 编辑 flume 配置文件
       vim flume1.conf

# agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# channel selector: multiplexing 多路复用 ;默认为replicating 复制

a1.sources.r1.selector.type = multiplexing

# 填写相应inerceptor的header上的key

a1.sources.r1.selector.header = type

# 分配不同value发送到的channel,number到c2,letter到 c1

a1.sources.r1.selector.mapping.number = c2

a1.sources.r1.selector.mapping.letter = c1

#如果匹配不上默认选择的channel

a1.sources.r1.selector.default = c2

#interceptor

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node1

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = node1

a1.sinks.k2.port = 4546

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 c2

# 接收c1中的数据

a1.sinks.k1.channel = c1

# 接收c2中的数据

a1.sinks.k2.channel = c2

   vim flume2.conf

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = avro

a2.sources.r2.bind = node1

# flume1 中sink的输出端口

a2.sources.r2.port = 4545

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

vim flume3.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = avro

a3.sources.r3.bind = node1

# flume1 中sink的输出端口

a3.sources.r3.port = 4546

# Describe the sink

a3.sinks.k3.type = logger

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

9.2测试

       打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程

第四个窗口启用necat,输入内容进行测试

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3

nc nc node1 44444  (flume1.conf中 source 填的主机名或IP地址 和端口号)

第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接

在窗口4中输入数字、字母、符号

分别在窗口二看到输出字母,窗口三输出数字和符号

恭喜,Interceptor起作用!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1695262.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

渲染管线——应用阶段

知识必备——CPU和GPU 应用阶段都做了什么 应用阶段为渲染准备了什么 1.把不可见的数据剔除 2.准备好模型相关数据&#xff08;顶点、法线、切线、贴图、着色器等等&#xff09; 3.将数据加载到显存中 4.设置渲染状态&#xff08;设置网格需要使用哪个着色器、材质、光源属性等…

【NumPy】NumPy性能优化与内存管理:解锁高效编程的高级策略

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

《Ai学习笔记》自然语言处理 (Natural Language Processing):常见机器阅读理解模型(上)02

Glove 词向量&#xff1a; 在机器理解中的词的表示&#xff1a; 词袋&#xff08;bow,bag of words&#xff09; one-hot 词向量 word2vec glove 目的&#xff1a;将一个词转换成一个向量 Word2vec 是一种用于生成词向量的工具包&#xff0c;由Google在2013年开源推出…

HMI设计:再谈上位机与下位机,附海量案例图

上期回顾&#xff1a;HMI界面之&#xff1a;上位机界面设计&#xff0c;一文扫盲 一、上位机负责控制和决策&#xff0c;下位机负责采集和执行 上位机和下位机是两个概念&#xff0c;通常用于描述计算机系统中不同层次的设备或组件。 上位机&#xff08;Host Computer&#x…

vue3 vite动态根据字符串加载组件

1 原理 import.meta.glob() 其实不仅能接收一个字符串&#xff0c;还可以接收一个字符串数组&#xff0c;就是匹配多个位置 let RouterModules import.meta.glob(["/src/view/*/*.vue", "/src/view/*.vue"]);这样我们就拿到了相对路劲的组件对象&#xf…

【学习笔记】Windows GDI绘图(五)图形路径GraphicsPath详解(上)

文章目录 图形路径GraphicsPath填充模式FillMode构造函数GraphicsPath()GraphicsPath(FillMode)GraphicsPath(Point[],Byte[])和GraphicsPath(PointF[], Byte[])GraphicsPath(Point[], Byte[], FillMode)和GraphicsPath(PointF[], Byte[], FillMode)PathPointType 属性FillMode…

最新版npm详解

如&#xff1a;npm中搜索 jQuery image.png image.png 接地气的描述&#xff1a;npm 类似于如下各大手机应用市场 image.png image.png 查看本地 node 和 npm 是否安装成功 image.png image.png 或 npm install -g npm image.png image.png image.png image.png image.…

Spring Boot集成Picocli快速入门Demo

1.什么是Picocli&#xff1f; Picocli是一个单文件命令行解析框架&#xff0c;它允许您创建命令行应用而几乎不需要代码。使用 Option 或 Parameters 在您的应用中注释字段&#xff0c;Picocli将分别使用命令行选项和位置参数填充这些字段。使用Picocli来编写一个功能强大的命…

16.线性回归代码实现

线性回归的实操与理解 介绍 线性回归是一种广泛应用的统计方法&#xff0c;用于建模一个或多个自变量&#xff08;特征&#xff09;与因变量&#xff08;目标&#xff09;之间的线性关系。在机器学习和数据科学中&#xff0c;线性回归是许多入门者的第一个模型&#xff0c;它…

蓝桥杯Web开发【大学组:省赛】2022年真题

1.水果拼盘 目前 CSS3 中新增的 Flex 弹性布局已经成为前端页面布局的首选方案&#xff0c;本题可以使用 Flex 属性快速完成布局。 1.1 题目问题 建议使用 flex 相关属性完成 css/style.css 中的 TODO 部分。 禁止修改圆盘的位置和图片的大小。相同颜色的水果放在相同颜色的…

根据Depth Quality Tool的z轴误差值确认相机是否需要进行相机内参校准

下载Depth Quality Tool深度质量验证工具 网盘链接【RealSense SDK v2.55.1】 链接&#xff1a;https://pan.baidu.com/s/1NrlbwNDBUL8wpWfVwbpMwA?pwd2jl0 提取码&#xff1a;2jl0 打开Depth Quality Tool深度质量验证工具 找一面墙作为目标&#xff0c;将摄像头水平对准墙…

ISCC 2024|Misc

FunZip ISCC{xoMjL8NuYRRb} Number_is_the_key ISCC{Sanoyq6qGIPF} 精装四合一 四张图片尾部都存在多余数据&#xff0c;把多余数据分别提取出来保存成文件&#xff0c;未发现规律。根据提示&#xff0c;预计需要将四部分多余数据进行合并。提取四个部分前16个字节&#xff0…

Golang并发编程-协程goroutine的信道(channel)

文章目录 前言一、信道的定义与使用信道的声明信道的使用 二、信道的容量与长度三、缓冲信道与无缓冲信道缓冲信道无缓冲信道 四、信道的初体验信道关闭的广播机制 总结 前言 Goroutine的开发&#xff0c;当遇到生产者消费者场景的时候&#xff0c;离不开 channel&#xff08;…

C语言 | Leetcode C语言题解之第97题交错字符串

题目&#xff1a; 题解&#xff1a; bool isInterleave(char* s1, char* s2, char* s3) {int n strlen(s1), m strlen(s2), t strlen(s3);int f[m 1];memset(f, 0, sizeof(f));if (n m ! t) {return false;}f[0] true;for (int i 0; i < n; i) {for (int j 0; j &l…

Java进阶学习笔记12——final、常量

final关键字&#xff1a; final是最终的意思。可以修饰类、方法、变量。 修饰类&#xff1a;该类就被称为最终类&#xff0c;特点是不能被继承了。 修饰方法&#xff1a;该方法是最终方法&#xff0c;特点是不能被重写了。 修饰变量&#xff1a;该变量只能被赋值一次。 有些…

mybatis-plus 优雅的写service接口中方法(3)

多表联查 上文讲过了自定义sql &#xff0c;和wrapper的使用&#xff0c;但是我们可以发现 我们查询的都是数据库中的一张表&#xff0c;那么怎么进行多表联查呢&#xff0c;当然也是用自定义sql来进行实现 比如说 查询 id 为 1 2 4 的用户 并且 地址在北京 的 用户名称 普…

告诉老板,AI大模型应该这样部署!

导语 随着大语言模型创新的快速步伐&#xff0c;企业正在积极探索用例并将其第一个生成式人工智能应用程序部署到生产中。 随着今年LLM或LLMOps的部署正式开始&#xff0c;企业根据自己的人才、工具和资本投资结合了四种类型的LLM部署方法。请记住&#xff0c;随着新的 LLM 优…

第199题|关于函数的周期性问题|函数强化训练(六)|武忠祥老师每日一题 5月24日

解题思路&#xff1a;解这道题我们要用到下面这个结论 f(x)连续&#xff0c;以T为周期时&#xff0c;原函数以T为周期的充分必要条件是&#xff1a; (A) sin x显然是以π为周期的&#xff0c;我们可以看到并不等于0,根据结论&#xff0c;A的原函数显然不是周期函数。 (B) 的…

Linux|如何在 awk 中使用流控制语句

引言 当您从 Awk 系列一开始回顾我们迄今为止介绍的所有 Awk 示例时&#xff0c;您会注意到各个示例中的所有命令都是按顺序执行的&#xff0c;即一个接一个。但在某些情况下&#xff0c;我们可能希望根据某些条件运行一些文本过滤操作&#xff0c;这就是流程控制语句的方法。 …

Windows VS2022 C语言使用 sqlite3.dll 访问 SQLite数据库

今天接到一个学生C语言访问SQLite数据库的的需求: 第一步,SQLite Download Page下载 sqlite3.dll 库 下载解压,发现只有两个文件: 于是使用x64 Native Tools Command Prompt 终端 生成 sqlite3.lib 和 sqlite3.exp文件 LIB -def:sqlite3.def -out:sqlite3.lib -machin…