【数据采集与预处理】流数据采集工具Flume

news2024/11/15 20:09:13

一、Flume简介

数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。

(一)Flume定义

        Apache Flume是一种分布式、具有高可靠和高可用性的数据采集系统,可从多个不同类型、不同来源的数据流汇集到集中式数据存储系统中。Flume 基于流式架构,灵活简单。

(二)Flume作用

Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

二、Flume组成架构

1、Agent
        Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的,是 Flume 数据传输的基本单元。Agent 主要有 3 个部分组成,Source、Channel、Sink。

2、Source
        Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

3、Channel
        Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

4、 Sink
        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
        Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
        Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

5、Event
        传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

Flume Agent 内部原理:

三、Flume安装配置

(一)下载Flume

到Flume官网下载Flume1.7.0安装文件,下载地址如下:

http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

下载完成后上传到虚拟机的“/usr/local/uploads”目录下。

(二)解压安装包

首先进入到“uploads”目录下。将压缩包解压到“/usr/local”目录下

[root@bigdata zhc]# cd /usr/local/uploads
[root@bigdata uploads]# tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local

将解压的文件修改名字为flume,简化操作。把/usr/local/flume目录的权限赋予当前登录Linux系统的用户。

[root@bigdata uploads]# cd /usr/local
[root@bigdata local]# mv apache-flume-1.7.0-bin flume
[root@bigdata local]# chown -R zhc:zhc ./flume

 

(三)配置环境变量

首先,修改/etc/profile配置文件:

[root@bigdata local]# vi /etc/profile

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

使文件生效:

[root@bigdata local]# source /etc/profile

下面修改 flume-env.sh 配置文件:

[root@bigdata local]# cd /usr/local/flume/conf
[root@bigdata conf]# cp flume-env.sh.template flume-env.sh
[root@bigdata conf]# vi flume-env.sh

在文件中增加一行内容,用于设置JAVA_HOME变量:

export JAVA_HOME=/usr/local/servers/jdk

然后,保存flume-env.sh文件,并退出vim编辑器。

(四)查看Flume版本信息

[root@bigdata conf]# cd /usr/local/flume
[root@bigdata flume]# ./bin/flume-ng version

然后就会发现如下报错: “错误: 找不到或无法加载主类”

原因分析:
(1)jdk 冲突
(2)安装了HBase就会报着个错

解决方法:

到“/usr/local/flume/bin”目录下修改flume-ng文件。

[root@bigdata flume]# cd /usr/local/flume/bin
[root@bigdata bin]# vi flume-ng

在文件中加入以下内容:

2>/dev/null | grep hbase

再次查看flume版本信息。

四、使用Flume作为Spark Streaming数据源

        Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

(一)Spark准备工作

1、下载spark-streaming-flume_2.11-2.3.4.jar

首先,到官网下载spark-streaming-flume_2.11-2.3.4.jar:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

上面的网址要是打不开,可以用下面的这个网址:

Central Repository: org/apache/spark/spark-streaming-flume_2.11

2、把这个jar文件放到“/usr/local/spark/jars/flume”目录下

[root@bigdata flume]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir flume
[root@bigdata jars]# cd flume
[root@bigdata flume]# cp /usr/local/uploads/spark-streaming-flume_2.11-2.3.4.jar .

注意:此处不要将“/usr/local/flume/lib”目录下的所有jar包都拷贝到“/usr/local/spark/jars/flume” 目录下,不然会使Spark和Hadoop版本与Guava库的版本不兼容,从而导致后面运行程序时会报错!

3、修改spark-env.sh文件

[root@bigdata flume]# cd /usr/local/spark/conf
[root@bigdata conf]# vi spark-env.sh

将如下内容加到文件中: 

