关于Flume-Kafka-Flume的模式进行数据采集操作

news2024/10/6 8:25:27


       测试是否连接成功:

        在主节点flume目录下输入命令:

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
# 这个file_to_kafka.conf文件就是我们的配置文件

 

        然后在另一台节点输入命令进行消费数据:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

        然后再开一个主节点终端,在这个主节点上面在对应生成数据的文件追加数据

         

        这样就可以看见第一个主节点的终端和消费节点上面有数据变化了! 

 


         下面这个是配置拦截器,把json格式的内容进行消费,其他的进行拦截

        Flume采集数据到kafka的配置conf文件内容:

#定义组件

#1、定义source、channel、agent名称
a1.sources = r1
a1.channels = c1
#配置source

#2、描述source
a1.sources.r1.type = TAILDIR

#指定监控的组名
a1.sources.r1.filegroups = f1

#指定f1组监控的路径
a1.sources.r1.filegroups.f1 = /opt/software/applog/log/app.*

#指定断点续传的文件
a1.sources.r1.positionFile = /opt/software/flume/taildir_position.json
# 配置拦截器
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel

#3、描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

#指定kafka集群
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092

#指定数据写到kafka哪个topic
a1.channels.c1.kafka.topic = topic_log

#是否以Event对象的形式写入kafka
a1.channels.c1.parseAsFlumeEvent = false
#组装 

#4、关联source->channel
a1.sources.r1.channels = c1

         如果一开始测试我们flume和kafka是否能成功采集数据的时候,我们应该先把拦截器的两行配置先删除,后面再根据我们需要的内容进行拦截对应的内容。就比如:我们期望我们采集到数据是json格式的,如果不是json格式的话,我们就放弃这个数据。

     具体操作:

(1)创建Maven工程flume-interceptor

(2)创建包:com.gugu.gmall.flume.interceptor

(3)在pom.xml文件中添加如下配置

        

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

 在com.gugu.gmall.flume.utils包下创建JSONUtil类

package com.gugu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类 

package com.gugu.gmall.flume.interceptor;

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
		
		//1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
		//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

        然后进行打包,复制到我们flume下的lib目录下就可以了!

        然后再和上面测试一样进行测试连接,是否成功把非json格式的数据拦截成功!


感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!!

关注我下一篇不迷路哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1208997.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java Stream 的使用

Java Stream 的使用 开始中间操作forEach 遍历map 映射flatMap 平铺filter 过滤limit 限制sorted 排序distinct 去重 结束操作collect 收集toList、toSet 和 toMapCollectors.groupingByCollectors.collectingAndThen metch 匹配find 查询findFirst 与 findAny 的使用Optional …

下载免费商用字体,就上这6个网站。

我不允许还有人不知道&#xff0c;这些可以免费下载商用字体的网站&#xff0c;必须收藏好了&#xff0c;有了这6个网站&#xff0c;再也不用担心字体侵权了。 1、字体搬运工 https://font.sucai999.com/ 一个免费可商用字体搬运工&#xff0c;实时跟新市面上免费商用的字体。…

No204.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

java程序中为什么经常使用tomcat

该疑问的产生场景&#xff1a; 原来接触的ssm项目需要在项目配置中设置tomcat&#xff0c;至于为什么要设置tomcat不清楚&#xff0c;只了解需要配置tomcat后项目才能启动。接触的springboot在项目配置中不需要配置tomcat&#xff0c;原因是springboot框架内置了tomcat&#xf…

1、 图像和像素

像素我们不陌生,图像我们更不陌生。 学习计算机视觉,我觉得第一步就是要了解我们要处理的对象,就像上一篇说到的,计算机视觉任务中,图像(像素)是原材料,算法是菜谱。 了解了图像的特征,才可以更好的完成更多图像处理任务,比如对一张图片进行分类,或者对一张图片画…

CNN进展:AlexNet、VGGNet、ResNet 和 Inception

一、说明 对于初学者来说&#xff0c;神经网络进展的历程有无概念&#xff1f;该文综合叙述了深度神经网络的革命性突破&#xff0c;从AlexNet开始&#xff0c;然后深度VGG的改进&#xff0c;然后是残差网络ResNet和 Inception&#xff0c;如果能讲出各种特色改进点的和改进理由…

【Python】【应用】Python应用之一行命令搭建http、ftp服务器

&#x1f41a;作者简介&#xff1a;花神庙码农&#xff08;专注于Linux、WLAN、TCP/IP、Python等技术方向&#xff09;&#x1f433;博客主页&#xff1a;花神庙码农 &#xff0c;地址&#xff1a;https://blog.csdn.net/qxhgd&#x1f310;系列专栏&#xff1a;Python应用&…

MQ四大消费问题一锅端:消息不丢失 + 消息积压 + 重复消费 + 消费顺序性

