03数据仓库Flume

news2024/11/16 23:38:51

Flume 功能

在这里插入图片描述
Flume主要作用,就是实时读取服务器本地磁盘数据,将数据写入到 HDFS。

Flume是 Cloudera提供的高可用,高可靠性,分布式的海量日志采集、聚合和传输的系统工具。

Flume 架构

Flume组成架构如下图所示:
在这里插入图片描述

Agent

每个 Agent 代表着一个 JVM 进程,它以事件的方式将数据从源头送至目的地。
Agent 由 3 个部分组成,Source、Channel、Sink。

  • Source:Source是负责接收数据到Flume Agent 的组件,Source 组件可以处理各种类型、各种格式的日志数据。
  • Sink:Sink不断轮询Channel中的事件并且批量移除他们,并将这些数据批量写入到存储或者索引系统,或被发送到另一个Flume Agent。
  • Channel:Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的, 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。
    Flume自带两种 Channel:Memory Channel 和File Channel。
    Memory Channel 是一个内存队列,Source将事件写入其尾部,Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中,因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event:Flume 传输数据的基本单元,数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成,Header用来存放event 的属性,为 K-V结构,Body 用于存放数据,为字节数组结构。

安装 flume

话不多说,直接开始安装 flume。

  1. 前往 http://archive.apache.org/dist/flume/ , 选择1.9.0
  2. 将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下
    在这里插入图片描述
  3. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[logan@hadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  1. 创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume
  1. 删除jar 包,否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar

注意删除guava,一定要配置 Hadoop 环境变量,否则会报错:

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  1. 修改log4j.properties配置文件
[logan@hadoop101 flume]$ vim conf/log4j.properties 
# 注意是修改内容
flume.log.dir=/opt/module/flume/logs
  1. 验证 flume
[logan@hadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume日志采集

配置解析

需要采集的日志文件分布在hadoop101,hadoop102两台日志服务器,故需要在hadoop101,hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到 Kafka。
此处选择TailDirSource和 KafkaChannel。

  • TailDirSource优势:支持断点续传、多目录。
  • KafkaChannel优势:省去了 Sink,提高了效率。
  • 日志采集 Flume关键配置 :
    在这里插入图片描述

Flume日志采集配置

  1. 创建Flume 配置文件
[logan@hadoop101 flume]$ pwd
/opt/module/flume
[logan@hadoop101 flume]$ mkdir job
[logan@hadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
  1. 编写拦截器
    • 创建Maven工程flume-interceptor
    • 创建包:com.logan.gmall.flume.interceptor
    • 在pom.xml文件中添加如下配置
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    • 在com.logan.gmall.flume.utils包下创建JSONUtil类
    package com.logan.gmall.flume.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtil {
        /**
         * 通过异常判定是否是JSON字符串
         */
        public static boolean isJSONValidate(String input){
            try {
                JSON.parseObject(input);
                return true;
            } catch (JSONException e) {
                return false;
            }
        }
    }
    
    • 在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.logan.gmall.flume.utils.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            //1、获取body当中的数据并转成字符串
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            if (JSONUtil.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    
    
        @Override
        public void close() {
    
        }
    }
    
    • 打包
      在这里插入图片描述
    • 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下

采集测试

  1. 启动Zookeeper、Kafka集群
  2. 启动 hadoop101上的日志采集Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
  1. 启动一个Kafka的Console-Consumer
[logan@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log
  1. 生成模拟数据
[logan@hadoop101 module]$ mkdir -p /opt/module/applog/log/
[logan@hadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":5},{"display_type":"query","item":"19","item_type":"sku_id","order":3,"pos_id":4},{"display_type":"query","item":"3","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{"display_type":"promotion","item":"19","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"query","item":"14","item_type":"sku_id","order":7,"pos_id":2},{"display_type":"query","item":"9","item_type":"sku_id","order":8,"pos_id":2},{"display_type":"promotion","item":"35","item_type":"sku_id","order":9,"pos_id":1}],"page":{"during_time":9853,"page_id":"home"},"ts":1672512476000}
{"actions":[{"action_id":"favor_add","item":"9","item_type":"sku_id","ts":1672512480386},{"action_id":"get_coupon","item":"2","item_type":"coupon_id","ts":1672512483772}],"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":4},{"display_type":"promotion","item":"14","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"query","item":"21","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"promotion","item":"28","item_type":"sku_id","order":5,"pos_id":1}],"page":{"during_time":10158,"item":"9","item_type":"sku_id","last_page_id":"home","page_id":"good_detail","source_type":"promotion"},"ts":1672512477000}
  1. 观察kafka是否有消费到数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1276865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jQuery选择器、操作DOM、事件处理机制、动画、ADJX操作知识点梳理

jQuery 核心理念就是写的更少&#xff0c;做的更多实现的代码更加简洁有效的提高开发效率 jQuery跟JavaScript的用法是不一样的 跟jQuery相继诞生的JavaScript库还有很多&#xff0c;不包括node.js 关于代码$("li").get(0)&#xff0c;获取DOM对象 jQuery对象声…

MMdetection3.0 问题:DETR验证集上AP值全为0.0000

MMdetection3.0 问题&#xff1a;DETR验证集上AP值全为0.0000 条件&#xff1a; 1、选择的是NWPU-VHR-10数据集&#xff0c;其中训练集455张&#xff0c;验证、测试相同均为195张。 2、在Faster-rcnn、YOLOv3模型上均能训练成功&#xff0c;但是在DETR训练时&#xff0c;出现验…

前端学习系列之CSS

目录 CSS 简介 发展史 优势 基本语法 引用方式 内部样式 行内样式 外部样式 选择器 id选择器 class选择器 标签选择器 子代选择器 后代选择器 相邻兄弟选择器 后续兄弟选择器 交集选择器 并集选择器 通配符选择器 伪类选择器 属性选择器 CSS基本属性 优…

pycharm中绘制一个3D曲线

import numpy as np import matplotlib.pyplot as plt # 中文的设置 import matplotlib as mp1 from mpl_toolkits.mplot3d import Axes3D mp1.rcParams["font.sans-serif"] ["kaiti"] mp1.rcParams["axes.unicode_minus"] False # 数据创建 X…

Python过滤掉特定区域内的矩形框

Python过滤掉特定区域内的矩形框 前言前提条件相关介绍实验环境过滤掉特定区域内的矩形框方法一&#xff1a;直接法&#xff08;for循环遍历&#xff09;代码实现输出结果 方法二&#xff1a;列表推导式代码实现输出结果 前言 由于本人水平有限&#xff0c;难免出现错漏&#x…

51k+ Star!动画图解、一键运行的数据结构与算法教程!

大家好&#xff0c;我是 Java陈序员。 我们都知道&#xff0c;《数据结构与算法》 —— 是程序员的必修课。 无论是使用什么编程语音&#xff0c;亦或者是前后端开发&#xff0c;都需要修好《数据结构与算法》这门课&#xff01; 在各个互联网大产的面试中&#xff0c;对数据…

RocketMQ事务消息源码解析

RocketMQ提供了事务消息的功能&#xff0c;采用2PC(两阶段协议)补偿机制&#xff08;事务回查&#xff09;的分布式事务功能&#xff0c;通过这种方式能达到分布式事务的最终一致。 一. 概述 半事务消息&#xff1a;指的是发送至broker但是还没被commit的消息&#xff0c;在半…

【Linux】24、文件系统、磁盘 IO

文章目录 一、文件系统1.1 索引节点和目录项1.2 虚拟文件系统 VFS1.3 文件系统 I/O1.5 性能观测1.5.1 容量1.5.2 缓存1.5.3 find 命令的缓存 二、磁盘 I/O2.1 通用块层2.2 I/O 栈2.3 磁盘性能指标2.3.1 磁盘 I/O 观测2.3.2 进程 I/O 观测 2.4 案例&#xff1a;找到打大量日志的…

UiPath学习笔记

文章目录 前言RPA介绍UiPath下载安装组件内容 前言 最近有一个项目的采集调研涉及到了客户端的采集&#xff0c;就取了解了一下RPA和UIPATH&#xff0c;记录一下 RPA介绍 RPA&#xff08;Robotic Process Automation&#xff1a;机器人处理自动化&#xff09;&#xff0c;是…

聊聊什么是IO流

目录 Java IOIO 基础Java IO 流了解吗&#xff1f; IO 设计模式1、装饰器模式2、适配器模式适配器模式和装饰器模式有什么区别呢&#xff1f;3、工厂模式4、观察者模式 IO 模型有哪些常见的 IO 模型&#xff1f;BIO(Blocking I/O)NIO (Non-blocking/New I/O)AIO (Asynchronous …

Java包(package)

1、概念 为了更好的组织类&#xff0c;用于区别类名的命名空间&#xff0c;其实就是基于工程的一个文件路径&#xff0c;如&#xff1a; 2、作用 三个作用&#xff1a; 1&#xff09;区分相同名称的类。 2&#xff09;能够较好地管理大量的类。 3&#xff09;控制访问范围。 在…

网站实现验证码功能

一、验证码 一般来说&#xff0c;网站在登录的时候会生成一个验证码来验证是否是人类还是爬虫&#xff0c;还有一个好处是防止恶意人士对密码进行爆破。 二、流程图 三、详细说明 3.1 后端生成验证码 Override public Result<Map<String, String>> getVerifica…

国内哪个超声波清洗机品牌比较好?质量好超声波清洗机总结

近年来超声波清洗机可以说是非常火爆&#xff0c;可以清洗化妆刷、眼镜、牙套等等一些小物件&#xff0c;大物件物品可以入手大型超声波清洗机&#xff0c;总之现在超声波清洗机已经衍生到可以在家使用&#xff0c;不再是在眼镜店看到它的身影或者是一些工业领域上&#xff0c;…

第二节:服务拆分(案例)

一、服务拆分注意事项 1.1 拆分原则 每个微服务&#xff0c;不要重复开发相同业务&#xff08;例如在单体项目中用到了一个查询&#xff0c;这个查询功能能够查询出订单信息、商品信息、用户信息&#xff0c;那么在拆分微服务时就不要将其写在一起了&#xff0c;订单的微服务只…

1、RocketMQ源码分析(一)

RocketMQ简单介绍 RabbitMQ的底层是使用erlang语言编写的&#xff0c;不便分析其底层&#xff0c;RocketMQ作为原阿里下经历阿里双十一严格考验的中间件&#xff0c;同时也是使用我们熟悉的java语言编写&#xff0c;我们先把入门的基础必备了解透&#xff0c;然后在去分析源码…

基于WebSocket实现客户聊天室

目录 一、实现聊天室原理 二、聊天室前端代码 三、聊天室后端代码&#xff08;重点&#xff09; 四、聊天室实现效果展示 一、实现聊天室原理 1.1 介绍websocket协议 websocket是一种通信协议&#xff0c;再通过websocket实现弹幕聊天室时候&#xff0c;实现原理是客户端首…

使用K-means把人群分类

1.前言 K-mean 是无监督的聚类算法 算法分类&#xff1a; 2.实现步骤 1.数据加工&#xff1a;把数据转为全数字&#xff08;比如性别男女&#xff0c;转换为0 和 1&#xff09; 2.模型训练 fit 3.预测 3.代码 原数据类似这样(source&#xff1a;http:img-blog.csdnimg.cn…

vmware 安装 AlmaLinux OS 8.6

选择系统镜像 选择镜像 选择安装位置和修改名称 可以自定义硬件&#xff0c;也可以不选择&#xff0c;后面可以再设置 自定义硬件可以设置内存和cpu等信息 安装虚拟机系统 密码如果简单的话需要点击两次done 才能保存

集成开发环境PyCharm的使用【侯小啾python领航计划系列(三)】

集成开发环境 PyCharm 的使用【侯小啾python领航计划系列(三)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ�…

Stable Diffusion AI绘画系列【10】:AI眼中的美丽清晨

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…