Flume学习---3、自定义Interceptor、自定义Source、自定义Sink

news2024/12/23 18:06:29

1、自定义Interceptor

1、案例需求
使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
2、需求分析
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。在该案例中,我们以端口数据模拟日志,以是否包含”zhm”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”zhm”,将其分别发往不同的分析系统(Channel)。
在这里插入图片描述
3、实现步骤
(1)创建一个Maven项目,并引入以下依赖

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor {
 //声明一个存放事件的集合
 private List<Event> addHeaderEvents;
 @Override
 public void initialize() {
 //初始化存放事件的集合
 addHeaderEvents = new ArrayList<>();
 }
 //单个事件拦截
 @Override
 public Event intercept(Event event) {
 //1.获取事件中的头信息
 Map<String, String> headers = event.getHeaders();
 //2.获取事件中的 body 信息
 String body = new String(event.getBody());
 //3.根据 body 中是否有"atguigu"来决定添加怎样的头信息
 if (body.contains("zhm")) {
 //4.添加头信息
 headers.put("type", "first");
 } else {
 //4.添加头信息
 headers.put("type", "second");
 }
 return event;
 }
 //批量事件拦截
 @Override
 public List<Event> intercept(List<Event> events) {
 //1.清空集合
 addHeaderEvents.clear();
 //2.遍历 events
 for (Event event : events) {
 //3.给每一个事件添加头信息
 addHeaderEvents.add(intercept(event));
 }
 //4.返回结果
 return addHeaderEvents;
 }
 @Override
 public void close() {
 }
 public static class Builder implements Interceptor.Builder {
 @Override
 public Interceptor build() {
 return new TypeInterceptor();
 }
 @Override
 public void configure(Context context) {
 }
 }
}

(3)打包,然后将jar包文件上传到Flume中的lib目录下
(4)编辑flume配置文件
为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 
com.zhm.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(5)分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。

## Hadoop103
bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume-source-avro-sink-logger -Dflume.root.logger=INFO,console

##Hadoop104
bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume-source-avro-sink-logger -Dflume.root.logger=INFO,console

##Hadoop102
bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume1-source-netcat-sink-avro 

(6)在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。
在这里插入图片描述

(7)观察 hadoop103 和 hadoop104 打印的日志。
在这里插入图片描述
在这里插入图片描述

2、自定义Source

1、介绍
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling、directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
2、需求
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
在这里插入图片描述
3、需求分析
在这里插入图片描述
4、案例实现
(1)导入 pom 依赖

<dependencies>
 <dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>

(2)编写代码

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements 
Configurable, PollableSource {
 //定义配置文件将来要读取的字段
 private Long delay;
 private String field;
 //初始化配置信息
 @Override
 public void configure(Context context) {
 delay = context.getLong("delay");
 field = context.getString("field", "Hello!");
 }
 @Override
 public Status process() throws EventDeliveryException {
 try {
 //创建事件头信息
 HashMap<String, String> hearderMap = new HashMap<>();
 //创建事件
 SimpleEvent event = new SimpleEvent();
 //循环封装事件
 for (int i = 0; i < 5; i++) {
 //给事件设置头信息
 event.setHeaders(hearderMap);
 //给事件设置内容
 event.setBody((field + i).getBytes());
 //将事件写入 channel
 getChannelProcessor().processEvent(event);
 Thread.sleep(delay);
 }
 } catch (Exception e) {
 e.printStackTrace();
 return Status.BACKOFF;
 }
 return Status.READY;
 }
 @Override
 public long getBackOffSleepIncrement() {
 return 0;
 }
 @Override
 public long getMaxBackOffSleepInterval() {
 return 0;
 }
}

(3)将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
(4)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.zhm.MySource
a1.sources.r1.delay = 1000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)开启任务

cd Flume目录下

 bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

(6)结果展示
在这里插入图片描述

3、自定义Sink

1、介绍
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
2、需求
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
在这里插入图片描述
3、编码

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable 
{
 //创建 Logger 对象
 private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSink.class);
 private String prefix;
 private String suffix;
 @Override
 public Status process() throws EventDeliveryException {
 //声明返回值状态信息
 Status status;
 //获取当前 Sink 绑定的 Channel
 Channel ch = getChannel();
 //获取事务
 Transaction txn = ch.getTransaction();
 //声明事件
 Event event;
 //开启事务
 txn.begin();
 //读取 Channel 中的事件,直到读取到事件结束循环
 while (true) {
 event = ch.take();
 if (event != null) {
 break;
 }
 }
 try {
 //处理事件(打印)
 LOG.info(prefix + new String(event.getBody()) + 
suffix);
 //事务提交
 txn.commit();
 status = Status.READY;
 } catch (Exception e) {
 //遇到异常,事务回滚
 txn.rollback();
 status = Status.BACKOFF;
 } finally {
 //关闭事务
 txn.close();
 }
 return status;
 }
 @Override
 public void configure(Context context) {
 //读取配置文件内容,有默认值
 prefix = context.getString("prefix", "hello:");
 //读取配置文件内容,无默认值
 suffix = context.getString("suffix");
 }
}

