Flume详解(2)

news2025/1/23 10:43:42

Flume

Sink

HDFS Sink

  1. 将数据写到HDFS上。数据以文件形式落地到HDFS上,默认是以FlumeData开头,可以通过hdfs.filePrefix来修改

  2. HDFS Sink默认每隔30s会滚动一次生成一个文件,因此会导致在HDFS上生成大量的小文件,实际过程中,需要通过hdfs.rollInterval来修改,一般设置为3600s或者86400s。如果设置为0,那么表示不滚动,只生成1个文件

  3. HDFS Sink默认每1024B会滚动一次生成一个文件,同样会导致产生更多的小文件,实际过程中,需要通过hdfs.rollSize来修改,一般设置为134217728B。如果设置为0,那么表示不滚动,只生成1个文件

  4. HDFS Sink默认每10条数据会滚动一次生成一个文件,同样会导致产生更多的小文件。实际过程中,需要通过hdfs.rollCount来修改。如果设置为0,那么表示不滚动,只生成1个文件

  5. HDFS Sink支持三种文件类型:SequenceFile(序列文件), DataStream(文本文件) or CompressedStream(压缩文件),默认使用的是SequenceFile。如果将文件类型设置为CompressedStream,那么还需要指定属性hdfs.codeC,支持gzip, bzip2, lzo, lzop, snappy

  6. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      ​
      a1.channels.c1.type = memory
      ​
      # 配置HDFS Sink
      # 类型必须是
      a1.sinks.k1.type = hdfs
      # 数据在HDFS上的存储路径
      a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume_data
      # 文件滚动间隔时间
      a1.sinks.k1.hdfs.rollInterval = 3600
      # 文件滚动大小
      a1.sinks.k1.hdfs.rollSize = 134217728
      # 文件滚动条数
      a1.sinks.k1.hdfs.rollCount = 1000000000
      # 文件类型
      a1.sinks.k1.hdfs.fileType = DataStream
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      2. 启动
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f hdfssink.properties -Dflume.root.logger=INFO,console
      3. 在新的窗口中通过`nc`来发送数据
      nc hadoop01 8090

Logger Sink

  1. 将数据以日志写入到指定目的地,支持consolefile。实际开发过程中,使用的比较少,一般是教学阶段使用较多

  2. Logger Sink默认要求Event的body部分不能超过16个字节,可以通过maxBytesToLog来调节

  3. Logger Sink对中文支持不好

File Roll Sink

  1. 将数据以文本文件形式存储到本地的磁盘上。可以通过属性sink.serializer来修改,支持TEXTavro_event

  2. 类似于HDFS Sink,File Roll Sink默认也是每隔30s滚动一次生成一个文件,可以通过属性sink.rollInterval来修改

  3. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8090
    ​
    a1.channels.c1.type = memory
    ​
    # 配置File Roll Sink
    # 类型必须是file_roll
    a1.sinks.k1.type = file_roll
    # 数据在本地的存储路径
    a1.sinks.k1.sink.directory = /opt/flume_data
    # 文件滚动间隔时间
    a1.sinks.k1.sink.rollInterval = 3600
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

AVRO Sink

  1. 将数据经过AVRO序列化之后来写出,结合AVRO Source来实现流动模型

  2. 多级流动

    1. 第一个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = exec
      a1.sources.s1.command = tail -F /opt/software/flume-1.11.0/data/a.txt
      a1.sources.s1.shell = /bin/sh -c
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop02
      a1.sinks.k1.port = 70000
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    2. 第二个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 7000
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 7000
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    3. 第三个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 7000
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

  3. 扇入流动

    1. 第一个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = exec
      a1.sources.s1.command = tail -F /opt/software/flume-1.11.0/data/a.txt
      a1.sources.s1.shell = /bin/sh -c
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 6666
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    2. 第二个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8000
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 6666
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    3. 第三个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 6666
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

  4. 扇出流动

    1. 注意:在Flume中,可以从同一个数据源采集数据,放到不同的仓库(Channel)存储,但是每一个Sink只能对应1个Channel

    2. 第一个节点

      a1.sources = s1
      a1.channels = c1 c2
      a1.sinks = k1 k2
      ​
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8000
      ​
      a1.channels.c1.type = memory
      a1.channels.c2.type = memory
      ​
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop02
      a1.sinks.k1.port = 7000
      ​
      a1.sinks.k2.type = avro
      a1.sinks.k2.hostname = hadoop03
      a1.sinks.k2.port = 7000
      ​
      a1.sources.s1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2

    3. 第二个和第三个节点

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 7000
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

