Flume EmbeddedAgent

news2025/2/25 4:15:41

flume

flume 二次开发,对EmbeddedAgent的简易改造,动态控制agent,实现启动、关闭等功能。
模块结构如下所示:
在这里插入图片描述
flume-parent github地址

1、用途

1.1、本地调试

对flume不是特别熟悉的开发者,都没有办法一次开发完Source或Sink,改造完后方便本地在调试

1.2、开发ETL工具

可开发ETL工具的好处是有具备事务的Channel,不会造成数据丢失,但如果要实现多种类型,有较大的开发量,可实现ETL功能

2、模块介绍

2.1、flume-engine

flume-engine是不可运行的jar包,要是其可以独立运行,添加启动类即可,或被依赖于其他可运行包中

2.1.1、代码说明

com.softwarevax.flume.agent包下

1、embedded
    改造的EmbeddedAgent,可支持多Souece,Sink。原先的EmbeddedAgent,Source、Sink都只支持一个。若需功能如同命令行启动搬强大,需要对
EmbeddedAgentConfiguration.configure(String name, Map<String, String> props);
方法进行改造,使传入的Map<String, String>属性,解析成类似配置文件的格式返回。
    Source、Interceptor、Channel、Processor、Sink都是通过该方法的返回属性创建而来,详见:
MaterializedConfiguration conf = this.configurationProvider.get(this.name, properties);
2、entity
    通过传入实体的方式,解析成EmbeddedAgentConfiguration.configure(String name, Map<String, String> props)
方法的入参形式,使其能正常解析,该包都是些Source、Interceptor、Channel、Processor、Sink的载体。

2.1.2、操作agent

    创建一个AgentManager实体,可以提交、关闭agent

2.2、flume-client

    web应用,用来提供agent启动、关闭的接口。可考虑新增一个类似网关的模块,agent都提交通过到网关模块,网关模块配置一些策略,
决定提交到哪个flume-client中运行,如负载均衡策略。

2.2、flume-api

    含所有开发的Source、Interceptor、Sink,所有的拦截器均放在api-interceptor-flume模块,Souce和Sink都新建一个模块

3、自定义开发

Source、Interceptor、Channel、Processor、Sink暂且都称为组件

3.1、Configurable


实现了Configurable接口的组件,在调用EmbeddedAgent.configure(Map<String, String> configure)时就会回调接口中的唯一方法,
不需要等到调用EmbeddedAgent.start();

3.2、Source

    Source分为PollableSource和EventDrivenSource,关系数据库,还有消息中间件(RocketMQ、Kafka),基本都是PollableSource类型,
RabbitMQ是EventDrivenSource类型的,具体实现哪种Source,取决于获取数据的方式。PollableSource类型的process()方法,如果返回
Status.BACKOFF,经过getBackOffSleepIncrement()时间后会再次调用,如果返回Status.READY,执行完之后,就会再次进入process()方法。

3.3、Interceptor

    Interceptor是依附在Source上的,配置的拦截器,要指是哪个定Source的拦截器。实现接口Interceptor即可,若需要配置参数,再实现接口
Configurable,在拦截器上,可以做一个简单处理,比如碰到字符串为null,将他改为""。

3.4、Channel

    transactionCapacity默认为100,如果一次提交超过100条数据,则会提交失败。capacity是Channel的容量,Channel有file、menory等类型,
详见ChannelType。

3.5、Sink

Sink需要开启事务,防止数据丢失。
Transaction transaction = channel.getTransaction();
transaction.begin();
transaction.commit();
transaction.close();

4、agent启动和关闭例子

将t_user的数据,复制到t_user_copy表中

4.1、启动flume-client

4.2、调用启动接口

http://localhost:8080/start

Content-Type: application/json

