Flume系列:Flume Sink使用

news2024/11/13 20:23:24

目录

Apache Hadoop生态-目录汇总-持续更新

1:HDFS Sink

HDFS小文件的处理

HDFS存入大量小文件的影响:

HDFS小文件处理:

2:logger Sink

3:写入Kafka - 可以使用kafka channel代替


Apache Hadoop生态-目录汇总-持续更新

系统环境:centos7

Java环境:Java8

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 组件目的地包括 hdfs、logger(常用语测试)、avro、thrift、ipc、file、HBase、solr、自定义。

 

1:HDFS Sink

# 1:定义组件
kafka_flume_hdfs.sources = r1
kafka_flume_hdfs.channels = c1
kafka_flume_hdfs.sinks = k1

# 2:定义source
这里主要介绍Channel顾这里省略,到source模块查看写法

# 3:定义channel
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
kafka_flume_hdfs.sinks.k1.type = hdfs
kafka_flume_hdfs.sinks.k1.hdfs.path = hdfs://hadoop322ha/project/v4/log/topic_log/%Y-%m-%d
    # 上传文件的前缀
kafka_flume_hdfs.sinks.k1.hdfs.filePrefix = logs-
    # 设置是否需要滚动生成文件,比如1小时一个, 如果设置为true需要设置对应的,roundValue,roundUnit
kafka_flume_hdfs.sinks.k1.hdfs.round = false
 ## 控制生成的小文件
    # 控制多久滚动一次文件,防止凑不够rollSize卡住, 正常设置3600,这里测试为了快速写出
kafka_flume_hdfs.sinks.k1.hdfs.rollInterval = 20
    # 控制文件多大后,滚动文件,128M滚动文件
kafka_flume_hdfs.sinks.k1.hdfs.rollSize = 134217728
    # 多少个events滚动文件,一般不指定写0
kafka_flume_hdfs.sinks.k1.hdfs.rollCount = 0

 ## 配置输出类型CompressedStream(压缩流),DataStream(原样输出),与压缩
    # 压缩流
kafka_flume_hdfs.sinks.k1.hdfs.fileType = CompressedStream
    # 压缩类型
kafka_flume_hdfs.sinks.k1.hdfs.codeC = lzop

# 5:定义关联关系
kafka_flume_hdfs.sources.r1.channels = c1
kafka_flume_hdfs.sinks.k1.channel = c1

sink到hdfs注意timestamp,默认是从Flume event headers里取的,如果header里没有不配useLocalTimeStamp 会直接报错

2022-12-06 11:47:33,481 ERROR hdfs.HDFSEventSink: process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

HDFS小文件的处理

HDFS存入大量小文件的影响:

        元数据层面:每个小文件都有一份元数据,小文件过多会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。

        计算层面:默认情况下MR会对每个小文件启用一个Map任务计算(默认1G内存),非常影响计算性能。同时也影响磁盘寻址时间。

HDFS小文件处理:

通过调整:hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount 这三个参数,控制小文件的生成

①文件在达到128M时会滚动生成新文件
②文件创建超3600秒时会滚动生成新文件
hdfs.rollInterval=3600
hdfs.rollSize=134217728  #128M滚动文件
hdfs.rollCount =0

## 对hdfs进行压缩
a1.sinks.k1.hdfs.fileType = CompressedStream  # 压缩流
a1.sinks.k1.hdfs.codeC = lzop

注意:hdfs要开启对应的压缩格式

2:logger Sink

# 1:定义组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2:定义source
这里主要介绍Channel顾这里省略,到source模块查看写法

# 3:定义channel
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
a1.sinks.k1.type = logger  # 表示a1的输出目的地是控制台logger类型

# 5:定义关联关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动方式:flume-ng agent --name a1 --conf-file flume-netcat-logger.conf -Dflume.root.logger=INFO,console

3:写入Kafka - 可以使用kafka channel代替

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/511314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣sql中等篇练习(十八)

力扣sql中等篇练习(十八) 1 银行账户概要 1.1 题目内容 1.1.1 基本题目信息1 1.1.2 基本题目信息2 1.1.3 示例输入输出 1.2 示例sql语句 # Write your MySQL query statement below SELECT u.user_id,u.user_name,u.creditIFNULL(t1.c1,0) credit,case when u.creditIFNULL…

分享一个无需账号完全免费的 ChatGPT-4 的平台

AIGC从入门到精通教程 1. 访问 SteamShip2. 进入创建页面,选择新建一个 GPT-4 实例。3. 完成创建后,您便可以尽情体验 GPT-4本教程收集于: AIGC从入门到精通教程 大家都知道,ChatGPT4.0是要订阅Plus(一个月20美元)后才能体验的。 今天就给大家弄一个白嫖ChatGPT4.0的教程…

DEJA_VU3D - Cesium功能集 之 107-卫星探测效果

