【大数据学习 | flume】flume之常见的sink组件

news2025/1/24 2:25:21

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、HBase sink,。

​ Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据,在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

1. File_roll Sink

File_roll sink是将收集到的数据存放在本地文件系统中,根据指定的时间生成新的文件用来保存数据。

# file_role sink

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/root/file_role
a1.sinks.k1.sink.rollInterval=60
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

rollInterval=60:每隔60s滚动生成一个文件。

创建数据输出目录

mkdir -p /root/file_role

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./file_roll.agent -Dflume.root.logger=INFO,console

2. hdfs sink

hdfs sink是将flume收集到的数据写入到hdfs中,方便数据可靠的保存。

其中:

sink 输出到hdfs中,默认每10个event 生成一个hdfs文件,hdfs文件目录会根据hdfs.path 的配置自动创建。

sink hdfs 配置参数描述:

名称描述
hdfs.pathhdfs目录路径
hdfs.filePrefix文件前缀。默认值FlumeData
hdfs.fileSuffix文件后缀
hdfs.rollInterval多久时间后close hdfs文件。单位是秒,默认30秒。设置为0的话表示不根据时间close hdfs文件
hdfs.rollSize文件大小超过一定值后,close文件。默认值1024,单位是字节。设置为0的话表示不基于文件大小
hdfs.rollCount写入了多少个事件后close文件。默认值是10个。设置为0的话表示不基于事件个数
hdfs.fileType文件格式, 有3种格式可选择:SequenceFile(默认), DataStream(不压缩) or CompressedStream(可压缩)
hdfs.batchSize批次数,HDFS Sink每次从Channel中拿的事件个数。默认值100
hdfs.minBlockReplicasHDFS每个块最小的replicas数字,不设置的话会取hadoop中的配置
hdfs.maxOpenFiles允许最多打开的文件数,默认是5000。如果超过了这个值,越早的文件会被关闭
hdfs.callTimeoutHDFS操作允许的时间,比如hdfs文件的open,write,flush,close操作。单位是毫秒,默认值是10000
hdfs.codeC压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy
# hdfs sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/xinniu/output/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.filePrefix=hainiu-
a1.sinks.k1.hdfs.fileSuffix=.log
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

3. kafka sink

将数据写入到kafka中

# kafka sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = hainiu
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动kafka消费者消费hainiu topic中的数据

启动fluem agent

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./kafkasink.agent -Dflume.root.logger=INFO,console

kafka保存flume收集到的数据,并通过kafka消费者消费到收集到的数据

4. avro sink

将flume收集到的数据通过avro sink序列化出去,通常用于数据跨服服务多级流动。

启动三台机器:

在第一台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 55555

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第二台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.94.204.87
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname =11.147.251.96
a1.sinks.k1.port = 55555

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第三台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第三台节点打印数据到控制台

4.1 扇出操作

还可以通过avro sink 实现扇出操作:即第一台服务器收集数据,将数据发送到第二台和第三台服务器。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555

a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第三台节点打印数据到控制台

4.2 扇入操作

还可以通过avro sink 实现扇入操作:即第一台和第二台手机数据,将数据发送到第三台服务器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数学分组求偶数和

问题描述 小M面对一组从 1 到 9 的数字,这些数字被分成多个小组,并从每个小组中选择一个数字组成一个新的数。目标是使得这个新数的各位数字之和为偶数。任务是计算出有多少种不同的分组和选择方法可以达到这一目标。 numbers: 一个由多个整数字符串组…

构建安全护盾:HarmonyOS 应用的数据安全审计与日志管理实战

文章目录 前言数据安全审计与日志管理的重要性什么是数据安全审计?为什么需要日志管理? 数据安全审计与日志管理的基本原则实现数据安全审计与日志管理的技术方案1. 数据安全审计的实现2. 日志管理的实现 ArkUI 与 ArkTS 的代码示例1. 审计日志记录2. 实…

ReactPress与WordPress:两大开源发布平台的对比与选择

ReactPress与WordPress:两大开源发布平台的对比与选择 在当今数字化时代,内容管理系统(CMS)已成为各类网站和应用的核心组成部分。两款备受欢迎的开源发布平台——ReactPress和WordPress,各自拥有独特的优势和特点&am…

HarmonyOS 开发环境搭建

HarmonyOS(鸿蒙操作系统)作为一种面向全场景多设备的智能操作系统,正逐渐在市场上崭露头角。为了进入HarmonyOS生态,开发者需要搭建一个高效的开发环境。本文将详细介绍如何搭建HarmonyOS开发环境,特别是如何安装和配置…

基于VUE实现语音通话:边录边转发送语言消息、 播放pcm 音频

文章目录 引言I 音频协议音频格式:音频协议:II 实现协议创建ws对象初始化边录边转发送语言消息 setupPCM按下通话按钮时开始讲话,松开后停止讲话播放pcm 音频III 第三库recorderplayer调试引言 需求:电台通讯网(电台远程遥控软件-超短波)该系统通过网络、超短波终端等无线…

【提高篇】3.3 GPIO(三,工作模式详解 上)

目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…