自定义Sink

  1. 自定义Sink的时候,需要定义一个类继承AbstractSink,实现Sink接口,最好还要实现Configurable接口来获取配置。注意,自定义Sink的过程中,需要关注事务问题

  2. 打成jar包放到lib目录下

    cd /opt/software/flume-1.11.0/lib/
    rz

  3. 编辑格式文件

    cd ../data/
    vim authsink.properties

    在文件中添加

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    a1.sources.s1.type = http
    a1.sources.s1.port = 8888
    ​
    a1.channels.c1.type = memory
    ​
    # 配置自定义Sink
    # 类型必须是类的全路径名
    a1.sinks.k1.type = com.fesco.sink.AuthSink
    # 存储路径
    a1.sinks.k1.path = /opt/flume_data
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

  4. 启动Flume

  5. 发送HTTP请求

    curl -X POST -d '[{"headers":{"class":"big data","sinktype":"auth"},"body":"testing~~~"}]' http://hadoop01:8888

事务

  1. Source会先执行doPut操作,将数据放入PutList中,PutList本质上是一个Deque

  2. PutList会试图将数据传输给Channel,如果成功,执行doCommit操作,如果失败,那么执行doRollback

  3. Channel收到数据之后,会试图将数据推送到TakeList,然后由TakeList将数据试图推送给Sink。TakeList本质上也是一个Deque

  4. 如果TakeList成功将数据推送给Sink,那么执行doCommit操作;反之,如果失败,那么执行doRollback操作

Channel

Memory Channel

  1. Memory Channel将数据临时存储到内存队列中,队列默认容量是100,即队列默认最多能存储100条数据,如果队列被放满,那么后续的操作会被阻塞。可以通过属性capacity来调节,实际过程中一般会设置为100000~300000

  2. transactionCapacity:事务容量。每次PutList向Channel推送的数据条数或者Channel向TakeList添加的数据条数,默认是100。实际过程中,这个值一般会调节为1000~3000

  3. 需要注意的是,Memory Channel是将数据临时存储到内存中,所以读写速度相对较快,但是不可靠,因此适应于要求速度但是不要求可靠性的场景

File Channel

  1. File Channel将数据临时存储到磁盘上,所以读写速度相对慢一些,但是可靠,因此适应于要求可靠性但不要求速度的场景

  2. File Channel默认会将数据临时存储到~/.flume/file-channel/data目录下,可以通过属性dataDirs来修改,如果指定了多个数据目录,那么目录之间用逗号隔开

  3. File Channel支持断点续传,默认情况下,会将偏移量记录到~/.flume/file-channel/checkpoint目录下,可以通过属性checkpointDir来修改

  4. 默认File Channel能够存储1000000条数据,可以通过属性capacity来条件

  5. File Channel最多能占用2146435071B的磁盘,可以通过maxFileSize修改

  6. File Channel的transactionCapacity的默认值是10000

  7. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8090
    ​
    # 配置File Channel
    # 类型必须是file
    a1.channels.c1.type = file
    # 偏移量的存储位置
    a1.channels.c1.checkpointDir = /opt/flume_data/checkpoint
    # 数据临时存储位置
    a1.channels.c1.dataDirs = /opt/flume_data/data
    ​
    a1.sinks.k1.type = logger
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