:/usr/local/spark/jars/flume/*:/usr/local/flume/lib/*

这样,Spark环境就准备好了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360539.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Proteus仿真】【Arduino单片机】太阳能追光系统设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器,使用LCD1602液晶、光敏传感器、ADC模块、按键模块、28BYJ48步进电机驱动模块、直流电机模块等。 主要功能: 系统运行后,L…

Jupyter Lab | 在指定文件夹的 jupyter 中使用 conda 虚拟环境

Hi,大家好,我是源于花海。本文主要了解如何在指定文件夹的 jupyter 中使用 conda 虚拟环境,即在 conda 里面创建虚拟环境、将虚拟环境添加至 jupyter lab/notebook、安装软件包。 目录 一、创建虚拟环境 二、激活并进入虚拟环境 三、安装 …

我们公司内应届生身上的6个共性问题

如题目,本文主要是根据我们公司内真实的应届生身上共同的问题,总结而来。 1. 一天会做很多工作:会跟很多人对接,会一会忙这个一会忙哪个 现象: 说实话,这种情况,我看着都替她着急。自己正在解…

IPD(集成产品开发)

一、简介IPD IPD是指应用于集成产品开发(Integrated Product Development)的一套流程。 IPD流程可分为很多小的流程,这些流程确保企业做正确的事,且正确地做事。 IPD核心的流程之一是PDP(Product Development Proce…

用C语言实现完全平方数计算【一题一策】第三期

题目:一个整数,它加上100后是一个完全平方数,再加上 168 又是一个完全平方数,请问该数是多少? 一、题目分析 首先假设该数为x,则x100y?,y为完全平方数。 然后加上168又是一个完全平方数&…

某金属加工公司的核心人才激励体系搭建项目纪实

【客户行业】金属加工行业 【问题类型】薪酬体系/激励体系 【客户背景】 某大型金属加工企业位于河北地区,成立于2000年,隶属于某大型有色金属集团,是一家集科研、开发、生产、销售于一体的国有企业,人员达到1000人。经过多年…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)主线程给子线程添加任务以及如何处理该任务

在看此篇文章,建议先看我的往期文章: 基于多反应堆的高并发服务器【C/C/Reactor】(中)在EventLoop的任务队列中添加新任务-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135346492?spm1001.2014.3001.5501一…

kubesphere和k8s的使用分享

文章目录 什么是kubernetesKubernetes的部分核心概念互式可视化管理平台与kubernetes的关系市面是常见的kubernetes管理平台 什么是kubesphereKubesphere默认安装的组件Kubesphere涉及的服务组件kubesphere的安装Kubesphere相关的内容 什么是kubernetes 就在这场因“容器”而起…

backtrader框架初探,轻松跑通策略并策略分析

网上有很多backtrader的文章,并有些将其与vnpy做比较,经过安装后发现,还是backtrader教程简单。 1、前期准备 # 安装akshare免费行情源 pip install akshare -i http://mirrors.aliyun.com/pypi/simple/ --trusted-hostmirrors.aliyun.com …

kubectl 源码分析

Cobra库 k8s各组件的cli部分都使用Cobra库实现,Cobra 中文文档 - 掘金 (juejin.cn),获取方式如下: go get -u github.com/spf13/cobralatest cobra库中的Command结构体的字段,用于定义命令行工具的行为和选项。它们的作用如下&…

性能优化-OpenMP基础教程(五)-全面讲解OpenMP基本编程方法

本文主要介绍OpenMP编程的编程要素和实战,包括并行域管理详细实战、任务分担详细实战。 🎬个人简介:一个全栈工程师的升级之路! 📋个人专栏:高性能(HPC)开发基础教程 🎀C…

Linux与安全

本心、输入输出、结果 文章目录 系统设计 - 我们如何通俗的理解那些技术的运行原理 - 第八部分:Linux、安全 前言 Linux 文件系统解释应该知道的 18 个最常用的 Linux 命令HTTPS如何工作? 数据是如何加密和解密的?为什么HTTPS在数据传输过程…

IntelliJ IDEA远程查看修改Ubuntu上AOSP源码

IntelliJ IDEA远程查看修改Ubuntu上的源码 本人操作环境windows10,软件版本IntelliJ IDEA 2023.2.3,虚拟机Ubuntu 22.04.3 LTS 1、Ubuntu系统安装openssh 查看是否安装: ssh -V 如果未安装: sudo apt install openssh-server # 开机自启…

php 数组中的元素进行排列组合

需求背景:计算出数组[A,B,C,D]各种排列组合,希望得到的是数据如下图 直接上代码: private function finish_combination($array, &$groupResult [], $splite ,){$result [];$finish_result [];$this->diffArrayItems($array, $…

springboot实现ChatGPT式调用(一次调用,持续返回)

下边实现了一个持续返回100以内随机数的接口,在接口超时之前会每隔1秒返回一个随机数 GetMapping(value "/getRandomNum", produces MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter getRandomNum() {SseEmitter emitter new SseEmitter();Th…

使用STM32的定时器和PWM实现LCD1602的背光控制

使用STM32的定时器和PWM功能来控制LCD1602的背光是一种常见的方法,它可以实现背光的亮度调节和闪烁效果。在本文中,我们将讨论如何利用STM32的定时器和PWM来实现LCD1602的背光控制,并提供相应的代码示例。 1. 硬件连接和初始化 首先&#x…

负责任的人工智能与人机环境系统智能

负责任的人工智能是指在人工智能系统的设计、开发、管理、使用和维护过程中,所有相关的角色(包括设计者、开发者、管理者、使用者、维护者等等)都承担其行为的道义、法律和社会责任。这意味着这些角色需要确保人工智能系统的设计与使用符合伦…

C++完成使用map Update数据 二进制数据

1、在LXMysql.h和LXMysql.cpp分别定义和编写关于pin语句的代码 //获取更新数据的sql语句 where语句中用户要包含where 更新std::string GetUpdatesql(XDATA kv, std::string table, std::string where); std::string LXMysql::GetUpdatesql(XDATA kv, std::string table, std…

window服务器thinkphp队列监听服务

经常使用linux的同学们应该对使用宝塔来做队列监听一定非常熟悉,但对于windows系统下,如何去做队列的监听?是一个很麻烦的事情。 本文将通过windows系统的服务来实现队列的监听。 对于thinkphp6 queue如何使用,不再赘述。其它系…