Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中

news2024/11/25 22:44:25

一、Flume监听多个文件目录

1. flume的环境搭建和基础配置参考

https://blog.csdn.net/qinqinde123/article/details/128130131

2. 修改配置文件flume-conf.properties

#定义两个是数据源source1、source2
agent.sources = source1 source2
agent.channels = channel1
agent.sinks = sink1

#数据源source1:监听/home/sxvbd/bigdata/flumeTestDir目录
agent.sources.source1.type = spooldir
agent.sources.source1.spoolDir = /home/sxvbd/bigdata/flumeTestDir
# 文件名带路径,header中key=filePath
agent.sources.source1.fileHeader = true
agent.sources.source1.fileHeaderKey = filePath
# 文件名不带路径,header中key=fileName
agent.sources.source1.basenameHeader = true
agent.sources.source1.basenameHeaderKey = fileName

#数据源source2:监听/home/sxvbd/bigdata/flumeTestDir/temp目录·
agent.sources.source2.type = spooldir
agent.sources.source2.spoolDir = /home/sxvbd/bigdata/flumeTestDir/temp
# 文件名带路径,header中key=filePaht
agent.sources.source2.fileHeader = true
agent.sources.source2.fileHeaderKey = filePath
# 文件名不带路径,header中key=fileName
agent.sources.source2.basenameHeader = true
agent.sources.source2.basenameHeaderKey = fileName

#定义一个channel
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000000
agent.channels.channel1.transactionCapacity = 10000
agent.channels.channel1.keep-alive = 60

#重写sink,根据文件名称不同,推送到不同topic中
agent.sinks.sink1.type = com.demo.flume.LogToDiffentKafkaTopic
agent.sinks.sink1.kafka.bootstrap.servers = node24:9092,node25:9092,node26:9092
agent.sinks.sink1.parseAsFlumeEvent = false

#定义source channel  sink的关系
agent.sources.source1.channels = channel1
agent.sources.source2.channels = channel1
agent.sinks.sink1.channel = channel1

二、重写Sink,根据文件名称不同,消息发送到不同的topic中

flume监听到有新文件出现的时候,会将文件内容推送到kakfa的topic中,但是如果文件夹中有不同类型的文件,直接推送到kafka的同一个topic中,如果根据内容无法区分不同类型的文件,那就需要根据文件名称来区分。flume本身根据配置无法实现,只能通过重写Sink,根据文件名称,将内容推送到kafka的不同topic。

在这里插入图片描述
看了一下官网的开发文档,要想自定义一个Sink也很简单,只需要继承一个抽象类 AbstractSink 和一个用于接收配置参数的接口 Configurable 即可.然后呢就需要实现两个方法一个就是public Status process() throws EventDeliveryException {}这个方法会被多次调用,反复执行,也就是通过它来实时的获取Channel流出来的数据;第二个就是public void configure(Context context) {} 这个方法主要是通过传入的这个Contex上下文对象.来个获取配置文件中的参数,一些初始化的工作可以写在这个方法里面.

1.创建springboot项目LogToDiffentKafkaTopic

2.pom.xml中引入flume相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>flume</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--Flume 依赖-->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!--Kafka 依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.30</version>
        </dependency>

    </dependencies>
	<!--构建-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3. 创建一个类LogToDiffentKafkaTopic.java,继承自AbstractSink

public class LogToDiffentKafkaTopic extends AbstractSink implements Configurable {

    private MessageClassifier messageClassifier;

    @Override
    public Status process() throws EventDeliveryException {
        System.out.println("========>process");
        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try{
            Event event = channel.take();
            if (event == null){
                transaction.rollback();
                status = Status.BACKOFF;
                return status;
            }
            System.out.println("========>event:" + event.toString());
            //根据配置文件中定义的agent.sources.source1.basenameHeader = true和agent.sources.source1.basenameHeaderKey = fileName获取文件名称
            String fileName = event.getHeaders().get("fileName");
            byte[] body = event.getBody();
            final String msg = new String(body);
            System.out.println("========>msg:" + msg.toString());
            status = messageClassifier.startClassifier(msg, fileName) ;
            // 提交事务
            transaction.commit();
        }catch (Exception e){
            transaction.rollback();
            e.printStackTrace();
            status = Status.BACKOFF;
        }finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        ImmutableMap<String, String> parameters = context.getParameters();
        //启动的时候,从配置文件flume-conf.properties中读取的配置信息
        System.out.println("========>parameters: " + parameters.toString());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", context.getString("kafka.bootstrap.servers", "localhost:9092"));
        properties.put("acks", context.getString("acks", "all"));
        properties.put("retries", Integer.parseInt(context.getString("retries", "0")));
        properties.put("batch.size", Integer.parseInt(context.getString("batch.size", "16384")));
        properties.put("linger.ms", Integer.parseInt(context.getString("linger.ms", "1")));
        properties.put("buffer.memory", Integer.parseInt(context.getString("buffer.memory", "33554432")));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        messageClassifier = new MessageClassifier(properties);
    }

4. 创建一个类MessageClassifier.java,继承自AbstractSink

public class MessageClassifier {

