【数仓】通过Flume+kafka采集日志数据存储到Hadoop

news2025/1/23 10:30:58

相关文章

  • 【数仓】基本概念、知识普及、核心技术
  • 【数仓】数据分层概念以及相关逻辑
  • 【数仓】Hadoop软件安装及使用(集群配置)
  • 【数仓】Hadoop集群配置常用参数说明
  • 【数仓】zookeeper软件安装及集群配置
  • 【数仓】kafka软件安装及集群配置
  • 【数仓】flume软件安装及配置
  • 【数仓】flume常见配置总结,以及示例

一、flume有什么作用

Apache Flume是一个分布式、可靠且可用的大数据日志采集、聚合和传输系统。它主要用于将大量的日志数据从不同的数据源收集起来,然后通过通道(Channel)进行传输,最终将数据传输到指定的目的地,如HDFS、HBase等。Flume具有高度可扩展性、容错性和灵活性,可以适应各种复杂的数据采集场景。

Flume的核心组件包括Source、Channel和Sink。Source负责从数据源中读取数据,可以是文件、网络套接字、消息队列等;Channel是数据的缓冲区,用于在Source和Sink之间传输数据;Sink负责将数据写入目标存储系统,如HDFS、HBase、Kafka等。此外,Flume还支持多种类型的Source、Channel和Sink,用户可以根据实际需求进行选择和配置。

Flume的主要作用是实现大规模数据采集和传输,实现数据的实时处理和分析,从而为企业提供更好的业务决策支持。在实际应用中,Flume可以用于日志收集、事件跟踪、数据流处理等场景。通过将数据从不同的数据源采集并传输到指定的目的地,Flume可以帮助企业实现数据的集中存储和管理,为后续的数据分析和挖掘提供基础。

此外,Flume还具有可靠性机制和故障转移和恢复机制,能够保证数据传输的可靠性和安全性。同时,Flume还支持客户扩展和自定义开发,用户可以根据自己的需求进行扩展和优化,使其更加适合特定的应用场景。

总的来说,Apache Flume是一个功能强大、灵活可靠的大数据日志采集、聚合和传输系统,它在大数据处理中起到了至关重要的作用。

二、环境准备

准备1台虚拟机

  • Hadoop131:192.168.56.131

本例系统版本 CentOS-7.8,已安装jdk1.8

关闭防火墙

systemctl stop firewalld

zookeeper、kafka 已安装,且已启动

三、flume安装配置

1、配置flume agent

1)本例演示 flume 去掉kafka数据,然后存储到hdfs中
2)完整数据通道是:log文件 > flume > kafka > flume > hdfs
3)flume 安装目录是 /data/flume
4)kafka 、Hadoop在前面已经安装过

新建配置文件 /data/flume/conf/job/kafka_to_hdfs_log.conf,内容如下:

# 定义组件
# 这里定义了Flume agent的三个主要组件:source(数据源)、channel(通道)和sink(数据接收器)。
a2.sources=r2
a2.channels=c2
a2.sinks=k2

# 配置source
# 配置数据源为Kafka,指定了Kafka的相关参数,如服务器地址、主题等。
a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
# 每次从Kafka拉取的数据量
a2.sources.r2.batchSize=5000
# 拉取数据的间隔时间(毫秒)
a2.sources.r2.batchDurationMillis=2000
# Kafka服务器地址列表
a2.sources.r2.kafka.bootstrap.servers = hadoop131:9092,hadoop132:9092,hadoop133:9092
# 从Kafka的哪个主题拉取数据
a2.sources.r2.kafka.topics=topic_log
# 注释掉的部分是关于拦截器的配置,拦截器可以用来对数据源的数据进行预处理。
#a2.sources.r2.interceptors=i2
#a2.sources.r2.interceptors.i2.type = com.my.flume.interceptor.TimestampInterceptor

