Flume实战--Flume中的选择器、自动容灾(故障转移)、负载均衡的详解与操作

news2025/1/12 5:56:13

        本文详细介绍了Apache Flume的关键特性,包括选择器、拦截器、故障转移和负载均衡。选择器负责将数据分发到多个Channel,拦截器用于修改或丢弃Event。故障转移机制能够在Sink故障时自动切换,而负载均衡则在多个Sink间分配负载。文章还提供了自定义拦截器的示例,展示了Flume在复杂数据处理中的灵活性和稳定性。

选择器

        当一个Source连接到多个Channel时,选择器决定了数据如何分发到这些Channel。Flume提供了复制选择器和多路复用选择器,允许我们根据需要选择数据分发策略。

一个Source对应多个channel的情况下,多个Channel中的数据是否相同,取决于我们使用了什么选择器,默认是复制选择器。也可以手动的使用多路选择器。

复制选择器

        复制选择器会将数据复制到所有配置的Channel,适用于需要在多个地方处理相同数据的场景。

 配置示例

编写flume脚本,需要一个source,两个channel,以及两个sink

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2
#  avro http  syslogtcp
# avro  avro-client
# http  curl
# syslogtcp  nc 
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 7777

#执行选择器类型为复制选择器
a1.sources.r1.selector.type=replicating


a1.channels.c1.type=memory
a1.channels.c2.type=memory


a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true



a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

启动这个flume脚本:

flume-ng agent -c ./ -f syslogtcp-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

向bigdata01 中的 7777 端口发送消息:

echo "hello world" | nc bigdata01 7777

如果nc命令无法识别,需要安装一下   yum install -y nc

 查看里面的数据发现都一样,说明使用的是复制选择器。

多路复用选择器

        多路复用选择器可以根据数据内容选择性地将数据发送到特定的Channel,适用于根据数据特性进行分流处理的场景。

就是每次发送消息的时候,可以指定发送消息走哪条channel,只有这条channel对应的sink才有数据,其他sink没数据。

 配置示例

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state        #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1       #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3    #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4          #默认使用c4这个channel

说明一个小区别:

avro
syslogtcp
http

可以指定一个hostname和端口号

不同的source,我们使用的发送数据的方式是不一样的:
avro-client
nc
curl

curl  是可以模拟发送get 或者 post 请求的。
比如: curl www.baidu.com

 编写脚本:mul.conf

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2

a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 8888

a1.sources.r1.selector.type=multiplexing
# header 跟  mapping 结合在一起,用于发送消息时,指定发送的方向
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
# 发送的消息找不到具体的channel,就走默认的c1
a1.sources.r1.selector.default = c1

a1.channels.c1.type=memory
a1.channels.c2.type=memory


a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=true


a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=true


a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

 启动该脚本:

flume-ng agent -c ./ -f mul.conf -n a1 -Dflume.root.logger=INFO,console

 模拟http请求:

curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:8888
curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://bigdata01:8888

效果就是,当我发送一条指令的时候,走state=USER的路径,只生成一个文件,走另一条路才会生成另一个不同的文件。

 

自动容灾(故障转移)

        Flume的故障转移机制允许在一个Sink组中的多个Sink之间自动切换,确保数据的持续处理,即使某个Sink出现故障。

多个sink组成一个组,这个组内的sink只有一台工作,假如这一台坏了,另一台自动的工作。

为了演示这个效果,我使用了三个Agent.模型如下:

在bigdata02和bigdata03上安装flume

在集群中可以使用脚本
xsync.sh /opt/installs/flume/
xsync.sh /etc/profile
xcall.sh source /etc/profile


也可以使用长拷贝命令,例如:
scp -r /opt/installs/flume1.9.0/ root@hadoop11:/opt/installs/
# 因为 /etc/hosts 文件中没有配置映射,所以使用ip代替了
scp -r /opt/installs/flume1.9.0/ root@192.168.52.12:/opt/installs/

scp -r /etc/profile root@hadoop11:/etc
scp -r /etc/profile root@192.168.52.12:/etc

两个虚拟机需要刷新配置文件
source /etc/profile

在bigdata01上,编写flume脚本:

failover.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1


# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086

# channel
a1.channels.c1.type = memory

# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088

#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
#  此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000

启动该脚本:

flume-ng agent -c ../conf -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata02上,编写 failover2.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087

# channel
a1.channels.c1.type = memory

# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover2.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata03上,编写 failover3.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088

# channel
a1.channels.c1.type = memory

# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover3.conf -n a1 -Dflume.root.logger=INFO,console

bigdata02和03启动无异常,再启动01上的脚本:

flume-ng agent -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata01上,发送消息:

echo "wei,wei,wei" | nc hadoop10 10086

发现bigdata02有反应,出现了消息,而bigdata03无反应。因为bigdata02权重大,需要工作

测试故障转移,将bigdata02停掉,再在bigdata01上发消息,就发现bigdata03收到消息了,故障转移了。