{
    "channel": {
        "configuration": {
            "type": "MEMORY",
            "transactionCapacity": "1000",
            "capacity": "1000000"
        },
        "type": "MEMORY"
    },
    "name": "mysql",
    "processor": {
        "configuration": {
            "type": "DEFAULT"
        },
        "type": "DEFAULT"
    },
    "sink": {
        "sinks": [
            {
                "configuration": {
                    "type": "com.softwarevax.flume.sink.mysql.MySQLSink",
                    "driverClassName": "com.mysql.cj.jdbc.Driver",
                    "username": "root",
                    "password": "123456",
                    "url": "jdbc:mysql://localhost:3306/optimize?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true",
                    "table": "t_user_copy",
                    "batch.size": "1000",
                    "columns": "id,name,nick_name,id_card_no,sex,phone,email,wechat"
                },
                "name": "s1"
            }
        ]
    },
    "source": {
        "sources": [
            {
                "configuration": {
                    "driverClassName": "com.mysql.cj.jdbc.Driver",
                    "username": "root",
                    "password": "123456",
                    "url": "jdbc:mysql://localhost:3306/optimize?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true",
                    "type": "com.softwarevax.flume.source.mysql.MySQLSource",
                    "table": "t_user",
                    "fetch.size": "1000"
                },
                "interceptors": [
                    {
                        "configuration": {
                            "type": "com.softwarevax.flume.interceptor.TimeStampInterceptor$Builder"
                        },
                        "name": "interceptor_1"
                    }
                ],
                "name": "r1"
            }
        ]
    }
}

4.3、查看flume agent的启动的日志

HeadTagInterceptor是默认的拦截器,可以将名字设置为interceptor_0,覆盖默认的拦截器

mysql.channels=mysql-channel
mysql.channels.mysql-channel.capacity=1000000
mysql.channels.mysql-channel.transactionCapacity=1000
mysql.channels.mysql-channel.type=MEMORY
mysql.sinkgroups=mysql-sink-group
mysql.sinkgroups.mysql-sink-group.processor.type=DEFAULT
mysql.sinkgroups.mysql-sink-group.sinks=s1
mysql.sinks=s1
mysql.sinks.s1.batch.size=1000
mysql.sinks.s1.channel=mysql-channel
mysql.sinks.s1.columns=id,name,nick_name,id_card_no,sex,phone,email,wechat
mysql.sinks.s1.driverClassName=com.mysql.cj.jdbc.Driver
mysql.sinks.s1.password=123456
mysql.sinks.s1.table=t_user_copy
mysql.sinks.s1.type=com.softwarevax.flume.sink.mysql.MySQLSink
mysql.sinks.s1.url=jdbc:mysql://localhost:3306/optimize?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true
mysql.sinks.s1.username=root
mysql.sources=r1
mysql.sources.r1.channels=mysql-channel
mysql.sources.r1.driverClassName=com.mysql.cj.jdbc.Driver
mysql.sources.r1.fetch.size=1000
mysql.sources.r1.interceptors=r1_interceptor_0 r1_interceptor_1
mysql.sources.r1.interceptors.r1_interceptor_0.type=com.softwarevax.flume.interceptor.HeadTagInterceptor$Builder
mysql.sources.r1.interceptors.r1_interceptor_1.type=com.softwarevax.flume.interceptor.TimeStampInterceptor$Builder
mysql.sources.r1.password=123456
mysql.sources.r1.table=t_user
mysql.sources.r1.type=com.softwarevax.flume.source.mysql.MySQLSource
mysql.sources.r1.url=jdbc:mysql://localhost:3306/optimize?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true
mysql.sources.r1.username=root

4.4、查看flume agent的关闭打印的日志

Component type: CHANNEL, name: mysql-channel stopped
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.start.time == 1671597763413
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.stop.time == 1671597786235
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.capacity == 1000000
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.current.size == 18000
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.put.attempt == 137000
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.put.success == 137000
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.attempt == 119000
Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.success == 119000
Source runner interrupted. Exiting

4.5、属性方式启动

Map<String, String> properties = new HashMap<>();
// source
properties.put("sources", "r1 r2");

properties.put("r1.type", "com.softwarevax.flume.source.MySource");
properties.put("r1.pre", " r1-local ");
properties.put("r1.sub", " r1-host ");
properties.put("r1.delay", "1000");
// 设置拦截器[0代表顺序, 可覆盖公用的拦截器]
properties.put("r1.interceptors[1].type", "com.softwarevax.flume.interceptor.TimeStampInterceptor$Builder");
// 设置拦截器属性
properties.put("r1.interceptors[1].name", "张三");

