【Flink metric(3)】chunjun是如何实现脏数据管理的

news2024/11/17 17:39:27

文章目录

  • 一. 基础逻辑
  • 二. DirtyManager
    • 1. 初始化
    • 2. 收集脏数据并check
    • 3. 关闭资源
  • 三. DirtyDataCollector
    • 1. 初始化
    • 2. 收集脏数据并check
    • 3. run:消费脏数据
    • 4. 释放资源
  • 四. LogDirtyDataCollector

一. 基础逻辑

脏数据管理模块的基本逻辑是:

  1. 当数据消费失败时,将脏数据拦截并保存到dirtyDataCollector中;
  2. 全局metric判断:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中、或mysql的配置中。

对于代码实现:

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期,DirtyDataCollector主要用于收集脏数据并输出(到日志中,mysql中),脏数据数量达到设定值之后,flink停止运行。

 
具体的DataCollector实现有:

在这里插入图片描述

分别用于输出到taskmanager的日志、(最后报错时)jobmanager日志、输出到mysql表中。

所以这里有三层代码结构:

  • DirtyManager:管理DirtyDataCollector
  • DirtyDataCollector:主要用于收集脏数据并输出,并判断脏数据是否达到临界值
  • 具体的DataCollector的实现:具体的输出实现:输出到日志,输出到mysql。

接下来我们逐个看每层的具体实现逻辑
 

二. DirtyManager

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期(open、run、close),主要流程如下:

  1. 设置系统配置给DirtyDataCollector
  2. 开启DirtyManager线程,主要用于DirtyDataCollector消费脏数据(收集脏数据)
  3. 关闭资源:DirtyDataCollector、DirtyManager的线程资源。

1. 初始化

初始化DirtyManager

  • 根据配置加载特定的DirtyDataCollector:用于脏数据的收集
  • 获取系统信息:jobId、jobName、operationName
  • 获取脏数据metric,用于定期合并脏数据为全局脏数据。
public DirtyManager(DirtyConfig dirtyConfig, RuntimeContext runtimeContext) {  
    //通过反射注册DirtyDataCollector
    this.consumer = DataSyncFactoryUtil.discoverDirty(dirtyConfig);  
    Map<String, String> allVariables = runtimeContext.getMetricGroup().getAllVariables();  
    this.jobId = allVariables.get(JOB_ID);  
    this.jobName = allVariables.getOrDefault(JOB_NAME, "defaultJobName");  
    this.operationName = allVariables.getOrDefault(OPERATOR_NAME, "defaultOperatorName");  
    this.errorCounter = runtimeContext.getLongCounter(Metrics.NUM_ERRORS);  
}

 

2. 收集脏数据并check

被具体的连接器调用:
具体当连接器生产数据或写数据到数据源报错时,调用此方法收集脏数据

  1. 创建线程,用于异步执行DirtyDataCollector,开始消费脏数据到日志或mysql表中
  2. 添加脏数据条数,同步到全局脏数据metric中
  3. 脏数据信息,存到队列中,等待具体的脏数据收集器消费
  4. 子流程:判断脏数据条数是否大于总脏数据条数
public void collect(Object data, Throwable cause, String field, long globalErrors) {  
    if (executor == null) {  
        execute();  
    }  
  
    DirtyDataEntry entity = new DirtyDataEntry();  
    entity.setJobId(jobId);  
    entity.setJobName(jobName);  
    entity.setOperatorName(operationName);  
    entity.setCreateTime(new Timestamp(System.currentTimeMillis()));  
    entity.setDirtyContent(toString(data));  
    entity.setFieldName(field);  
    entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause));  
  
    //积累metric:errorCounter,这里直接同步到jobmanager?
    errorCounter.add(1L);  
    //将脏数据添加到队列,等待消费。
    consumer.offer(entity, globalErrors);  
}

/**  
 * 创建线程,用于异步执行DirtyDataCollector  
 */
 public void execute() {  
    if (executor == null) {  
        executor =  
                new ThreadPoolExecutor(  
                        MAX_THREAD_POOL_SIZE,  
                        MAX_THREAD_POOL_SIZE,  
                        0,  
                        TimeUnit.MILLISECONDS,  
                        new LinkedBlockingQueue<>(),  
                        new ChunJunThreadFactory(  
                                "dirty-consumer",  
                                true,  
                                (t, e) -> {  
                                    log.error(  
                                            String.format(  
                                                    "Thread [%s] consume failed.", t.getName()),  
                                            e);  
                                }),  
                        new ThreadPoolExecutor.CallerRunsPolicy());  
    }  
  
    //初始化DirtyDataCollector:比如脏数据定时发送到mysql时的线程注册  
    consumer.open();  
    //拿出一个线程执行DirtyDataCollector的execute方法  
    executor.execute(consumer);  
}

 

