Flume基础知识(十一):Flume自定义接口

news2025/1/11 10:56:11

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予 不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析 系统(Channel)。

3)实现步骤

(1)创建一个 maven 项目,并引入以下依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.atguigu.interceptor;
​
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
​
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
​
/**
 * @author:左泽林
 * @date:日期:2022-01-17-时间:14:47
 * @message:
 */
public class TypeInterceptor implements Interceptor {
​
    /*声明一个集合,用于存放拦截器处理后的事件*/
    private List<Event> addHeaderEvents ;
​
    //初始化
    public void initialize() {
        /*初始化*/
        addHeaderEvents = new ArrayList<Event>();
    }
​
    //处理单个事件
    public Event intercept(Event event) {
​
        //1.获取header & body
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());
​
        //2.根据body中是否包含“atguigu”添加不同的头信息
        if (body.contains("atguigu")){
            headers.put("type","atguigu");
        }else{
            headers.put("type","other");
        }
​
        /*返回数据*/
        return event;
    }
​
    /*批处理事件*/
    public List<Event> intercept(List<Event> events) {
​
        //1. 清空集合
        addHeaderEvents.clear();
​
        /*遍历events*/
        for (Event event : events) {
            addHeaderEvents.add(intercept(event));
        }
​
        /*返回数据*/
        return events;
    }
    
    public void close() {
​
    }
    
    public static class Builder implements Interceptor.Builder{
​
        public Interceptor build() {
            return new TypeInterceptor();
        }
​
        public void configure(Context context) {
            
        }
    }
}
打包,上传到服务器Flume的lib下,Flume会在启动时调用

(3)编辑 flume 配置文件

