大数据技术——Flume实战案例

news2024/11/23 0:56:33

实战案例目录

  • 1. 复制和多路复用
    • 1.1 案例需求
    • 1.2 需求分析
    • 1.3 实现操作
  • 2. 负载均衡和故障转移
    • 2.1 案例需求
    • 2.2 需求分析
    • 2.3 实现操作
  • 3. 聚合操作
    • 3.1 案例需求
    • 3.2 需求分析
    • 3.3 实现操作

1. 复制和多路复用

1.1 案例需求

    使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

1.2 需求分析

    通过使用exec source实时监控Hive日志,将日志以avro为中转站发送给Flume-2,3分别存储到不同的地方,需要注意:保存到本地的目录必须存在,如下图所示:
在这里插入图片描述

1.3 实现操作

首先在虚拟机对应目录下创建文件:mkdir flume3
创建配置信息文件vim flume-file-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

vim flume-flume-hdfs.conf

 # Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

vim flume-flume-dir.conf

 # Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

结果查看:
在这里插入图片描述
总用量 8
-rw-rw-r–. 1 lcl lcl 5942 5 月 22 00:09 1526918887550-3

2. 负载均衡和故障转移

2.1 案例需求

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用
FailoverSinkProcessor,实现故障转移的功能。

2.2 需求分析

通过netcat source监听4444端口,由于这里只有一个channel所以在这里使用了Sink组的形式接收同一个source(这个是可以一个channel对应多个sink,但是不能一个sink对应多个channel),然后通过kill 命令把Flume2破坏,查看Flume3在这里插入图片描述

2.3 实现操作

编辑以下三个配置文件
vim flume-netcat-flume.conf

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

vim flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

vim flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

执行命令,启动配置文件:【最后启动属于服务器地一端】

bin/flume-ng agent -n a3 -c conf/  -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -n a3 -c conf/  -f job/group2/flume-flume-console1.conf -
Dflume.root.logger=INFO,console
bin/flume-ng agent -n a3 -c conf/  -f  job/group2/flume-netcat-flume.conf

使用netcat工具向本机的 44444 端口发送内容: nc localhost 44444

查看两个控制台打印日志情况,之后把Flume2 kill掉

查看Flume3打印日志情况

3. 聚合操作

3.1 案例需求

  • hadoop102 上的 Flume-1 监控文件/opt/module/group.log,
  • hadoop103 上的 Flume-2 监控某一个端口的数据流,
  • Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

3.2 需求分析

在这里插入图片描述
由于需要多个虚拟机工作完成任务,所以这里需要分发flume

3.3 实现操作

创建工作目录:mkdir /opt/module/flume/job/group3

hadoop102: vim flume1-logger-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

hadoop 103: vim flume2-netcat-flume.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

hadoop104: vim flume3-flume-logger.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

执行命令,启动配置文件

 bin/flume-ng agent -n a3 -c conf/ -f job/group3/flume3-flume-logger.conf -
Dflume.root.logger=INFO,console
 bin/flume-ng agent -n a3 -c conf/ -f job/group3/flume1-logger-flume.conf
 bin/flume-ng agent -n a3 -c conf/ -f job/group3/flume2-netcat-flume.conf

使用以下命令在hadoop102上向/opt/module 目录下的 group.log 追加内容

 echo 'hello' > group.log

在 hadoop103 上向 44444 端口发送数据

 telnet hadoop103 44444

之后在hadoop104上查看接受的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/115851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图文详解二维差分

目录 前言 一、 二维差分的定义 二、二维差分的使用 三、计算二维差分 四、ACWing 798. 差分矩阵 前言 一维二维前缀和详解 图文详解一维差分 一、 二维差分的定义 对于一个给定的二维数组 arr,它的二维差分数组 d 中 d[i][j] 可以用如下公式计算:…

命令执行-无字母数字webshell

