6、Flume安装部署

news2025/1/16 2:38:25

按照采集通道规划,需在hadoop102,hadoop103,hadoop104三台节点分别部署一个Flume。可参照以下步骤先在hadoop102安装,然后再进行分发。

1、Flume入门

1.1、 Flume安装部署

1.1.1、 安装地址

(1) Flume官网地址:Welcome to Apache Flume — Apache Flume

(2)文档查看地址:Flume 1.11.0 User Guide — Apache Flume

(3)下载地址:Index of /dist/flume

1.1.2、 安装部署

(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下

(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

[shuidi@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

(3)修改apache-flume-1.9.0-bin的名称为flume

[shuidi@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

[shuidi@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar

注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more

(5)修改conf目录下的log4j.properties配置文件,配置日志文件路径

[shuidi@hadoop102 conf]$ vim log4j.properties

flume.log.dir=/opt/module/flume/logs

1.2、分发Flume

[shuidi@hadoop102 ~]$ xsync /opt/module/flume/

1.3、项目经验

(1)堆内存调整

Flume堆内存通常设置为4G或更高,配置方式如下:

修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数(虚拟机环境暂不配置)

export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
注:
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;
-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。

2、日志采集Flume

2.1、日志采集Flume配置概述

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。

选择TailDirSource和KafkaChannel的原因如下:

1)TailDirSource

TailDirSource相比ExecSource、SpoolingDirectorySource的优势

TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

SpoolingDirectorySource监控目录,支持断点续传。

2)KafkaChannel

采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:

 

3、 日志采集Flume配置实操

1)创建Flume配置文件

在hadoop102节点的Flume的job目录下创建file_to_kafka.conf

[shuidi@hadoop104 flume]$ mkdir job
[shuidi@hadoop104 flume]$ vim job/file_to_kafka.conf 

2)配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

3)编写Flume拦截器

(1)创建Maven工程flume-interceptor

(2)创建包:com.atguigu.gmall.flume.interceptor

(3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(4)在com.atguigu.gmall.flume.utils包下创建JSONUtil类

package com.atguigu.gmall.flume.utils;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

(5)在com.atguigu.gmall.flume.interceptor包下创建ETLInterceptor类

package com.atguigu.gmall.flume.interceptor;

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        
        //1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

(6)打包

 (7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

4、 日志采集Flume测试

1)启动Zookeeper、Kafka集群

2)启动hadoop102的日志采集Flume

[shuidi@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

3)启动一个Kafka的Console-Consumer

[shuidi@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4)生成模拟数据

[shuidi@hadoop102 ~]$ lg.sh 

5)观察Kafka消费者是否能消费到数据

5、 日志采集Flume启停脚本

1)分发日志采集Flume配置文件和拦截器

若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。

[shuidi@hadoop102 flume]$ scp -r job hadoop103:/opt/module/flume/
[shuidi@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/

2)方便起见,此处编写一个日志采集Flume进程的启停脚本

在hadoop102节点的/home/shuidi/bin目录下创建脚本f1.sh

[shuidi@hadoop102 bin]$ vim f1.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

3)增加脚本执行权限

[shuidi@hadoop102 bin]$ chmod 777 f1.sh

4)f1启动

[shuidi@hadoop102 module]$ f1.sh start

5)f1停止