为 hadoop100 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
#拦截器
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射需要对应代码中的拦截类型,这里就是atguigu、other
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop102 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分别在 hadoop100,hadoop101,hadoop102 上启动 flume 进程,注意先后顺序。

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
[root@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
[root@hadoop100 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf

(5)在 hadoop100 使用 netcat 向 localhost:44444 发送字母和数字。

(6)观察 hadoop101 和 hadoop102 打印的日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1363774.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在群晖NAS上搭建私有部署笔记软件——Blossom

一、Blossom 简介 Blossom 是一个需要私有部署的笔记软件&#xff0c;虽然本身定位是一个云端软件&#xff0c;但你仍然可以在本地部署&#xff0c;数据和图片都将保存在本地&#xff0c;不依赖任何的图床或者对象存储。 Blossom | Blossom (wangyunf.com)https://www.wangyun…

Zoho SalesIQ:提高品牌在社交媒体上参与度的实用指南

在当今快节奏的数字世界中&#xff0c;品牌参与度变得比以往任何时候都更加重要。社交媒体在企业与客户互动方面发挥着至关重要的作用&#xff0c;了解如何很好地利用社交媒体来增强品牌参与度至关重要。 正如我们在之前的博客中所了解到的&#xff0c;品牌参与是指在品牌与其…

企业数据库安全管理规范

1.目的 为规范数据库系统安全使用活动&#xff0c;降低因使用不当而带来的安全风险&#xff0c;保障数据库系统及相关应用系统的安全&#xff0c;特制定本数据库安全管理规范。 2.适用范围 本规范中所定义的数据管理内容&#xff0c;特指存放在信息系统数据库中的数据。 本…

原文件名自动编号方法:简单易懂的文件重命名规则

当处理大量文件时&#xff0c;例如图片、文档或其他类型的文件&#xff0c;经常要给它们重新命名&#xff0c;以便于管理和查找。一种常见的方法是使用自动编号&#xff0c;这样可以从001开始&#xff0c;一直编号到最后的文件。这种方法既简单又高效&#xff0c;且易于理解。下…

【读书】《白帽子讲web安全》个人笔记Ⅱ-1

目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展&#xff0c;人们发现浏览器才是互联网最大的入口&#xff0c;绝大多数用户使用互联…

UV机-理光G5六彩一白一光油配置

UV机-理光G5六彩一白一光油配置

【Navigation】global_planner 源码解析

全局规划器 global_planner 功能包 文章目录 global_planner 功能包结构1、plan_node.cpp2、planner_core.cpp3、astar.cpp4、dijkstra.cpp5、quadratic_calculator.cpp6、grid_path.cpp7、gradient_path.cpp8、orientation_filter.cpp全局规划大都基于静态地图进行规划,产生路…

加密的手机号如何模糊查询?

1 一次加载到内存 实现这个功能&#xff0c;我们第一个想到的办法可能是&#xff1a;把个人隐私数据一次性加载到内存中缓存起来&#xff0c;然后在内存中先解密&#xff0c;然后在代码中实现模糊搜索的功能。 这样做的好处是&#xff1a;实现起来比较简单&#xff0c;成本非常…

约数个数和约数之和算法总结

知识概览 约数个数 由算数基本定理 可得对于N的任何一个约数d&#xff0c;有 因为N的每一个约数和~的一种选法是一一对应的&#xff0c;根据乘法原理可得&#xff0c; 一个数的约数个数为 约数之和 一个数的约数之和公式为 多项式乘积的每一项为 正好对应的是一个数的每一个约…

计算机Java项目|Springboot疫情网课管理系统

项目编号&#xff1a;L-BS-ZXBS-07 一&#xff0c;环境介绍 语言环境&#xff1a;Java: jdk1.8 数据库&#xff1a;Mysql: mysql5.7 应用服务器&#xff1a;Tomcat: tomcat8.5.31 开发工具&#xff1a;IDEA或eclipse 二&#xff0c;项目简介 疫情网课也都将通过计算机…

jmeter线程组

特点&#xff1a;模拟用户&#xff0c;支持多用户操作&#xff1b;可以串行也可以并行 分类&#xff1a; setup线程组&#xff1a;初始化 类似于 unittest中的setupclass 普通线程组&#xff1a;字面意思 teardown线程组&#xff1a;环境恢复&#xff0c;后置处理

2.3 A VECTOR ADDITION KERNEL

我们现在使用矢量加法来说明CUDA C程序结构。矢量加法可以说是最简单的数据并行计算&#xff0c;是顺序编程中Hello World的并行等价物。在我们展示向量加法的内核代码之前&#xff0c;首先回顾传统的向量加法&#xff08;主机代码&#xff09;函数的工作原理是有帮助的。图2.5…

阿里云服务器Centos安装宝塔面板

阿里云服务器Centos安装宝塔面板 1 背景1.1 aliyun1.2 Linux 2 安装步骤2.0 环境配置2.1 安装前准备2.2 宝塔安装2.3 建站 3 centos常用命令3.1 防火墙相关 1 背景 1.1 aliyun 阿里云服务器是阿里云提供的一项云计算服务&#xff0c;它能够帮助用户快速搭建网站、应用和服务&…

专栏序言-GDB高级调试技巧实战

工欲善其事必先利其器&#xff01; GDB耍的溜&#xff0c;BUG找的快&#xff01; 在软件开发的世界里&#xff0c;调试是不可避免的。对于C/C开发者来说&#xff0c;GDB就是那把调试利器&#xff01;如果你只知道单步调试&#xff0c;那定位问题的速度肯定是龟速&#xff1b…

FSMC—扩展外部SRAM

一、SRAM控制原理 STM32控制器芯片内部有一定大小的SRAM及FLASH作为内存和程序存储空间&#xff0c;但当程序较大&#xff0c;内存和程序空间不足时&#xff0c;就需要在STM32芯片的外部扩展存储器了。STM32F103ZE系列芯片可以扩展外部SRAM用作内存。 给STM32芯片扩展内存与给…

Arduion Modbus通讯示例

实现了Arduion和Qt上位机利用Modbus协议采集DHT11数据&#xff0c;以及开关LED灯 软件界面&#xff1a; 实物界面&#xff1a; arduion下位机代码&#xff1a; #include <ModbusRtu.h> #include <DHT.h>#define DHTPIN 2 // DHT11连接到Arduino的数字引…

强化学习5——动态规划在强化学习中的应用

动态规划在强化学习中的应用 基于动态规划的算法优良 &#xff1a;策略迭代和价值迭代。 策略迭代分为策略评估和策略提升&#xff0c;使用贝尔曼期望方程得到一个策略的状态价值函数&#xff1b;价值迭代直接使用贝尔曼最优方程进行动态规划&#xff0c;得到最终的最优状态价…

【设计模式】迭代器模式

一起学习设计模式 目录 前言 一、概述 二、结构 三、案例实现 四、优缺点 五、使用场景 六、JDK源码解析 总结 前言 【设计模式】迭代器模式——行为型模式。 一、概述 定义&#xff1a; 提供一个对象来顺序访问聚合对象中的一系列数据&#xff0c;而不暴露聚合对象…

前端-基础 常用标签-超链接标签( 锚点链接 )

锚点链接 &#xff1a; 点击链接&#xff0c;可以快速定位到 页面中的某个位置 如果不好理解&#xff0c;讲一个例子&#xff0c;您就马上明白了 >>> 这个是 刘德华的百度百科 &#xff0c;可以看到&#xff0c;页面里面有很多内容&#xff0c;那就得有个目录了 …

贝锐蒲公英云智慧组网解读:实现工业设备远程调试、异地PLC互联

这个时候&#xff0c;使用异地组网是非常有效的解决方案。在12月28日贝锐官方的直播中&#xff0c;请到了贝锐蒲公英的技术研发经理&#xff0c;为大家分享了贝锐蒲公英云智慧组网解决方案&#xff0c;以及蒲公英二层组网相关的技术和应用。 搜索“贝锐”官方视频号&#xff0c…