[自研开源] 数据集成之分批传输 v0.7

news2025/1/11 19:51:51

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

介绍

本篇基于 数据集成之任务流程 介绍任务分批传输的使用场景和配置操作。

使用场景

mydata使用API方式集成数据,当一次请求或响应 传输数据量较多时 可能无法完成、或容易对服务端造成影响,因此需要分为多次处理;

例如 常见的分页查询、导入大量数据时分批处理、集成对接时的全量同步等;

分批传输数据

业务系统与mydata集成时,在提供数据消费数据这两个方向上分别实现分批传输;

提供数据

由mydata调用应用的API获取数据,通过配置分批参数 实现一次任务内多次调用API获取完整数据,有以下两种基本的配置模式:

  • 配置了 固定参数size=10、递增参数current从1开始每次递增1、每次间隔1秒的任务;

在这里插入图片描述

  • 配置了 递增参数start从1开始每次递增100、递增参数end从100开始每次递增100、每次间隔1秒的任务;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;

  • lastProduceData记录上一次数据,用于和本次对比数据,若重复 则结束,避免死循环(理论上很少有2次完全一样的数据);

  • 若分批有异常,则复用任务3次出错 自动结束并发送邮件通知的功能;

  • 执行完一次后,自动计算递增参数值;

// 提供数据
case MdConstant.DATA_PRODUCER:
    // 分批模式 记录上一次数据,用于对比两次数据,若重复 则结束,避免死循环
    List<Map> lastProduceData = null;
    do {
        // 若启用分批,则将分批参数加入请求参数中
        if (taskInfo.isBatch()) {
            Map<String, Object> batchParam = jobBatchService.parseToMap(taskInfo);
            Map<String, Object> reqParams = MapUtil.union(taskInfo.getReqParams(), batchParam);
            taskInfo.setReqParams(reqParams);
        }

        // 调用api 获取json
        String json = ApiUtil.read(taskInfo);

        // 将json按字段映射 解析为业务数据
        jobDataService.parseData(taskInfo, json);
        // 若没有返回数据,则结束处理
        if (CollUtil.isEmpty(taskInfo.getProduceDataList())) {
            break;
        }
        // 对比上一次数据
        if (lastProduceData != null) {
            if (CollUtil.isEqualList(lastProduceData, taskInfo.getProduceDataList())) {
                // 异常任务失败,邮件通知用户检查任务
                throw new RuntimeException("分批获取数据异常,最后两次获取的数据相同!");
            }
        }
        lastProduceData = taskInfo.getProduceDataList();

        // 根据条件过滤数据
        jobDataFilterService.doFilter(taskInfo);

        // 保存业务数据
        jobDataService.saveTaskData(taskInfo);

        // 更新环境变量
        jobVarService.saveVarValue(taskInfo, json);

        // 递增分批参数
        jobBatchService.incBatchParam(taskInfo);

        // 若启用分批,则等待间隔
        if (taskInfo.isBatch()) {
            ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);
        }
    } while (taskInfo.isBatch());

    break;

消费数据

由mydata通过API向应用发送数据,通过配置分批参数 限制每次向API发送的数据量,从而减少数据查询量和请求处理时间;

如下图,配置了分批数量为1000的任务,分批参数为选填,mydata将按1000为限制查询符合条件的数据,通过API请求发送给应用;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;
  • 自动管理分页参数,执行分页查询数据,发送给API;
  • 直到分页查询没有数据 自动结束;
// 消费数据
case MdConstant.DATA_CONSUMER:
    String dataCode = taskInfo.getDataCode();
    if (StrUtil.isEmpty(dataCode)) {
        break;
    }
    List<BizDataFilter> filters = taskInfo.getDataFilters();
    if (CollUtil.isNotEmpty(filters)) {
        // 解析过滤条件值中的 自定义字符串
        parseFilterValue(filters);
        // 排除值为null的条件
        filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
    }
    int round = 0;
    Long skip = null;
    Integer limit = taskInfo.isBatch() ? taskInfo.getBatchSize() : null;
    do {
        if (taskInfo.isBatch()) {
            skip = (long) round * taskInfo.getBatchSize();
        }
        // 根据过滤条件 查询数据
        List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters, skip, limit);
        if (CollUtil.isEmpty(dataList)) {
            break;
        }
        taskInfo.setConsumeDataList(dataList);
        // 根据字段映射转换为api参数
        jobDataService.convertData(taskInfo);
        // 调用api传输数据
        ApiUtil.write(taskInfo);

        round++;
        // 若启用分批,则等待间隔
        if (taskInfo.isBatch()) {
            ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);
        }
    }
    while (taskInfo.isBatch());
    break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1546222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32F4x7标准库移植LWIP

