大数据课程D5——hadoop的Sink

news2025/1/24 17:53:40

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 掌握Sink的HDFS Sink;

⚪ 掌握Sink的Logger Sink;

⚪ 掌握Sink的File Roll Sink;

⚪ 掌握Sink的Null Sink;

⚪ 掌握Sink的AVRO Sink;

⚪ 掌握Sink的Custom Sink;

一、HDFS Sink

1. 概述

1. HDFS Sink将收集到的数据写到HDFS中。

2. 在往HDFS上写的时候,支持三种文件类型:文本类型,序列类型以及压缩类型。如果不指定,那么默认使用使得序列类型。

3. 在往HDFS上写数据的时候,数据的存储文件会定时的滚动,如果不指定,那么每隔30s会滚动一次,生成一个文件,那么此时会生成大量的小文件。

2. 配置属性

属性

解释

type

必须是hdfs

hdfs.path

数据在HDFS上的存储路径

hdfs.rollInterval

指定文件的滚动的间隔时间

hdfs.fileType

指定文件的存储类型:DataSteam(文本),SequenceFile(序列),CompressedStream(压缩)

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置HDFS Sink

# 类型必须是hdfs

a1.sinks.k1.type = hdfs

# 指定数据在HDFS上的存储路径

a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata

# 指定文件的存储类型

a1.sinks.k1.hdfs.fileType = DataStream

# 指定文件滚动的间隔时间

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f hdfssink.conf -

Dflume.root.logger=INFO,console

二、Logger Sink

1. 概述

1. Logger Sink是将Flume收集到的数据打印到控制台上。

2. 在打印的时候,为了防止过多的数据将屏幕占满,所以要求body部分的数据不能超过16个字节,超过的部分不打印。

3. Logger Sink在打印的时候,对中文支持不好。

2. 配置属性

属性

解释

type

必须是logger

maxBytesToLog

指定body部分打印的字节数

三、File Roll Sink

1. 概述

1. File Roll Sink将数据写到本地磁盘上。

2. 同HDFS Sink类似,File Roll Sink在往磁盘上写的时候,也有一个滚动的间隔时间,同样是30s,因此在磁盘上同样会形成大量的小文件。

2. 配置属性

属性

解释

type

必须是file_roll

sink.directory

指定数据的存储目录

sink.rollInterval

指定文件滚动的间隔时间

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置File Roll Sink

# 类型必须是file_roll

a1.sinks.k1.type = file_roll

# 指定数据在磁盘上的存储目录

a1.sinks.k1.sink.directory = /home/flumedata

# 指定文件的滚动间隔时间

a1.sinks.k1.sink.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f filerollsink.conf -

Dflume.root.logger=INFO,console

四、Null Sink

1. 概述

1. Null Sink会抛弃所有接收到的数据。

2. 配置属性

属性

解释

type

必须是null

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置Null Sink

# 类型必须是null

a1.sinks.k1.type = null

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1f

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f nullsink.conf -

Dflume.root.logger=INFO,console

五、AVRO Sink

1. 概述

1. AVRO Sink会将数据利用AVRO序列化之后写出到指定的节点的指定端口。

2. AVRO Sink结合AVRO Source实现多级、扇入、扇出流动效果。

2. 配置属性

属性

解释

type

必须是avro

hostname

数据要发往的主机的主机名或者IP

port

数据要发往的主机的接收端口

3. 多级流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop02

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

4. 启动Flume,启动的时候,谁接收数据,就先启动谁:

../bin/flume-ng agent -n a1 -c ../conf -f duoji.conf -

Dflume.root.logger=INFO,console

4. 扇入流动

1. 第一个和第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanru.conf -

Dflume.root.logger=INFO,console

5. 扇出流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1 c2

a1.sinks = k1 k2

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c2.type = memory

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop02

a1.sinks.k1.port = 8090

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop03

a1.sinks.k2.port = 8090

a1.sources.s1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

2. 第二个和第三个节点::

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -

Dflume.root.logger=INFO,console

六、Custom Sink

1. 概述

1. 定义一个类实现Sink接口,考虑到需要获取配置属性,所以同样需要实现Configurable接口。

2. 不同于自定义Source,自定Sink需要考虑事务问题。

