【大数据之Flume】六、Flume进阶之自定义Source

news2025/1/15 23:05:52

(1)概述:
  Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

  自定义source的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html
  自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

  实现的方法:
    getBackOffSleepIncrement() //backoff 步长
    getMaxBackOffSleepInterval()//backoff 最长时间
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用

  适用于:读取MySQL数据或者其它文件系统。

(2)需求:
  使用 flume 接收数据,并给每条数据添加前缀,输出到控制台,使得前缀后缀可从 flume 配置文件中配置。
在这里插入图片描述
(3)分析:
在这里插入图片描述
步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>

(2)自定义MySource ,继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.source;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    private String perfix;
    private String subfix;
    private Long delay;

    //初始化 context(读取配置文件内容)
    @Override
    public void configure(Context context) {
        perfix = context.getString("per","per-");
        subfix = context.getString("sub");
        delay = context.getLong("delay",2000L);
    }

    //获取数据封装成 event 并写入 channel,这个方法将被循环调用。
    @Override
    public Status process() throws EventDeliveryException {

        //循环封装事件
        try {
            for (int i = 0; i < 5; i++) {
                //创建事件
                Event e = new SimpleEvent();
                //创建事件头信息
                HashMap<String, String> header = new HashMap<>();
                //给事件设置头信息
                e.setHeaders(header);
                //给事件设置内容
                e.setBody((perfix+"test: "+i+subfix).getBytes());
                //将事件写入 channel
                getChannelProcessor().processEvent(e);
            }

            Thread.sleep(delay);
            return Status.READY;
        } catch (Exception exception) {
            exception.printStackTrace();
            return Status.BACKOFF;
        }

    }

    //backoff 步长
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    //backoff 最长时间
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group5,在该文件夹下创建配置文件flume-mysource-logger.conf。

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.study.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.per = per
a1.sources.r1.sub = sub

# Describe the sink 
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 -f job/group5/flume-mysource-logger.conf -Dflume.root.logger=INFO,console

(5)结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/833278.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 快速创建桌面图标

在安装 tar.gz 这类型压缩文件时&#xff0c;通常启动文件是.sh文件。文章主要记录快速添加到桌面图标。 1、解压 tar -zxvf XXX.tar.gz 2、创建桌面图标文件 touch XXX.desktop 3、文件中配置 [Desktop Entry] NameXXX CommentZZZ Exec/软件可执行文件所在目录/可执行文…

Python 一篇入门

目录 Python 的简介与特点 Python支持多种编程风格 解释运行 跨平台 可扩展强 可嵌入 丰富的库 Python版本选择 Python开发环境搭建 认识Python解释器 快速入门 变量和赋值 动态类型 变量命名规则 认识 "数字" 认识 "字符串" 认识 "…

【数据结构】移除链表元素-图文解析(单链表OJ题)

LeetCode链接&#xff1a;203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 本文导航 &#x1f4ad;做题思路 &#x1f3a8;画图更好理解&#xff1a; ✍️代码实现 &#x1f5c2;️分情况讨论&#xff1a; ❄️极端情况&#xff1a; &#x1f4ad;做题思路 遍历链表…

油画欣赏|桂林的《瑞云》

《瑞云》陈可之•2023年绘油画《瑞云》&#xff0c;运用巧妙的光影处理&#xff0c;描绘出桂林之山、水、云的大好自然风光&#xff0c;令人心旷神怡。此作品是陈可之先生游览桂林时所绘新作。阳光和煦&#xff0c;碧波清流&#xff0c;乘舟遥看山水&#xff0c;亦是人间一大乐…

深度学习——全维度动态卷积ODConv

ODConv(OMNI-DIMENSIONAL DYNAMIC CONVOLUTION)是一种关注了空域、输入通道、输出通道等维度上的动态性的卷积方法&#xff0c;因此被称为全维度动态卷积。 part1. 什么是动态卷积 动态卷积就是对卷积核进行线性加权 第一篇提出动态卷积的文章也是在SE之后&#xff0c;他提出…

前端如何打开钉钉(如何唤起注册表中路径与软件路径不关联的软件)

在前端唤起本地应用时&#xff0c;我查询了资料&#xff0c;在注册表中找到腾讯视频会议的注册表情况&#xff0c;如下&#xff1a; 在前端代码中加入 window.location.href"wemeet:"; 就可以直接唤起腾讯视频会议&#xff0c;但是我无法唤起钉钉 之所以会这样&…

2023华数杯数学建模A题思路分析 - 隔热材料的结构优化控制研究

# 1 赛题 A 题 隔热材料的结构优化控制研究 新型隔热材料 A 具有优良的隔热特性&#xff0c;在航天、军工、石化、建筑、交通等 高科技领域中有着广泛的应用。 目前&#xff0c;由单根隔热材料 A 纤维编织成的织物&#xff0c;其热导率可以直接测出&#xff1b;但是 单根隔热…

nodejs中的path.json和path.resolve的区别