项目背景 使用GD芯片的我们&#xff0c;都会去参考ST的代码。可是呢&#xff0c;有一个很大的问题就是&#xff0c;ST早就提供HAL库了&#xff0c;而目前GD还只有标准库。在移植LWIP的时候&#xff0c;会有很多不便。 好在天无绝人之路&#xff0c;找到了一份ST的官方例程&am…

java常用IO流功能——字符流和缓冲流概述

前言&#xff1a; 整理下学习笔记&#xff0c;打好基础&#xff0c;daydayup! 之前说了下了IO流的概念&#xff0c;并整理了字节流&#xff0c;有需要的可以看这篇 java常用应用程序编程接口&#xff08;API&#xff09;——IO流概述及字节流的使用 字符流 FileReader(文件字…

基于注意力机制和损坏特征掩蔽的遮挡人脸识别

Occluded Face Recognition Based on Attention Mechanism and Damaged Feature Masking 摘要 本文提出了一种基于注意力机制&#xff08;BAM&#xff09;和掩模生成器的新型遮挡人脸识别方法。在主干网络中嵌入BAM以提取更多可区分的特征&#xff0c;同时设计掩模生成器来清理…

媒介盒子揭秘0基础写好软文的秘诀

在流量见顶的传播环境下&#xff0c;品牌获取用户注意力的主要方式就在软文&#xff0c;然而有许多品牌在写软文时往往摸不着头脑&#xff0c;不知道怎么写&#xff0c;如何写才能使品牌有效曝光&#xff0c;今天媒介盒子就来和大家聊聊&#xff1a;0基础也能写好软文的秘诀&am…

SAP BAS中Fiori开发的高阶功能(storyboard, navigation, guided development, variant)

1. 前言 在之前的几篇文章中&#xff0c;我介绍了SAP BAS的一些基本功能&#xff0c;包括账户申请&#xff0c;创建工作区&#xff0c;git的使用以及如何step-by-step去创建出你的第一个Fiori项目等等。在本篇中&#xff0c;我将进一步介绍一些在开发Fiori应用程序时会用到的高…

【数仓】DataX软件安装及配置,从mysql同步到hdfs

相关文章 【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用&#xff08;集群配置&#xff09;【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安…

C语言例4-7:格式字符f的使用例子

%f&#xff0c;实型&#xff0c;小数部分为6位 代码如下&#xff1a; //格式字符f的使用例子 #include<stdio.h> int main(void) {float f 123.456;double d1, d2;d11111111111111.111111111;d22222222222222.222222222;printf("%f,%12f,%12.2f,%-12.2f,%.2f\n&qu…

关于在forEach循环中使用异步,造成forEach里面的函数还未执行完毕,外层的同步已经被执行的问题

使用 原生的 for循环替代forEach循环即可解决问题 1.实例代码&#xff1a; select_Father_comment_sql_res.forEach( (item) > {const Select_FId_children_sql util.format("Select *, \IFNULL(User.UserName,) as CommentUserName, \IFNULL(User.UserName,) as AtU…

2015年认证杯SPSSPRO杯数学建模B题(第一阶段)替换式密码全过程文档及程序

2015年认证杯SPSSPRO杯数学建模 B题 替换式密码 原题再现&#xff1a; 历史上有许多密码的编制方法。较为简单的是替换式密码&#xff0c;也就是将文中出现的字符一对一地替换成其它的符号。对拼音文字而言&#xff0c;最简单的形式是单字母替换加密&#xff0c;也就是以每个…

