Flume 之自定义Sink

news2025/2/22 20:14:38
1、简介

        前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。

2、自定义Source

        自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120

3、自定义Sink

        本文将Sink定义为mysql。

3.1、引入依赖
<dependency>
	<groupId>org.apache.flume</groupId>
	<artifactId>flume-ng-core</artifactId>
	<version>1.11.0</version>
</dependency>

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    private final static Logger log = LoggerFactory.getLogger(MySink.class);
    private String url;
    private String username;
    private String password;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel channel = getChannel();
        // channel 支持事务
        Transaction thx = channel.getTransaction();
        thx.begin();
        try {
            Event event = channel.take();
            String name = new String(event.getBody());
            int i = MysqlConfig.insertData(this.url, this.username, this.password, name);
            if (i > 0){
                log.info("==插入数据库成功==");
            }
            thx.commit();
            status = Status.READY;
        } catch (Exception ex){
            ex.printStackTrace();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        String url = context.getString("url");
        String username = context.getString("username");
        String password = context.getString("password");
        this.url = url;
        this.username = username;
        this.password = password;
    }
}
 3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlConfig {

    private MysqlConfig(){
    }

    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }

    public static Connection getConnection(String url, String username, String password) throws SQLException {
        Connection connection = DriverManager.getConnection(url, username, password);
        return connection;
    }

    public static int insertData(String url, String username,String password, String name){
        Connection connection = null;
        try{
            connection = getConnection(url, username, password);
            PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");
            boolean res = preparedStatement.execute();
            if (res){
                return 1;
            }
            return 0;
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            if (connection != null){
                try {
                    connection.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
        }
        return 0;
    }
}
 3.3、Flume 配置文件

        vim flume-self-source-sink.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource 
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello 
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录 
 4.1、将自定义jar包放入lib目录

 

4.2、将数据库驱动jar包放入lib目录

        驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。 

5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console

注:启动Flume 之前,自定义 web 服务也要启动。

 6、结果

成功保存进数据库。

7、总结 

         本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1391883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计一个抽奖系统

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring原理、JUC原理、Kafka原理、分布式技术原理、数据库技术&#x1f525;如果感觉博主的文章还不错的…

排序算法9----计数排序(C)

计数排序是一种非比较排序&#xff0c;不比较大小 。 1、思想 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用。 2、步骤 1、统计数据&#xff1a;统计每个数据出现了多少次。&#xff08;建立一个count数组&#xff0c;范围从[MIN,MAX],MAX代表arr中…

韩愈和白居易,相交不相融的两位大伽

韩愈和白居易&#xff0c;一个百代文宗一个引领诗风&#xff0c;却关系平淡原因何在&#xff1f; 一位就是韩愈&#xff08;768年&#xff0d;824年&#xff09;&#xff0c;一位是白居易&#xff08;772年&#xff0d;846年&#xff09;&#xff0c;都是唐代各有建树的文学领…

2024年华数杯国际赛赛题浅析

21号完赛&#xff0c;28号出成绩的华数杯国际赛&#xff0c;作为美赛最合适的练手赛正式开赛。为了让大家更好地比赛&#xff0c;首先为大家带来本次竞赛两道题目的浅要解析。主要分析两道题目适合的群体&#xff0c;未来大家求解过程中可能遇到的问题。方便大家快速完成选题。…

【Proteus仿真】【Arduino单片机】汽车车窗除霜系统设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用LCD1602显示模块、光线传感器、DS18B20温度传感器、PCF8691 ADC模块、继电器加热模块等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD…

微软Power Platform使用Power Automate Desktop flow桌面流爬取京东商品信息

微软Power Platform使用Power Automate Desktop flow桌面流爬取京东商品信息 目录 微软Power Platform使用Power Automate Desktop flow桌面流爬取京东商品信息1、创建一个桌面流应用Desktop flow2、启动新的浏览器&#xff0c;跳转到我们需要的URL中3、在文本框中填充文字并点…

Android Studio 如何设置中文

Android Studio 是一个为 Adndroid 平台开发程序的集成开发环境&#xff08;IDE&#xff09;。 如何安装中文插件 在 Jetbrains 家族的插件市场上&#xff0c;是能够搜到语言包插件的&#xff0c;正常情况下安装之后只需要重启即可享受中文界面&#xff0c;可AndroidStudio 中…

基于vue+Spring Boot家政服务人员预约系统iph9d

通过对家政服务管理内容的学习研究&#xff0c;进而设计并实现一个家政服务系统。系统能实现的主要功能应包括即时通讯、通讯回复、预约订单、接单信息、服务费用管、服务评价的一些操作。还有可以正确的为用户服务&#xff0c;准确显示当前信息[5]。 开发软件有很多种可以用&…

【链路层】点对点协议 PPP

目录 1、PPP协议的特点 2、PPP协议的组成和帧格式 3、PPP协议的工作状态 目前使用得最广泛的数据链路层协议是点对点协议PPP(Point-to-Point Protocol)。 1、PPP协议的特点 我们知道&#xff0c;互联网用户通常都要连接到某个 ISP 才能接入到互联网。PPP 协议就是用户计算机…

本地一键部署grafana+prometheus

本地k8s集群内一键部署grafanaprometheus 说明&#xff1a; 此一键部署grafanaPrometheus已包含&#xff1a; victoria-metrics 存储prometheus-servergrafanaprometheus-kube-state-metricsprometheus-node-exporterblackbox-exporter grafana内已导入基础的dashboard【7个…

2024年华数杯国际赛A题赛题

问题A&#xff1a;来自日本的放射性废水 背景 2011年3月&#xff0c;日本东海岸发生的地震引发了福岛第一核电站的事故。一场大规模海啸摧毁了该核电站的冷却系统&#xff0c;导致三个核反应堆熔毁&#xff0c;核燃料碎片熔化。为了冷却熔化的核燃料&#xff0c;海水不断地注入…

数据仓库面试题

1 思维导图&数仓常见面试题 2 题目 1. 数据仓库是什么&#xff1f; 数据仓库是一个面向主题的&#xff08;订单、支付、退单等&#xff09;、集成的&#xff08;整合多个信息源的大量数据&#xff09;、非易失的&#xff08;一般不会进行删除和修改操作&#xff09;且随时…

使用Java Stream API获取子部门的所有父级部门

使用Java Stream API获取子部门的所有父级部门 在数据库表中&#xff0c;经常会有包含层次结构的部门信息表。本博客将使用Java语言和Stream API演示如何获取给定子部门的所有父级部门&#xff0c;并在代码中加入详细注释。 1. 创建 Department 类 首先&#xff0c;我们需要定…

经典目标检测YOLO系列(二)YOLOV2的复现(1)总体网络架构及前向推理过程

经典目标检测YOLO系列(二)YOLOV2的复现(1)总体网络架构及前向推理过程 和之前实现的YOLOv1一样&#xff0c;根据《YOLO目标检测》(ISBN:9787115627094)一书&#xff0c;在不脱离YOLOv2的大部分核心理念的前提下&#xff0c;重构一款较新的YOLOv2检测器&#xff0c;来对YOLOV2有…

ADA-YOLO:YOLOv8+注意力+Adaptive Head,mAP提升3%

生物医学图像分析中的目标检测和定位至关重要&#xff0c;尤其是在血液学领域&#xff0c;检测和识别血细胞对于诊断和治疗决策至关重要。虽然基于注意力的方法在各个领域中目标检测方面取得了显著的进展&#xff0c;但由于医学影像数据集的独特挑战&#xff0c;其在医学目标检…

cookie和session的工作过程和作用:弥补http无状态的不足

cookie是客户端浏览器保存服务端数据的一种机制。当通过浏览器去访问服务端时&#xff0c;服务端可以把状态数据以key-value的形式写入到cookie中&#xff0c;存储到浏览器。浏览器下次去服务服务端时&#xff0c;就可以把这些状态数据携带给服务器端&#xff0c;服务器端可以根…

C++I/O流——(3)文件输入/输出(第一节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

HBase学习二:RegionServer详解

1、内部结构 RegionServer是HBase系统中最核心的组件&#xff0c;主要负责用户数据写入、读取等基础操作。RegionServer组件实际上是一个综合体系&#xff0c;包含多个各司其职的核心模块&#xff1a;HLog、MemStore、HFile以及BlockCache。 2、HLog 3、MemStore 4、HFile…

08- OpenCV:形态学操作(膨胀与腐蚀 、提取水平与垂直线)

目录 前言 一、膨胀&#xff08;Dilation&#xff09;与 腐蚀&#xff08;Erosion&#xff09; 二、形态学操作 1、开操作&#xff08;Opening&#xff09; 2、闭操作&#xff08;Closing&#xff09; 3、形态学梯度&#xff08;Morphological Gradient&#xff09; 4、…

【数学建模】美赛备战笔记 01 美赛指南与竞赛全流程

美赛指南 整篇论文需要在25页内。 六道赛题特点&#xff1a; A、B题涉及到微分方程和物理概念较多&#xff0c;需要一定的专业知识&#xff1b; C题常常涉及到时间序列、机器学习&#xff1b; D题一般是运筹学/网络科学&#xff0c;图论、优化问题&#xff0c;涉及到的概念多…