Flume实现Kafka数据持久化存储到HDFS

news2024/9/29 3:27:05

写在前面:博主是一只经过实战开发历练后投身培训事业的“小山猪”,昵称取自动画片《狮子王》中的“彭彭”,总是以乐观、积极的心态对待周边的事物。本人的技术路线从Java全栈工程师一路奔向大数据开发、数据挖掘领域,如今终有小成,愿将昔日所获与大家交流一二,希望对学习路上的你有所助益。同时,博主也想通过此次尝试打造一个完善的技术图书馆,任何与文章技术点有关的异常、错误、注意事项均会在末尾列出,欢迎大家通过各种方式提供素材。

  • 对于文章中出现的任何错误请大家批评指出,一定及时修改。
  • 有任何想要讨论和学习的问题可联系我:zhuyc@vip.163.com。
  • 发布文章的风格因专栏而异,均自成体系,不足之处请大家指正。

Flume实现Kafka数据持久化存储到HDFS

本文关键字:Flume、Kafka、HDFS、实时数据、存储

文章目录

  • Flume实现Kafka数据持久化存储到HDFS
    • 一、场景描述
      • 1. 数据输入
      • 2. 数据管道
      • 3. 数据输出
    • 二、组件介绍
      • 1. Kafka
      • 2. Hadoop
      • 3. Flume
    • 三、前置准备
      • 1. Flume下载
      • 2. Flume安装
      • 3. 数据源准备
    • 四、配置文件
      • 1. 以内存为channel
      • 2. 以文件为channel
    • 五、运行测试
      • 1. 直接运行
      • 2. 监控运行

一、场景描述

对于一些实时产生的数据,除了做实时计算以外,一般还需要归档保存,用于离线数据分析。使用Flume的配置可以实现对数据的处理,并按一定的时间频率存储,本例中将从Kafka中按天存储数据到HDFS的不同文件夹。

1. 数据输入

本场景中数据来自Kafka中某个Topic订阅,数据格式为json。

2. 数据管道

使用Flume作为数据处理管道,通过配置实现自定义存储规则。

3. 数据输出

最终数据将存储在HDFS中,每一天的数据将对应一个单独的文件夹。

二、组件介绍

1. Kafka

来自维基百科:Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

如果需要参考安装步骤可以点击:Kafka 3.x的解压安装 - Linux

2. Hadoop

来自维基百科:Apache Hadoop是一款支持数据密集型分布式应用程序并以Apache 2.0许可协议发布的开源软件框架,有助于使用许多计算机组成的网络来解决数据、计算密集型的问题。基于MapReduce计算模型,它为大数据的分布式存储与处理提供了一个软件框架。所有的Hadoop模块都有一个基本假设,即硬件故障是常见情况,应该由框架自动处理。

如果需要参考安装步骤可以点击:Hadoop 3.x各模式部署 - Ubuntu

3. Flume

来自维基百科:Apache Flume是一款分布式、可靠且可用的软件,用于高效地收集、聚合和移动大量日志数据。它有一个基于流数据流的简单而灵活的体系结构。它具有健壮性和容错性,具有可调的可靠性机制以及许多故障切换和恢复机制。它使用了一个简单的可扩展数据模型,允许在线分析应用程序。

Flume的运行只需要预先配置好JDK即可,安装过程只需要解压以及环境变量的配置。

三、前置准备

1. Flume下载

  • 官网地址:https://flume.apache.org/

  • 点击Download -> 选择binary中的tar.gz

  • 进入镜像地址列表,右键复制下载链接

  • 使用wget下载到Linux系统
wget https://dlcdn.apache.org/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz

2. Flume安装

关于前置环境JDK的安装可以参考:Hadoop 3.x各模式部署 - Ubuntu中前置环境的部分【点击可直接跳转到指定位置】。

  • Flume解压缩
tar -zvxf apache-flume-1.11.0-bin.tar.gz
  • 环境变量配置
vi ~/.bashrc

export FLUME_HOME=/path/to/apache-flume-1.11.0-bin
export PATH=$PATH:$FLUME_HOME/bin

3. 数据源准备

可以在Kafka中创建一个新的Topic用于测试,具体步骤可以参考:Kafka 3.x的解压安装 - Linux中Console测试的部分【点击可直接跳转到指定位置】。

