Flume基础知识(九):Flume 企业开发案例之复制和多路复用

news2025/1/12 16:07:11

1)案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储 到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

2)需求分析:

3)实现步骤:

(1)准备工作

在/opt/module/flume/job 目录下创建 group1 文件夹

[root@hadoop102 job]$ cd group1/

在/opt/module/datas/目录下创建 flume3 文件夹

[root@hadoop102 datas]$ mkdir flume3

(2)创建 flume-file-flume.conf

配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。 编辑配置文件

 [root@hadoop102 group1]$ vim flume-file-flume.conf 

添加如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop100
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop100
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(3)创建 flume-flume-hdfs.conf

配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。

编辑配置文件

[root@hadoop102 group1]$ vim flume-flume-hdfs.conf

添加如下内容

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop100
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop100:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建 flume-flume-dir.conf

配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

编辑配置文件

[root@hadoop102 group1]$ vim flume-flume-dir.conf 

添加如下内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop100
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目 录。

(5)执行配置文件

分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

(6)启动 Hadoop 和 Hive

[root@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh 
[root@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh 
[root@hadoop102 hive]$ bin/hive hive (default)>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1361585.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【十】【C语言\动态规划】376. 摆动序列、673. 最长递增子序列的个数、646. 最长数对链,三道题目深度解析

动态规划 动态规划就像是解决问题的一种策略,它可以帮助我们更高效地找到问题的解决方案。这个策略的核心思想就是将问题分解为一系列的小问题,并将每个小问题的解保存起来。这样,当我们需要解决原始问题的时候,我们就可以直接利…

用Redis实现全局唯一ID

全局唯一ID 如果使用数据库自增ID就存在一些问题: id的规律性太明显受表数据量的限制 全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性: 唯一性高可用递增性安全性高性能 为了增加ID的安全性…

Linux第15步_安装FTP客户端

安装完FTP服务器后,还需要安装FTP客户端,才可以实现Ubuntu系统和Windows系统进行文件互传。 1、在STM32MP157开发板A盘基础资料\03软件中,找到“FileZilla_3.51.0_win64-setup.exe”,双击它,就可以安装。 2、点击“I …

How can I be sure that I am pulling a trusted image from docker?

1、Error response from daemon: manifest for jenkins:latest not found: manifest unknown: manifest unknown 2、Error response from daemon: pull access denied for nacos, repository does not exist or may require ‘docker login’: denied: requested access to th…

云服务器ECS搭建个人项目

一、登录云服务器ECS 在ECS实例的操作列中点击远程连接云服务器ECS,点击实例最右侧的远程连接按钮,并立即登录后会跳转至Workbench的登录页面。但是第一次进去不知道密码?可以重置密码 登录后可以看到如下页面,说明已经成功登录到…

开源项目 | 完整部署流程、一款开源人人可用的开源数据可视化分析工具

📚 项目介绍 在互联网数据大爆炸的这几年,各类数据处理、数据可视化的需求使得 GitHub 上诞生了一大批高质量的 BI 工具。 借助这些 BI 工具,我们能够大幅提升数据分析效率、生成更高质量的项目报告,让用户通过直观的数据看到结…

Spring Boot依赖版本声明

链接 官网 Spring Boot文档官网:​​​​​​https://docs.spring.io/spring-boot/docs/https://docs.spring.io/spring-boot/docs/ Spring Boot 2.0.7.RELEASE Spring Boot 2.0.7.RELEASE reference相关:https://docs.spring.io/spring-boot/docs/2.…

大学生搜题软件,未来可期吗?

作为一家专注于软件开发的公司《智创有术》,我们致力于为客户提供创新、高效和可靠的解决方案。通过多年的经验和专业知识,我们已经在行业内建立了良好的声誉,并赢得了客户的信任和支持。 支持各种源码,网站搭建,APP&a…

为什么说UUID是唯一的?

在数字时代,我们需要一种能够唯一标识各种实体的方法。通用唯一标识符(UUID)正是为满足这一需求而诞生的。本文将从多个方面介绍UUID,探讨它为何成为通用唯一标识符,以及为什么说UUID是唯一的。 UUID/GUID生成器 | 一…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)在EventLoop中处理被激活的文件描述符的事件

文件描述符处理与回调函数 一、主要概念 反应堆模型:一种处理系统事件或网络事件的模型,当文件描述符被激活时,可以检测到文件描述符:在操作系统中,用于标识打开的文件、套接字等的一种数据类型 处理激活的文件描述符…

RK3568驱动指南|第九篇 设备模型-第111章 platform总线注册驱动流程实例分析实验

瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码,支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU,可用于轻量级人工…

Vue2 实现内容拖拽或添加 HTML 到 Tinymce 富文本编辑器的高级功能详解

在 Web 开发中,Tinymce 被广泛应用作为富文本编辑器。除了基础的文本编辑功能,Tinymce 还提供了一系列高级功能,使得文本编辑更加灵活和便捷。本文将介绍如何在 Tinymce 中实现一些高级功能,并深入了解每个工具的使用。 Tinymce …

走向云原生 破局数字化

近年来,随着云计算概念和技术的普及,云原生一词也越来越热门,云原生成为云计算领域的新变量。行业内,华为、阿里巴巴、字节跳动等各个大厂都在“抢滩”云原生市场。行业外,云原生也逐渐出圈,出现在大众视野…

分布式(6)

目录 26.雪花算法如何实现的? 27.雪花算法有什么问题?有哪些解决思路? 28.有哪些方案实现分布式锁? 29.基于数据库如何实现分布式锁?有什么缺陷? 30.基于Redis如何实现分布式锁?有什么缺陷&…

二刷Laravel 教程(用户注册)总结Ⅳ

一、显示用户信息 1)resource Route::resource(users, UsersController); 相当于下面这7个路由 我们先用 Artisan 命令查看目前应用的路由: php artisan route:list 2) compact 方法 //我们将用户对象 $user 通过 compact 方法转化为一个关联…

thingsboard规则节点功能记录(自用)

本文是对【ThingsBoard源码级分析规则节点使用第一季】 https://www.bilibili.com/video/BV1CT411e7vt/?p4&share_sourcecopy_web&vd_source9a5ca7ed3cff97385fdab4b6188e485c 学习的一些记录,加深自己的理解,在此声明。 asset profile switch…

Zookeeper之Java客户端实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有: ZooKeeper官方的Java客户端API。第三方的Java客户端API,比如Curator。 接下来我们将逐一学习一下这两个java客户端是如何操作zookeeper的。 1. ZooKe…

redis复习笔记03(小滴课堂)

Redis6常见数据结构概览 0代表存在,1代表不存在。 1表示删除成功,0表示失败。 查看类型,默认string类型。 也可以设置set类型。 list类型。 查看key的过期时间: Redis6数据结构之String类型介绍和应用场景 批量设置: …

studio3T mongodb 根据查询条件更新字段 或 删除数据

1. mongodb 等于、不等于$ne、不包含 $nin 以及批量更新数据的使用。 业务场景: 在集合中,根据查询条件,更新数据状态。 实现代码: 1. 部门名称为XXX、状态不等于“完好”的、并且不包含这些编码的数据先查询出来2. 再把状态更…

GUI设计基础

层次结构 要学GUI,大概先知道它的层次结构,如下图所示,我们要设计的就是下面这个几个东西。 菜单uimenu 建立一级菜单项的函数调用格式: hmuimenu(h_parent,PropertyNamel,valuel,propertyName2,value2,...); hm 是…