# 配置channel
# 配置通道为文件通道,指定了通道的相关参数,如检查点目录、数据目录等。
a2.channels.c2.type = file
# 检查点目录,用于存储通道的状态信息
a2.channels.c2.checkpointDir = /data/flume/checkpoint/behaviorl
# 数据目录,用于存储通道中的数据
a2.channels.c2.dataDirs = /data/flume/data/behaviorl
# 通道中文件的最大大小(字节)
a2.channels.c2.maxFileSize = 2146435071
# 通道的容量,即可以存储的最大事件数
a2.channels.c2.capacity = 1000000
# 通道的keepalive时间(秒)
a2.channels.c2.keepalive = 6

# 配置sink
# 配置数据接收器为HDFS,指定了HDFS的相关参数,如文件路径、文件前缀等。
a2.sinks.k2.type =hdfs
# HDFS上的文件路径,使用了时间变量来动态生成目录
a2.sinks.k2.hdfs.path = /origin_data/user/log/topic_log/%Y-%m-%d
# HDFS上的文件前缀
a2.sinks.k2.hdfs.filePrefix=log
# 是否按照时间轮转文件,这里设置为false,表示不按照时间轮转
a2.sinks.k2.hdfs.round =false
# 文件轮转的时间间隔(秒)
a2.sinks.k2.hdfs.rollInterval=10
# 文件轮转的大小阈值(字节)
a2.sinks.k2.hdfs.rollSize=134217728
# 文件轮转的事件数阈值,这里设置为0,表示不按照事件数轮转
a2.sinks.k2.hdfs.rollCount=0

# 控制输出文件类型
# 设置输出文件的类型为压缩流格式,并使用gzip压缩算法。
a2.sinks.k2.hdfs.fileType = CompressedStream
a2.sinks.k2.hdfs.codeC = gzip

# 组装
# 将数据源、通道和数据接收器组装在一起,形成一个完整的Flume agent。
a2.sources.r2.channels=c2
a2.sinks.k2.channel=c2

这个配置文件定义了一个Flume agent,它从Kafka中读取数据,通过文件通道进行缓存,并最终将数据写入到HDFS中。在写入HDFS时,使用了压缩流格式,并对输出文件进行了gzip压缩。同时,还通过一些参数对文件的轮转进行了控制。

2、启动flume

1)创建flume启动脚本f2.sh

vi /usr/bin/f2.sh
# 修改文件权限
chmod 777 /usr/bin/f2.sh

2)复制如下内容

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi

case $1 in
"start")
    #遍历集群所有机器
    for host in hadoop131
    do
        echo --------------------  $host 日志收集 flume 启动 --------------------
        ssh $host "nohup /data/flume/bin/flume-ng agent -n a2 -c /data/flume/conf/ -f /data/flume/conf/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
    done
;;
"stop")
    #遍历集群所有机器
    for host in hadoop131
    do
        echo --------------------  $host 日志收集 flume 停止 --------------------
        ssh $host "ps -ef | grep kafka_to_hdfs_log | grep -v grep | awk '{print \$2}' |xargs -n1 kill 9"
    done
;;
*)
    echo "Input Args Error..."
;;
esac

3)通过集群脚本 f2.sh 操作

f2.sh start

flume启动命令说明

以下是flume启动命令的常用参数:

参数默认值说明
--name-n无默认值,必须指定指定启动的Flume Agent的名称。这个名称应该与配置文件中定义的agent的名称一致。
--conf-c无默认值,通常设置为flume配置文件的目录指定Flume配置文件的目录。这个目录下应该包含flume的配置文件。
--conf-file-f无默认值,必须指定指定具体的Flume配置文件名。这个文件应该包含了Flume Agent的配置信息。
--zkConnString-z无默认值当Flume配置使用Zookeeper进行集群管理时,指定Zookeeper的连接字符串。格式为主机名:端口号,多个节点用逗号分隔。
-Dflume.root.logger无默认值,通常设置为INFO,console设置Flume的日志级别和输出方式。例如,INFO,console表示日志级别为INFO,并输出到控制台。也可以设置为输出到日志文件。
--no-reload-conffalse如果设置为true,那么Flume将不会重新加载配置文件,即使配置文件发生了变化。
--help-h无默认值显示帮助信息,列出所有可用的启动参数。