微服务瞎写

1.微服务解决的问题 1、如何发现新节点以及检查各节点的运行状态? 2、如何发现服务及负载均衡如何实现? 3、服务间如何进行消息通信? 4、如何对使用者暴露服务API? 5、如何集中管理各节点配置文件? 6、如何收集各…

Python Tornado框架教程:高性能Web框架的全面解析

Python Tornado框架教程:高性能Web框架的全面解析 引言 在现代Web开发中,选择合适的框架至关重要。Python的Tornado框架因其高性能和非阻塞I/O特性而备受青睐。它特别适合处理大量并发连接的应用,比如聊天应用、实时数据处理和WebSocket服务…

基于图像分类的对抗攻击算法研究

图像分类与对抗攻击 图像分类是计算机视觉的基础任务,旨在 将不同类别的图像准确归类 。随着深度学习发展,模型在大规模数据集上的表现已超越人类。然而,这一进步也引发了新的安全挑战—— 对抗攻击 。 对抗攻击通过向原始图像添加精心设计的…

【AI大模型】ELMo模型介绍:深度理解语言模型的嵌入艺术

学习目标 了解什么是ELMo.掌握ELMo的架构.掌握ELMo的预训练任务.了解ELMo的效果和成绩.了解ELMo的优缺点. 目录 🍔 ELMo简介 🍔 ELMo的架构 2.1 总体架构 2.2 Embedding模块 2.3 两部分的双层LSTM模块 2.4 词向量表征模块 🍔 ELMo的预…

Linux Android 正点原子RK3568替换开机Logo完整教程

0.这CSDN是有BUG吗?大家注意:表示路径的2个点号全都变成3个点号啦! 接下来的后文中,应该是2个点都被CSDN变成了3个点: 1.将这两个 bmp 图片文件720x1280_8bit拷贝到内核源码目录下,替换内核源码中默认的 logo 图片。注意:此时还缺少电量显示图片 2.编译内核 make d…

【EasyExcel】复杂导出操作-自定义颜色样式等(版本3.1.x)

文章目录 前言一、自定义拦截器二、自定义操作1.自定义颜色2.合并单元格 三、复杂操作示例1.实体(使用了注解式样式):2.自定义拦截器3.代码4.最终效果 前言 本文简单介绍阿里的EasyExcel的复杂导出操作,包括自定义样式,根据数据合并单元格等。…

Linux(CentOS)安装达梦数据库 dm8

CentOS版本:CentOS 7 达梦数据库版本:dm8 一、获取 dm8 安装文件 1、下载安装文件 打开达梦官网:https://www.dameng.com/ 下载的文件 解压后的文件 2、上传安装文件到 CentOS 使用FinalShell远程登录工具,并且使用 root 用户…

fastapi 调用ollama之下的sqlcoder模式进行对话操作数据库

from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel import ollama import mysql.connector from mysql.connector.cursor import MySQLCursor import jsonapp FastAPI()# 数据库连接配置 DB_CONFIG {"database": "web&quo…

如何监控Kafka消费者的性能指标?

要监控 Kafka 消费者性能指标,可以遵循以下最佳实践和策略: 关键性能指标监控: 消息吞吐量:监控消费者和生产者的吞吐量,以评估数据处理和消费的效率。延迟:监控端到端的延迟,例如通过比较消息产…

【LINUX相关】

一、Linux怎么进行查看日志? 首先得问问开发项目日志存放在哪里,可以使用多种命令来查看日志。常用的命令包括tail、cat、less和grep等。例如:1、使用tail命令可以实时查看日志文件的最新内容:tail -f log_file, 2、使用cat命令可…

IT运维的365天--019 用php做一个简单的文件上传工具

前情提要:朋友的工作室,有几个网站分布在不同的服务器上,要经常进行更新,之前是手动复制压缩包到各个服务器去更新(有写了自动更新的Shell脚本)。但还是觉得太麻烦,每次还要手动传输压缩包到各个…

计算机网络 (4)计算机网络体系结构

前言 计算机网络体系结构是指计算机网络层次结构模型,它是各层的协议以及层次之间的端口的集合。这一体系结构为计算机网络及其部件应完成的功能提供了精确定义,并规定了这些功能应由何种硬件或软件来实现。 一、主流模型 计算机网络体系结构存在多种模型…

C++- 基于多设计模式下的同步异步日志系统

第一个项目:13万字,带源代码和详细步骤 目录 第一个项目:13万字,带源代码和详细步骤 1. 项目介绍 2. 核心技术 3. 日志系统介绍 3.1 为什么需要⽇志系统 3.2 ⽇志系统技术实现 3.2.1 同步写⽇志 3.2.2 异步写⽇志 4.知识点和单词补充 4.1单词补充 4.2知识点补充…

小程序租赁系统打造便捷租赁体验助力共享经济发展

内容概要 小程序租赁系统是一个极具创新性的解决方案,它通过简化租赁过程,让物品的共享变得便捷流畅。对于那些有闲置物品的用户来说,他们可以轻松发布自己的物品,让其他需要的人快速找到并租借。而对于找东西的人来说&#xff0…