其他Channel

  1. JDBC Channel:将数据临时存储到数据库,但是JDBC Channel目前只支持Derby数据库。基于Derby(微型、文件型、单连接)的特性,所以实际开发过程中,不适用这个Channel

  2. Spillable Memory Channel:内存溢出Channel。内存中维系一个队列,如果队列被放满,不会阻塞,而是会将数据临时存储到磁盘上,这个Channel目前还在实验阶段,不推荐在生产场景中使用

Selector

概述

  1. Selector并不是一个单独的组件,而是附属于Source的子组件

  2. Selector支持三种模式:

    1. replicating:复制/复用模式。节点收集到数据之后,会将数据复制,然后分发给每一个节点,此时每一个节点收到的数据都是相同的

    2. load balancing:负载均衡模式。节点收集到数据之后,会平均分发到其他的节点上。此时被扇出的节点接收到的数据条数大致相等,但是数据不相同。这种模式是Flume1.10提供的,然后不稳定

    3. multiplexing:路由/分发模式。节点收集到数据之后,会根据headers中的指定键和值,将数据分发给对应的节点来处理,此时每一个节点收到的数据都是不同的

  3. 扇出结构中,如果不指定,默认使用的是replicating模式

multiplexing

  1. 实际过程中,如果需要对数据进行分类处理,那么可以考虑使用路由/分发模式

  2. 案例

    a1.sources = s1
    a1.channels = c1 c2
    a1.sinks = k1 k2
    ​
    a1.sources.s1.type = http
    a1.sources.s1.port = 8000
    # 指定Selector的类型
    a1.sources.s1.selector.type = multiplexing
    # 指定监听的字段
    a1.sources.s1.selector.header = kind
    # 根据kind字段的值分发给对应的Channel
    a1.sources.s1.selector.mapping.video = c1
    a1.sources.s1.selector.mapping.music = c2
    a1.sources.s1.selector.default = c2
    ​
    ​
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    ​
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop02
    a1.sinks.k1.port = 7000
    ​
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop03
    a1.sinks.k2.port = 7000
    ​
    a1.sources.s1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

    启动Flume之后,发送请求

    curl -X POST -d '[{"headers":{"kind":"video"},"body":"video log"}]' http://hadoop01:8000
    curl -X POST -d '[{"headers":{"kind":"music"},"body":"music log"}]' http://hadoop01:8000
    curl -X POST -d '[{"headers":{"kind":"txt"},"body":"txt log"}]' http://hadoop01:8000

Sink Processor

概述

  1. Sink Processor本质上就是Sink Group,是将一个或者多个Sink绑定到一个组中来使用

  2. 目前,官网支持三种模式

    1. default:默认模式。一个Sink就对应一个Sinkgroup,有几个Sink就对应了几个Sinkgroup

    2. Load Balancing:负载均衡。将多个Sink绑定到一个组中,然后将这个组接收到数据平均的发送给每一个Sink。支持round_robin(轮询)和random(随机)。同样,Flume提供的负载均衡模式并不好(能)用

    3. Failover:崩溃恢复。将多个Sink绑定到一个组中,如果现在工作的Sink宕机,同组中的其他Sink可以实现相同的功能,从而避免了单点故障

Failover

  1. 将多个Sink绑定到一个组中,同组的Sink需要配置优先级,数据会优先发送给优先级较高的Sink,如果高优先级的Sink宕机,那么才会发送给低优先级的Sink

  2. 案例

    a1.sources = s1
    a1.channels = c1 c2
    a1.sinks = k1 k2
    ​
    # 给Sinkgroup起名
    a1.sinkgroups = g1
    # 给Sinkgroup绑定Sink
    a1.sinkgroups.g1.sinks = k1 k2
    # 指定Sinkgroup的类型
    a1.sinkgroups.g1.processor.type = failover
    # 给Sink指定优先级
    a1.sinkgroups.g1.processor.priority.k1 = 7
    a1.sinkgroups.g1.processor.priority.k2 = 5
    # 发送超时时间
    # 默认是30000ms->30s
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    ​
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8000
    ​
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    ​
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop02
    a1.sinks.k1.port = 7000
    ​
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop03
    a1.sinks.k2.port = 7000
    ​
    a1.sources.s1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1538636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【力扣刷题日记】603.连续空余座位