需要注意的是,Flume的启动参数可能会因版本和具体的使用场景而有所不同。上表中的参数是最常用的,但并不是所有的参数都在所有版本的Flume中都可用。在实际使用时,建议查阅对应版本的Flume官方文档或使用flume-ng agent --help命令查看可用的参数列表。

3、验证日志采集通路

1)在指定的log目录中生成日志文件

cat app.log >> /data/applog/log/app_test.log

2)打开Hadoop查看数据,http://192.168.56.131:9870/

Hadoop在前面已经安装过

在这里插入图片描述

参考

  • https://flume.apache.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1512703.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】string学习 — 手搓string类项目

手搓string项目 1 string类介绍2 功能描述3 代码实现3.0 基础框架3.1 构造函数 和 析构函数3.2 流操作符重载 和 尾插扩容3.4 运算符重载3.5 实用功能3.6 迭代器模拟 总结这里提供一下源代码:Thanks♪(・ω・)ノ谢谢阅读!…

摄像机内存卡删除的视频如何恢复?恢复指南来袭

在现代社会,摄像机已成为记录生活、工作和学习的重要设备。然而,随着使用频率的增加,误删或意外丢失视频的情况也时有发生。面对这样的情况,许多用户可能会感到无助和困惑。那么,摄像机内存卡删除的视频真的无法恢复吗…

【AnaConda/MiniConda/Linux】使用sudo python或切换root管理员conda环境被绕过解决方案

写在前面 部分机型修改环境变量存在风险,可能用于被覆盖而出现大量命令无法找到的情况 可以输入这个解决 export PATH/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin往期相关内容 探索Miniconda3:简单、灵活的Python环境和…

HTML 学习笔记(九)颜色值和长度单位

一、颜色 1.通过RGB值来设置颜色 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>table</title&…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的癌症图像检测系统(深度学习模型+UI界面代码+训练数据集)

摘要&#xff1a;本篇博客深入介绍了如何借助深度学习技术开发癌症图像检测系统&#xff0c;以提高医疗诊断的精度和速度。系统基于先进的YOLOv8算法&#xff0c;并对比分析了YOLOv7、YOLOv6、YOLOv5的性能&#xff0c;如mAP和F1 Score。详细解释了YOLOv8的原理&#xff0c;并附…

【how2j练习题】css部分课堂练习

1.表格斑马线 <style>table {width: 500px;border-collapse: collapse;}tr#title {background-color: white;text-align: center;border-bottom: 5px solid gold;}tr#id1 {text-align: center;border-bottom: 2px solid blueviolet;}tr#id2 {text-align: center;border-b…

【C++】STL(六) list容器

7. list容器7.1 简介7.2 构造函数例子 7.3 赋值和交换例子 7.4 大小操作例子 7.5 插入和删除例子 7.6 数据存取例子 7.7 反转和排序例子 7. list容器 7.1 简介 ① 功能&#xff1a;将数据进行链式存储。 ② 链表(list)是一种物理存储单元上非连续的存储结构&#xff0c;数据…

Python环境下一维时间序列的小波尺度谱和时间平均小波谱(基于Morlet小波)

小波分析是较好的非平稳信号分析方法之一&#xff0c;它通过伸缩和平移运算对信号进行多尺度细化分析&#xff0c;能够在不同的尺度上描述信号的局部特征&#xff0c;为微弱故障特征信号的检测提供了有效的工具。小波尺度谱可看作一个有恒定相对带宽的谱图&#xff0c;能够反映…

Linux fork函数详解

文章目录 1 基本介绍2 fork实例2.1 多个fork返回值2.2 C语言 fork与输出2.3 fork &#x1f4a3; 1 基本介绍 #include <sys/types.h> #include <unistd.h>pid_t fork(void)描述 fork用于创建一个子进程&#xff0c;它与父进程的唯一区别在于其PID和PPID&#xff0…

【Linux】Linux小结

