Flume拦截器使用-实现分表、解决零点漂移等

news2025/1/11 15:05:25

img

1.场景分析

使用flume做数据传输时,可能遇到将一个数据流中的多张表分别保存到各自位置的问题,同时由于采集时间和数据实际发生时间存在差异,因此需要根据数据实际发生时间进行分区保存。
鉴于此,需要设计flume拦截器配置conf文件实现上述功能,废话不多说,直接上代码。

2.配置文件

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83_noneautotype</version>
        </dependency>
    </dependencies>

3.主程序

public class test implements Interceptor  {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1、获取header和body的数据
        try {
            byte[] body = event.getBody();
            Map<String, String> headers = event.getHeaders();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            JSONObject jsonObject = JSONObject.parseObject(log);
            //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题和历史数据同步)
            Long ts = DateFormatUtil.toTsAddTimeZone(jsonObject.getString("@timestamp"));
            String topicHeader = headers.get("topic");
//            System.out.println("topicHeader主题名称为:"+topicHeader);
            String httpUserAgent = jsonObject.getString("topicHeader");
            //数据筛选
            if("clb-healthcheck".equals(httpUserAgent) || (StringUtils.isNotEmpty(httpUserAgent) && httpUserAgent.startsWith("kube-probe"))){
//                System.out.println("过滤的事件为:"+event);
                return null;
            }else {
                jsonObject.put("timestamp",ts);
                headers.put("timestamp", ts.toString());
                if("xxx".equals(topicHeader)){
                    headers.put("table","table1");
                }else if("xxxx".equals(topicHeader)){
                    headers.put("table","table2");
                }else{
                    headers.put("table","other");
                }

                event.setBody(jsonObject.toString().getBytes(StandardCharsets.UTF_8));
//                System.out.println("传输的事件为:"+event);
                return event;
            }

        }catch (JSONException e){
//            System.out.println("格式有问题的事件为:"+event);
            return null;
        }
    }


    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new test();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

4.utils-时间处理程序

/**
 * 日期转换工具类
 * 注意:SimpleDateFormat在对日期进行转换的时候,存在线程安全的问题
 * 建议:使用JDK1.8之后提供的日期包下的相关类完成封装
 */
public class DateFormatUtil {
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private static final DateTimeFormatter dtf1 = DateTimeFormatter.ofPattern("yyyyMMdd");
    private static final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyyMMddHH");
    private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final DateTimeFormatter dtfFull1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss SSS");
    private static final DateTimeFormatter dtfFull2 = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");

    public static Long toTs(String dtStr, boolean isFull) {

        LocalDateTime localDateTime = null;
        if (!isFull) {
            dtStr = dtStr + " 00:00:00";
        }
        localDateTime = LocalDateTime.parse(dtStr, dtfFull);

        return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    public static Long toTsAddTimeZone(String dtStr) {

        LocalDateTime localDateTime = null;
        localDateTime = LocalDateTime.parse(dtStr, dtfFull2);

        return localDateTime.toInstant(ZoneOffset.of("+0")).toEpochMilli();
    }
    public static Long toTs1(String dtStr) {

        LocalDateTime localDateTime = null;

        localDateTime = LocalDateTime.parse(dtStr, dtfFull1);

        return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }


    public static Long toTs(String dtStr) {
        return toTs(dtStr, false);
    }

    public static String toDate(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf.format(localDateTime);
    }

    public static String toYmdHms(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtfFull.format(localDateTime);
    }

    public static String toYmd(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf1.format(localDateTime);
    }
    public static String toYmdH(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf2.format(localDateTime);
    }
    public static int toYmdHInt(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return Integer.parseInt(dtf2.format(localDateTime));
    }

    public static void main(String[] args) {


        long t1= 1670833135997L;
        String s1 = toYmdH(t1);
        int i1 = toYmdHInt(t1);
        String s2 = toYmdH(1670833136000L);
//        long s3 = toTsAddTimeZone("2024-01-31 05:43:46");
        long s4 = toTsAddTimeZone("2024-01-31T06:11:54.000Z");

//        System.out.println(s3);
        System.out.println(s4);



    }

}

5.打包放入flume的lib目录

mv test.jar /data/flume-1.9.0/lib/

6.编写配置文件运行

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

#配置source
a1.sources.r1.type= org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.kafka.consumer.group.id= xxx
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = xxx:9092
a1.sources.r1.kafka.topics = xxx,xxxx
a1.sources.r1.kafka.consumer.auto.offset.reset = latest
a1.sources.r1.interceptors = i1
#此处需要写jar包的详细reference
a1.sources.r1.interceptors.i1.type =test$Builder



#memory channel
a1.channels.c1.type = memory
#channel的event个数
a1.channels.c1.capacity = 20000
#事务event个数
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 2147483648

#配置channel
#a1.channels.c1.type = file
#a1.channels.c1.checkpointDir =/data/xxx
#a1.channels.c1.dataDirs = /data/module/xxx
#a1.channels.c1.maxFileSize = 2147483648
#a1.channels.c1.capacity = 2000000
#a1.channels.c1.transactionCapacity=20000
#a1.channels.c1.keep-alive = 6
#a1.chhannels.c1.checkpointInterval=60000
#a1.minimumRequirdSpace=26214400

#配置sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log1
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 360
a1.sinks.k1.hdfs.rollSize = 1174405120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip


#配置sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k2.hdfs.filePrefix = log2
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 360
a1.sinks.k2.hdfs.rollSize = 1174405120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = gzip


#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#k1.batchSize+k2.batchSize < c1.capacity

启动命令