四、配置文件

在Flume中主要需要配置3个部分,sourcechannelsink。本例中source为kafka,sink为HDFS,channel同样有多种选择。

1. 以内存为channel

  • 优缺点
    • 优点:速度较快,不会占用额外硬盘空间
    • 缺点:只依赖Kafka的偏移量记录,Flume自身不会存储偏移量信息
  • 核心配置项
    • agent.sources.kafka-source.batchSize:每一批次处理的数据量,可以根据需要修改
    • agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的订阅地址,包含主机及端口号
    • agent.sources.kafka-source.kafka.topics:Kafka的Topic名称
    • agent.sinks.hdfs-sink.hdfs.path:最终数据在HDFS的保存路径,父级目录需要手动创建
  • 在Flume的conf文件夹中新建配置文件kafka-memory-hdfs.conf
# Name the components on this agent
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = hdfs-sink

# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-memory-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest

# Describe/configure the channel
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000

# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text

# Bind the source and sink to the channel
agent.sources.kafka-source.channels = memory-channel
agent.sinks.hdfs-sink.channel = memory-channel

2. 以文件为channel

  • 优缺点
    • 优点:可以保证数据不丢失,将数据状态保存在本地磁盘上
    • 缺点:会额外占用硬盘存储空间,读写速度相对较慢,需要合理移除历史文件
  • 核心配置项
    • agent.sources.kafka-source.batchSize:每一批次处理的数据量,可以根据需要修改
    • agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的订阅地址,包含主机及端口号
    • agent.sources.kafka-source.kafka.topics:Kafka的Topic名称
    • agent.channels.file-channel.checkpointDir:本地磁盘路径,需要预先创建父级目录
    • agent.channels.file-channel.useDualCheckpoints:设置为true则开启双重机制,可额外设置一个备份路径
    • agent.channels.file-channel.maxFileSize:单位为字节,当达到文件大小时会自动滚动新建
    • agent.sinks.hdfs-sink.hdfs.path:最终数据在HDFS的保存路径,父级目录需要手动创建
  • 在Flume的conf文件夹中新建配置文件kafka-file-hdfs.conf
# Name the components on this agent
agent.sources = kafka-source
agent.channels = file-channel
agent.sinks = hdfs-sink

# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-file-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest

# Describe/configure the channel
agent.channels.file-channel.type = file
agent.channels.file-channel.capacity = 10000
agent.channels.file-channel.transactionCapacity = 1000
agent.channels.file-channel.checkpointDir = /tmp/flume/checkpoint/
agent.channels.file-channel.backupCheckpointDir = /tmp/flume/backup/
agent.channels.file-channel.checkpointInterval = 300
agent.channels.file-channel.maxFileSize = 104857600
agent.channels.file-channel.useDualCheckpoints = true

# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text

# Bind the source and sink to the channel
agent.sources.kafka-source.channels = file-channel
agent.sinks.hdfs-sink.channel = file-channel

五、运行测试

开始执行后,会按照预先配置的存储规则**%Y-%m-%d**,将每一天产生的数据存放在不同的文件夹,但是由于数据是分批到达的,所以每个文件夹中会有多个文件,但是这不影响数据的计算,如果需要可以合并整理。

1. 直接运行

Flume启动时可以通过conf -f参数指定配置文件,建议分配较多的内存,防止溢出:

nohup flume-ng agent -c conf -f ptah/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g &

运行日志可以在FLUME_HOME/flume.log中找到,测试稳定后可以将进程挂在后台执行。

2. 监控运行

如果需要方便的进行指标监控,可以在启动时加入Prometheus,具体安装步骤可以查看可以自定义指标的监控工具 - Prometheus的安装部署。

  • jmx环境准备

下载jar包存储在合适位置:jmx_prometheus_javaagent-0.18.0.jar

  • 配置文件修改

在flume的conf配置文件中【kafka-memory-hdfs.conf/kafka-file-hdfs.conf】添加如下内容:

flume.monitoring.type = jmx
  • 添加监控规则:config.yaml

新建一个config.yaml文件,存放在合适位置。

startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
whitelistObjectNames:
  - 'org.apache.flume.*:*'