4、打包
将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
5、配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
 尚硅谷大数据技术之 Flume 
—————————————————————————————
# Describe the sink
a1.sinks.k1.type = com.zhm.MySink
a1.sinks.k1.suffix = :zhm
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

7、开启任务

 bin/flume-ng agent -c conf/ -n a1 -f job/groupSink/mysink.conf  -Dflume.root.logger=INFO,console

8、结果展示
在这里插入图片描述
在这里插入图片描述

4、额外知识

1、Flume的事务机制
Flume的事务机制(类似数据库的事务机制):Flume使用两个独立的事务分别负责Source到Channel,以及从Channel到Sink的事件传递。
比如Spooling Directory Source为文件的每一行创建一个事件,一旦事务中所以事件全部传递到Channel且提交成功,那么事务将会回滚。且所以得事件都会保持到Channel中,等待重新传递。

2、Flume采集数据会丢失吗?
根据Flume的架构原理,Flume是不可能丢失数据的,其内部有完善的事务机制,Source到Channel是事务性,Channel到Sink是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel储存数据已满,导致Source不再写入,未写入的数据丢失。

Flume不会丢失数据,但是可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接受到响应,Sink会再次发生数据,此时可能会导致·数据的重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/636203.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微信公众平台对接】有关【上传图文消息内的图片获取URL】调用示例

1、微信接口说明&#xff1a; 2、调用示例 /*** 上传图文消息内的图片获取URL** param image* return*/PostMapping("uploadImg")public String uploadImg(MultipartFile image) {return wechatOpenService.uploadImg(image);}/*** 上传图文消息内的图片获取URL* htt…

css魔法:伪元素content内容竟然可以用css函数!

&#x1f33b; 前言 CSS 伪元素用于设置元素指定部分的样式。伪元素中 ::before 和 ::after 是最常用的&#xff0c;它们分别用于在dom元素前/后插入内容&#xff0c;本文内容就是关于 ::before 和 ::after 的 content 内容的一些冷门用法展开的。 一般我们在使用伪元素时&…

基于Java+jsp+servlet的养老院管理系统设计和实现《收藏版》

基于Javajspservlet的养老院管理系统设计和实现《收藏版》 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联系方…

大麦生成链接 大麦生成订单截图 抢票成功截图

一键生成购票链接 一键生成订单截图 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3

微服务工程搭建过程中的注意点

1、父工程pom.xml文件 1&#xff1a;父工程的maven坐标&#xff1b; 2&#xff1a;packaging使用pom&#xff1b; 原因&#xff1a;在Spring Cloud微服务工程中&#xff0c;通常会采用多模块的方式进行开发&#xff0c;父工程的pom文件中的packaging标签设置为pom&#xff0c;是…

操作系统 | 知识梳理 | 复习(上)

目录 &#x1f4da;操作系统概述 &#x1f407;操作系统中的抽象概念 &#x1f4da;准备知识 &#x1f407;中断输入输出 &#x1f407;软件中断 &#x1f407;处理器特权级 &#x1f407;操作系统的结构 &#x1f4da;程序的结构 &#x1f407;运行时视图简介 &…

SQL语句中EXISTS的详细用法大全

SQL语句中EXISTS的详细用法大全 前言一、建表1.在MySQL数据库建表语句2.在ORACLE数据库建表语句 二、在SELECT语句中使用EXISTS1.在SQL中使用EXISTS2.在SQL中使用NOT EXISTS3.在SQL中使用多个NOT EXISTS4.在SQL中使用多个EXISTS5.在SQL中使用NOT EXISTS和EXISTS 三、在DELETE语…

jmeter非gui运行,jtl生成了,但是html报告没有生成

jmeter非gui运行&#xff0c;jtl生成了&#xff0c;但是html报告没有生成&#xff0c;查看log&#xff0c;内容如下&#xff1a; 22:45:00,913 ERROR o.a.j.JMeter: Error generating dashboard: org.apache.jmeter.report.dashboard.GenerationException: Error while proces…

谷歌的passkey是什么?