负载均衡

        负载均衡机制允许Flume在多个Sink之间平衡数据负载,提高数据处理的效率和可靠性。

演示一下:

bigdata01中创建balance.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1


# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086

# channel
a1.channels.c1.type = memory

# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088

#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random

bigdata02,bigdata03 不变,服务必须是启动的。

使用我们的nc命令发送请求,发现bigdata02和bigdata03随机的处理请求的数据。

  flume轮训是每隔一段时间轮训,而不是每秒轮训一次。所以可能多条在同一时间间隔的events都被一个输出到一个sink端。

练习

需求

编写一个Flume配置,实现以下需求:

 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。

数据

{"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
{"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
{"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-02 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-03 12:02:00", "level": "WARN", "message": "Warn occurred!"}

代码

package com.bigdata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class ETLInterceptor implements Interceptor {

    /**
     * {"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
     * {"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
     * {"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
     *
     * 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。
     */

    @Override
    public void initialize() {

    }

    // 这个方法是核心方法,可以处理一条,就可以循环处理多条
    @Override
    public Event intercept(Event event) {
        // byte[] --> String
        String json = new String(event.getBody());

        /*JSONObject jsonObject = JSON.parseObject(json);
        String timestamp = jsonObject.getString("timestamp");
        String level = jsonObject.getString("level");
        String message = jsonObject.getString("message");*/
        Log log = JSON.parseObject(json, Log.class);

        System.out.println(log);

        // 日期比较  after  before   --> Calendar 、Date、DateLocal

        // String --> Date
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        try {
            Date date = dateFormat.parse(log.getTimestamp());

            Date date2 = dateFormat.parse("2023-03-31");


            if((log.getLevel().equals("WARN") || log.getLevel().equals("ERROR"))  &&  date.after(date2)){
                System.out.println("符合条件的数据......");
                return event;
            }

        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        ArrayList<Event> events = new ArrayList<>();
        // 任何一个list ,都不能边循环,边添加或者删除,否则报错!!!
        // 如何过滤掉不符合条件的数据,先将其置为null,然后在循环中,将null的数据不添加到集合中,就过滤掉了
        for (Event oldEvent : list) {
            Event newEvent = intercept(oldEvent);
            if(newEvent != null){
                events.add(newEvent);
            }

        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class EventBuilder implements Builder {

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 视频链接

15-flume中的选择器_哔哩哔哩_bilibili

16-flume的故障转移_哔哩哔哩_bilibili

17-flume中的负载均衡_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【零基础入门产品经理】学习准备篇 | 需要学一些什么呢?

前言&#xff1a; 零实习转行产品经理经验分享01-学习准备篇_哔哩哔哩_bilibili 该篇内容主要是对bilibili这个视频的观后笔记~谢谢美丽滴up主友情分享。 全文摘要&#xff1a;如何在0实习且没有任何产品相关经验下&#xff0c;如何上岸产品经理~ 目录 一、想清楚为什么…

Redis 基础数据改造

优质博文&#xff1a;IT-BLOG-CN 一、服务背景 基础数据查询服务&#xff1a;提供航司、机场、票台、城市等基础数据信息。 痛点一&#xff1a;因为基础数据不属于频繁更新的数据&#xff0c;所以每个应用都有自己和缓存&#xff0c;当基础数据更新后&#xff0c;各个应用缓存…

webGL入门(五)绘制多边形

代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><scri…

ARM 服务器上安装 OpenEuler (欧拉)

系统介绍 在 2019 年 7 月 19 日&#xff0c;华为宣布要在年底正式开源 openEuler 操作系统&#xff1b;在半年后的 12 月 31 日&#xff0c;华为正式开源了 openEuler 操作系统&#xff0c;邀请社区开发者共同来贡献。 一年后&#xff0c;截止到 2020 年12 月 25日&#xff…

计算机毕业设计 Java教务管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

[Cocoa]_[初级]_[使用NSNotificationCenter作为目标观察者实现时需要注意的事项]

场景 在开发Cocoa程序时&#xff0c;由于界面是用Objective-C写的。无法使用C的目标观察者[1]类。如果是使用第二种方案2[2],那么也需要增加一个代理类。那么有没有更省事的办法&#xff1f; 说明 开发界面的时候&#xff0c;经常是需要在子界面里传递数据给主界面&#xff0…

PIKACHU | PIKACHU 靶场 XSS 后台配置

关注这个靶场的其他相关笔记&#xff1a;PIKACHU —— 靶场笔记合集-CSDN博客 PIKACHU 自带了一个 XSS 平台&#xff0c;可以辅助我们完成 XSS 攻击&#xff0c;但是该后台需要配置数据库以后才能使用。本教程&#xff0c;就是教大家如何配置 PIKACHU XSS 平台的。 PIKACHU XS…

vulhub weblogic 靶场攻略

一&#xff1a;WebLogic 后台弱⼝令GetShell&#xff08;weak_password &#xff09; 漏洞描述 通过弱⼝令进⼊后台界⾯ , 上传部署war包 , getshell 影响范围 全版本&#xff08;前提后台存在弱⼝令&#xff09; 环境搭建 cd vulhub-master/weblogic/weak_password doc…

【STM32开发环境搭建】-4-在STM32CubeMX中新增Keil(MDK-ARM) 5的工程目录(包含指定路径的C和H文件)

案例背景&#xff1a; 由于Keil(MDK-ARM)5工程&#xff1a;DEMO_STM32F030C8T6.uvprojx是由STM32CubeMX工具生成的&#xff0c;如果我们在Keil工程中手动添加了一些c文件和h文件的Include Path包含路径&#xff0c;会在STM32CubeMX下一次生成uvprojx文件时&#xff0c;被删除&…

纯软件小白 学习DDR5

问题 1.你知道当你打开游戏加载存档时候计算机是在做什么吗&#xff1f; 由于你的CPU只有在数据被加载到DRAM的时候才可以工作&#xff0c;所以当你需要用数据的时候&#xff0c;数据会从SSD复制到DRAM这一过程需要时间&#xff0c;所以会有加载&#xff08;所有3D模型、纹理…

【从零开始实现stm32无刷电机FOC】【实践】【7.1/7 硬件设计】

目录 stm32电路磁编码器电路电机驱动电路电流采样电路电机选择本文示例硬件说明 为了承载和验证本文的FOC代码工程&#xff0c;本节设计了一个简易的三相无刷电机 硬件套件&#xff0c;主控采用非常常用的stm32f103c8t6单片机&#xff0c;电机编码器采用MT6701&#xff0c;电机…

电源的带载能力怎么判断?Namisoft为您介绍测试方法

确保电源在各种负载条件下都能稳定工作&#xff0c;是电源设计者面临的重要挑战。本文将详细介绍如何通过带载测试来评估电源的负载能力。 电源带载测试介绍 带载能力指电源在其规定条件下&#xff0c;所能承受的最大负载能力。电源带载测试就是对电源模块的负载能力进行测试&a…

调试分析:[跳数度量]更改为[距离度量]后的 routing_bellmanford 算法

回顾复习2023年8月的《★修改Exata6.2源码&#xff1a;〔修改Bellmanford最短路径路由的衡量标准从【路由跳数】改为【“路由器节点间的物理距离”】&#xff0c;并动画演示〕》&#xff0c;VS2015调试Exata&#xff0c;跟踪调试修改后的[ routing_bellmanford.cpp ]源码&#…

AgentScope : 与CodeAct智能体对话

参考&#xff1a; 非一般程序猿第六季Agent入门实战篇(三)–CodeActAgent篇 Conversation with CodeAct Agent 0&#xff0c;简介 CodeAct Agent是一个Agent,它不仅可以聊天,还可以为你编写和执行Python代码。在本示例中,将介绍另一种赋予Agent调用工具能力的方法,特别是通过…

Mac 卸载 IDEA 流程

1、现在应用程序中删除Idea 2、进入Library目录 cd /Users/zhengzhaoxiang/Library 3、删除IntelliJIdea2023.3&#xff08;根据自己的版本而定&#xff09;记得进去看下是否删除干净了 rm -rf Logs/JetBrains/IntelliJIdea2023.3 rm -rf Preferences/com.jetbrains.intel…

项目学习笔记

Downloads – Oracle VirtualBoxhttps://www.virtualbox.org/wiki/Downloads

启动hadoop集群出现there is no HDFS_NAMENODE_USER defined.Aborting operation

解决方案 在hadoop-env.sh中添加 export HDFS_DATANODE_USERroot export HDFS_NAMENODE_USERroot export HDFS_SECONDARYNAMENODE_USERroot export YARN_RESOURCEMANAGER_USERroot export YARN_NODEMANAGER_USERroot 再次运行即可。

Candance仿真电流镜OTA

1.电路图搭建 图1 上面那层不能直接一横直接连过来&#xff0c;图2只能这样连。但是&#xff0c;图2的M1和M0的电压已经超过了VDD的1.8V。是不行的&#xff0c;需要调整&#xff0c;主要增大M1和M0的宽长比以减小电压。 图2 candance电流镜OTA电路实现 下面这篇文章讲了电流镜…

ROS与无人驾驶学习笔记(一)——ROS基本操作

文章目录 ※ 安装ubuntu 下载 创建虚拟机 安装系统 安装vmware tool 更新源 安装常用软件 ※ 安装ROS 设置软件更新 使用清华源安装 ros测试 认识ROS ROS特点 ROS系统实现 ROS安装 工作需要&#xff0c;转行做码农了。。。 大概是无人驾驶相关的&#xff0c;啥都不会。。。 看成…

JS设计模式之状态模式:优雅地管理应用中产生的不同状态

一. 前言 在过去&#xff0c;我们经常使用条件语句&#xff08;if-else 语句&#xff09;来处理应用程序中的不同状态。然而&#xff0c;这种方式往往会让代码变得冗长、难以维护&#xff0c;并可能引入潜在的 bug。而状态模式则提供了一种更加结构化和可扩展的方法来处理状态…