    /*文件名称中包含_CDSS_,则消息推送到data-ncm-hljk-cdss-topic*/
    private static final String HJSJ_SSSJ_CDSS = ".*_CDSS_.*";
    private static final String HJSJ_SSSJ_CDSS_TOPIC = "data-ncm-hljk-cdss-topic";

     /*文件名称中包含_FZSS_,则消息推送到data-ncm-hljk-fzss-topic*/
    private static final String HJSJ_SSSJ_FZSS = ".*_FZSS_.*";
    private static final String HJSJ_SSSJ_FZSS_TOPIC = "data-ncm-hljk-fzss-topic";

    private final KafkaProducer<String, String> producer;

    public MessageClassifier(Properties kafkaConf) {
        producer = new KafkaProducer<>(kafkaConf);
    }

    public Sink.Status startClassifier(String msg, String fileName) {
        System.out.println("===========>msg: " + msg);
        System.out.println("===========>fileName: " + fileName);
        try {
            if (Pattern.matches(HJSJ_SSSJ_CDSS, fileName)) {
                System.out.println("===========>HJSJ_SSSJ_CDSS");
                producer.send(new ProducerRecord<>(HJSJ_SSSJ_CDSS_TOPIC, msg));
            } else if (Pattern.matches(HJSJ_SSSJ_FZSS, fileName)) {
                System.out.println("===========>HJSJ_SSSJ_FZSS");
                producer.send(new ProducerRecord<>(HJSJ_SSSJ_FZSS_TOPIC, msg));
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("===========>exception: " + e.getMessage());
            return Sink.Status.BACKOFF;
        }
        return Sink.Status.READY;
    }
}

5. 打jar包: flume-1.0.jar

mvn clean install -DskipTests

6. 在flume的安装目录下创建plugins.d目录

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d

7. 在plugins.d目录下创建一个目录(名字任意,例如demo)

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo

8. 在demo目录下创建两个目录:lib和libext

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo/lib
mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo/libext

9. 将jar包上传到lib目录下(libext不用管)

10. 在配置文件flume-conf.properties中配置自定义sink

#Each channel's type is defined.
agent.sinks.sink1.type = com.demo.flume.LogToDiffentKafkaTopic
agent.sinks.sink1.kafka.bootstrap.servers = node24:9092,node25:9092,node26:9092
agent.sinks.sink1.parseAsFlumeEvent = false

11.启动

nohup ../bin/flume-ng agent --conf conf -f /home/sxvbd/bigdata/flume-1.9.0/conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console > flume.log 2>&1 &

12.在对应的目录下拖入文件

目录/home/sxvbd/bigdata/flumeTestDir/和目录/home/sxvbd/bigdata/flumeTestDir/temp

13.监听kafka的topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/52227.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

B. Password(KMP)

Problem - 126B - Codeforces Asterix、Obelix和他们的临时伙伴Suffix和Prefix终于找到了和谐寺。然而&#xff0c;它的门被牢牢地锁住了&#xff0c;即使是Obelix也没能打开它们。 过了一会儿&#xff0c;他们发现在寺庙大门下面的一块岩石上刻着一个字符串。亚力认为那是打开…

realme手机配什么蓝牙耳机?realme蓝牙耳机推荐

蓝牙耳机作为人手必备的单品&#xff0c;不同厂商的产品更是多种多样&#xff0c;用户可以有更多的选择&#xff0c;选购蓝牙耳机的时候&#xff0c;除了看重佩戴舒适度、发声单元人们更加追求最新研发的技术。realme是为年轻人而来的科技潮牌。秉持“敢越级”品牌理念&#xf…

iOS MD5基础知识

MD5信息摘要算法&#xff08;英语&#xff1a;MD5 Message-Digest Algorithm&#xff09;&#xff0c;一种被广泛使用的密码散列函数&#xff0c;可以产生出一个128位&#xff08;16字节&#xff09;的散列值&#xff08;hash value&#xff09;&#xff0c;用于确保信息传输完…

实现了Spring的Aware接口的自定义类什么时候执行的?

在之前的内容中 Spring的Aware接口有什么用&#xff1f;_轻尘的博客-CSDN博客_aware接口的作用 了解到用户可以通过实现相应的Aware接口来获取spring框架提供的能力&#xff0c;俗称“攀亲戚” 以如下代码为例&#xff0c;自定义类MyAware实现了BeanFactroryAware&#xff0…

数据库、计算机网络,操作系统刷题笔记5

数据库、计算机网络&#xff0c;操作系统刷题笔记5 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能很多算法学生都得去找开发&#xff0c;测开 测开的话&#xff0c;你就得学数据库&#xff0c;sql&#xff0c;oracle&…

【MySQL基础】MySQL常用的图形化管理工具有那些?

目录 一、为什么要使用MySQL图形化管理工具 原因 / 目的 / 作用 二、什么是DOS窗口? 三、常见的MySQL图形化管理工具有那些&#xff1f; 四、 常见几个MySQL图形工具的介绍 Navicat SQLyog MySQL Workbench DataGrip 五、Navicat图形工具的安装与使用 第一步&#x…

python带你制作随机点名系统,超级简单

前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 在某些难以抉择得时候&#xff0c;我们经常要用外力来帮助我们做出选择 比如&#xff0c;课堂随机点名或面对活动需要人上台表演时等等场景 这个时候&#xff0c;有一个随机点名系统就非常好啦&#xff0c;毕竟运气得事~ …

QT之 给控件添加右键菜单

一、效果预览 二、代码 cpp文件 //listView右键菜单 void MainWindow::Rightclicklistview() {//初始化一级菜单TotalRightclick new QMenu(this);AddDevice new QMenu(this);upDevice new QAction(this);DownDevice new QAction(this);Delete new QAction(this);EditDev…

压缩包里的文件名可以这样隐藏起来

我们知道&#xff0c;压缩后的文件如果有保密需要&#xff0c;可以给压缩包设置打开密码。 设置密码后&#xff0c;还是可以打开压缩包&#xff0c;也可以看到压缩包里面的文件名称&#xff0c;当你点击里面的文件&#xff0c;才会提示需要输入密码后才能打开文件。 如果希望加…

数据运算——逻辑运算

数据运算——逻辑运算一、逻辑运算1.通过例题掌握位模式层次上的逻辑运算2.位模式层次上的逻辑运算的应用1.**与运算使指定位复位**2.**或运算使指定位置位**3.**异或运算使指定位取反**二、移位运算1.逻辑移位2.循环移位3.算术移位算术右移算术左移举例1>.(算术右移)2>.…

KepServer EX6模拟仿真PlC数据以及点表的复制跟项目的迁移

一.模拟plc数据绑定标点 1.新建通道选择“Simulator” 右击 “连接性”》新建通道选择Simulator 填写通道名称&#xff08;自定义&#xff09; 然后一直默认设置点击下一页知道完成!!! 添加展示 2.给通道添加设备 右击通道》添加设备 设备名自定义 然后一直默认进行下一步…

[附源码]计算机毕业设计springboot人事管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

使用WPS生成二维码,手机扫码访问主机的资源

问题描述 如果我们想要使用二维码&#xff0c;包装一个链接&#xff0c;访问目标资源。 在淘宝上可以看到&#xff0c;一些网店提供制作二维码服务。其实我们自己也可以做。 原理是&#xff1a;我们把资源发送给商家&#xff0c;商家拿到后&#xff0c;将资源部署到服务器上…

ARM cortex-M4核中断实验 中断和串口

要求&#xff1a;按键触发时&#xff0c;LED灯状态取反&#xff0c;并且在串口工具打印一句话。 KEY1按键按下&#xff0c;LED1状态取反&#xff0c;串口工具打印key1 down!!!! GPIO模块&#xff1a; UART模块&#xff1a; 主函数&#xff1a; 实验现象&#xff1a…

2022世界杯漫谈与猜想,谁是你心目中的第一

文章目录0、 我与足球1、卡塔尔世界杯2、亚洲球队水平3、中国足球4、展望0、 我与足球 1、第一次意义上的踢足球还是初中&#xff0c;记得是五四青年节说全校搞一场足球比赛&#xff0c;我们班莫名其妙的组了一个队&#xff0c;然后在放学后提提足球&#xff0c;那时候规则都不…

JSON端口操作实例

JSON 端口可直接实现在 JSON 和 XML 之间进行转换。端口会自动检测输入文件是 JSON 还是 XML&#xff0c;然后将文件在两种格式间相互转换。 该端口较多的是运用在API接口调用集成方案的项目当中&#xff0c;我们以百思买项目为例&#xff0c;知行之桥将接收到的百思买的EDI报…

针对海洋数据的管理三维gis软件系统有何优势

海洋地理信息系统是以深海、水质、海表层、空气及海岸带人类活动为研究对象,经过综合利用地理信息系统的室内空间海洋数据处理方法、GIS和绘图系统集成化、三维算法设计、海洋数据信息仿真模拟和界面显示等功能,为多种来源的数据信息给予协调性坐标、储存和集成化信息内容等专用…

springboot整合jett导出数据(2)

一 操作案例 1.1 pom文件 <dependency><groupId>net.sf.jett</groupId><artifactId>jett-core</artifactId><version>0.11.0</version></dependency> 1.2 代码 /*** author liujianfu* description 导出 环保指标查询…

基于java+springmvc+mybatis+vue+mysql的婚纱影楼

项目介绍 婚姻是每个人人生中都非常重要的一个组成部分&#xff0c;它是一个新家庭的开始也是爱情的见证&#xff0c;所以很多人在结婚之前都会拍一套美美的婚纱照来纪念这一美好的时刻&#xff0c;但是很多时候人们在拍婚纱照的时候都是到当地的影楼去拍摄&#xff0c;这种影…

React基础

文章目录1.简介1.1 react与vue1.1.1 相同点1.1.2 不同点1.1.3 函数式组件的特点&#xff08;什么是函数式组件&#xff09;a.幂等b.无副作用用&#xff1a;1.1.4 虚拟dom的作用1.1.5 vue当中template与render的关系&#xff1a;1.2 MVC、MVVM、MVP模式1.2.1 MVC1.2.2 MVVM1.2.3…