命令执行-无字母数字webshell 我们看如下代码&#xff1a; <?php if(!preg_match(/[a-z0-9]/is,$_GET[shell])) {eval($_GET[shell]); }在命令执行中&#xff0c;我们经常会碰到过滤了字母和数字的情况&#xff0c;那如何才能绕过呢&#xff1f; 我的想法&#xff1a;通…

【数据结构】ArrayList的简单使用

文章目录ArrayList一些ArrayList常用的方法杨辉三角打扑克时的洗牌与摸牌ArrayList 上一次我们自己模拟实现了一下数据结构中的顺序表&#xff0c;当然在我们日常使用时不需要每次使用都自己模拟实现一遍&#xff0c;Java中提供了ArrayList类&#xff0c;我们直接导包就可以使…

如何使用 Delphi / Lazarus / C++ Builder 从 FastReport VCL 创建 Code 11 条码?

Fastreport是目前世界上主流的图表控件&#xff0c;具有超高性价比&#xff0c;以更具成本优势的价格&#xff0c;便能提供功能齐全的报表解决方案&#xff0c;连续三年蝉联全球文档创建组件和库的“ Top 50 Publishers”奖。 FastReport.VCL官方版下载https://www.evget.com/…

黑客动态播报 | 这种勒索方式,让付赎金毫无用处

入侵→加密→要赎金 黑客凭这套商业模式横行多年 受害者之所以前赴后继付赎金 是因为他们相信 给钱就能如愿拿到密钥 尽快恢复业务 可有的时候 自系统被加密的那一刻起 数据就拿不回来了 今年10月,网上出现了一种名为Cryptonite的开源勒索软件包。它使用Python编码,利…

SpringBoot 整合 Shiro 实现动态权限加载更新+ Session 共享 + 单点登录

一.说明 二.项目环境 二.编写项目基础类 三.编写Shiro核心类 四.实现权限控制 五.POSTMAN测试 六.项目源码 一.说明 Shiro是一个安全框架,项目中主要用它做认证,授权,加密,以及用户的会话管理,虽然Shiro没有SpringSecurity功能更丰富,但是它轻量,简单,在项目中通常业务…

报表设计-FineReport 配置MySQL5外接数据库

1. 概述 1.1 版本 报表服务器版本 功能变更 11.0 - 11.0.3 1&#xff09;首次配置外接数据库时&#xff0c;支持自行选择是否「迁移数据至要启用的数据库」 2&#xff09;迁移外接数据库的过程提示细化&#xff0c;方便用户了解迁移进度 1.2 功能简介 报表系统配置外接数…

推荐系统遇上深度学习(一四一)-[快手]移动端实时短视频推荐

今天给大家带来CIKM2022应用研究方向最佳论文-来自于快手团队的《Real-time Short Video Recommendation on Mobile Devices》&#xff0c;主要研究在移动端如何做到更好的短视频实时推荐&#xff0c;是一篇不错的落地经验分享的论文&#xff0c;一起来看一下。1、背景近几年来…

LeetCode 323周赛

2500. 删除每行中的最大值 给你一个 m x n 大小的矩阵 grid &#xff0c;由若干正整数组成。 执行下述操作&#xff0c;直到 grid 变为空矩阵&#xff1a; 从每一行删除值最大的元素。如果存在多个这样的值&#xff0c;删除其中任何一个。将删除元素中的最大值与答案相加。 …

【Leetcode】101. 对称二叉树、104. 二叉树的最大深度、226. 翻转二叉树

作者&#xff1a;一个喜欢猫咪的的程序员 专栏&#xff1a;《Leetcode》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 101. 对称二叉树 104. 二叉树的最大深度 226. 翻转二叉树 101. 对称二…

DJ11 8086系列处理器(第一节课)

目录 一、8086/8088微处理器 二、8086/8088CPU的特点 1. 指令流水线 2. 内存分段管理 3. 支持多处理器系统 三、8088 CPU外部引脚及功能 1. 最小模式下的引脚 2. 最大模式下的引脚 四、8088/8086 CPU 的工作时序 1. 基本概念 2. 总线周期 一、8086/8088微处理器 二、…