blacklistObjectNames: []
  • 添加监控配置:prometheus.yml

scrape_configs配置中增加一组和flume相关的job,修改后需要重新加载配置文件或者重启Prometheus进程

scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: "prometheus"

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
      - targets: ["localhost:9090"]

  - job_name: "flume"
    static_configs:
      - targets: ["localhost:9998"]
  • 启动命令

在启动Flume时,额外指定jar包所在路径,以及监控规则文件所在路径,设置的端口号为9998,与Prometheus中的设置保持一致。

nohup flume-ng agent -c conf -f path/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g -javaagent:/path/to/jmx_prometheus_javaagent-0.18.0.jar=9998:/path/to/config.yaml &
  • 监控效果

部署完成后可以通过jvm_threads_state指标来查看Flume的进程状态:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/586632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

eclipse连接mysql全网最详细教程

第一步&#xff1a;我们先做连接前的环境准备工作 1、首先在MySQL官网下载驱动&#xff1a;&#xff08;下载地址博主给大家整理好了直用&#xff09; https://dev.mysql.com/downloads/file/?id498587 下下来是这个样子 2、在eclipse中新建一个工程 3、添加驱动到eclipse里…

小主机折腾记12 HP 285G3 PRO MT

五一期间&#xff0c;无事&#xff0c;咸鱼购入HP 285G3 PRO MT折腾 HP 285 PRO G3 MT准系统 150包邮 R5 2600 250包邮 金百达3200内存 229京东包邮 直接说准系统情况&#xff1a; 1.主机有三个sata接口&#xff0c;两个硬盘接口&#xff0c;一个光驱接口&#xff08;应该是可以…

BM1684X开发环境搭建--SOC mode

环境配置-SOC模式---------------------------------------- 1&#xff1a;对于SoC模式&#xff0c;模型转换也需要在docker开发容器中完成&#xff1b;C/C程序建议在x86主机上使用交叉编译工具链编译生成可执行文件后&#xff0c;再拷贝到SoC目标平台运行&#xff1b; 2&…

macOS运行软件提示:“嘉立创EDA.app” 已损坏,无法打开。你应该将它移到废纸篓。

一、问题描述 macOS运行软件提示&#xff1a;“嘉立创EDA.app” 已损坏&#xff0c;无法打开。你应该将它移到废纸篓。 二、问题分析 macOS安全性问题导致的。 三、解决方案 1、在系统设置、隐私与安全性、安全性、允许从以下位置下载的应用程序中&#xff0c;选择“任何来…

OpenLDAP 搭建及简单使用

文章目录 1、前言LDAPOpenLDAP 2、安装&#xff08;通过 Docker 部署&#xff09;环境说明镜像说明部署客户端&验证连接示例 总结 1、前言 LDAP 轻型目录访问协议&#xff08;英文&#xff1a;Lightweight Directory Access Protocol&#xff0c;缩写&#xff1a;LDAP&am…

java.awt.datatransfer.Clipboard剪切板复制粘贴String

java.awt.datatransfer.Clipboard剪切板复制粘贴String java.awt.Toolkit 是一个可用来获取操作剪切板的工具 Toolkit toolkit Toolkit.getDefaultToolkit(); 获得 tookit Clipboard systemClipboard toolkit.getSystemClipboard(); 获得操作系统级的剪切板 Toolkit toolk…

算法基础学习笔记——⑬高斯消元\组合计数\容斥原理

✨博主&#xff1a;命运之光 ✨专栏&#xff1a;算法基础学习 目录 ✨高斯消元 ✨组合计数 &#x1f353;通过预处理逆元的方式求组合数: &#x1f353;Lucas定理: &#x1f353;分解质因数法求组合数&#xff1a; 前言&#xff1a;算法学习笔记记录日常分享&#xff0c;需…

【小可爱专属教程】服务器配置环境

【小可爱专属教程】服务器配置环境 安装CUDA和CUDNN安装CUDA安装CUDNN 安装Anaconda3安装Pytorch 安装CUDA和CUDNN 安装CUDA 文件已经上传至百度网盘 链接&#xff1a;https://pan.baidu.com/s/1LKzZXtSr1kXOnlfbO0cmEw?pwdgfbb 提取码&#xff1a;gfbbsudo sh cuda_11.3.…

