Flume的安装及使用

news2025/1/12 13:26:49

Flume的安装及使用

文章目录

  • Flume的安装及使用
      • Flume的安装
        • 1、上传至虚拟机,并解压
        • 2、重命名目录,并配置环境变量
        • 3、查看flume版本
        • 4、测试flume
        • 5、flume的使用

Flume的安装

1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft

alias soft=‘cd /usr/local/soft/’

2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 
4、测试flume
  • 监控端口,将数据打印至控制台

    # a1表示 agent名称,具体定义在启动命令中 -n 参数中
    # r1表示a1中的source名称
    a1.sources = r1
    # k1表示a1的sink名称
    a1.sinks = k1
    # c1表示a1中的channel名称
    a1.channels = c1
    
    # 表示a1的输入源类型为netcat端口类型
    a1.sources.r1.type = netcat
    
    # 表示a1的监听主机,表示任意ip都可以
    a1.sources.r1.bind = 0.0.0.0
    # 表示a1的监听端口号
    a1.sources.r1.port = 6666
    
    # 表示a1的sinkc输出的目的地是控制台的logger类型
    a1.sinks.k1.type = logger
    
    # 表示channel类型是memory内存类型
    a1.channels.c1.type = memory
    
    # 表示a1的channel总容量为1000 event
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100
    
    # 表示将r1和c1连接起来
    a1.sources.r1.channels = c1
    
    # 表示将K1 和 c1 连接起来
    a1.sinks.k1.channel=c1
    

启动命令

flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/job/natcat2logger.conf -Dflume.root.logger=INFO,console

​ 参数说明:

​ -c 表示配置文件存储在conf目录中

​ -n 表示agent起名为a1

​ -f 表示flume启动读取的配置文件

​ -Dflume.root.logger=INFO,console 表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别 ,日志级别包括: log、info、warn、 error

6666端口来源:
在这里插入图片描述

在这里插入图片描述

  • 监控一个目录,将数据打印出来

    • 配置文件
    # 首先先给agent起一个名字 叫a1
    # 分别给source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    # 分别对source、channel、sink进行配置
    
    # 配置source
    # 将source的类型指定为 spooldir 用于监听一个目录下文件的变化
    # 因为每个组件可能会出现相同的属性名称,所以在对每个组件进行配置的时候 
    # 需要加上 agent的名字.sources.组件的名字.属性 = 属性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    a1.sources.r1.fileHeader = true
    
    # 给r1这个souces配置一个拦截器并取名为 i1
    a1.sources.r1.interceptors = i1 
    # 将拦截器i1的类型设置为timestamp 会将处理数据的时间以毫秒的格式插入event的header中
    # a1.sources.r1.interceptors.i1.type = timestamp
    # 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据
    a1.sources.r1.interceptors.i1.type = regex_filter
    # 配置正则表达式
    # 如果匹配到那么获取整行数据 
    a1.sources.r1.interceptors.i1.regex = [0-9]+
    # excludeEvents = true 表示将匹配到的过滤,未匹配到的放行
    # a1.sources.r1.interceptors.i1.excludeEvents = true
    
    # 配置sink
    # 使用logger作为sink组件,可以将收集到数据直接打印到控制台
    a1.sinks.k1.type = logger
    
    # 配置channel
    # 将channel的类型设置为memory,表示将event缓存在内存中
    a1.channels.c1.type = memory
    
    # 组装
    # 将sources的channels属性指定为c1
    a1.sources.r1.channels = c1
    
    # 将sinks的channel属性指定为c1
    a1.sinks.k1.channel = c1
    
    • 启动agent
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目录
    mkdir /root/data
    
    • 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
    # 随意在a.txt中加入一些内容
    vim /root/data/a.txt
    
