Flume 之自定义 Source

news2024/11/17 4:29:35
1、简介

        Flume 自带 Source 有 Avro、Thrift、Netcat、Taildir、Kafka、Http等,有些场合比如我们指定访问接口获取数据当做 Flume 的 Source,像这种定制化的 Source 需要我们自己实现,下面我将介绍如何自定义实现 Source。

2、自定义实现 Flume 的 Source 

        使用自定义 Source ,访问自身的web 服务,并且发送至 logger 的 Sink。

2.1、引入依赖
<dependencies>
	<dependency>
		<groupId>org.apache.flume</groupId>
		<artifactId>flume-ng-core</artifactId>
		<version>1.11.0</version>
	</dependency>
</dependencies>

 <build>
	<plugins>

	   <plugin>
		   <groupId>org.springframework.boot</groupId>
		   <artifactId>spring-boot-maven-plugin</artifactId>
		   <version>3.2.1</version>
		   <configuration>
			   <skip>true</skip>
		   </configuration>
	   </plugin>
	</plugins>
</build>
2.2、自定义Source
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MySource extends AbstractSource implements Configurable, PollableSource {
     private String path;

     private final static Logger log = LoggerFactory.getLogger(MySource.class);
     
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        try{
            URL url = new URL(this.path);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            connection.connect();

            InputStream is = connection.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is, "utf-8"));
            StringBuffer stringBuffer = new StringBuffer();

            String line = null;
            while ((line = bufferedReader.readLine()) != null){
                stringBuffer.append(line);
                stringBuffer.append("\t");
            }
            log.info("test =======================: {}" , stringBuffer.toString());
            Event event = EventBuilder.withBody(stringBuffer.toString().getBytes());
            getChannelProcessor().processEvent(event);
            status = Status.READY;
            // 2s调用一次
            Thread.sleep(2000);
        }catch (Exception ex){
            log.info("出错了!, {}", ex.getMessage());
            status = Status.BACKOFF;
        }
        return status;
    }
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
    @Override
    public void configure(Context context) {
        // 从配置文件中读取
        String path = context.getString("path", "http://baidu.com");
        log.info("path ==========================: {}", path);
        this.path = path;
    }
}
2.3、将自定义Source 打包放到 flume 的 lib目录下

2.4、flume 的配置文件编写 

        vim flume-self-source.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource # 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.33:8088/hello # 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sinks.k1.type = logger
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.5、测试
2.5.1、启动web服务

2.5.2、启动flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source.conf -Dflume.root.logger=INFO,console
2.5.3、结果 

 

3、总结

        本文详细介绍 Flume 如何实现自定义 Source,帮助大家进一步了解Flume的使用。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1390611.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux中放大字体

环境&#xff1a;VMware17Pro&#xff0c;Ubuntu22.04 在显示设置外观中只看到图标放大的调整&#xff0c;没看到字体大小设置 不按照常规设置&#xff0c;点开下面的辅助功能->大号文本&#xff08;没有设置具体字号的选项&#xff0c;但是可以放大&#xff09; 效果图如下…

五、带登录窗体的demo

做了一个简单的带登录窗体的demo&#xff0c;有用户名和密码不能为空的验证&#xff0c;原理是在main.cpp的主函数入口处&#xff1a; 1、将默认的MainWindow主窗体注释。 2、新建一个formlogin登录窗体&#xff0c;在主函数中先运行登录窗体。 3、在登录窗体中引用MainWind…

Monorepo-uniapp 构建分享

Monorepo uniapp 构建灵感&#xff1a;刚好要做一个项目&#xff0c;于是想到升级一下之前自己写的一个vue3tspiniauno的模版框架&#xff0c;其实那个框架也不错&#xff1b;只是感觉还差点东西&#xff0c;我已经用那个小框架写了两三个项目&#xff1b;轻巧实用。为什么选…

线性代数——行列式按行(列)展开

目录 一、余子式&#xff1a;将行列式某元素所在行和列的元素全去掉 剩余部分所构成的行列式&#xff0c;称为该元素的余子式 二、代数余子式 三、行列式等于它的任一行&#xff08;列&#xff09;的各元素与对应代数余子式乘积之和 四、行列式某行元素&#xff08;列&…

transbigdata 笔记: 官方文档示例3:车辆轨迹数据处理

1 读取数据 轨迹数据质量分析 这一部分和 transbigdata笔记&#xff1a;data_summary 轨迹数据质量/采样间隔分析-CSDN博客 的举例是一样的 import pandas as pd import geopandas as gpd import transbigdata as tbddata pd.read_csv(Downloads/TaxiData-Sample.csv, names…

一文解析 Copycat Dex与 Bitcat Dex的区别

Copycat Dex和 Bitcat Dex都带一个 Cat 并且都是衍生品协议&#xff0c;很多人都会误认为这两个是同一个项目&#xff0c;实际不然。它们是面向两个不同赛道、不同资产类型的衍生品项目。 Copycat Dex和 Bitcat Dex都是衍生品 DEX&#xff0c;它们最本质的区别主要在于&#xf…

FPGA时序分析与时序约束(四)——时序例外约束