【技术解决方案】(多级)缓存架构最佳实践

凌晨三点半了&#xff0c;太困了&#xff0c;还差一些&#xff0c;明天补上… 因为自己最近做的项目涉及到了缓存&#xff0c;所以水一篇缓存相关的文章&#xff0c;供大家作为参考&#xff0c;若发现文章有纰漏&#xff0c;希望大家多指正。 缓存涉及到的范围颇广&#xff0c…

spring boot整合Swagger2(2.9.2版本)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

计算机组成原理-中央处理器-指令流水线和多处理器

目录 一、指令流水线基本概念 1.1影响流水线的因素 1.1.1结构相关(资源冲突) 1.1.2 数据相关(数据冲突) 1.1.3 控制相关(控制冲突) 1. 2 流水线分类 二、流水线的多发技术 2.1 超标量技术 2.2 超流水技术 2.3 超长指令字 三、五段式指令流水线 四、多处理器系统基本概念…

I.MX RT1170加密启动详解(1):Encrypted Boot image组成

使用RT1170芯片构建的所有平台一般都是高端场合&#xff0c;我们需要考虑软件的安全需求。该芯片集成了一系列安全功能。这些特性中的大多数提供针对特定类型攻击的保护&#xff0c;并且可以根据所需的保护程度配置为不同的级别。这些特性可以协同工作&#xff0c;也可以独立工…

chatgpt赋能python:Python中的区间:什么是区间(Interval),如何使用区间

Python 中的区间&#xff1a;什么是 区间&#xff08;Interval&#xff09;&#xff0c;如何使用区间 在Python中&#xff0c;区间&#xff08;Interval&#xff09;是一个广泛使用的数据结构&#xff0c;用于表示一段连续的数据范围。使用区间可以更方便地处理各种数据类型&a…

本地搭建CFimagehost私人图床【公网远程访问】

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

代码随想录算法训练营第四十一天 | 力扣 343. 整数拆分, 96.不同的二叉搜索树

343. 整数拆分 题目 343. 整数拆分 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 解析 1.确定dp数组&#xff08;dp table&#xff09;以及下标的…

史上最详细的使用Claude和接入Claude-api教程

是什么&#xff08;What&#xff09; Claude 是最近新开放的一款 AI 聊天机器人&#xff0c;是世界上最大的语言模型之一&#xff0c;比之前的一些模型如 GPT-3 要强大得多&#xff0c;因此 Claude 被认为是 ChatGPT 最有力的竞争对手。Claude 的研发公司是专注人工智能安全和研…

Java制作520表白代码——爱一个人需要理由吗?

✨博主&#xff1a;命运之光 ✨专栏&#xff1a;Java经典程序设计 520表白日&#xff0c;每个人都期待着浪漫的表白&#xff0c;而作为一名热爱编程的程序员&#xff0c;我决定用程序员的方式来向你表达我的爱意。 在2023年5月20日这个特殊的日子里&#xff0c;我要用一段特别的…

BM1684X-onnx模型转化为bmodel

1&#xff1a;在tpu-mlir目录下进入docker docker run --privileged --name tpu-mlir -v $PWD:/workspace -it sophgo/tpuc_dev:v2.2 原因&#xff1a;该镜像已创建&#xff0c;要么重新创建一个新进程&#xff0c;要么杀死老进程&#xff1b; 解决办法如下&#xff1a; 2:接着…

夜深人静学32系列17——OLED

夜深人静学32系列17——OLED OLED简介接口定义OLED驱动原理驱动函数OLED.COLED.HCubeMX配置 实战部分效果展示驱动代码 OLED简介 LED&#xff0c;即有机发光二极管&#xff08;Organic Light-Emitting Diode&#xff09;&#xff0c;又称为有机激光显示&#xff08;Organic El…

基于YOLOV5的道路损伤(GRDDC‘2020)检测

1. GRDDC2020 数据集介绍 GRDDC2020 数据集是从印度、日本和捷克收集的道路图像。包括三个部分&#xff1a;Train, Test1, Test2。训练集包括带有 PASCAL VOC 格式 XML 文件标注的道路图像。 缺陷类型&#xff1a;D00、D01、D11、D10、D20、D40、D43、D44、D50、D0w0…