Flume第二章:企业案例

news2025/1/14 1:18:18

系列文章目录

Flume第一章:环境安装
Flume第二章:企业案例


文章目录

  • 系列文章目录
  • 前言
  • 一、复制和多路复用
    • 1.案例需求
    • 2.案例实现
    • 3.结果查看
  • 二、负载均衡和故障转移
    • 1.需求案例
    • 2.案例实现
    • 3.结果查看
  • 三、聚合
    • 1.案例需求
    • 2.案例实现
    • 3.查看结果
  • 总结


前言

这次我们记录一些相关的案例,还是以实战为主,话不多说直入主题。


一、复制和多路复用

1.案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

2.案例实现

创建一个在/out/module下创建一个data目录,作为本地系统日志的输出地址。
在这里插入图片描述
在其中创建一个flume3来放置第一个案例的输出结果。

在flume/job目录下创建一个group1来存放第一次实验的conf文件
在这里插入图片描述
编写配置文件

配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2。

vim flume-file-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。

vim flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

vim flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

由于实验要使用HDFS集群,我们先启动hadoop。
在这里插入图片描述
开3三个终端分别运行三个flume程序。

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

在开启一个终端,启动hive,生成被监控的日志。

3.结果查看

HDFS查看日志
在这里插入图片描述
本地文件
在这里插入图片描述

二、负载均衡和故障转移

1.需求案例

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

2.案例实现

创建group2创建conf文件存放目录。
在这里插入图片描述
配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2
vim flume-netcat-flume.conf

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

配置上级 Flume 输出的 Source,输出是到本地控制台
vim flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

vim flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

再次开启三个终端分别运行flume程序

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

3.结果查看

再开一个终端,向44444端口发送信息
nc localhost 44444
在这里插入图片描述
可以看到flume2终端收到的信息。
在这里插入图片描述
flume3没有信息
在这里插入图片描述
现在将flume2终止掉,然后继续向44444发送信息。
在这里插入图片描述
在这里插入图片描述
可看到现在flume3收到了信息,完成了故障转移。

三、聚合

1.案例需求

hadoop102 上的 Flume-1 监控文件/opt/module/group.log,
hadoop103 上的 Flume-2 监控某一个端口的数据流,
Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

2.案例实现

由于要在集群中使用flume,所以要先分发flume
xsync /opt/module/flume/
在102创建group3
配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。
vim flume1-logger-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在103上编辑配置文件
先创建group3文件夹
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:
vim flume2-netcat-flume.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

在104上编辑配置文件
还是先创建group3文件夹
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。
vim flume3-flume-logger.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

开3个终端分别启动flume

#hadoop104
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

#hadoop103
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

#hadoop102
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

启动成功可以在hadoop104日志中看到如下结果
在这里插入图片描述

3.查看结果

hadoop102创建测试文件
/opt/module/group.log

在这里插入图片描述
在hadoop103向44444端口发送信息。
在这里插入图片描述
可以再hadoop104的日志信息中查看
在这里插入图片描述
在hadoop102中向group.log中追加数据
在这里插入图片描述
在hadoop104中依然可以查看信息
在这里插入图片描述


总结

自此实验结束,难度还是有一些的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/169562.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用python写的代码输入助手小程序(附源码)

命令太多,很容易忘记,还有很多代码片段想保存下来用到的时候能够快速输入,提高开发效率。在网上找了很多,发现都不是自己想要的。于是就用python写了一个自己用的代码输入助手小程序,我自己已经用了很长时间了&#xf…

工业中常用流量计及其测量原理

一、流量计单位 工程上常用单位m3/h,它可分为瞬时流量(Flow Rate)和累计流量(Total Flow),瞬时流量即单位时间内过封闭管道或明渠有效截面的量,流过的物质可以是气体、液体、固体;累…

Introduction to Multi-Armed Bandits——02 Stochastic Bandits

Introduction to Multi-Armed Bandits——02 Stochastic Bandits 参考资料 Slivkins A. Introduction to multi-armed bandits[J]. Foundations and Trends in Machine Learning, 2019, 12(1-2): 1-286. 在线学习(MAB)与强化学习(RL)[2]:IID Bandit的一些算法 B…

化繁为简、性能提升 -- 在WPF程序中,使用Freetype库心得

本人使用WPF开发了一款OFD阅读器,显示字体是阅读器中最重要的功能。处理字体显示有多种方案,几易其稿,最终选用Freetype方案。本文对WPF中如何使用Freetype做简单描述。 OFD中有两种字体:嵌入字体和非嵌入字体。1) 非…

【vue2】vue生命周期的理解

🥳博 主:初映CY的前说(前端领域) 🌞个人信条:想要变成得到,中间还有做到! 🤘本文核心:vue生命周期的介绍、vue生命周期钩子函数详解,vue生命周期的执行顺序 目录 …

使用管控平台管理redis集群