3. 关闭资源

/** Close manager. */  
public void close() {  
    if (!isAlive.get()) {  
        return;  
    }  
    //先关闭datacollector的资源
    if (consumer != null) {  
        consumer.close();  
    }  
    //再关闭executor线程
    if (executor != null) {  
        executor.shutdown();  
    }  
  
    isAlive.compareAndSet(true, false);  
}

 

三. DirtyDataCollector

处于第二层的dirtyDataCollector实现了脏数据的临时保存并等待具体DataCollector的消费,
它的基本逻辑是:

  1. 当脏数据消费失败时,将脏数据拦截并保存到consumeQueue中,等待被消费
  2. 全局的metric:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中。

 

1. 初始化

在DirtyManager实例化时,注册DirtyDataCollector时的操作,

这里获取脏数据最大值,允许消费脏数据失败的条数,以及对具体DataCollector的初始化,我们下节分析。


public void initializeConsumer(DirtyConfig conf) {  
    this.maxConsumed = conf.getMaxConsumed();  
    this.maxFailedConsumed = conf.getMaxFailedConsumed();  
  
    this.init(conf);  
}

被DirtyManager调用:在开启脏数据收集器线程之前执行

初始化具体脏数据收集器:目前之后mysql脏数据收集器实现了此方法:消费线程、mysql连接

public void open() {  
}

 

2. 收集脏数据并check

offer方法被DirtyManager的collect方法调用

  • 用于存储具体脏数据并更新单个slot的脏数据条数。
  • 每添加一条脏数据,就判断脏数据是否达到了设定值,如果是则抛出异常。

其中:globalErrors是上文AccumulatorCollector定期更新的结果。


//存储脏数据具体内容,并更新单个slot的脏数据条数
public synchronized void offer(DirtyDataEntry dirty, long globalErrors) {  
    consumeQueue.offer(dirty);  
    addConsumed(1L, dirty, globalErrors);  
}


/**  
 * 添加脏数据  
 * 通过metric判断此时的脏数据条数,是否已经超过全局设置的脏数据条数  
 * @param count  
 * @param dirty  
 * @param globalErrors  
 */  
protected void addConsumed(long count, DirtyDataEntry dirty, long globalErrors) {  
    consumedCounter.add(count);  
    // 因为总体的脏数据需要tm和jm进行通讯(每tm心跳+1s),会有延迟,且当单slot运行时误差将达到最大  
    // 所以这里需要判断延迟情况  
    long max =  
            consumedCounter.getLocalValue() >= globalErrors  
                    ? consumedCounter.getLocalValue()  
                    : globalErrors;  
    // 但这里仍然有误差:此时如果所有的slot都消费了脏数据那么其他slot的脏数据就记录不到。也就是会多消费脏数据  
    // 所以这里要有取舍:是否要消费完全准确的脏数据  
    if (max >= maxConsumed) {  
        StringJoiner dirtyMessage =  
                new StringJoiner("\n")  
                        .add("\n****************Dirty Data Begin****************\n")  
                        .add(dirty.toString())  
                        .add("\n****************Dirty Data End******************\n");  
        throw new NoRestartException(  
                String.format(  
                        "The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s]",  
                        maxConsumed)  
                        + dirtyMessage);  
    }  
}

 

3. run:消费脏数据

由DirtyManager开启脏数据消费线程,

具体的DataCollector(log、mysql)消费脏数据,发送到Taskmanager日志或mysql表中。

/**  
 * 开启脏数据消费线程  
 * 定时消费脏数据,发送到执行脏数据管理器中:log、mysql等  
 */  
@Override  
public void run() {  
    while (isRunning.get()) {  
        try {  
            //指定的DataCollector消费脏数据
            DirtyDataEntry dirty = consumeQueue.take();  
            consume(dirty);  
        } catch (Exception e) {  
            //未成功将脏数据收集到脏数据管理模块中  
            addFailedConsumed(e, 1L);  
        }  
    }  
}

/**  
 *  消费脏数据用于输出到日志、mysql等  
 */
 protected abstract void consume(DirtyDataEntry dirty) throws Exception;

 

4. 释放资源

不同的DataCollector有不同的操作,下节分析

public abstract void close();

 

四. LogDirtyDataCollector

实现比较简单:拿到的数据直接打印到Taskmanager中,关闭时,设定isRunning为false

/**  
 * 没有线程,调用即输出到日志中  
 */  