前言 编写这个专栏主要目的是对工作之中基于Cesium实现过的功能进行整合,有自己琢磨实现的,也有参考其他大神后整理实现的,初步算了算现在有差不多实现小140个左右的功能,后续也会不断的追加,所以暂时打算一周2-3更的样子来更新本专栏(每篇博文都会奉上完整demo的源代码,…

c++STL之常用的算法

目录 常用的遍历算法 for_each() transform() for_each()和transform()算法比较 常用的查找算法 adjacent_find() binary_search count() count_if() find() 常用的排序算法 merge() sort() random_shuffle() reverse() 常用的拷…

基于`IRIS`,动态解析`HL7`消息

文章目录 基于IRIS,动态解析HL7消息什么是HL7HL7 版本HL7 消息结构段(Segment)字段(Field) HL7 数据类型在IRIS中查看HL7数据结构传统方式拼写HL7消息结构动态对象解析HL7消息结构。 基于IRIS,动态解析HL7消…

SpringCloud:微服务保护之隔离和降级

1.FeignClient整合Sentinel SpringCloud中,微服务调用都是通过Feign来实现的,因此做客户端保护必须整合Feign和Sentinel。 1.1.修改配置,开启sentinel功能 修改OrderService的application.yml文件,开启Feign的Sentinel功能&…

详细版易学版TypeScript - 类

一、类 - 类的属性和方法 class MyPreson {// 类的属性// 属性需要在类里先定义并确定类型,才可以在constructor里面用this访问name: stringage: numberconstructor(name: string, age: number) {this.name name;this.age age;}// 类的方法sendStr(str: string) {…

工业4.0时代来临,POWERLINK协议在千兆网卡下的性能

“工业 4.0”的高歌猛进, “智能制造”,“智慧工厂”的呼声越来越响亮。这些需求使得数据传输量越来越大,实时性越来越高,因此我们将 POWERLINK 从 100Mbps 升级到1000Mbps。测试下POWERLINK这种工业总线协议的性能,最…

jest基础指示

describ(类似java中的class,在这里定义的变量可以在所有it中使用) 针对某一方面的测试,一个描述性的东西,针对某一个方面的测试,或者说是一个作用域 一组测试用例的集合。 有两个参数,参数1 &…

Vue 3.0 学习笔记

Vue 3 学习笔记 文章目录 Vue 3 学习笔记[toc]一、初识vue3二、 常用Composition API(组合式API)**1. setup函数****2. ref函数****3. reactive函数****4. Vue3.0中的响应式原理****Vue2.x的响应式****Vue3.x的响应式** **5. reactivce对比ref****6. set…

为什么我在大厂待了三个月就选择离开?我聊聊应届生该选择大厂还是小公司

我在互联网大厂只待了3个月就离开了,主要原因不是大厂的福利或者薪资不够好,只是因为我发现在大厂里每天都有开不完的会,忙碌到没有自己的生活。当时我每天10点上班,晚上要工作到11甚至是12点,甚至半夜两三点都接到过工…

LangChain-Agents 入门指南

LangChain-Agents 入门指南 LangChain-Agents 入门指南注册 Serpapi运行高级 Agents API 测试运行 Google Search其它 Here’s the table of contents: LangChain-Agents 入门指南 LangChain是一个使用LLMs构建应用程序的工具箱,包含Models、Prompts、Indexes、Mem…

Vue3-黑马(二)

目录: (1)vue3-ref与reactive (2)vue3-基础-属性绑定与事件绑定 (3)vue3-基础-表单绑定 (1)vue3-ref与reactive ref函数可以把普通的数据变成响应式的数据&#xff0…

Firewall Testing Checklist 分析

不管是在服务器、云技术、嵌入式、车载等场景,防火墙的作用尤为重要,下面从信息收集、管理审核流程、操作系统安全、已实现规则和检查配置,这五个方面来进行分析firewall的具体作用和能力,然后提供一些实用的网络firewall工具给大…

【Spring框架全系列】SpringBoot配置文件相关操作

🌇哈喽,大家好,我是小浪。上篇博客我们已经学习了如何创建一个Spring项目,那么创建Spirng项目还可以直接通过在Spring官网的方式来创建,做法也非常的简单,感兴趣的小伙伴可以在C站搜个教程尝试一下&#xf…

22年广东河南省赛-隐藏信息探索

任务十: 1、访问服务器的FTP服务,下载图片QR,从图片中获取flag,并将flag提交; QR.png的内容如下,可以看到,找到二维码的前三块都被反转了,需要反转回来,把这个二维码做成一个正方形,使用截图工具分隔、配合画图工具拼接+旋转。平均分为4块,分出4个方块。即可。 2、…

django 基本使用

django 下载 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple django查看版本 django-admin --version 4.2.1创建项目 django-admin startproject project创建一个 app python manage.py startapp app注册 app project/settings # app名称.apps.app名称Config…

2023河南省赛vp题解

目录 A题&#xff1a; B题 C题 D题 E题 F题 G题 H题 I题 J题 K题 L题 A题&#xff1a; 1.思路&#xff1a;考虑暴力枚举和双hash&#xff0c;可以在O(n)做完。 2.代码实现&#xff1a; #include<bits/stdc.h> #define sz(x) (int) x.size() #define rep(i,z,…

头歌计算机算法设计与分析:随机化算法

第1关&#xff1a;硬币实验 任务描述 相关知识随机数 编程要求 测试说明任务描述 本关任务&#xff1a;计算机产生的伪随机数来模拟抛硬币试验。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a;1.如何获取数组的长度&#xff0c;2.如何遍历数组。 随机数 随机…

基于 SpringBoot+WebSocket 无DB实现在线聊天室

0 项目说明 0.1 样例展示 0.2 源码地址 GitHub&#xff1a;https://github.com/ShiJieCloud/web-chat Gitee&#xff1a;https://gitee.com/suitbaby/web-chat GitCode&#xff1a;I’m Jie / web-chat GitCode 1 WebSocket 简介 1.1 HTTP 常用的 HTTP 协议是一种无状态…