properties.put("r2.type", "com.softwarevax.flume.source.MySource2");
properties.put("r2.pre", " r2-local ");
properties.put("r2.sub", " r2-host ");
properties.put("r2.delay", "1000");

// memory、file(为每个任务设置相应的路径)
properties.put("channel.type", "file");
properties.put("channel.capacity", "100000");

// sink group
properties.put("sinks", "s1 s2");

// s1
properties.put("s1.type", "com.softwarevax.flume.sink.MySink");
properties.put("s1.pre", "s1-");
properties.put("s1.sub", "-s1");

// s2
properties.put("s2.type", "com.softwarevax.flume.sink.MySink2");
properties.put("s2.pre", "s2-");
properties.put("s2.sub", "-s2");

// processor负载均衡
properties.put("processor.type", "load_balance");
// type = load_balance时,可自定义selector,默认ROUND_ROBIN
properties.put("processor.selector", "round_robin");

try {
    EmbeddedAgent agent = new EmbeddedAgent("agent");
    agent.configure(properties);
    agent.start();
} catch (final Exception ex) {
}

4.6、实体方式启动

AgentEntity entity = new AgentEntity("mysql");

AgentSource agentSource = new AgentSource();
List<Source> sources = new ArrayList<>();
// s1
Source source1 = new Source();
source1.setName("r1");
Map<String, String> r1Map = new HashMap<>();
r1Map.put("type", "com.softwarevax.flume.source.MySource");
r1Map.put("pre", " r1-local ");
r1Map.put("sub", " r1-host ");
r1Map.put("delay", "1000");
AgentInterceptor interceptor = new AgentInterceptor();
interceptor.setName("interceptor_1");
Map<String, String> interceptorMap = new HashMap<>();
interceptorMap.put("type", "com.softwarevax.flume.interceptor.TimeStampInterceptor$Builder");
interceptorMap.put("tag", "vax");
interceptor.setConfiguration(interceptorMap);
source1.setConfiguration(r1Map);
List<AgentInterceptor> interceptors = new ArrayList<>();
interceptors.add(interceptor);
source1.setInterceptors(interceptors);
sources.add(source1);

Source source2 = new Source();
source2.setName("r2");
Map<String, String> r2Map = new HashMap<>();
r2Map.put("type", "com.softwarevax.flume.source.MySource2");
r2Map.put("pre", " r2-local ");
r2Map.put("sub", " r2-host ");
r2Map.put("delay", "1000");
source2.setConfiguration(r2Map);
sources.add(source2);

agentSource.setSources(sources);
entity.setSource(agentSource);

AgentChannel channel = new AgentChannel();
Map<String, String> channelMap = new HashMap<>();
channelMap.put("type", "file");
channelMap.put("capacity", "100000");
channel.setConfiguration(channelMap);
entity.setChannel(channel);

AgentProcessor processor = new AgentProcessor();
Map<String, String> processorMap = new HashMap<>();
// 多个sink时,type不能为default,一个sink时,type不能为load_balance
processorMap.put("type", "load_balance");
processorMap.put("selector", "round_robin");
processor.setConfiguration(processorMap);
entity.setProcessor(processor);

AgentSink agentSink = new AgentSink();
List<Sink> sinks = new ArrayList<>();
// sink1
Sink sink1 = new Sink();
Map<String, String> s1Map = new HashMap<>();
sink1.setName("s1");
s1Map.put("type", "com.softwarevax.flume.sink.MySink");
s1Map.put("pre", "s1-");
s1Map.put("sub", "-s1");
sink1.setConfiguration(s1Map);
sinks.add(sink1);
// sink2
Sink sink2 = new Sink();
Map<String, String> s2Map = new HashMap<>();
sink2.setName("s2");
s2Map.put("type", "com.softwarevax.flume.sink.MySink2");
s2Map.put("pre", "s2-");
s2Map.put("sub", "-s2");
sink2.setConfiguration(s2Map);
sinks.add(sink2);

agentSink.setSinks(sinks);
entity.setSink(agentSink);