5、flume的使用
  • spoolingToHDFS.conf

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    a.sources.r1.interceptors = i1 
    a.sources.r1.interceptors.i1.type = timestamp
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir1
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = student
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 在 /root/data/目录下准备数据
    The Zen of Python, by Tim Peters
    
    Beautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    
    • 启动agent
    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
    

    注意如果Hadoop是3.x版本,那么需要将Hdfs中的lib目录下的guava包复制到Flume

    执行如下:

    mv $FLUME_HOME/lib/guava-11.0.2.jar $FLUME_HOME/lib/guava-11.0.2.jar.bak

    cd /usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib

    cp guava-27.0-jre.jar $FLUME_HOME/lib/

  • httpToLogger

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定http的属性
    a.sources.r1.type = http
    a.sources.r1.port = 6666 
    
    #指定sink的类型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 启动

      • 先启动agent
      flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再使用curl发起一个http请求
      curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
      
  • exec

    • 配置文件

      # Name the components on this agent
      a2.sources = r2
      a2.sinks = k2
      a2.channels = c2
      
      # Describe/configure the source
      a2.sources.r2.type = exec
      a2.sources.r2.command = tail -F /usr/local/soft/hive-3.1.2/log/hive.log
      
      # Describe the sink
      a2.sinks.k2.type = hdfs
      a2.sinks.k2.hdfs.path = /flume/hivelog/%Y%m%d/%H
      #上传文件的前缀
      a2.sinks.k2.hdfs.filePrefix = logs-
      #是否按照时间滚动文件夹
      a2.sinks.k2.hdfs.round = true
      #多少时间单位创建一个新的文件夹
      a2.sinks.k2.hdfs.roundValue = 1
      #重新定义时间单位
      a2.sinks.k2.hdfs.roundUnit = hour
      #是否使用本地时间戳
      a2.sinks.k2.hdfs.useLocalTimeStamp = true
      #积攒多少个Event才flush到HDFS一次
      a2.sinks.k2.hdfs.batchSize = 100
      #设置文件类型,可支持压缩
      a2.sinks.k2.hdfs.fileType = DataStream
      #多久生成一个新的文件
      a2.sinks.k2.hdfs.rollInterval = 60
      #设置每个文件的滚动大小
      a2.sinks.k2.hdfs.rollSize = 134217700
      #文件的滚动与Event数量无关
      a2.sinks.k2.hdfs.rollCount = 0
      
      a2.channels.c2.type = memory
      a2.channels.c2.capacity = 1000
      a2.channels.c2.transactionCapacity = 100
      
      
      a2.sources.r2.channels = c2
      a2.sinks.k2.channel = c2 
      
      
    • 启动

      启动flume

      bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
      

      启动hive,之后查看HDFS

  • 单数据源多输出

    ​ Flume1监控文件内容变动,将监控到的内容分别给到flume2和flume3,flume2将内容写到HDFS, Flume3将数据写到本地文件系统

    • 配置文件

      flume1.conf 
      
      #Named
      a1.sources = r1
      a1.channels = c1 c2
      a1.sinks = k1 k2
      
      #Source
      a1.sources.r1.type = TAILDIR
      a1.sources.r1.filegroups = f1
      a1.sources.r1.filegroups.f1 = /usr/local/soft/flume-1.9.0/job/taildir/.*\.txt
      # 对于监听的文件监听位置信息需要保存到该json文件中
      a1.sources.r1.positionFile = /usr/local/soft/flume-1.9.0/job/position/position.json
      
      # channel selector
      a1.sources.r1.selector.type = replicating
      
      #Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 10000
      a1.channels.c2.transactionCapacity = 100
      
      #Sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = localhost
      a1.sinks.k1.port = 7777
      
      a1.sinks.k2.type = avro
      a1.sinks.k2.hostname = localhost
      a1.sinks.k2.port = 8888
      
      
      #Bind
      a1.sources.r1.channels = c1 c2  
      a1.sinks.k1.channel = c1 
      a1.sinks.k2.channel = c2 
      
      
      
      
      flume2.conf
      
      a2.sources = r1
      a2.channels = c1
      a2.sinks = k1 
      
      #Source
      a2.sources.r1.type = avro
      a2.sources.r1.bind = localhost
      a2.sources.r1.port = 7777
      
      #Channel
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 10000
      a2.channels.c1.transactionCapacity = 100
      
      #Sink
      a2.sinks.k1.type = hdfs
      a2.sinks.k1.hdfs.path = /flume/replicating/%Y%m%d/%H
      a2.sinks.k1.hdfs.filePrefix = logs-
      a2.sinks.k1.hdfs.round = true
      a2.sinks.k1.hdfs.roundValue = 1
      a2.sinks.k1.hdfs.roundUnit = hour
      a2.sinks.k1.hdfs.useLocalTimeStamp = true
      a2.sinks.k1.hdfs.batchSize = 100
      a2.sinks.k1.hdfs.fileType = DataStream
      a2.sinks.k1.hdfs.rollInterval = 60
      a2.sinks.k1.hdfs.rollSize = 134217700
      a2.sinks.k1.hdfs.rollCount = 0
      
      #Bind
      a2.sources.r1.channels = c1 
      a2.sinks.k1.channel = c1 
      
      
      flume3.conf 
      
      #Named
      a3.sources = r1
      a3.channels = c1
      a3.sinks = k1 
      
      #Source
      a3.sources.r1.type = avro
      a3.sources.r1.bind = localhost
      a3.sources.r1.port = 8888
      
      #Channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 10000
      a3.channels.c1.transactionCapacity = 100
      
      #Sink
      a3.sinks.k1.type = file_roll
      # 将数据保存到本地路径中 不断滚动创建,如果某个时间段没有数据,那么会创建一个空的文件 
      a3.sinks.k1.sink.directory = /usr/local/soft/flume-1.9.0/job/fileroll
      
      #Bind
      a3.sources.r1.channels = c1 
      

    a3.sinks.k1.channel = c1

    
    * 启动
    
    ```shell
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console
    
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console
    
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
    
    
    
    
    
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1618556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Datawhale ChatGPT基础科普

根据课程GitHub - datawhalechina/hugging-llm: HuggingLLM, Hugging Future. 摘写自己不懂得一些地方,具体可以再到以上项目地址 LM:这是ChatGPT的基石的基石。 Transformer:这是ChatGPT的基石,准确来说它的一部分是基石。 G…

UDS诊断故障码DTC

在汽车的各种零部件ECU设计中,开发人员会罗列出他能想到的这些零部件可能发生的所有故障。并且会为每一个故障分配一个代码、代号或数字。这个分配的数据就是DTC,DTC就是每个故障码的身份证号。 五位故障码属于OBD诊断协议。 五位故障码实际上只占用2个…

如何启用启用WordPress调试模式

最近我们的WordPress网站在访问时,经常出现打不开的现象,我们向主机提供商Hostease咨询后,他们提到这是由于WordPress的某个插件导致的问题,我们在将插件版本升级至最新后,这个问题就消失了。为了方便后续的检查&#…

【SpringBoot】-MyBatis详解+单表操作

作者:学Java的冬瓜 博客主页:☀冬瓜的主页🌙 专栏:【Framework】 主要内容:什么是MyBatis框架?MyBatis框架有什么用?MyBatis实现查询步骤详解。MyBatis实现单表的增删查改。MyBatis模糊查询&…

【智能算法】蜉蝣算法(MA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2020年,K Zervoudakis等人受到自然界蜉蝣交配繁殖行为启发,提出了蜉蝣算法(Mayfly Algorithm, MA)。 2.算法原理 2.1算法思想 MA灵感来自蜉蝣交配…

简化图卷积 笔记

1 Title Simplifying Graph Convolutional Networks(Felix Wu、Tianyi Zhang、Amauri Holanda de、 Souza Jr、Christopher Fifty、Tao Yu、Kilian Q. Weinberger)【ICML 2019】 2 Conclusion This paper proposes a simplified graph convolutional m…

《HCIP-openEuler实验指导手册》1.4 Apache MPM工作模式调整

MPM介绍 二、配置步骤 查看MPM当前工作模式 方法一: httpd -M | grep mpm方法二: 浏览器访问:http://IP:端口/server-status 方法三: cat /etc/httpd/conf.modules.d/00-mpm.conf查看 LoadModule mpm_event_module modules/mo…

嵌入式学习59-ARM8(中断,ADC,内核定时器和传感器)

什么是中断顶半部和底半部 ? (部分记忆)背 上半部: …

探索亚马逊云科技「生成式 AI 精英速成计划」

目录 前言「生成式 AI 精英速成计划」技术开发课程学习课程学习 总结 前言 亚马逊云科技(Amazon Web Services,简称AWS)作为全球领先的云计算服务提供商,一直以来在推动人工智能(AI)领域的发展中扮演着重要…

鸿蒙OpenHarmony【小型系统运行案例】 (基于Hi3516开发板)

运行 启动系统 在完成Hi3516DV300的烧录后,还需要设置BootLoader引导程序,才能运行OpenHarmony系统。 在Hi3516DV300任务中,单击Configure bootloader(Boot OS)进行配置即可。 说明: DevEco Device Tool…

python爬虫之爬取微博评论(4)

一、获取单页评论 随机选取一个微博,例如下面这个 【#出操死亡女生家属... - 冷暖视频的微博 - 微博 (weibo.com) 1、fnf12,然后点击网络,搜索评论内容,然后预览,就可以查看到网页内容里面还有评论内容 2、编写代码…

【无线通信】OQPSK

调制 sps 8; RolloffFactor 0.2; FilterSpanInSymbols 10;bits randi([0, 1], 224*8, 1); % 1792symbols bits*2 - 1; % 1792 re -symbols(2:2:end); % 896 im -symbols(1:2:end); % 896pFilterTx comm.RaisedCosineTransmitFilter(...Shape, Square root, ...Rollo…

【全网首发】Mogdb 5.0.6新特性:CM双网卡生产落地方案

在写这篇文章的时候,刚刚加班结束,顺手写了这篇文章。 前言 某大型全国性行业核心系统数据库需要A、B两个物理隔离的双网卡架构方案,已成为行业标准。而最新发布的MogDB 5.0.6的CM新增支持流复制双网段部署,用于网卡级高可用容灾(…

LlamaIndex 加 Ollama 实现 Agent

AI Agent 是 AIGC 落地实现的场景之一,与 RAG 不同,RAG 是对数据的扩充,是模型可以学习到新数据或者本地私有数据。AI Agent 是自己推理,自己做,例如你对 AI Agent 说我要知道今天上海的天气怎么样,由于 AI…

WAF防范原理

目录 一、什么是WAF 二、纵深安全防御 WAF的组网模式 WAF配置全景 WAF端 服务器 攻击端 拦截SQL注入,XSS攻击,木马文件上传 要求: 使用WAF,通过配置策略要求能防御常见的web漏洞攻击(要求至少能够防御SQL、XSS、文…

阿赵UE学习笔记——30、HUD简单介绍

阿赵UE学习笔记目录 大家好,我是阿赵。   继续学习虚幻引擎,这次来学习一下HUD的基础使用。 一、 什么是HUD HUD(Head-Up Display),也就是俗称的抬头显示。很多其他领域里面有用到这个术语,比如开车的朋友可能会接触过&#xf…

实现Spring底层机制(三)

文章目录 阶段4—实现BeanPostProcessor机制1.文件目录2.初始化方法实现1.编写初始化接口InitializingBean.java2.MonsterService.java实现初始化接口3.容器中的createBean方法增加初始化逻辑,判断对象类型是否是InitializingBean的子类型,如果是&#x…

BI项目规划第二讲:BI项目实施方案这样设计就够了

在上一篇文章中,我们对BI项目规划中的确定项目范围、组建项目团队两方面进行了详细的解读。 随着项目团队的建立和角色分配的明确,接下来的工作将转向具体的项目实施策略,这是确保项目顺利进行的另一个关键环节。 BI项目实施方案是在项目开…

<前端>Electron-builder为公证后的app打更新信息latest.yml

MacOS下,Electron-builder可以很方便的为测试包app打更新信息(latest-mac.yml)。 但是,正式发布的时候,不可能用测试包app,因为还没有进行公证。如何为公证的app打latest-mac.yml呢。 其实观察latest-mac.y…

真实世界的密码学(四)

原文:annas-archive.org/md5/655c944001312f47533514408a1a919a 译者:飞龙 协议:CC BY-NC-SA 4.0 第十六章:加密何时何地失败 本章涵盖 使用加密时可能遇到的一般问题 遵循烘烤良好的加密的要点 加密从业者的危险和责任 问候…