2. 事务

1. Source收集到数据之后,会通过doPut操作将树放到队列PutList(本质上是一个阻塞式队列)中。

2. PutList会试图将数据推送到Channel中。如果PutList成功将数据放到了Channel中,那么执行doCommit操作;反之执行doRollback操作。

3. Channel有了数据之后,会将数据通过doTake操作推送到TakeList中。

4. TakeList会将数据推送给Sink,如果Sink写出成功,那么执行doCommit;反之执行doRollvack。

3. 自定义Sink步骤

1. 构建Maven工程,导入对应的POM依赖。

2. 定义一个类继承AbstractSink,实现Sink接口和Configurable接口,覆盖configure,start,process和stop方法。

3. 完成之后打成jar包放到Flume安装目录的lib目录下。

4. 编写格式文件:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置自定义Sink

# 类型必须是类的全路径名

a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink

# 指定文件的存储路径

a1.sinks.k1.path = /home/flumedata

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

5. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f authsink.conf -

Dflume.root.logger=INFO,console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/807579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【前端知识】React 基础巩固(三十六)——RTK中的异步操作

React 基础巩固(三十六)——RTK中的异步操作 一、RTK中使用异步操作 引入RTK中的createAsyncThunk,在extraReducers中监听执行状态 import { createSlice, createAsyncThunk } from "reduxjs/toolkit"; import axios from "axios";export cons…

<MySQL> Centos 7环境安装MySQL

Centos 7环境安装MySQL 1.卸载不要的环境 停止MySQL服务 systemctl stop mariadb.service systemctl stop mysqld禁止MySQL服务开机自启 systemctl disable mysqld卸载MySQL软件包 yum remove mysql-server mysql-client删除MySQL数据目录 rm -rf /var/lib/mysql清理MySQ…

福特汽车在全球电动汽车市场的主导地位正在不断扩大

来源:猛兽财经 作者:猛兽财经 2023年7月27日,美国最大的汽车巨头之一福特汽车(F)公布了其2023年第二季度财报。 2023年7月6日,福特汽车宣布,第二季度美国市场的汽车销量已经较2023年第一季度增长了11.7%,令…

机器人状态估计:robot_localization 功能包高级参数详解

机器人状态估计:robot_localization 功能包高级参数详解 前言功能包简介相关参数高级参数 前言 移动机器人的状态估计需要用到很多传感器,因为对单一的传感器来讲,都存在各自的优缺点,所以需要一种多传感器融合技术,将…

微信朋友圈跟圈怎么设置?

朋友圈跟发功能对需要进行朋友圈营销或微信营销的公司和个体创业者的帮助极大。通常情况下,这些创业者或企业会管理多个微信账号来协同运营和管理客户资源,也就是俗称的“大号”和“小号”。如果没有朋友圈跟发软件,客户需要依次使用大号来发…

141. 环形链表

简单 1.9K 相关企业 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链…

十九章:利用跨图像语义挖掘进行弱监督语义分割

0.摘要 本文研究了仅使用图像级别监督进行语义分割学习的问题。目前流行的解决方案利用分类器的对象定位图作为监督信号,并努力使定位图捕捉更完整的对象内容。与之前主要关注于图像内部信息的努力不同,我们着眼于跨图像语义关系在全面对象模式挖掘中的价…

冯诺依曼体系的认识、来源、原理、组成、功能和特点

目录 一.认识冯诺依曼 二.冯诺依曼体系结构的来源 三.冯诺依曼体系结构计算机 3.1工作原理 3.2组成部件 3.3功能和特点 🎁个人主页:tq02的博客_CSDN博客-C语言,Java,Java数据结构领域博主 🎥 本文由 tq02 原创,首发于 CSDN&…

股票回购不积极,遭分析师看空,汽车之家财务前景黯淡

来源:猛兽财经 作者:猛兽财经 第一季度财报后股价表现不佳 汽车之家(ATHM)于2023年5月11日公布了2023年第一季度业财报绩。 猛兽财经通过查询财报得知,汽车之家第一季度的实际营收为2.21亿美元,正常每股收…

java可变字符序列:StringBuffer、StringBuilder

