04数据平台Flume

news2025/1/14 0:49:29

Flume 功能

在这里插入图片描述
Flume主要作用,就是实时读取服务器本地磁盘数据,将数据写入到 HDFS。

Flume是 Cloudera提供的高可用,高可靠性,分布式的海量日志采集、聚合和传输的系统工具。

Flume 架构

Flume组成架构如下图所示:
在这里插入图片描述

Agent

每个 Agent 代表着一个 JVM 进程,它以事件的方式将数据从源头送至目的地。
Agent 由 3 个部分组成,Source、Channel、Sink。

  • Source:Source是负责接收数据到Flume Agent 的组件,Source 组件可以处理各种类型、各种格式的日志数据。
  • Sink:Sink不断轮询Channel中的事件并且批量移除他们,并将这些数据批量写入到存储或者索引系统,或被发送到另一个Flume Agent。
  • Channel:Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的, 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。
    Flume自带两种 Channel:Memory Channel 和File Channel。
    Memory Channel 是一个内存队列,Source将事件写入其尾部,Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中,因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event:Flume 传输数据的基本单元,数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成,Header用来存放event 的属性,为 K-V结构,Body 用于存放数据,为字节数组结构。

安装 flume

话不多说,直接开始安装 flume。

  1. 前往 http://archive.apache.org/dist/flume/ , 选择1.9.0
  2. 将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下
    在这里插入图片描述
  3. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[logan@hadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  1. 创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume
  1. 删除jar 包,否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar

注意删除guava,一定要配置 Hadoop 环境变量,否则会报错:

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  1. 修改log4j.properties配置文件
[logan@hadoop101 flume]$ vim conf/log4j.properties 
# 注意是修改内容
flume.log.dir=/opt/module/flume/logs
  1. 验证 flume
[logan@hadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume日志采集

配置解析

需要采集的日志文件分布在hadoop101,hadoop102两台日志服务器,故需要在hadoop101,hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到 Kafka。
此处选择TailDirSource和 KafkaChannel。

  • TailDirSource优势:支持断点续传、多目录。
  • KafkaChannel优势:省去了 Sink,提高了效率。
  • 日志采集 Flume关键配置 :
    在这里插入图片描述

Flume日志采集配置

  1. 创建Flume 配置文件
[logan@hadoop101 flume]$ pwd
/opt/module/flume
[logan@hadoop101 flume]$ mkdir job
[logan@hadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
  1. 编写拦截器
    • 创建Maven工程flume-interceptor
    • 创建包:com.logan.gmall.flume.interceptor
    • 在pom.xml文件中添加如下配置
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    • 在com.logan.gmall.flume.utils包下创建JSONUtil类
    package com.logan.gmall.flume.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtil {
        /**
         * 通过异常判定是否是JSON字符串
         */
        public static boolean isJSONValidate(String input){
            try {
                JSON.parseObject(input);
                return true;
            } catch (JSONException e) {
                return false;
            }
        }
    }
    
    • 在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.logan.gmall.flume.utils.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            //1、获取body当中的数据并转成字符串
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            if (JSONUtil.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    
    
        @Override
        public void close() {
    
        }
    }
    
    • 打包
      在这里插入图片描述
    • 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下

采集测试

  1. 启动Zookeeper、Kafka集群
  2. 启动 hadoop101上的日志采集Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
  1. 启动一个Kafka的Console-Consumer
[logan@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log
  1. 生成模拟数据
[logan@hadoop101 module]$ mkdir -p /opt/module/applog/log/
[logan@hadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":5},{"display_type":"query","item":"19","item_type":"sku_id","order":3,"pos_id":4},{"display_type":"query","item":"3","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{"display_type":"promotion","item":"19","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"query","item":"14","item_type":"sku_id","order":7,"pos_id":2},{"display_type":"query","item":"9","item_type":"sku_id","order":8,"pos_id":2},{"display_type":"promotion","item":"35","item_type":"sku_id","order":9,"pos_id":1}],"page":{"during_time":9853,"page_id":"home"},"ts":1672512476000}
{"actions":[{"action_id":"favor_add","item":"9","item_type":"sku_id","ts":1672512480386},{"action_id":"get_coupon","item":"2","item_type":"coupon_id","ts":1672512483772}],"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":4},{"display_type":"promotion","item":"14","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"query","item":"21","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"promotion","item":"28","item_type":"sku_id","order":5,"pos_id":1}],"page":{"during_time":10158,"item":"9","item_type":"sku_id","last_page_id":"home","page_id":"good_detail","source_type":"promotion"},"ts":1672512477000}
  1. 观察kafka是否有消费到数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294496.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重要功能:妙手ERP正式接入北俄海外仓,助力跨境卖家出海俄罗斯市场!

近日&#xff0c;妙手ERP正式接入北俄海外仓&#xff0c;并支持Ozon、速卖通AliExpress、Joom平台卖家同步推送商品、查看库存清单、高效处理订单&#xff0c;助力卖家掘金俄罗斯市场。 关于北俄海外仓 北俄海外仓为北俄国际旗下项目&#xff0c;北俄国际成立于2008年&#xf…

【Java用法】Lombok中@SneakyThrows注解的使用方法和作用

Lombok中SneakyThrows注解的使用方法和作用 一、SneakyThrows的作用二、SneakyThrows注解原理 一、SneakyThrows的作用 普通Exception类,也就是我们常说的受检异常或者Checked Exception会强制要求抛出它的方法声明throws&#xff0c;调用者必须显示的去处理这个异常。设计的目…

企业集团采购系统(供应商、询价、招投标)-源码

一、业务需求 企业招标询价供应商管理系统是一种专业的采购管理系统&#xff0c;旨在帮助企业实现供应商关系的管理和采购成本的控制。该系统涵盖了企业采购管理的各个方面&#xff0c;包括采购预算、供应商管理、产品管理、采购计划、询价、竞价、招标、采购订单、采购合同执…

Docker构建自定义镜像

创建一个docker-demo的文件夹,放入需要构建的文件 主要是配置Dockerfile文件 第一种配置方法 # 指定基础镜像 FROM ubuntu:16.04 # 配置环境变量&#xff0c;JDK的安装目录 ENV JAVA_DIR/usr/local# 拷贝jdk和java项目的包 COPY ./jdk8.tar.gz $JAVA_DIR/ COPY ./docker-demo…

前端vue3实现本地及在线文件预览(含pdf/txt/mp3/mp4/docx/xlsx/pptx)

一、仅需实现在线预览&#xff0c;且文件地址公网可访问 &#xff08;一&#xff09;微软office免费预览&#xff08;推荐&#xff09; 支持doc/docx/xls/xlsx/ppt/pptx等多种office文件格式的免费预览 //示例代码//​在https://view.officeapps.live.com/op/view.aspx?src…

使用autodl服务器,两个3090显卡上运行, Yi-34B-Chat-int4模型,并使用vllm优化加速,显存占用42G,速度23 words/s

1&#xff0c;演示视频地址 https://www.bilibili.com/video/BV1Hu4y1L7BH/ 使用autodl服务器&#xff0c;两个3090显卡上运行&#xff0c; Yi-34B-Chat-int4模型&#xff0c;用vllm优化&#xff0c;增加 --num-gpu 2&#xff0c;速度23 words/s 2&#xff0c;使用3090显卡 和…

JavaScript中的构造函数是什么,如何使用ES6中的类来进行构造函数的封装和继承?

目录 学习目标&#xff1a; 学习内容&#xff1a; 学习时间&#xff1a; 学习讲解&#xff1a; 深入对象 创建对象三种方式 构造函数 练习 实例化执行过程 说明&#xff1a;1. 创建新对象2. 构造函数this指向新对象3. 执行构造函数代码&#xff0c;修改this&#xff…

微信小程序调用相机拍摄或手机相册

wx.chooseMedia(Object object) 功能描述 拍摄或从手机相册中选择图片或视频。

Docker网络原理

Docker网络概述 1.桥接模式介绍 bridge模式是docker的默认网络模式。 桥接模式是一种用于连接两个不同网络段的设备&#xff0c;使它们能够共享通信的一种方式。 桥接设备工作在OSI模型的第二层&#xff0c;即数据链路层&#xff0c;通常基于MAC地址进行帧转发。 物理层连接…

VUE+THREE.JS 点击模型相机缓入查看模型相关信息

点击模型相机缓入查看模型相关信息 1.引入2.初始化CSS3DRenderer3.animate 加入一直执行渲染4.点击事件4.1 初始化renderer时加入监听事件4.2 触发点击事件 5. 关键代码分析5.1 移除模型5.2 创建模型上方的弹框5.3 相机缓入动画5.4 动画执行 1.引入 引入模型所要呈现的3DSprite…

Python 案例实训教学,课程展示及结课存档优化|ModelWhale 版本更新

大雪时节&#xff0c;仲冬如约而至&#xff0c;我们也迎来了 ModelWhale 新一轮的版本更新。 本次更新中&#xff0c;ModelWhale 主要进行了以下功能迭代&#xff1a; 优化 课程大纲展示&#xff08;团队版✓&#xff09;优化 作业批量导出存档&#xff08;团队版✓&#xff…

Windows本地如何添加域名映射?(修改hosts文件)

1. DNS(域名系统) Domain Name System(域名系统)&#xff1a;为了加快定位IP地址的速度, 将域名映射进行层层缓存的系统. 目的&#xff1a;互联网通过IP&#xff08;10.223.146.45&#xff09;定位浏览器建立连接&#xff0c;但是我们不易区别IP&#xff0c;为了方便用户辨识I…

基于SpringBoot+maven+Mybatis+html慢性病报销系统(源码+数据库)

一、项目简介 本项目是一套基于SpringBootmavenMybatishtml慢性病报销系统&#xff0c;主要针对计算机相关专业的正在做bishe的学生和需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目可以直接作为bishe使用。 项目都经过严格调试&a…

C程序设计—输入一行字符,分别统计出其中的英文字母、空格、数字和其它字符的个数。

#include <stdio.h> #include <ctype.h> // 包含ctype.h头文件&#xff0c;用于判断字符类型 int main() { char str[100]; // 定义字符数组&#xff0c;用于存储输入的字符串 int i, letters 0, space 0, digit 0, others 0; // 定义变量&#xff0c…

护眼灯有效果吗?考研必备护眼台灯推荐

据统计&#xff0c;中国人口的近视率约为10%至20%。 国家卫健委发布的中国首份眼健康白皮书显示&#xff0c;我国小学生近视率为47.2%&#xff0c;初中生近视率为75.8%&#xff0c;大学生近视率超过90%。据世界卫生组织统计数据显示&#xff0c;目前全球约有14亿近视人口&#…

反射加载SDK完成统一调用

文章目录 1、需求背景2、接口抽象类具体实现类3、疑问4、存在的问题5、通过反射加载SDK并完成调用5、补充&#xff1a;关于业务网关7、补充&#xff1a;关于SDK的开发 关键点&#xff1a; 接口抽象类&#xff08;半抽象半实现&#xff09;具体实现类业务网关反射加载SDK&#…

C++新经典模板与泛型编程:策略技术中的算法策略

策略技术中的算法策略 在之前博客中funcsum()函数模板中&#xff0c;实现了对数组元素的求和运算。求和在这里可以看作一种算法&#xff0c;扩展一下思路&#xff0c;对数组元素求差、求乘积、求最大值和最小值等&#xff0c;都可以看作算法。而当前的funcsum()函数模板中&…

H3.3K27M弥漫性中线胶质瘤的反义寡核苷酸治疗

今天给同学们分享一篇实验文章“Antisense oligonucleotide therapy for H3.3K27M diffuse midline glioma”&#xff0c;这篇文章发表在Sci Transl Med期刊上&#xff0c;影响因子为17.1。 结果解读&#xff1a; CRISPR-Cas9消耗H3.3K27M恢复了H3K27三甲基化&#xff0c;并延…

PHPstorm可选择版本的链接

PHPstorm可选择版本的链接 链接&#xff1a;https://www.jetbrains.com/phpstorm/download/other.html

class060 拓扑排序的扩展技巧【算法】

class060 拓扑排序的扩展技巧【算法】 算法讲解060【必备】拓扑排序的扩展技巧 2023-12-7 22:23:02 code1 P4017 最大食物链计数 // 最大食物链计数 // a -> b&#xff0c;代表a在食物链中被b捕食 // 给定一个有向无环图&#xff0c;返回 // 这个图中从最初级动物到最顶…