@Slf4j  
public class LogDirtyDataCollector extends DirtyDataCollector {  
  
    private static final long serialVersionUID = 7366317208451727471L;  
    private Long printRate;  
  
    @Override  
    protected void init(DirtyConfig conf) {  
        this.printRate = conf.getPrintRate();  
    }  
  
    /**  
     * 输出脏数据到taskmanager  
     * @param dirty dirty-data which should be consumed.  
     */    @Override  
    protected void consume(DirtyDataEntry dirty) {  
        if (consumedCounter.getLocalValue() % printRate == 0) {  
            StringJoiner dirtyMessage =  
                    new StringJoiner("\n")  
                            .add("\n====================Dirty Data=====================")  
                            .add(dirty.toString())  
                            .add("\n===================================================");  
            log.warn(dirtyMessage.toString());  
        }  
    }  
  
    @Override  
    public void close() {  
        isRunning.compareAndSet(true, false);  
        log.info("Print consumer closed.");  
    }  
}

 

下篇分析MysqlDirtyDataCollector是如何消费数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1858921.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

适用于轨道交通专用的板卡式网管型工业以太网交换机

是网管型 CompactPCI板卡式冗余环网交换机。前面板带有6个 10/100/1000Base-T(X)M12接口。后面的CPCI接口有 8个10/100/1000Base-T (X) 以太网接口。 是特别为轨道交通行业EN50155标准要求而设计的坚固型交换机。它同时具有以下特性&#xff1a; ● 支持2线以太网距离扩展端口&…

Crypto++ 入门

一、简介 Crypto&#xff08;也称为CryptoPP、libcrypto或cryptlib&#xff09;是一个免费的开源C库&#xff0c;提供了多种加密方案。它由Wei Dai开发和维护&#xff0c;广泛应用于需要强大加密安全的各种应用程序中。该库提供了广泛的加密算法和协议的实现&#xff0c;包括&…

【通用技巧】自动获取日志存放路径,无需手动修改配置文件

我们在部署环境的时候&#xff0c;常常会手动修改一些配置文件的存放地址&#xff0c;比如日志的路径、截图的路径&#xff0c;这是因为我们的环境不一样&#xff0c;部署应用的位置也不一样导致的。如果位置写死了&#xff0c;那么就会造成通用性很差&#xff0c;所以我们经常…

明明设置允许跨域,为什么还会出现跨域请求的问题

一、问题 在微服务项目中&#xff0c;明明已经设置允许跨域访问&#xff1a; 为什么还会出现跨域请求问题&#xff1f; 二、为什么 仔细查看错误提示信息&#xff1a;When allowCredentials is true, allowedOrigins cannot contain the special value "*" since t…

NestJs连接数据库

文章目录 一、下载 MySql 数据库二、下载VsCode插件查询、插入数据 一、下载 MySql 数据库 NestJS连接数据库我选择的是MySql&#xff0c;首先先安装nestjs/typeorm 、typeorm、 mysql2 执行命令&#xff1a; pnpm install nestjs/typeorm typeorm mysql2 -S 连接数据库需要你…

C语言小例程28/100

题目&#xff1a;利用递归方法求5!。 程序分析&#xff1a;递归公式&#xff1a;fnfn_1*4! #include <stdio.h>int main() {int i;int fact(int);for(i0;i<6;i){printf("%d!%d\n",i,fact(i));} } int fact(int j) {int sum;if(j0){sum1;} else {sumj*fac…

震惊!这样制作宣传册,效果竟然如此惊人!

在当今社会&#xff0c;宣传册作为一种重要的宣传手段&#xff0c;其制作质量直接影响到宣传效果。而令人震惊的是&#xff0c;现在有些制作宣传册的方法&#xff0c;其效果竟然如此惊人&#xff01;今天&#xff0c;教大家如何制作宣传册吧&#xff01; 首先&#xff0c;我们要…

南京邮电大学计算机网络实验二(网络路由器配置RIP协议)

文章目录 一、 实验目的和要求二、 实验环境(实验设备)三、 实验步骤四、实验小结&#xff08;包括问题和解决方法、心得体会、意见与建议等&#xff09;五、报告资源 一、 实验目的和要求 掌握思科路由器的运行过程&#xff0c;掌握思科路由器的硬件连线与接口&#xff0c;掌…

【研究】国内外大模型公司进展

2022年11月&#xff0c;OpenAI推出基于GPT-3.5的ChatGPT后&#xff0c;引发全球AI大模型技术开发与投资热潮。AI大模型性能持续快速提升。以衡量LLM的常用评测标准MMLU为例&#xff0c;2021年底全球最先进大模型的MMLU 5-shot得分刚达到60%&#xff0c;2022年底超过70%&#xf…