目录 一、时序例外约束 1.1 为什么需要时序例外约束 1.2 时序例外约束分类 二、多周期约束 2.1 多周期约束语法 2.2 同频同相时钟的多周期约束 2.3 同频异相时钟的多周期约束 2.4 慢时钟域到快时钟域的多周期约束 2.5 快时钟域到慢时钟域的多周期约束 三、虚假路径约…

亚马逊店飞飞ERP系统,跟卖+铺货+物流发货模式综合一体的ERP系统

跨境电商亚马逊&#xff0c;目前为止电商行业比较靠前的电商平台&#xff01;那么有人做电商&#xff0c;就会有人出单&#xff0c;有人出单就会有中转仓需求&#xff0c;代打包&#xff0c;代贴单&#xff01;那么这一切都是有一套逻辑完善的ERP来完成&#xff01;前端通过授权…

人工智能培训靠谱吗

靠谱的&#xff0c;因为人工智能是未来的发展趋势&#xff0c;因此&#xff0c;人工智能工程师也将成为就业爆款。人工智能工程师负责创建和开发自动化系统、算法和机器学习模型&#xff0c;以实现自主决策和任务执行。由于人工智能在可穿戴设备、家庭自动化、智能城市和自动驾…

信息技术安全评估准则新版标准的变化

文章目录 前言一、GB/T 18336 标准在我国的应用情况&#xff08;一&#xff09;以GB/T 18336 标准制定的信息技术产品国家标准&#xff08;二&#xff09;GB/T 18336 标准提升了国家关键信息基础设施的整体网络安全保障水平 二、新版 GB/T 18336 标准的变化及应用展望三、标准支…

C#,入门教程(66)——枚举Enum的高等用法

前言&#xff1a;国内码农与国外优秀程序员的最大区别是&#xff0c;我们的专家、教授喜欢唾沫横飞地&#xff0c;夸夸其谈语言特性、框架、性能&#xff0c;唯一目的是带私货&#xff08;书籍或教程&#xff09;&#xff0c;很少能写出真有用的程序。差距在哪呢&#xff1f;基…

MFC CAsyncSocket类作为客户端示例

之前写过CAsyncSocket类使用的博客;进一步看一下; VS新建一个MFC 对话框工程; 添加一个类,从CAsyncSocket继承,起个自己的名字; 对话框添加几个编辑框,按钮,静态控件; 为自己的CxxxAsyncSocket类添加重写的虚函数,OnConnect、OnReceive、OnSend; 自己的CAsyncSoc…

Python数据结构——列表

目录 一、认识Python数据结构 二、列表概述 三、列表切片 &#xff08;一&#xff09;概述 &#xff08;二&#xff09;常见形式 &#xff08;三&#xff09;特别说明 四、列表的基本操作 &#xff08;一&#xff09;创建列表 &#xff08;二&#xff09;列表元素增加…

宿舍管理系统的设计与实现:基于Spring Boot、Java、Vue.js和MySQL的完整解决方案

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

GMP模型学习笔记:概念、流程概述、设计机制及部分场景

前言 Go是并发而生的语言&#xff0c;其中则通过GMP模型来进行协程的分配与调度。本篇将记录自己学习GMP模型的笔记。 进程、线程、协程分配流程概述 计算机发展之初&#xff0c;是只有进程的。那时候是单进程时代&#xff0c;多个进程顺序执行&#xff0c;计算机也没有并发…

sentinel熔断与限流

文章目录 一、sentinel简介Sentinel 是什么&#xff1f;Sentinel安装 二、sentinel整合工程新建cloudalibaba-sentinel-service8401微服务引入依赖yml配置主启动类添加EnableDiscoveryClient业务类测试 三、sentinel流控规则基本介绍流控模式直接&#xff08;默认&#xff09;关…

在pycharm远程连接树莓派遇到的No files or folders found to process处理办法

在PyCharm中解决"No files or folders found to process"错误的另一个方法是通过Deployment中的Configuration选项。在PyCharm中&#xff0c;找到Tool并选择Deployment&#xff0c;然后点击Configuration。 在设置路径的过程中需要注意目标目录是相对的 在中 会识…

DNS从入门到精通

DNS从入门到精通 Dns从入门到精通 DNS从入门到精通一、DNS原理二、企业高速缓存dns的搭建三、DNS相关名词解释四、权威DNS搭建编辑子配置文件&#xff08;主要写我们维护的域zone)开始解析 五、权威dns中的数据记录种类及应用编辑子配置文件&#xff08;主要写我们维护的域zone…

微信小程序canvas画布图片保存到相册官方授权、自定义授权、保存

关键步骤介绍 wx.getSetting可以获取授权信息。 wx.authorize首次授权时会打开弹框让用户授权&#xff0c;若用户已选择同意或拒绝&#xff0c;后续不会再显示授权弹框。 如果授权信息显示未进行相册授权&#xff0c;则打开自定义弹框&#xff08;show_auth: true&#xff0…

自定义C#类库(.dll文件)

环境配置 操作系统&#xff1a;Windows 10 开发工具&#xff1a;Visual Studio 2022 .Net桌面开发环境&#xff1a; 开发步骤 &#xff08;一&#xff09;创建C#类库项目 &#xff08;二&#xff09;配置项目名称和项目路径 &#xff08;三&#xff09;选择所使用的框架&a…