RabbitMQ-如何保证消息不丢失 生产者把消息发送到 RabbitMQ Server 的过程中丢失 从生产者发送消息的角度来说&#xff0c;RabbitMQ 提供了一个 Confirm&#xff08;消息确认&#xff09;机制&#xff0c;生产者发送消息到 Server 端以后&#xff0c;如果消息处理成功&#xff…

【python 生成器 面试必备】yield关键字,协程必知必会系列文章--自己控制程序调度,体验做上帝的感觉 2

这篇文章要解决的问题&#xff1a;How to Pass Value to Generators Using the “yield” Expression in Python ref:https://python.plainenglish.io/yield-python-part-ii-e93abb619a16 1.如何传值 yield 是一个表达式&#xff01;&#xff01;&#xff01;&#xff01; yi…

WebGl-Blender:建模 / 想象成形 / 初识 Blender

一、理解Blender 欢迎来到Blender&#xff01;Blender是一款免费开源的3D创作套件。 使用Blender&#xff0c;您可以创建3D可视化效果&#xff0c;例如建模、静态图像&#xff0c;3D动画&#xff0c;VFX&#xff08;视觉特效&#xff09;快照和视频编辑。它非常适合那些受益于…

【Python图像超分】Real-ESRGAN图像超分模型(超分辨率重建)详细安装和使用教程

1 前言 图像超分是一种图像处理技术&#xff0c;旨在提高图像的分辨率&#xff0c;使其具有更高的清晰度和细节。这一技术通常用于图像重建、图像恢复、图像增强等领域&#xff0c;可以帮助我们更好地理解和利用图像信息。图像超分技术可以通过多种方法实现&#xff0c;包括插值…

【每日一题】阈值距离内邻居最少的城市

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;多源最短路 写在最后 Tag 【多源最短路】【数组】【2023-11-14】 题目来源 1334. 阈值距离内邻居最少的城市 题目解读 题目翻译过来是这样的&#xff1a;一共 n 个城市&#xff0c;统计在每个城市 dt 距离范围内所有…

【java学习—十四】反射获取类的父类、接口、构造方法、方法(3)

文章目录 1. 通过反射获取一个类的父类和接口2. 反射获取一个类的构造方法3. 反射获取全部构造器4. 通过反射创建一个对象5. 反射机制获取类的方法 1. 通过反射获取一个类的父类和接口 使用反射可以取得&#xff1a; 实现的全部接口 public Class<?>[] getInterfaces(…

Python实现扫雷游戏,代码示例,边玩边学+回忆童年!

文章目录 前言实现总结关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼职渠道 前言 扫雷是一款益智类小游戏&#xff0…

香港科技大学广州|智能制造学域机器人与自主系统学域博士招生宣讲会—中国科学技术大学专场

&#x1f3e0;地点&#xff1a;中国科学技术大学西区学生活动中心&#xff08;一楼&#xff09;报告厅 【宣讲会专场1】让制造更高效、更智能、更可持续—智能制造学域 &#x1f559;时间&#xff1a;2023年11月16日&#xff08;星期四&#xff09;18:00 报名链接&#xff1a…

百望云携手华为发布金融信创与数电乐企联合方案 创新金融合规变革

10月27日&#xff0c;北京发布《关于开展全面数字化的电子发票试点工作的公告》&#xff0c;自2023年11月01日起开展数电票试点。千呼万唤始出来&#xff0c;拉开了北京地区企业开展数电票试点的序幕。 百望云作为数电票行业翘楚&#xff0c;电子发票服务平台供应商&#xff0c…

【C/C++底层】内存分配:栈区(Stack)与堆区(Heap)

/*** poject * author jUicE_g2R(qq:3406291309)* file 底层内存分配&#xff1a;栈区(Stack)与堆区(Heap)* * language C/C* EDA Base on MVS2022* editor Obsidian&#xff08;黑曜石笔记软件&#xff09;* * copyright 2023* COPYRIGHT …

论文阅读:Robust High-Resolution Video Matting with Temporal Guidance

发表时间&#xff1a;2021年8月25日 项目地址&#xff1a;https://peterl1n.github.io/RobustVideoMatting/ 论文地址&#xff1a;https://arxiv.org/pdf/2108.11515.pdf 我们介绍了一种鲁棒的&#xff0c;实时的&#xff0c;高分辨率的人体视频匹配方法&#xff0c;以实现了新…

【C++】:STL——标准模板库介绍 || string类

&#x1f4da;1.什么是STL STL(standard template libaray-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;而且 是一个包罗数据结构与算法的软件框架 &#x1f4da;2.STL的版本 原始版本 Alexander Stepanov、Meng Lee 在…

得帆信息携手深信服,联合打造高安全PaaS超融合一体化解决方案

上海得帆信息技术有限公司&#xff08;以下简称“得帆”&#xff09;和深信服科技股份有限公司&#xff08;以下简称“深信服”&#xff09;携手推出融合安全性、稳定性、高效性于一体的全新PaaS超融合解决方案。 用户痛点分析 全面推进企业数字化与信息化的趋势下&#xff0c;…