 nohup /data/module/flume-1.9.0/bin/flume-ng agent -Xms1024m -Xmx2048m -n a1 -c /data/module/flume-1.9.0/conf -f /data/module/flume-1.9.0/job/test.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=36001  >/dev/null 2>&1 &

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1443313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java 内存区域介绍

&#xff08;1&#xff09;程序计数器 程序计数器主要有两个作用&#xff1a; 字节码解释器通过改变程序计数器来依次读取指令&#xff0c;从而实现代码的流程控制&#xff0c;如&#xff1a;顺序执行、选择、循环、异常处理。 在多线程的情况下&#xff0c;程序计数器用于记录…

【开源】JAVA+Vue.js实现计算机机房作业管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 登录注册模块2.2 课程管理模块2.3 课时管理模块2.4 学生作业模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 课程表3.2.2 课时表3.2.3 学生作业表 四、系统展示五、核心代码5.1 查询课程数据5.2 新增课时5.3 提交作…

360 安全浏览器 - 页面布局 - 常用网址

360 安全浏览器 - 页面布局 - 常用网址 自定义样式 let myStyle {https://www.baidu.com/: {color: #001483,backgroundColor: #FFF,icon: https://www.baidu.com/favicon.ico},https://blog.csdn.net/jx520: {backgroundColor: #fc5531,icon: https://g.csdnimg.cn/static/l…

离线数仓(一)【数仓概念、需求架构】

前言 今天开始学习数仓的内容&#xff0c;之前花费一年半的时间已经学完了 Hadoop、Hive、Zookeeper、Spark、HBase、Flume、Sqoop、Kafka、Flink 等基础组件。把学过的内容用到实践这是最重要的&#xff0c;相信会有很大的收获。 1、数据仓库概念 1.1、概念 数据仓库&#x…

【Linux】指令提权-sudo

Hello everybody&#xff0c;新年快乐&#xff01;哈哈&#xff01;今天打算给大家讲讲指令提权的相关知识&#xff0c;虽然内容不多&#xff0c;但有时却很有用。在我们学习过权限&#xff0c;vim后就可以学习指令提权啦&#xff0c;没看过的宝子们建议先去看一看我之前的文章…

〖大前端 - ES6篇②〗- let和const

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;哈哥撩编程&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xff0c;目前在公司…

缓存穿透、缓存击穿与缓存雪崩

缓存穿透、缓存击穿与缓存雪崩 1.本质区别 缓存穿透指的是数据库不存在数据&#xff0c;导致无法缓存&#xff0c;每次查询都查数据库&#xff0c;数据库压垮 缓存击穿指的是缓存键值对key过期了&#xff0c;key过期期间&#xff0c;大量请求访问&#xff0c;不经过缓存&…

Python操作MySQL基础

除了使用图形化工具以外&#xff0c;我们也可以使用编程语言来执行SQL从而操作数据库。在Python中&#xff0c;使用第三方库: pymysql来完成对MySQL数据库的操作。 安装第三方库pymysql 使用命令行,进入cmd&#xff0c;输入命令pip install pymysql. 创建到MySQL的数据库连接…

代码随想录算法训练营day15||二叉树part02、102.二叉树的层序遍历、 226.翻转二叉树(优先掌握递归)、101. 对称二叉树 (优先掌握递归)

102.二叉树的层序遍历 题目&#xff1a;给你一个二叉树&#xff0c;请你返回其按 层序遍历 得到的节点值。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 接下来我们再来介绍二叉树的另一种遍历方式&#xff1a;层序遍历。 层序遍历一个二叉树。就是…

VDB-具有动态拓扑的高分辨率稀疏体积表示方法

论文地址&#xff1a;Museth_TOG13.pdf 概述 论文提出了一个称为VDB的新颖数据结构和算法&#xff0c;它可以高效地表示三维网格上的稀疏、随时间变化的数据。 VDB的数据结构基于B树&#xff0c;包含一个动态的根节点&#xff0c;以及多个内部节点和叶节点层次&#xff0c;这…

CVE-2021-44915 漏洞复现

CVE-2021-44915 路由/admin/admin.php是后台&#xff0c;登录账号和密码默认是admin、tao&#xff0c;选择管理栏目菜单。 点击编辑&#xff0c;然后随便改点内容&#xff0c;提交时候抓包。 id是注入点。直接拿sqlmap跑就行了。

滑块验证码识别代码分享

平时我们开发爬虫会遇到各种各样的滑动验证码&#xff0c;如下图所示&#xff1a; 为了解决这个问题&#xff0c;我写了一个通用的滑块验证码识别代码&#xff0c;主要是分析图片&#xff0c;然后计算出滑块滑动的像素距离。但是像素距离大多数情况下都不会等于滑动距离&#x…

2月7日《CS2》终于放大招,玩家激情再次被点燃

2024.2.7号&#xff0c;也就是昨天&#xff0c;V社终于放了大招&#xff0c;对CS2做了高达5个多G的大更新&#xff0c;这次更新内容还是比较多的&#xff0c;说几个比较有意思的点吧。 1、新武器箱&#xff1a;千瓦武器箱&#xff01; 全新的武器箱千瓦箱&#xff0c;能开出全…

SAP-PP-01-005工作中心

前言 工作中心是用于生产产品的生产资源&#xff0c;包括机器、人和设备&#xff0c;是各种生产或能力加工单元的总称。工作中心属于能力的范畴即计划的范畴&#xff0c;而不属于固定资产或者设备管理的范畴。一个工作中心可以是一台设备、一组功能相同的设备、一条自动生产线、…

RocketMQ(二):领域模型(生产者、消费者)

1 生产者&#xff08;Producer&#xff09; 本节介绍Apache RocketMQ 中生产者的定义、模型关系、内部属性、版本兼容和使用建议。 1.1 定义 生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。 生产者通常被集成在业务系统中&#xff0c;将业务消息按照要…

C语言KR圣经笔记 7.1标准输入和输出 7.2格式化输出-printf

第七章 输入和输出 输入和输出功能并不是 C 语言本身的一部分&#xff0c;故到目前为止&#xff0c;本书都没有对其着重说明。然而&#xff0c;程序与其环境之间交互的方式&#xff0c;比书中之前所展示的更为复杂。本章我们会详描述标准库&#xff0c;即一系列为 C 程序提供输…

EMC学习笔记(二十一)降低EMI的PCB设计指南(一)

降低EMI的PCB设计指南&#xff08;一&#xff09; 1.概述2.射频3.连接器与过孔元件4.静态引脚和动态引脚和输入5.基本回路6.差模与共模 tips&#xff1a;资料主要来自网络&#xff0c;仅供学习使用。 1.概述 印刷电路板(PCB)的一般布局准则&#xff0c;基本上都有相对的文件进…

应用层 HTTP协议(1)

回顾 前面我们说到了数据链路层,网络层IP协议,传输层的TCP/UDP协议一些知识点,现在让我们谈谈 应用层的HTTP协议的知识点. 这篇我们先从大局入手,仍然是对总体报文进行全局分析,再对细节报文进行拆解分析 版本 首先我们谈谈HTTP协议的版本 HTTP 0.9 (1991) HTTP 1.0 (1992 - 1…

阿里云游戏服务器多少钱一年?

阿里云游戏服务器租用价格表&#xff1a;4核16G服务器26元1个月、146元半年&#xff0c;游戏专业服务器8核32G配置90元一个月、271元3个月&#xff0c;阿里云服务器网aliyunfuwuqi.com分享阿里云游戏专用服务器详细配置和精准报价&#xff1a; 阿里云游戏服务器租用价格表 阿…

C++ 水仙花数

案例描述&#xff1a;水仙花数是指一个3位数&#xff0c;它的每个位上的数字的3次幂之和等于它本身例如&#xff1a; 1A35A33A3153 请利用do.…while语句&#xff0c;求出所有3位数中的水仙花数 分析思路&#xff1a; 1、将所有的三位数进行输出&#xff08;100~999&#x…