LVS、Nginx、HAproxy的区别 LVS、Nginx和HAproxy都是常见的负载均衡器&#xff0c;用于将网络负载分散到多个服务器上&#xff0c;以提高系统的可用性和性能 功能不同&#xff1a; LVS是一个Linux内核模块&#xff0c;在网络层&#xff08;第四层&#xff09;运行的。 Nginx和…

Java错误:微服务报错Cannot execute request on any known serve

&#x1f414;问题内容 报Cannot execute request on any known server 这个错&#xff1a;连接Eureka服务端地址不对。 &#x1f414;解决方式 检查.yml文件或者.properties文件配置 下划线下划线后面的小写字母等同于去掉下划线大写下划线后面的字母&#xff08;驼峰原则&am…

一道题学会如何使用哈希表

给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2示例 2&#xff1a; 输入&#xff1a;nums [1,2,3], …

【任务计划程序】打卡签到(自用)

文章目录 前言效果如下&#xff1a; 任务计划程序其他签到代码&#xff08;更新如下&#xff09; 前言 【github actionpython】完成定时任务并推送&#xff08;学会自制&#xff09;&#xff08;这里的github上这个glados签到不能用了&#xff0c;glados上的部分功能也变了&a…

学完Efficient c++ (39-40)

条款39&#xff1a;明智而审慎地使用private继承 private继承的特点&#xff1a; 如果类之间是private继承关系&#xff0c;那么编译器不会自动将一个派生类对象转换为一个基类对象。由private继承来的所有成员&#xff0c;在派生类中都会变为private属性&#xff0c;换句话说…

大白话ChatGPT技术

▼最近直播超级多&#xff0c;预约保你有收获 今晚直播&#xff1a;《ChatGPT架构设计与应用案例实践》 —1— ChatGPT 架构设计剖析 ChatGPT 总体架构由三大部分构成&#xff1a;预训练&#xff08;Pre-training&#xff09;架构、微调&#xff08;Fine-tuning&#xff09;架…

sqllab第五关通关笔记

知识点&#xff1a; 报错注入函数语法&#xff08;详见第二关笔记&#xff09;报错注入打印位数最多32位对于大于32位的数据最好使用截取函数进行控制&#xff1b;以保证输出完整mysql表中的重点数据库 information_schema &#xff08;mysql 5.0以上&#xff09; schemata …

揭秘 Kubernetes Secret:安全存储敏感信息的秘密武器

Kubernetes Secret Secret 是 Kubernetes 中用于存储敏感信息的资源&#xff0c;例如密码、API 密钥和 SSH 密钥。Secret 可以被 Pod 和其他 Kubernetes 资源使用&#xff0c;而无需将敏感信息暴露在配置文件或环境变量中。 1. Secret 类型 Kubernetes 支持多种类型的 Secre…

智能家居涉及到的12个物联网传感器!

智能家居领域涉及到的物联网传感器种类繁多&#xff0c;下面列举一些常见的物联网传感器&#xff1a; 温度传感器&#xff1a;用于检测室内温度&#xff0c;可以实现智能温控和节能控制。湿度传感器&#xff1a;用于检测室内湿度&#xff0c;可以实现智能湿度控制和防潮功能。…

Python Excel 文本编辑库之xlsxwriter使用详解

概要 在现代数据处理和报表生成中,Excel 文件是一个非常常见的格式。Python XlsxWriter 库是一个强大的工具,可以帮助开发者轻松创建和编辑 Excel 文件,并且具有高度的灵活性和可定制性。本文将全面介绍 XlsxWriter 库的原理、功能、用法,并通过丰富的示例代码来展示其强大…

Linux 安装 Gitblit

1.下载Gitblit 官网地址&#xff1a;Gitblit&#xff0c;目前最新的是1.9.3 2.上传到服务器 ①在服务器上新建目录&#xff1a;/usr/local/gitblit ②将下载的文件上传到服务器&#xff1a;/usr/local/gitblit/gitblit-1.9.3.tar.gz ③解压文件&#xff1a; cd /usr/local…