1 添加redis集群 在数据库资源中添加redis集群,配置参数并将URL中cluster调整为true。 2 验证配置资源是否正常 3 操作redis数据库中的数据 可以通过使用图形化界面或者命令窗口进行Redis数据库的CRUD 3.1 图形化界面操作 操作Redis字符串列表 3.1.1 新增 右…

Apache Iceberg 背后的设计

原文地址: 阿帕奇冰山:幕后的建筑外观 |德雷米奥 (dremio.com)绝对的精品文章!!!机器翻译和自我调整组成了这篇文章,供大家学习。介绍数据湖的构建希望是实现数据民主化,以允许越来越多的人员、工具和应用程序使用越来越多的数据。实现这一目…

十五天学会Autodesk Inventor,看完这一系列就够了(八),图框自定义

所周知,Autocad是一款用于二维绘图、详细绘制、设计文档和基本三维设计,现已经成为国际上广为流行的绘图工具。Autodesk Inventor软件也是美国AutoDesk公司推出的三维可视化实体模拟软件。因为很多人都熟悉Autocad,所以再学习Inventor&#x…

【数据库数据恢复】华为云mysql数据库数据被delete的数据恢复案例

数据库数据恢复环境: 华为云ECS,linux操作系统; mysql数据库,实例内数据表默认存储引擎为innodb。 数据库故障: 在执行数据库版本更新测试时,用户误将本应在测试库测试的sql脚本执行在生产库中&#xff0c…

拉伯证券|芯片半导体迎来“行业底部出清”,大资金进场迹象明显

近期关于小芯片的利好不断,英特尔近期就发布了根据小芯片技能的处理器,而近期长电科技也在小芯片范畴获得突破。据长电科技在互动平台表明,公司现已完成4nm工艺制程手机芯片的封装,最大封装体面积约为1500平方毫米的体系级封装。长…

人工智能所需高等数学知识大全(收藏版)

来源:投稿 作者:愤怒的可乐 编辑:学姐 不懂数学是学不好人工智能的,本系列文章就汇总了人工智能所需的数学知识。本文是高等数学篇。 另有线代篇和概率论篇 函数与极限 函数 yf(x) ,x是函数f的自变量,y是因变量 函…

数据结构(4)线段树、延迟标记、扫描线

活动 - AcWing 参考-《算法竞赛进阶指南》 一、延迟标记(懒标记) 在线段树的区间查询命令中,每当遇到被查询区间[l,r]完全覆盖节点时,可以直接把节点上的答案作为备选答案返回。我们已经证明,这样操作的复杂度为O(4…

01-React(脚手架+MVC/MVVM+JSX)

使用 create-react-app 构建React工程化项目 安装 create-react-app $ npm i create-react-app -g 「mac需要加sudo」 基于脚手架创建项目「项目名称需要符合npm包规范」 $ create-react-app xxx 目录结构: |- node_modules 包含安装的模块 |- public 页面模板…

79.循环神经网络的从零开始实现

从头开始基于循环神经网络实现字符级语言模型。 这样的模型将在H.G.Wells的时光机器数据集上训练。 和之前一样, 我们先读取数据集。 %matplotlib inline import math import torch from torch import nn from torch.nn import functional as F from d2l import to…

Rockchip开发系列 - 9.watchdog看门狗

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 dts中的watchdog节点watchdog驱动文件TRM watchdog:WDT框图功能描述计数器中断系统复位复位脉冲长度操作流程图寄存器描述寄存器设置…

Nessus Host Discovery

系列文章 Nessus介绍与安装 Nessus Host Discovery 1.启动nessus cd nessus sh qd_nessus.sh2.进入nessus网站 https://192.168.3.47:8834/3.点击【New Scan】 4.选择【Host Discovery】 5.输入name【主机发现】,Description【主机发现】,Targets【…

Android 蓝牙开发——服务启动流程(二)

首先我们要知道,主要系统服务都是在 SystemServer 启动的,蓝牙也是如此: 1、SystemServer 源码路径:/frameworks/base/services/java/com/android/server/SystemServer.java private void startOtherServices(NonNull TimingsT…

labelme(2)json文件转类别灰度图

首先感谢大佬:https://blog.csdn.net/tzwsg/article/details/114653071一、上代码,json2gray.py:#!/usr/bin/python # -*- coding: UTF-8 -*- # !H:\Anaconda3\envs\new_labelme\python.exe import argparse import json import os import os…

go语言中变量和常量的注意点

1、类型转换:大类型可以转换成小类型但是精度丢失;小类型不能转换成大类型。例如: package mainimport "fmt"//golang中使用" var "关键字来定义变量 //定义变量的语法:1、var var_name1[,var_name2,...] va…

day16|654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树

654.最大二叉树 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点,其值为 nums 中的最大值。递归地在最大值 左边 的 子数组前缀上 构建左子树。递归地在最大值 右边 的 子数组后缀上 构建右子树。 返回 nums 构…