大数据技术之Flume 企业开发案例——聚合(7)

news2024/12/26 13:13:53

目录

聚合

1)案例需求:

2)需求分析 

3)实现步骤:

准备工作

创建 flume1-logger-flume.conf

创建 flume2-netcat-flume.conf

创建 flume3-flume-logger.conf

执行配置文件


聚合

1)案例需求:

  • hadoop12 上的 Flume-1 监控文件 /opt/module/group.log
  • hadoop13 上的 Flume-2 监控某个端口的数据流,
  • Flume-1 与 Flume-2 将数据发送给 hadoop14 上的 Flume-3Flume-3 将最终数据打印到控制台。

2)需求分析 

多数据源汇总案例

 

3)实现步骤:

  1. 准备工作
    • 分发 Flume

    • [lzl@hadoop12 module]$ xsync flume
      
      
      xsync 是集群同步文件脚本,也就是在一台服务器分发文件给其他台服务器,脚本内容如下:
      #!/bin/bash
      #1. 判断参数个数
      if [ $# -lt 1 ]
      then
        echo Not Enough Arguement!
        exit;
      fi
      #2. 遍历集群所有机器
      for host in hadoop12 hadoop13 hadoop14
      do
        echo ====================  $host  ====================
        #3. 遍历所有目录,挨个发送
        for file in $@
        do
          #4 判断文件是否存在
          if [ -e $file ]
          then
            #5. 获取父目录
            pdir=$(cd -P $(dirname $file); pwd)
            #6. 获取当前文件的名称
            fname=$(basename $file)
            ssh $host "mkdir -p $pdir"
            rsync -av $pdir/$fname $host:$pdir
          else
            echo $file does not exists!
          fi
        done
      done
    • hadoop12hadoop13 以及 hadoop14/opt/module/flume/job 目录下创建一个 group3 文件夹。

      [lzl@hadoop12 job]$ mkdir group3
      [lzl@hadoop13 job]$ mkdir group3
      [lzl@hadoop14 job]$ mkdir group3
  2. 创建 flume1-logger-flume.conf
    • 配置 Source 用于监控 /opt/module/group.log 文件,配置 Sink 输出数据到下一级 Flume。

      hadoop12 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume1-logger-flume.conf

      添加如下内容

      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /opt/module/group.log
      a1.sources.r1.shell = /bin/bash -c
      
      # Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop14
      a1.sinks.k1.port = 4141
      
      # Describe the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
  3. 创建 flume2-netcat-flume.conf
    • 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume。

       

      hadoop13 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume2-netcat-flume.conf

      添加如下内容

      # Name the components on this agent
      a2.sources = r1
      a2.sinks = k1
      a2.channels = c1
      
      # Describe/configure the source
      a2.sources.r1.type = netcat
      a2.sources.r1.bind = hadoop13
      a2.sources.r1.port = 44444
      
      # Describe the sink
      a2.sinks.k1.type = avro
      a2.sinks.k1.hostname = hadoop14
      a2.sinks.k1.port = 4141
      
      # Use a channel which buffers events in memory
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 1000
      a2.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      a2.sources.r1.channels = c1
      a2.sinks.k1.channel = c1
  4. 创建 flume3-flume-logger.conf
    • 配置 source 用于接收 flume1flume2 发送过来的数据流,最终合并后 sink 到控制台。

      hadoop14 上编辑配置文件

      [lzl@hadoop14 group3]$ touch flume3-flume-logger.conf
      [lzl@hadoop14 group3]$ vim flume3-flume-logger.conf

      添加如下内容

      # Name the components on this agent
      a3.sources = r1
      a3.sinks = k1
      a3.channels = c1
      
      # Describe/configure the source
      a3.sources.r1.type = avro 
      a3.sources.r1.bind = hadoop14
      a3.sources.r1.port = 4141
      
      # Describe the sink
      a3.sinks.k1.type = logger
      
      # Describe the channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 1000
      a3.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      a3.sources.r1.channels = c1
      a3.sinks.k1.channel = c1
  5. 执行配置文件
    • 分别开启对应配置文件:flume3-flume-logger.confflume2-netcat-flume.confflume1-logger-flume.conf

      [lzl@hadoop14 flume]$ bin/flume-ng agent --conf conf/ --name 
      a3 --conf-file job/group3/flume3-flume-logger.conf 
      -Dflume.root.logger=INFO,console
      [lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name 
      a2 --conf-file job/group3/flume1-logger-flume.conf
      [lzl@hadoop13 flume]$ bin/flume-ng agent --conf conf/ --name 
      a1 --conf-file job/group3/flume2-netcat-flume.conf
  6. hadoop13 上向 /opt/module 目录下的 group.log 追加内容

    [lzl@hadoop13 module]$ echo 'hello' >> group.log
  7. hadoop12 上向 44444 端口发送数据

    [lzl@hadoop12 flume]$ telnet hadoop13 44444
  8. 检查 hadoop14 上数据

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2079865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为eNSP:路由器子接口配置

一、拓扑图 二、 路由器配置 [R1]int g0/0/0.1#进入子接口 [R1-GigabitEthernet0/0/0.1]ip add 192.168.1.254 24#配置子接口地址 [R1-GigabitEthernet0/0/0.1]dot1q termination vid 10#标记终止的vid编号 [R1-GigabitEthernet0/0/0.1]arp broadcast enable #开启子接口的arp…

Keilv5 逻辑分析仪的使用

声明:基于视频【事件驱动型编程和 QP/C 框架】所做的笔记 Keilv5逻辑分析仪是Keil MDK集成开发环境(IDE)中的一个工具,用于帮助开发人员进行嵌入式系统的调试和分析。 它的作用主要有: 监测信号:Keilv5逻…

DBSCAN算法详解

1. 算法原理 DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,主要用于发现数据中的任意形状的簇,并能够有效地识别噪声点。它的基本思想是通过密度来定义簇,即在数据…

Python -- GUI图形界面编程—GUI编程实例 博主也在持续学习中[ 持续更新中!!! 欢迎白嫖 ]

本文继上篇文章http://t.csdnimg.cn/mJlmW继续介绍GUI的图形界面编程(相关视频是哔站上的应该搜这个题目就能找到),文章还是很基础的,目前博主处于有一点基础的状态。 文章的主要介绍了依旧非常重要的结构tinkter库、常见的三种布…

Patch-Package:一款灵活的开源依赖修复工具

一、背景 在现代软件开发中,开发者通常依赖大量的开源库来加快开发进程。然而,随着时间的推移,可能会遇到一些问题: 开源包的缺陷:开源库可能存在 Bug 或者与项目不兼容的问题。开发者可以自己修复这些问题&#xff0…

QML控件: 动画输入框 LineEdit PySide6

1. 前言 本代码为扫地僧-smile原创, 废话不多说, 直接看效果图由于录制的这个GIF图掉帧严重, 实际动画效果非常细腻 2.看代码 控件模块代码如下 SmileLineEdit.qml import QtQuick import QtQuick.Controls/* __author__: 扫地僧-smile */Rectangle {// 属性property int …

这些年使用Delphi的成果

成果1: 收到了一件文化衫 成果2:被评为亚洲专家,收到了一套Delphi7 光碟找不到了。

Arista与英伟达IB网络竞争格局分析

悄然崛起的英伟达新对手 英伟达都有哪些对手? 当然首选AMD和英特尔。AMD具备AI加速卡业务,融合CPU和GPU设计能力;英特尔作为x86架构的开创者,如今也涉足AI加速卡领域。它们的产品在参数上与英伟达对标,同时在定位和售…

江西生物科技职业学院春雨宣讲团丨弘扬西柏坡精神,共绘时代新篇章

今年五月,江西生物科技职业学院春雨宣讲团获批共青团中央2024年全国大学生西柏坡精神志愿宣讲团之一。 秉承传承红色文化、弘扬西柏坡精神的崇高使命,该宣讲团成员自7月3日起至8月20日,踏上了深入江西省南昌市、九江市、景德镇市、吉安市等地…

走线特征阻抗

ns/ft中ft代表英尺 :1ft0.3048m30.48cm 对于FR-4板,1.017*根号(4.5)2.121ns/ft 也即每英寸的传输时间为2.121ns,30.48/2.12114.6cm/ns; 当差分线中以相同的驱动电压驱动时,我们称之为偶模,当以…

Eclipse 自定义字体大小

常用编程软件自定义字体大全首页 文章目录 前言具体操作1. 打开设置对话框2. 打开字体设置页面3. 找到Text Font,点击修改4. 修改字体 前言 Eclipse 自定义字体大小,统一设置为 Courier New ,大小为 三号 具体操作 【Windows】>【Perfer…

IROS 2024不容错过的4大理由

01 IROS是国际机器人顶会 关于机器人学术顶会,听IROS 2025大会主席、上海交通大学王贺升教授怎么说。 ICRA 2024专辑:上交王贺升教授聊机器人学术顶会【度量访问】 02 未来一年难得的国际交流机会 ICRA 2025 将在美国亚特兰大举办; RSS 20…

zookeeper命令 及 ACL控制

1命令 登录 zkCli.sh -server 192.168.58.81:2128 登录ip zkCli.sh 登录本机 关闭会话 close 帮助文档 help 让zk数据发生变化都是一次事务 create创建 create /aaa 创建持久节点 create -e /aaa/bbb 创建临时节点 create /aaa/bbb/ccc 不能创建成功 …

docker 拉取镜像超时

docker 拉取镜像超时报错如下: error pulling image configuration: Get xxx:443: i/o timeout [rootlocalhost docker]# docker pull nginx Using default tag: latest latest: Pulling from library/nginx e4fff0779e6d: Pulling fs layer 2a0cb278fd9f: Pulli…

GB28181 SDP协议学习笔记

GB28181 SDP协议学习笔记 (GB/T28181-2016 附录F 100页) 定义 示例 v0 o34000000002000000001 0 0 IN IP4 192.168.100.100 sDownload u34020000001310000001:3 cIN IP4 192.168.100.100 t1498173736 1498174066 mvideo 10122 TCP/RTP/AVP 96 98 97 arecvonly artpmap:96 P…

MySQL索引详解:原理、数据结构与分析和优化

在数据库管理系统中,索引是提高查询性能、优化数据存储结构的重要工具。MySQL作为广泛使用的开源关系型数据库管理系统,其索引机制对于提升数据库操作效率具有至关重要的作用。本文将围绕“MySQL索引详解:原理、数据结构与分析和优化”这一主…

仓颉语言:静态类型与垃圾收集,让编程更安全高效

在编程的世界里,安全与效率是永恒的追求。仓颉语言以其静态类型系统和先进的垃圾收集机制,为开发者提供了一个既安全又高效的编程环境。 静态类型系统:编译期的严格把关 类型安全:所有变量和表达式的类型在编译期确定,…

IS-IS路由原理详解

目录 一. 协议介绍: 优点: 二. 1) 协议基本概念: 2) 协议网络类型与网络链路关系 3) IS-IS的报文类型 4) P2P链路邻居关系的建立 ​编辑三. 综上所述 一. 协议介绍: IS-IS最初是国际标准化组织ISO(the International Organization for Standardization&am…

2024年适合初创企业的10款项目管理软件

身为初创企业的掌舵人或联合创始人,您深深了解兼顾多项事务的艰辛。适合初创企业的项目管理软件能够对团队运营效率产生明显影响。 您需要精准把握产品研发、营销推广以及团队协作,同时要严格控制预算和时间。项目管理的失策极易导致延期、超支&#xff…

Spring不是引入了三级缓存,解决了循环依赖的问题吗?

上面是典型的循环依赖问题,在很多人认识中spring引入了三级缓存,不会发生循环依赖报错,但是结果是会启动报错: 其实,在 Spring 2.6 开始,默认已经不开启对循环依赖的支持了,如果想要开启对循环依…