大数据技术之Flume 企业开发案例——自定义 Source(9)

news2024/9/23 7:22:36

目录

自定义 Source

1)介绍

2)需求

3)分析

4)编码

5)测试


自定义 Source

1)介绍

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型和格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy 等。虽然官方提供的 Source 类型已经很丰富,但在实际开发中可能仍不能完全满足需求。此时,可以根据实际需求来自定义 Source。

官方提供了自定义 Source 的接口:Flume Developer Guideicon-default.png?t=N7T8https://flume.apache.org/FlumeDeveloperGuide.html#source。自定义 MySource 需要继承 AbstractSource 类并实现 ConfigurablePollableSource 接口。

主要实现的方法包括:

  • getBackOffSleepIncrement() —— backoff 步长
  • getMaxBackOffSleepInterval() —— backoff 最长时间
  • configure(Context context) —— 初始化 context(读取配置文件内容)
  • process() —— 获取数据封装成 event 并写入 channel,这个方法将被循环调用。

使用场景:例如读取 MySQL 数据或其他文件系统。

2)需求

使用 Flume 接收数据,并为每条数据添加前缀,输出到控制台。前缀可以从 Flume 配置文件中配置。

3)分析

自定义 Source 需求分析:

  • 继承 AbstractSource
  • 实现 Configurable 和 PollableSource

关键方法:

  • configure(Context context):读取配置文件(如 XX.conf)中的配置信息
  • process():接收数据,将数据封装成一个个的 Event,写入 Channel。使用 for 循环模拟数据生成(例如:for(int i = 0; i < 5; i++)
  • getBackOffSleepIncrement():暂不使用
  • getMaxBackOffSleepInterval():暂不使用

4)编码

(1)导入 pom 依赖

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
  </dependency>
</dependencies>

(2)编写代码

package com.lzl;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

  private Long delay;
  private String field;

  @Override
  public void configure(Context context) {
    delay = context.getLong("delay");
    field = context.getString("field", "Hello!");
  }

  @Override
  public Status process() throws EventDeliveryException {
    try {
      HashMap<String, String> headerMap = new HashMap<>();
      SimpleEvent event = new SimpleEvent();
      for (int i = 0; i < 5; i++) {
        event.setHeaders(headerMap);
        event.setBody((field + i).getBytes());
        getChannelProcessor().processEvent(event);
        Thread.sleep(delay);
      }
    } catch (Exception e) {
      e.printStackTrace();
      return Status.BACKOFF;
    }
    return Status.READY;
  }

  @Override
  public long getBackOffSleepIncrement() {
    return 0;
  }

  @Override
  public long getMaxBackOffSleepInterval() {
    return 0;
  }
}

5)测试

(1)打包 将写好的代码打包,并放到 Flume 的 lib 目录(例如 /opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.lzl.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = lzl

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)开启任务