文章目录 StringBuffer与StringBuilder的理解StringBuilder、StringBuffer的API StringBuffer与StringBuilder的理解 因为String对象是不可变对象,虽然可以共享常量对象,但是对于频繁字符串的修改和拼接操作,效率极低,空间消耗也…

【算法训练营】Fibonacci数列+合法括号序列判断+两种排序方法

7.29 Fibonacci数列题目解析代码 合法括号序列判断题目题解代码 两种排序方法题目:题解代码 Fibonacci数列 题目 题目链接: 点击跳转 解析 【题目解析】: 本题是对于Fibonacci数列的一个考察,Fibonacci数列的性质是第一项和第二项都为1&am…

Segmentation fault 利用 core.xxx文件帮助你debug

在没有get到本文介绍的技能之前的时候,以前遇到程序发生了 Segmentation fault 时,也是一筹莫展,看到伴随程序崩溃而生成的 core.xxxx 文件时(有时会生成,有时不会生成,留着下面介绍)&#xff0…

SpringBoot2.2.0.RELEASE整合Elasticsearch6.8.3

SpringBoot2.2.0.RELEASE整合Elasticsearch6.8.3 SpringBoot是2.2.0.RELEASE&#xff0c;elasticsearch是6.8.3 使用依赖spring-boot-starter-data-elasticsearch 使用ElasticSearchRepository操作 1、导入依赖 <?xml version"1.0" encoding"UTF-8&quo…

24考研数据结构-数组和特殊矩阵

目录 数据结构&#xff1a;数组与特殊矩阵数组数组的特点数组的用途 特殊矩阵对角矩阵上三角矩阵和下三角矩阵稀疏矩阵特殊矩阵的用途 结论 3.4 数组和特殊矩阵3.4.1数组的存储结构3.4.2普通矩阵的存储3.4.3特殊矩阵的存储1. 对称矩阵(方阵)2. 三角矩阵(方阵)3. 三对角矩阵(方阵…

Meta-Transformer 多模态学习的统一框架

Meta-Transformer是一个用于多模态学习的新框架&#xff0c;用来处理和关联来自多种模态的信息&#xff0c;如自然语言、图像、点云、音频、视频、时间序列和表格数据&#xff0c;虽然各种数据之间存在固有的差距&#xff0c;但是Meta-Transformer利用冻结编码器从共享标记空间…

【嵌入式学习笔记】嵌入式基础11——STM32常用轮子(SYSTEM)

1.deley文件夹介绍 1.1.delay文件夹介绍 函数名函数功能OSdelay_osschedlockus级延时时,关闭任务调度(防止打断us级延迟)OSdelay_osschedunlockus级延时时,恢复任务调度OSdelay_ostimedlyus级延时时,恢复任务调度OSSysTick_Handlersystick中断服务函数OSdelay_init初始化延迟…

MySQL服务无法启动,服务没有报告任何错误

MySQL服务无法启动&#xff0c;服务没有报告任何错误 昨天mysql服务还好好的&#xff0c;今天怎么都打不开。my.ini配置和端口都没有问题&#xff0c;只能备份一下data的数据&#xff0c;删除data文件夹&#xff0c;初始化mysqld。 一定要备份data数据&#xff01;&#xff01;…

【算法和数据结构】257、LeetCode二叉树的所有路径

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;首先看这道题的输出结果&#xff0c;是前序遍历。然后需要找到从根节点到叶子节点的所有路径&#xff…

苍穹外卖-day06

苍穹外卖-day06 本项目学自黑马程序员的《苍穹外卖》项目&#xff0c;是瑞吉外卖的Plus版本 功能更多&#xff0c;更加丰富。 结合资料&#xff0c;和自己对学习过程中的一些看法和问题解决情况上传课件笔记 视频&#xff1a;https://www.bilibili.com/video/BV1TP411v7v6/?sp…

什么是 HTTP 长轮询?

什么是 HTTP 长轮询&#xff1f; Web 应用程序最初是围绕客户端/服务器模型开发的&#xff0c;其中 Web 客户端始终是事务的发起者&#xff0c;向服务器请求数据。因此&#xff0c;没有任何机制可以让服务器在没有客户端先发出请求的情况下独立地向客户端发送或推送数据。 为…