稀奇古怪的解压视频都哪里找的?6个古怪稀奇解压素材网站分享

在这个信息泛滥的时代&#xff0c;解压视频已经成为我们日常生活中的调味剂。特别是那些奇特而有趣的视频&#xff0c;它们能够立刻抓住我们的眼球&#xff0c;带来独一无二的视觉享受和心理上的放松。但你可能会好奇&#xff0c;这些引人注目的解压视频都可以在哪里找到呢&…

华为eNSP模拟器下载地址

一、依赖程序 VirtualBox&#xff1a;https://cloud.rsecc.cn/softlink/VirtualBox-5.2.26-128414-Win.exe WinPcap&#xff1a;https://cloud.rsecc.cn/softlink/WinPcap_4_1_3.exe Wireshark&#xff1a;https://cloud.rsecc.cn/softlink/Wireshark-win64-3.0.6.exe 需要…

代码随想录-Day38

509. 斐波那契数 斐波那契数 &#xff08;通常用 F(n) 表示&#xff09;形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始&#xff0c;后面的每一项数字都是前面两项数字的和。也就是&#xff1a; F(0) 0&#xff0c;F(1) 1 F(n) F(n - 1) F(n - 2)&#xff0c;其中 …

【Android】我的手机在...自己下载...那个(浅析Intent基础运用)

【Android】我的手机在…自己下载…那个&#xff08;浅析Intent基础运用&#xff09; 在Android开发中&#xff0c;Intent&#xff08;意图&#xff09;是一个非常重要的概念。它不仅仅是用于在应用程序的各个组件之间进行通信的工具&#xff0c;也是启动新的Activity、Servic…

FydeOS导入VMware虚拟机之后,如何扩展系统硬盘大小?

前言​ 最近查询FydeOS系统的小伙伴不在少数啊&#xff01;可见这个系统是相当nice的&#xff0c;小伙伴们都是尝试尝试。 看到有不少小伙伴通过VMware虚拟机使用FydeOS&#xff0c;那么你就肯定知道官方包导入VMware之后&#xff0c;硬盘只显示分区了20GB。 如果这时候使用Fy…

【Java核心技术13】Java中的构造器与析构器:深入解析与代码示例

引言 所有文章均为原创验证&#xff0c;您随手的 关注、点赞、收藏 是我创作最大的动力。 示例代码地址&#xff1a;https://gitee.com/code-in-java/csdn-blog.git 在面向对象编程语言中&#xff0c;构造器和析构器是类生命周期管理的关键部分。构造器负责初始化新创建的对象&…

Java NIO(一) 概述

NIO主要用于以少量线程来管理多个网络连接&#xff0c;处理其上的读写等事件。在大量连接情况下&#xff0c;不管是效率还是空间占用都要优于传统的BIO。 Java NIO 由以下几个核心部分组成&#xff1a; Channel Buffer Selector Selector 如果你的应用打开了多个连接&#x…

Hack The Box-Axlle【更新中】

总体思路 XLL-EXEC->hta反弹shell->重置用户密码->重写二进制文件 信息收集&端口利用 nmap -sSVC axlle.htb开放了一大堆端口&#xff0c;这里先挑重点的80端口和445端口查看 80端口主页只有一个邮箱账号&#xff0c;对其目录扫描和子域名扫描 dirsearch -u h…

Python统计实战:3D散点图绘制

为了解决特定问题而进行的学习是提高效率的最佳途径。这种方法能够使我们专注于最相关的知识和技能&#xff0c;从而更快地掌握解决问题所需的能力。 &#xff08;以下练习题来源于《统计学—基于Python》。联系获取完整数据和Python源代码文件。&#xff09; 练习题 用以下数…

【力扣】从前序与中序遍历序列构造二叉树

&#x1f525;博客主页&#xff1a; 我要成为C领域大神 &#x1f3a5;系列专栏&#xff1a;【C核心编程】 【计算机网络】 【Linux编程】 【操作系统】 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 本博客致力于分享知识&#xff0c;欢迎大家共同学习和交流。 给定两个整数数…

测评:【ONLYOFFICE】版本更迭与AI加持下的最新ONLYOFFICE桌面编辑器8.1

你是否还在为没有一款合适的在线桌面编辑器而苦恼&#xff1f;你是否还在因为办公软件的选择过少而只能使用WPS或者office&#xff1f;随着办公需求的不断变化和发展&#xff0c;办公软件也在不断更新和改进。ONLYOFFICE 作为一款全功能办公软件&#xff0c;一直致力于为用户提…