[lzl@hadoop12 flume]$ pwd
/opt/module/flume
[lzl@hadoop12 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

(4)结果展示 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2081557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Spring】Spring循环依赖的解决方案

【Spring】Spring循环依赖的解决方案 开篇词&#xff1a;干货篇&#xff1a;1.PostConstruct方法&#xff1a;2.构造函数注入和Setter方法注入&#xff1a;3.配置文件注明【允许存在循环引用】4.Lazy5.SpringUtil的getBean 我是杰叔叔&#xff0c;一名沪漂的码农&#xff0c;下…

Swift concurrency 3 — 三种异步方式(@escaping closure, Combine, async/await)

直到现在为止&#xff0c;如果我们想要异步请求数据&#xff0c;应该说至少有三种方式&#xff1a; 传统的通过闭包&#xff08;escaping closure&#xff09;方式回调处理。通过Combine的发布者订阅者机制。通过async/await组合的方式。 采用哪种方式&#xff0c;还得因项目…

基于微信小程序音乐分享与交流平台---附源码95587

摘 要 随着移动互联网的普及&#xff0c;微信小程序作为一种轻量级的应用程序&#xff0c;正逐渐成为人们获取信息和服务的便捷渠道。为了给用户提供便捷、多样化的音乐分享和交流渠道&#xff0c;本文提出了—种基于微信小程序的音乐分享与交流平台的设计与实现方案。通过该平…

SqlServer还原数据库后,数据库显示受限制用户解决方法

SqlServer还原数据库后&#xff0c;数据库显示受限制用户解决方法: 1.打开SSMS。 2.选中连接的数据库&#xff0c;右击鼠标右键&#xff0c;点击属性 3.在属性对话框中选择选项 4.在该对话框的右边&#xff0c;找到【限制访问】,并且将其改为&#xff1a;【MULTI_USER】 5.点击…

1个月2万粉的AI职业头像号,1分钟速成超详细教程

大家好 今天要拆解的是一个**小红书的AI职业头像号博主&#xff0c;**博主使用了手绘的二次元风格头像&#xff0c;不到1个月&#xff0c;现在的粉丝数已经1.9万了&#xff0c;收藏点赞9万。 有图有真相&#xff1a; 一、账号作品分析 这个博主的账号都是图文笔记。使用AI制…

【Google Maps JavaScript API】Simple Click Events 详解

文章目录 一、Simple Click Events 简介1. 什么是 Simple Click Events&#xff1f;2. 为什么使用 Simple Click Events&#xff1f; 二、Simple Click Events 的实现1. 基本代码结构2. 设置地图样式3. 初始化地图 三、处理点击事件1. 为标记添加点击事件2. 中心改变事件 四、完…

【JAVA基础】抽象类

抽象类 引言抽象类 抽象方法 引言 ​ 在面向对象的概念中&#xff0c;所有的对象都是通过类来描绘的&#xff0c;但是反过来&#xff0c;并不是所有的类都是用来描绘对象的&#xff0c;如果一个类中没有包含足够的信息来描绘一个具体的对象&#xff0c;这样的类就是抽象类。 …

【Google Maps JavaScript API】Geolocation功能实现用户位置定位

文章目录 一、什么是Geolocation&#xff1f;二、Geolocation的应用场景三、如何使用Geolocation功能1. 初始化地图2. 编写初始化地图的JavaScript代码3. 代码解析初始化地图创建定位按钮获取用户位置处理定位错误 4. 样式设置5. 运行示例 四、注意事项五、总结 Google Maps Ja…

【GIT】Idea中的git命令使用-全网最新详细(包括现象含义)

原文网址&#xff1a;【GIT】Idea中的git命令使用-全网最新详细&#xff08;包括现象含义&#xff09; 文章目录 **命令1&#xff1a;查看当前所处分支&#xff1a;****命令2&#xff1a;拉取最新代码&#xff1a;****命令3&#xff1a;切换分支&#xff1a;****命令4&#xff…

AIOps探索 | 运维应急的六个阶段

当下&#xff0c;金融科技快速发展的时代&#xff0c;银行和金融机构的IT系统日益复杂&#xff0c;业务量呈指数式增长。面对这一挑战&#xff0c;运维应急已成为确保金融服务稳定性和可靠性的关键因素。 智能运维应急即在IT系统出现异常或故障时&#xff0c;快速发现问题、准…

【达梦数据库】数据库频繁崩溃记录-非dump分析

这里写目录标题 背景查找原因问题解决补充 背景 用户反映系统CentOS上的数据库频繁崩溃&#xff0c;系统没有崩溃过&#xff0c;希望帮忙分析下 查找原因 查看数据库运行日志&#xff1a;无任何报错 查看数据库错误日志&#xff1a;无报错 查看OS运行日志&#xff0c;/var…

EPLAN中如何制作标题页和封页?

EPLAN中如何制作标题页和封页? 如下图所示,我们打开EPLAN,新建一个项目,

开学季有什么必买的好物?提升学习效率的好物来啦!学生党必看!

对于即将开学的学生们来说&#xff0c;选择一款性价比高的电容笔是非常重要的。它不仅能够提升学习效率&#xff0c;还能带来更加流畅舒适的书写与绘画体验。接下来&#xff0c;我将推荐一款非常适合学生使用的电容笔&#xff0c;它不仅性能可靠&#xff0c;而且价格亲民&#…

大语言模型向量检索技术综述:背景知识、数据效率、泛化能力、多任务学习、未来趋势

预训练语言模型如BERT和T5&#xff0c;是向量检索(后续文中使用密集检索)的关键后端编码器。然而&#xff0c;这些模型通常表现出有限的泛化能力&#xff0c;并在提高领域内准确性方面面临挑战。最近的研究探索了使用大型语言模型&#xff08;LLMs&#xff09;作为检索器&#…

苹果机器人计划:能否成为智能家居的破局者?

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

ADB 获取屏幕坐标,并模拟滑动和点击屏幕

本文声明:本文是参考https://blog.csdn.net/beyond702/article/details/69258932编制。同时,补充了在windows系统模式下,详细的获取屏幕坐标的步骤。 1.判断设备与windows电脑USB连接是否正常 在CMD窗口输入命令:ADB devices,按ENTER键,输出如下结果,则表示连接正常。 …

LLM大模型微调心得:全面经验总结与技巧分享

导读 模型越大对显卡的要求越高&#xff0c;目前主流对大模型进行微调方法有三种&#xff1a;Freeze方法、P-Tuning方法和Lora方法。本文总结了作者在ChatGLM-6B模型微调的经验&#xff0c;并汇总了目前各类开源项目&数据。 写在前面 大型语言模型横行&#xff0c;之前非…

Android经典实战之Kotlin的delay函数和Java中的Thread.sleep有什么不同?

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 Kotlin 中的 delay 函数和 Java 中的 Thread.sleep 都用于暂停代码执行&#xff0c;但是它们用于不同的场景&#xff0c;并且有不同的实现和影响…

饭馆扫码点餐小程序什么方法进行开发

扫码点餐模式出现的时间已经比较久&#xff0c;其主要作用便是节约客商时间&#xff0c;客户自己点餐&#xff0c;商家响应餐品跟进&#xff0c;降低服务员长时间沟通成本&#xff0c;当然客户饭馆消费也不能只依靠工具&#xff0c;还是需要与服务结合&#xff0c;打造有温度的…

企业网络安全“九九八十一难”,且看XDR的黑!神!话!

近期&#xff0c;游戏《黑神话悟空》的热度攀升&#xff0c;不仅汇聚了全球玩家的目光&#xff0c;也悄然成为黑客及网络不法分子的目标。 游戏预热阶段&#xff0c;其剧情视频意外泄露&#xff0c;迅速在网络上引发热烈讨论与业内关注。随后&#xff0c;有黑客组织公开宣称将…