前言 练习sql语句,所有题目来自于力扣(https://leetcode.cn/problemset/database/)的免费数据库练习题。 今日题目: 603.连续空余座位 表:Cinema 列名类型seat_idintfreebool Seat_id 是该表的自动递增主键列。 在…

数学建模(层次分析法 python代码 案例)

目录 介绍: 模板: 例题:从景色、花费、饮食,男女比例四个方面去选取目的地 准则重要性矩阵: 每个准则的方案矩阵:​ 一致性检验: 特征值法求权值: 完整代码: 运行结…

2024最新版pycharm激活码【资源共享】一键复制

好用的话 家人们记得点个赞喔 VPXI6TDKOQ-eyJsaWNlbnNlSWQiOiJWUFhJNlRES09RIiwibGljZW5zZWVOYW1lIjoi5rC45LmF5rA5rS7IHd3d8K3YWppaHVvwrdjb20iLCJsaWNlbnNlZVR5cGUiOiJQRVJTT05BTCIsImFzc2lnbmVlTmFtZSI6IiIsImFzc2lnbmVlRW1haWwiOiIiLCJsaWNlbnNlUmVzdHJpY3Rpb24iOiIiLCJjaG…

STM32不使用中断实现定时器微秒级精确延时

我们在写代码的时候避免不了要使用延时函数,很多延时函数都是使用中断或者tick来实现的,tick的方式最大到毫秒ms级别,通过中断方式的通用定时器来实现,如果实现1us的延时那么每1us就来一次中断,很影响cpu的效率。 本文…

haproxy 高可用

一 haproxy HAProxy简介 HAProxy提供高可用、负载均衡以及基于TCP和HTTP的应用代理,适合处理高负载站点的七层数据请求。类似的代理服务可以屏蔽内部真实服务器,防止内部服务器遭受攻击。 HAProxy特点和优点: 1.支持原声SSL,同时支持客户端和…

【Android 源码】Android源码下载指南

文章目录 前言安装Repo初始化Repo选择分支没有梯子替换为清华源 有梯子 下载源码下载开始参考 前言 这是关于Android源码下载的过程记录。 环境:Windows上通过VMware安装的Ubuntu系统 安装Repo 创建Repo文件目录 mkdir ~/bin PATH~/bin:$PATH下载Repo工具&#…

火哥Windows内核第五期

主要讲解windows的保护模式,系统调试,异常发现及处理等等。事件等待、异常、软件调试、内存管理、消息机制、调试器开发项目、内核工具开发项目,发展方向:CC驱动开发,软件逆向,破解,游戏保护,反…

huggingface的transformers训练bert

目录 理论 实践 理论 https://arxiv.org/abs/1810.04805 BERT(Bidirectional Encoder Representations from Transformers)是一种自然语言处理(NLP)模型,由Google在2018年提出。它是基于Transformer模型的预训练方法…

力扣HOT100 - 128. 最长连续序列

解题思路: 注意: 1.Set不能直接排序,必须要转换成ArrayList或者LinkedList后用Collections.sort()方法进行排序。 (Queue也不能直接排序,排序方法同Set) 2.连续的序列不能只找第一个,因为不…

dbscan算法实现鸢尾花聚类(python实现)

DBscan算法原理 : dbscan算法-CSDN博客 法一(调库) : 直接调库 : import numpy as np import matplotlib.pyplot as plt from sklearn import datasets from sklearn.cluster import DBSCAN from sklearn.decomposition import PCA from sklearn.discriminant_analysis …

【数据结构刷题专题】——二分查找

二分查找 二分查找模板题&#xff1a;704. 二分查找 二分查找前提&#xff1a; 有序数组数组中无重复元素 左闭右闭&#xff1a; class Solution { public:int search(vector<int>& nums, int target) {int left 0;int right nums.size() - 1;while (left <…

重新配置node.js,npm,环境变量

起因是检查最近收到的一些朋友分享给我的各种资料&#xff0c;什么前端&#xff0c;后端&#xff0c;java,go,python等语言&#xff0c;想着将一个模拟QQ音乐的一个源代码进行跑通&#xff0c;看看有什么特别之处。如下图 出现了node环境路径问题&#xff0c;参考链接 https:/…

回收站的数据删了可以找回来吗?方法已备好

在数字化时代&#xff0c;数据的安全性与恢复问题逐渐受到大家的关注。回收站&#xff0c;作为电脑中存储已删除文件的地方&#xff0c;常常被视为数据恢复的“救命稻草”。然而&#xff0c;当回收站中的数据也被删除时&#xff0c;许多人可能会感到无助和困惑。本文旨在探讨回…

nuclei使用方法

nuclei使用方法 查看帮助 nuclei -h 列出所有模板 nuclei -tl 查找某种cms的相关漏洞模板&#xff0c;wordpress为例 nuclei -tl -tc "contains(name,wordpress)"便会列出内容里含有wordpress关键字的漏洞检测模板 使用与某cms相关的所有漏洞模板进行扫描&#…

每日一题 --- 209. 长度最小的子数组[力扣][Go]

长度最小子数组 题目&#xff1a; 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续 子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度**。**如果不存在符合条件的子数组&#xff0c…

web学习笔记(四十三)ajax

目录 1.相关基础概念 1.1客户端与服务器 1.2URL地址 1.3 客户端和服务器端通信的过程 1.4 一个URL地址放入浏览器&#xff0c;到页面渲染发生了什么事情 1.5 数据 1.6资源的请求方式 2.Ajax 2.1什么是Ajax 2.2 jQuery 中的Ajax 2.2.1 $.get()的语法 2.2.2$.post()…

Linux:http协议初步认识

文章目录 OSI七层模型http协议域名路径信息请求和响应 编写一个httpserver OSI七层模型 在结束了前面对于序列化反序列化等内容的学习后&#xff0c;重新回到对于OSI模型的部分 如上所示的是对于OSI接口的示意图&#xff0c;在这当中可以看到会话层的概念&#xff0c;会话层的…

简介:KMeans聚类算法

在机器学习中&#xff0c;无监督学习一直是我们追求的方向&#xff0c;而其中的聚类算法更是发现隐藏数据结构与知识的有效手段。聚类是一种包括数据点分组的机器学习技术。给定一组数据点&#xff0c;我们可以用聚类算法将每个数据点分到特定的组中。 理论上&#xff0c;属于同…

SQL Server 2008R2 日志文件大小设置及查询

SQL Server 2008R2 建立数据库存在日志无限增长问题&#xff0c;造成磁盘内存不足。本文解决这个问题&#xff0c;如下&#xff1a; 1.设置日志文件的最大大小 USE master; GO ALTER DATABASE [D_total] MODIFY FILE (NAME D_total_log, -- 日志文件的逻辑名称MAXSIZE 200…

LeetCode Python - 69. x 的平方根

目录 题目描述解法运行结果 题目描述 给你一个非负整数 x &#xff0c;计算并返回 x 的 算术平方根 。 由于返回类型是整数&#xff0c;结果只保留 整数部分 &#xff0c;小数部分将被 舍去 。 注意&#xff1a;不允许使用任何内置指数函数和算符&#xff0c;例如 pow(x, 0.…