nodejs中的path.json和path.resolve的区别 我们有多少次在 Node.js 项目中遇到过path.join()和path.resolve()却没有真正理解它们之间的区别&#xff1f;本文就讲解一下这两者的区别。 重要术语 首先我们先来看看几个术语&#xff0c;便于后续我们掌握这两者的差异。 字符串…

HCIP实验练习题

实验拓扑如下&#xff1a; 实验题目要求如下&#xff1a; 【1】R2为ISP&#xff0c;只能配置IP地址 【2】R1~R2之间为HDLC封装 【3】R2~R3之间为ppp封装&#xff0c;pap认证&#xff0c;R2为主认证方 【4】R2~R4之间为ppp封装&#xff0c;chap认证&#xff0c;R2为主认证方…

2023年华数杯数学建模C题思路 - 母亲身心健康对婴儿成长的影响

# 1 赛题 C 题 母亲身心健康对婴儿成长的影响 母亲是婴儿生命中最重要的人之一&#xff0c;她不仅为婴儿提供营养物质和身体保护&#xff0c; 还为婴儿提供情感支持和安全感。母亲心理健康状态的不良状况&#xff0c;如抑郁、焦虑、 压力等&#xff0c;可能会对婴儿的认知、情…

Java包装类(自动拆装箱)

包装类 为什么要有包装类&#xff1f; 在面向对象中&#xff0c;“一切皆为对象”&#xff0c;但是基本数据类型不符合这一理念&#xff0c;为了让基本类型也称为对象 便于类型之间的转化&#xff0c;数据类型之间的基本操作 转换方式&#xff1a; int ——> Integer ne…

软件第三方测评机构简析,软件检测证明材料的作用和意义

软件检测证明材料是由信息产业部授权或省级软件产业主管部门认可的软件第三方测评机构出具的检测证明材料&#xff0c;对于用户来说非常重要&#xff0c;是从软件的开发过程、测试结果和用户评价等方面来判断软件的质量和安全性&#xff0c;它可以帮助用户判断软件的质量和安全…

【c++】rand()随机函数的应用(二)——舒尔特方格数字的生成

目录 一、舒尔特方格简介 二、如何生成舒尔特方格 &#xff08;一&#xff09;线性同余法 1、利用线性同余法生成随机数序列的规律 (1) 当a和c选取合适的数时&#xff0c;可以生成周期为m的随机数序列 (2) 种子seed取值也是有周期的 2、利用线性同余法生成5阶舒尔特方格…

IIC子系统-实现si7006温湿度传感器采集温湿度功能

1.将IIC核心层和总线驱动层配置进内核 *********************配置核心层*************************1.找到核心层代码目录&#xff1a;内核顶层目录/drivers/i2c2. 内核顶层目录执行make menuconfig3. > Device Drivers > I2C support ->-*-I2C support4.保存退出***…

Java进阶——数据结构与算法之哈希表与树的入门小结(四)

文章大纲 引言一、哈希表1、哈希表概述2、哈希表的基本设计思想3、JDK中的哈希表的设计思想概述 二、树1、树的概述2、树的特点3、树的相关术语4、树的存储结构4.1、双亲表示法4.2、孩子兄弟表示法&#xff1a;4.3、孩子表示法&#xff1a;4.4、双亲孩子表示法 三、二叉树1、二…

ThreadPoolExecutor详解(上)

为什么会有线程池&#xff1f; 如果客户端发一个请求&#xff0c;服务端就创建一个线程接收请求&#xff0c;线程资源是有限的&#xff0c;而且创建一个线程和执行结束之后都要调用操作系统资源销毁线程&#xff0c;这样频繁操作肯定非常占用cpu和内存资源&#xff0c;线程池的…

性能测试 —— “问题分析”

性能测试大致分以下几个步骤&#xff1a; 需求分析 脚本准备 测试执行 结果整理 问题分析 今天要说的是最后一个步骤——“问题分析”&#xff1b; 需求描述 有一个服务&#xff0c;启动时会加载一个1G的词表文件到内存&#xff0c;请求来了之后&#xff0c;会把请求词去…

构建稳健的PostgreSQL数据库:备份、恢复与灾难恢复策略

在当今数字化时代&#xff0c;数据成为企业最宝贵的资产之一。而数据库是存储、管理和保护这些数据的核心。PostgreSQL&#xff0c;作为一个强大的开源关系型数据库管理系统&#xff0c;被广泛用于各种企业和应用场景。然而&#xff0c;即使使用了最强大的数据库系统&#xff0…

LeetCode 25题:K个一组翻转链表

题目&#xff1a; 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。 你不能只是单纯…

嵌入式:C高级 Day2

一、递归实现&#xff0c;输入一个数&#xff0c;输出这个数的每一位 二、递归实现&#xff0c;输入一个数字&#xff0c;输出这个数的二进制 三、写一个脚本&#xff0c;包含以下内容 1.显示/etc/group文件中第五行的内容 2.创建目录/home/ubuntu/copy 3.切换工作路径到此目录…