AgentManager manager = new AgentManager();
Map<String, String> props = manager.configure(entity);
try {
    EmbeddedAgent agent = new EmbeddedAgent("agent");
    agent.configure(props);
    agent.start();
} catch (final Exception ex) {
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105763.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

消息队列mq

1. 为什么使用消息队列&#xff1f; 其实就是问问你消息队列都有哪些使用场景&#xff0c;然后你项目里具体是什么场景&#xff0c;说说你在这个场景里用消息队列是什么&#xff1f; 解耦、异步、削峰 2. 消息队列优缺点 2.1.优点 优点上面已经说了&#xff0c;就是在特殊…

并查集的原理及实现

Ⅰ. 并查集原理 在一些应用问题中&#xff0c;需要将 n 个不同的元素划分成一些不相交的集合。开始时&#xff0c;每个元素自成一个单元素集合&#xff0c;然后按一定的规律将归于同一组元素的集合合并。在此过程中要反复用到查询某一个元素归属于那个集合的运算。适合于描述这…

前端基础_线型Line styles

线型Line styles 线型包括如下属性。 lineWidth value lineCap type lineJoin type miterLimit value 通过这些属性来设置线的样式。下面将结合实例来讲解一下各属性的应用及应用后的效果。 lineWidth属性 该属性设置当前绘线的粗细&#xff0c;属性值必须…

ArcGIS编辑绘制图斑又慢又难?这些高效的处理技巧你值得拥有!

GIS画图是不是画得很慢! 图斑修改是不是无从下手! 图纸矢量化是不是琐碎繁杂、工作量大! 其实,强大的ArcGIS有很多高效的图斑编辑技巧,掌握这些技巧,无论是绘制图斑、还是修改图斑,还是图纸矢量化,绝对让你事半功倍! NO.1—自动完成面 当你要绘制一个图斑的相邻图…

华为云桌面,企业云上办公为何都偏好它?

在众多云上办公产品中&#xff0c;华为云桌面基于华为云的三十年投入的技术强、资源多、创新快和更可靠的优势&#xff0c;在众多云上办公产品中脱颖而出&#xff0c;成为众多企业数字化转型道路上不二选择&#xff0c;类似于三一重工、中泰模具、小飞侠等企业都选择了华为云桌…

非递归前序、中序遍历代码推演出后序遍历代码(极其透彻)

一、前言 众所周知&#xff0c;二叉树的遍历方式有三种&#xff1a;前序遍历、中序遍历和后序遍历。 &#x1f34c; 前序遍历&#xff1a;首先访问根节点&#xff0c;然后递归遍历左子树&#xff0c;最后递归遍历右子树。 &#x1f34c; 中序遍历&#xff1a;首先递归遍历左…

pypower的简单应用1

目录 一、背景描述 二、如何打开IEEE30节点并进行潮流计算 三、如何修改已有模型参数 四、完整代码 五、注意事项 pypower与matpower非常类似&#xff0c;可以利用matpower学习pypower&#xff0c;当然也有一些不同之处。下面记录一下应用pypower解决的问题。 一、背景描述…

Java优先队列的代码实现过程详解

1.优先队列定义 普通的队列是一种先进先出的数据结构&#xff0c;元素在队列尾追加&#xff0c;而从队列头删除。在某些情况下&#xff0c;我们可能需要找出队列中的最大值或者最小值&#xff0c;例如使用一个队列保存计算机的任务&#xff0c;一般情况下计算机的任务都是有优先…

《Python程序开发》期末作业

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、题目&#xff1a; 1 .选修课信息(1).xlsx&#xff0c;信息如下&#xff1a; 2 .学生选课信息表.xls&#xff0c;信息如下 3.任务 二、将文件中的信息导入数据库 …

脱水蔬菜开启蔬菜产业发展新道路 国内市场正不断扩大

根据观研报告网发布的《中国脱水蔬菜市场发展现状研究与投资前景预测报告&#xff08;2022-2029年&#xff09;》显示&#xff0c;脱水蔬菜又称复水菜&#xff0c;是将新鲜蔬菜经过洗涤、烘干等加工制作&#xff0c;脱去蔬菜中大部分水分后而制成的一种干菜&#xff0c;食用时只…

Netty实战与源码剖析(一)——浅谈NIO编程

1 前言 很久之前就想写与Netty相关的博客了&#xff0c;但由于个人时间安排的问题一直拖到了现在&#xff0c;借助这个机会&#xff0c;重新温习Java高级编程的同时&#xff0c;也把Netty实战以及源码剖析分享给各位读者。 2 Netty是什么&#xff1f; Netty is a NIO client …

Spring—Spring IOC

文章目录Spring IOC容器1. 什么是IOC2.IOC的核心原理IOC如何充当对象容器&#xff1f;具体什么作为对象容器&#xff1f;IOC的核心原理图3. IOC容器的底层原理IOC的实现&#xff0c;依赖于以下3门技术上边提到的三种技术如何实现IOC的呢&#xff1f;4.IOC(接口)————————…

官宣!CATCTF不日开赛!!

各位极客请注意&#xff01; 2022.12.31 10:00—2023.01.01 17:00 攻防世界 x Nepnep x CATCTF 即将开赛 请做好参战准备&#xff01; 本场赛事由攻防世界提供技术与平台支撑 部分赛题募集自各位爱猫人士 其他题目则由Nepnep的师傅们承包 赛事运维人员也将由志愿者师傅们…

一起Talk Android吧(第四百四十八回:UI控件之Switch)

文章目录概念介绍使用方法内容总结各位看官们大家好&#xff0c;上一回中咱们说的例子是"UI控件之TimePickerDialog",这一回中说的例是" UI控件之Switch"。闲话休提&#xff0c;言归正转&#xff0c;让我们一起Talk Android吧&#xff01;概念介绍 我们在…

设计模式~简单工厂模式

简单工厂模式是由一个工厂对象决定创建出哪一种产品类的实例。 工厂模式专门负责将大量有共同接口的类实例化。 工厂模式的几种形态&#xff1a; 简单工厂模式(Simple Factory)&#xff1a;又称静态工厂方法模式工厂方法模式&#xff08;Factory Method&#xff09;:又称多态…

云卷云舒:2022 数据库总结从Gartner到IDC

2022年尾已至&#xff0c;行业总结纷纷而至。Gartner 于12月13日发布了其 “2022 云数据库管理系统魔力象限”IDC于12月15日发布了 “2022年上半年中国关系型数据库软件市场跟踪报告”Gartner 的魔力象限&#xff0c;聚焦在 "Cloud Database"&#xff0c;不再进行本地…

《Redis实战篇》三、优惠券秒杀

文章目录3.1 全局唯一ID3.2 Redis实现全局唯一Id3.3 添加优惠卷3.4 实现秒杀下单3.5 库存超卖问题分析3.6 乐观锁解决超卖问题3.7 优惠券秒杀-一人一单3.8 集群环境下的并发问题3.1 全局唯一ID 每个店铺都可以发布优惠券&#xff1a; 当用户抢购时&#xff0c;就会生成订单并保…

nginx+rtmp+OBS搭建音视频直播服务

文章目录OBSNginx-rtmpdocker方式野生方式推流hls单码流rtmp多码流拉流OBS 下载地址&#xff1a; http://www.obsproject.com.cn/download/https://obsproject.com/zh-cn/download 傻瓜式一路按照提示安装即可。 Nginx-rtmp docker方式 有很多个镜像可供选择&#xff0c;我…

3.0、Hibernate-延迟加载 1

3.0、Hibernate-延迟加载 1 Hibernate 延迟加载 也叫 惰性加载、懒加载&#xff1b; 使用延迟加载可以提高程序运行效率&#xff0c;Java 程序 与 数据库交互的频次越低&#xff0c;程序运行的效率就越高&#xff0c;所以我们应该尽量减少 Java 程序 与 数据库的交互次数&#…

“ 总有个人会捡起 七零八落的你 “

把我所有最好的那些东西 给从来不曾抛弃我的人 所幸音频&#xff1a;00:0003:50 | 01 | 我想象过漂洋过海的冒险 向往过孤身一人的江湖 也憧憬过无拘无束的高飞 但是 越长大我越发现 家人在我生命中占据着不可或缺 | 02 | “怎么都不要我” 贺子秋说出来这句话的时候我…