[shuidi@hadoop102 module]$ f1.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全网最牛,Web自动化测试Selenium八大元素定位实战(详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 安装Selenium和下…

BFT 最前线|OpenAI暂时下线ChatGPT”浏览“功能;Stability AI CEO:5年内,人类程序员将不复存在

原创 | 文 BFT机器人 AI视界 TECHNOLOGY NEWS 01 Open AI暂时下线ChatGPT“浏览”功能 日前OpenAI方面宣布&#xff0c;面向ChatGPT Plus用户的"浏览"功能会在某些情况下出现故障&#xff0c;因此已于7月3日暂时禁用了这一功能。该功能是为了提高ChatGPT的搜索体验…

威胁检测和取证日志分析

在网络中&#xff0c;威胁是指可能影响其平稳运行的恶意元素。因此&#xff0c;对于任何希望搁置任何财政损失或生产力下降机会的组织来说&#xff0c;威胁检测都是必要的。为了先发制人地阻止来自各种来源的任何此类攻击&#xff0c;需要有效的威胁检测情报。 威胁检测可以是…

mmap函数

参考 https://blog.csdn.net/bhniunan/article/details/104105153void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);参数 addr&#xff1a;出参&#xff0c; 指定映射的起始地址&#xff0c;通常设为NULL&#xff0c;由内核来分配 len&#x…

网络编程3——TCP Socket实现的客户端服务器通信完整代码(详细注释帮你快速理解)

文章目录 前言一、理论准备Socket套接字是什么TCP协议的特点 二、TCP 流套接字提供的APIServerSocket APISocket API 三、代码实现请求响应式 客户端服务器服务器客户端疑惑解答为什么服务器进程需要手动指定端口号而客户端进程不需要为什么客户端中的服务器IP与端口号是"…

Mysql架构篇--Mysql 主从同步方案

文章目录 前言一、传统的主从复制&#xff1a;1 原理&#xff1a;2 缺点&#xff1a; 二、半同步复制&#xff08;Semi-Synchronous Replication&#xff09;&#xff1a;三、组复制&#xff1a;1 原理&#xff1a;2 实现&#xff1a;2.1 myql 实例安装&#xff1a;2.1 myql 实…

量子近似优化算法(QAOA)入门(1):从量子绝热算法(QAA)角度的直观理解

文章目录 前言&#xff1a;量子计算的本质是测量一、基于量子逻辑电路的常用算法1.NISQ&#xff1a;Noisy Intermediate-Scale Quantum&#xff08;含噪声中等规模量子&#xff09; 二、量子绝热算法&#xff08;QAA&#xff1a;Quantum Adiabatic Algorithm&#xff09;1.QAA的…

【KingFusion】用KingFusion3.6创建一个客户端工程的步骤

哈喽&#xff0c;大家好&#xff0c;我是雷工&#xff01; 今天学习用KingFusion3.6创建一个客户端工程&#xff0c;以下记录创建过程。 客户端组件作为KingFusion3.6的数据展示功能模块&#xff0c;其主要功能是通过组态组态式配置以及丰富的图表元素、动画连接等多样的展示形…

ROS:TF坐标变换

目录 一、TF坐标变换背景二、概念三、静态坐标变换3.1概念3.2实际用例3.2.1分析3.2.2流程3.2.3C实现 一、TF坐标变换背景 机器人系统上&#xff0c;有多个传感器&#xff0c;如激光雷达、摄像头等&#xff0c;有的传感器是可以感知机器人周边的物体方位(或者称之为:坐标&#…

《LORA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS》论文笔记

引言 全量参数微调在LLM背景下由“不方便”演变为“不可行|高昂成本”&#xff0c;基于“收敛的模型参数可以压缩到低维空间”的假设&#xff1a; the learned over-parametrized models in fact reside on a low intrinsic dimension. 作者提出LORA&#xff08;Low Rank Adap…

远程关闭或重新启动计算机

远程关机只是从远程位置关闭计算机的过程。主要领域是组织在没有知识的情况下失去收入将是电力费用。员工倾向于在周末打开他们的系统。不必要的电力消耗也会影响我们的环境。在这种情况下&#xff0c;系统管理员可以在周末和非工作时间安排自动系统关闭&#xff0c;或者在必要…

Valve 签约开源 Linux 图形驱动开发者

导读据外媒 phoronix 报道&#xff0c;Valve 最近聘用了著名开源 Linux 图形驱动开发者 Alyssa Rosenzweig&#xff0c;以改进开源 Linux 图形驱动程序堆栈&#xff0c;增强 Linux 游戏生态系统。 Alyssa Rosenzweig 多年来在 Panfrost 开源、逆向工程 Arm Mali 图形驱动程序方…

【自动化测试基础知识】什么是自动化测试?

什么是自动化测试? 自动化测试是一种软件工具的应用&#xff0c;用于自动化由人驱动的检查和验证软件产品的手工过程。大多数现代敏捷和DevOps软件项目现在都包括从一开始就进行自动化测试。然而&#xff0c;为了充分理解自动化测试的价值&#xff0c;先学习下在它被广泛采用…

优化|一阶方法:求解不具有凸性和lipschitz连续性的复合问题

论文解读者&#xff1a;陈康明&#xff0c;赵田田&#xff0c;李朋 编者按&#xff1a;​ 对于大多数一阶算法&#xff0c;我们会在收敛性分析时假设函数是凸的&#xff0c;且梯度满足全局 Lipschitz 条件。而本文中&#xff0c;对于某一类特殊函数。我们不仅不要求函数是凸的…

基于信号博弈模型的区块链赋能下中小企业融资问题

​ 我国的金融体系是银行主导性&#xff0c;银行信贷是企业融资的首要来源。然而银企之间存在着严重的信息不对称&#xff0c;根据经典的微观银行理论&#xff0c;银行与企业之间的信息不对称会引发道德风险和逆向选择问题。因此在银行信贷市场中&#xff0c;当中小企业需要融资…

MySQL实现数据炸裂拆分(类似Hive的explode函数的拆分数组功能)

MySQL实现数据炸裂拆分(类似Hive的"explode"函数的拆分数组功能) 需求背景 背景描述 ​ 在Hive中&#xff0c;"explode"函数用于将数组类型的列拆分为多行&#xff0c;以便对数组中的每个元素进行处理。然而&#xff0c;在MySQL中&#xff0c;并没有直接…

前置微小信号放大器怎么用

前置微小信号放大器是一种用于将微弱信号从传感器转换成足够强度的信号以便更好地进行检测和处理的设备。它主要应用于各种传感器领域&#xff0c;例如温度传感器、压力传感器、光学传感器和生物传感器等。前置微小信号放大器的作用是提高信号的信噪比&#xff0c;减小噪声干扰…

天津热门大数据培训班 大数据选课技巧

大数据开发技术的应用时时刻刻都会影响我们的生活&#xff0c;所以很多想转行做大数据开发&#xff0c;大数据开发技术不断更新和发展&#xff0c;很多企业在开发过程中需要的大数据开发技术不断提高要求&#xff0c;因此市面上缺少的是要全面技能的大数据开发人员。 什么是大…

使用 Docker Desktop 安装 Centos 系统

一、前言 由于 Docker 是一个容器&#xff0c;它支持在一个服务器进行多服务部署&#xff0c;并且还能保持服务的独立性&#xff0c;那么&#xff0c;在Docker 上的运用时 我们也是可以 独立部署多个系统来做不同是其他&#xff0c;这样环境独立的情况下&#xff0c;也就不会造…

投票评选活动小程序v2-用户报名图片上传

投票评选活动小程序v2-用户自行报名收集材料页面 主要收集项目或者作品图片及其描述&#xff0c;可以在后台进行统一录入&#xff0c;也可以是在用户界面&#xff0c;让用户自行报名上传。 这里开发了一个“我要报名”页面&#xff0c;在首页点击“我要报名”按钮跳转过来。 …