如何学习VBA_3.2.19:利用Shell函数运行可执行程序

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的劳动效率&#xff0c;而且可以提高数据处理的准确度。我推出的VBA系列教程共九套和一部VBA汉英手册&#xff0c;现在已经全部完成&#xff0c;希望大家利用、学习。 如果…

Git 分布式版本控制系统基本概念和操作命令

目录 Git 基本概念 功能特点 工作流程 操作命令 新建代码库 配置 增删文件 代码提交 分支 标签 查看信息 远程同步 撤销 其他 小结 Git Git 是一个开源的分布式版本控制系统&#xff0c;用于跟踪文件的变更历史。它最初由 Linux Torvalds 设计&#xff0c;用于…

JDK,Nginx,Redis安装

创建develop目录 mkdir /usr/local/develop/ cd /usr/local/develop 下载 wget http://nginx.org/download/nginx-1.17.4.tar.gz yum install git git clone https://github.com/arut/nginx-rtmp-module.git 解压文件 tar zxmf nginx-1.17.4.tar.gz 进入解压目录 cd ng…

STL和泛型编程

STL和泛型编程 一.STL六大部件"前开后闭"区间 二.容器(1)顺序容器1.array源码剖析 2.vector源码剖析vector的迭代器 3.list源码剖析迭代器的设计规则关于重载操作符关于重载->和*操作符 4.forward_list源码剖析 5.deque源码剖析底层数据结构操作实现deque的设计de…

力扣面试150 Pow(x, n) 快速幂 负指数

Problem: 50. Pow(x, n) 解题方法 &#x1f468;‍&#x1f3eb; 参考题解 复杂度 时间复杂度: O ( l o g 2 n ) O(log_{2}n) O(log2​n) 空间复杂度: O ( 1 ) O(1) O(1) Code class Solution {public double myPow(double x, int n){if (x 0.0f)return 0.0d;long b…

centos7 装 docker-ce

安装必要的系统工具&#xff1a; sudo yum install -y yum-utils device-mapper-persistent-data lvm2 sudo yum install -y yum-utils device-mapper-persistent-data lvm2 命令会以超级用户的身份安装三个软件包&#xff1a;yum-utils&#xff0c;device-mapper-persistent-…

2024.3.2-玄子Share-Mybatis 八股文面试题(共计:48 道 9000 字)

2024.3.2-玄子Share-Mybatis 八股文面试题&#xff08;共计&#xff1a;48 道 9000 字&#xff09; 前言&#xff1a; 本文部分面试题来源于网络仅供学习使用&#xff0c;请支持原作部分面试题有修改润色&#xff0c;部分面试题由我&#xff08;玄子&#xff09;自写面试题根据…

牛客NC12 重建二叉树【中等 dfs Java,Go,PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/8a19cbe657394eeaac2f6ea9b0f6fcf6 思路 先序遍历第一个值就是根节点&#xff0c;根据这个值可以在中序中划分左右子树&#xff0c; 我们这里已经将一个数划分成两颗子树&#xff0c;那么在递归的使用刚刚的分析…

大模型时代的向量数据库:原理解析和应用案例

大家好&#xff0c;在人工智能领域&#xff0c;数据处理和加工的需求愈发增加。随着人们深入探索AI高级的应用&#xff0c;如图像识别、语音搜索和推荐引擎等&#xff0c;数据的复杂性也在不断地增加。此时传统的数据库存储方式已不能完全满足需求&#xff0c;向量数据库应运而…

大模型知识点汇总——分布式训练

PS&#xff1a;本篇只在宏观上介绍相关概念和技术&#xff0c;不做数学推导和过于细节介绍&#xff0c;旨在快速有一个宏观认知&#xff0c;不拘泥在细节上&#xff0c;导致很混乱。 涉及技术名词 分布式框架等涉及的技术名词很多&#xff0c;很容易让人眼花缭乱&#xff0c;…

vdat文件分段了怎么合并成MP4?批量导入一键合并!

一些监控摄像头、视频录像软件或其他专用设备可能会生成vdat文件作为其录制的视频数据文件。一些浏览器比如夸克下载的视频也会出现vdat格式&#xff0c;因为流媒体播放采用的是分段加载&#xff0c;在网络不好的时候&#xff0c;重新加载对文件整体性损坏比较小&#xff0c;所…