软考中级系统集成项目管理工程师怎么自学备考

1、考试内容是什么&#xff1f; 2、备考前要准备什么&#xff1f; 3、如何高效备考&#xff1f; 一、考试内容是什么&#xff1f; 本考试设置的科目包括&#xff1a; &#xff08;1&#xff09;系统集成项目管理基础知识&#xff0c;考试时间为150分钟&#xff0c;笔试&am…

IB体育评估哪些内容?

"IB体育"这个词的内涵太广了&#xff0c;覆盖的课程也很多&#xff01;这个IB体育是一般体育课还是某个具体的IB科目呢&#xff1f;是MYP阶段的体育还是DP阶段的呢&#xff1f;其实很多人都是很懵&#xff0c;通过收集资料&#xff0c;可以分享一下&#xff0c;仅供参…

2022年虚拟电厂行业研究报

第一章 行业概况 虚拟电厂&#xff08;VPP, Virtual Power Plant&#xff09;本质上是将分布式电源&#xff08;发电&#xff09;、可控负荷&#xff08;用电&#xff09;、储能等利用计算机通信网络技术将其聚合成一个虚拟的集中式电厂&#xff0c;来为电网提供需求侧响应的“…

4个封神的电脑工具,颠覆你对免费白嫖的认知,干货奉上

闲话少说&#xff0c;直上干货。 1、TinyWow TinyWow虽说是国外网站工具&#xff0c;但不得不承认真的无敌好用&#xff0c;收纳工具超200个&#xff0c;完全免费&#xff0c;无任何弹屏广告&#xff0c;更为良心的是&#xff0c;不需要注册登录&#xff0c;随用随走&#xff0…

专业的方案公司阐述智能硬件产品开发的全过程

现在市场上的方案公司太多&#xff0c;让人应接不暇&#xff0c;当我们要开发一款智能硬件产品的时候&#xff0c;我们要如何去选择方案公司呢&#xff1f;又怎样判断方案公司是否则专业呢&#xff1f;下面沐渥带大家一起来了解下智能硬件产品开发的全过程&#xff0c;大家就知…

Ubuntu 18.0.4 SonarQube-7.1.x 安装教程 以及错误总结

Ubuntu 18.0.4 SonarQube-7.1.x 安装教程 docker安装未成功 zip安装 1. 下载地址 sonarQube最新版下载地址&#xff1a;&#xff08;最新版不支持mysql&#xff09;https://www.sonarqube.org/downloads/7.1版本下载地址&#xff1a;​ ​https://binaries.sonarsource.com…

【UE4 第一人称射击游戏】10-添加冲刺功能

上一篇&#xff1a; 【UE4 第一人称射击游戏】09-添加蹲伏功能 本篇效果&#xff1a; 步骤&#xff1a; 1.在“Character”文件夹内添加一个混合空间 骨架选择“Swat_Skeleton” 命名为“Sprint_BS” 双击打开“Sprint_BS”&#xff0c;将水平和垂直坐标名称分别设为“Direct…

【java】HashMap底层原理实现原理及面试题

目录一.哈希表(散列)1.什么是哈希表2.什么是哈希冲突(面试题)3.解决哈希冲突的方法(面试题)(1) 开放地址法① 线性探查②二次探查③随机探查(2) 再哈希法(3) 链地址法(4)建立公共溢出区二.HashMap1.HashMap的hash()算法(面试)(1)为什么不是h key.hashCode()直接返回&#xff0…

绘制菜单符号的技法

在上一篇文章中&#xff0c;我们了解了如何绘制主题化的和原始未主题化的单选按钮&#xff0c;我曾提到&#xff0c;绘制菜单符号会更加复杂一些。复杂之处在于&#xff0c;这些符号是通过单色位图实现的&#xff0c;而不是漂亮的全彩色位图。 首先&#xff0c;我们将通过一种错…