谷歌的passkey是什么&#xff1f; 谷歌正在研发一种名为“Passkey”的新技术&#xff0c;它将用于用户身份验证。Passkey不同于传统的密码&#xff0c;它采用了硬件加密密钥&#xff08;如安全密钥或生物识别方式&#xff09;以及双因素身份验证等技术&#xff0c;可以更好地保…

微信:把元宇宙装进小程序

作为月活13.09亿的国民级应用&#xff0c;微信的每次小升级都很容易形成现象级。2023开年&#xff0c;微信放大招&#xff0c;试图把元宇宙装进小程序。 微信小程序 XR-FRAME 不久前&#xff0c;微信官方在开放社区贴出了“XR-FRAME”开发指南&#xff0c;这是一套为小程序定制…

RocketMQ 快速入门教程,手把手教教你干代码

目录 RocketMQ定义为什么要用消息中间件&#xff1f;应用解耦流量削峰数据分发 RocketMQ各部分角色介绍NameServer主机(Broker)生产者(Producer)消费者(Consumer)消息(Message) 使用RocketMQ的核心概念主题(Topic)消息队列(Message Queue)分组(Group)标签(Tag)偏移量(Offset) 普…

企业级信息系统开发讲课笔记4.11 Spring Boot中Spring MVC的整合支持

文章目录 零、学习目标一、Spring MVC 自动配置&#xff08;一&#xff09;自动配置概述&#xff08;二&#xff09;Spring Boot整合Spring MVC 的自动化配置功能特性 二、Spring MVC 功能拓展实现&#xff08;一&#xff09;创建Spring Boot项目 - SpringMvcDemo2021&#xff…

老胡的周刊(第094期)

老胡的信息周刊[1]&#xff0c;记录这周我看到的有价值的信息&#xff0c;主要针对计算机领域&#xff0c;内容主题极大程度被我个人喜好主导。这个项目核心目的在于记录让自己有印象的信息做一个留存以及共享。 &#x1f3af; 项目 qrbtf[2] 艺术二维码生成器&#xff1a; qrb…

某学院校园网站的设计与实现(论文+源码)_kaic

摘 要 使用旧方法对冀中工程技师学院网站的信息进行系统化管理已经不再让人们信赖了&#xff0c;把现在的网络信息技术运用在冀中工程技师学院网站的管理上面可以解决许多信息管理上面的难题&#xff0c;比如处理数据时间很长&#xff0c;数据存在错误不能及时纠正等问题。这次…

ajax--XML、AJAX简介、express框架使用、AJAX操作的基本步骤

一、XML&#xff08;可扩展标记语言&#xff09; XML与HTML类似&#xff0c;不同的是HTML中都是预定义标签&#xff0c;而XML中没有预定义标签&#xff0c;全都是自定义标签&#xff0c;用来表示一些数据。 比如有一个学生数据&#xff1a;name“孙悟空”;age18;gender“男”&a…

数字图像处理期末复习习题 SCUEC part3 形态学图像处理专项

1.关于膨胀&#xff0c;腐蚀&#xff0c;开操作&#xff0c;闭操作的证明题 2.腐蚀和膨胀的定义 3.开操作与闭操作的定义 4.击中击不中变换

ubuntu驱动重装

卸载 进入命令行模式 sudo NVIDIA-Linux-x86_64-495.46.run --uninstall安装 进入命令行模式 sudo ./NVIDIA-Linux-x86_64-460.67.run –no-opengl-files –no-x-check –no-nouveau-check选continue installation。 ② 选 NO。 选NO。

源氏木语获得多少个奖项?答案 2023年天猫618淘宝大赢家今日答案与618天猫超级红包怎么领取?

2023年6月12日天猫618淘宝大赢家今日答案 问题&#xff1a;源氏木语获得多少个奖项&#xff1f; 答案&#xff1a;15 2023年淘宝天猫618超级红包怎么领取&#xff1f; 从2023年5月29日开始持续到6月20日&#xff0c;每天都可以打开手机淘宝或天猫&#xff0c;在首页搜索框内…

Open Inventor 2023.1.1 Crack 2022-06-08

Open Inventor 是一组高性能的三维软件开发包&#xff08;SDK&#xff09;&#xff0c;用于医学、计算机辅助设计与工程、石油、天然气和采矿业这些领域中的专业应用。 其面向对象的应用程序编程接口、可拓展架构以及一整套先进庞大的组件为软件开发者提供一个完美的高级平台&…

教育系统和功能设计

慧享教育系统和功能设计 要求&#xff1a; 1. 在需求分析的基础上&#xff0c;确定项目详细功能&#xff1b; 2. 确定每个功能模块的子功能及详细内容并描述&#xff1b; 3. 完成事务设计和应用设计。 操作步骤&#xff1a; 1.系统结构设计及子系统划分 划分系…