Flink异步IO 调用算法总是超时

news2025/1/22 16:53:02

记录一次使用Flink 异步调用IO 总是超时的bug
注:博主使用的版本就是:<flink.version>1.16.1</flink.version>

起因:

因公司业务需要,使用Flink对数据进行流式处理,具体处理流程就是,从kafka接到数据,然后连续请求十多个接口(算法)对数据进行打标;

主程序:

在这里插入图片描述

具体的异步IO代码(随便找一个展示):

package com.wenge.datagroup.storage.process;

import com.alibaba.fastjson.JSONObject;
import com.wenge.datagroup.storage.bean.ParamConfig;
import com.wenge.datagroup.storage.common.ArgsConstants;
import com.wenge.datagroup.storage.process.base.BaseETL;
import com.wenge.datagroup.storage.service.YaYiService.YaYiPolarityService;
import com.wenge.datagroup.storage.utils.ConfigUtil;
import com.wenge.datagroup.storage.utils.Funnel;
import com.wenge.datagroup.storage.utils.YaYiUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
public class AnalyerAsyncIOProcessPolarity {

    public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {

        log.error("----------------------------开始异步IO处理----------------");
        String topic = Funnel.contains(ArgsConstants.TOPIC) ? Funnel.getString(ArgsConstants.TOPIC) : "";
        String configFile = Funnel.contains(ArgsConstants.CONFIG) ? Funnel.getString(ArgsConstants.CONFIG) : "config.properties";
        int asyncNum = Funnel.contains(ArgsConstants.ASYNC_NUM) ? Funnel.getInt(ArgsConstants.ASYNC_NUM) : ConfigUtil.getInteger(ArgsConstants.ASYNC_NUM);
        int mapParallelism = Funnel.contains(ArgsConstants.MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.MAP_PARALLELISM);
        int filterParallelism = Funnel.contains(ArgsConstants.FILTER_PARALLELISM) ? Funnel.getInt(ArgsConstants.FILTER_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.FILTER_PARALLELISM);
        int TranslateParallelism = (Funnel.contains(ArgsConstants.Translate_MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.Translate_MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.Translate_MAP_PARALLELISM));

        // 异步IO
        RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
            private transient ExecutorService executorService;
            private ParamConfig paramConfig;
            private YaYiUtil yaYiUtil;

            @Override
            public void open(Configuration parameters) {
                // 重新加载配置文件
                log.error("重新加载配置文件");
                ConfigUtil.setConfigFile(configFile);
                ConfigUtil.setTopic(topic);
                ConfigUtil.init();
                this.executorService = Executors.newFixedThreadPool(asyncNum);
                paramConfig = new ParamConfig(ConfigUtil.getString("YaYiappKey"), ConfigUtil.getString("YaYiappSecret"));
                yaYiUtil = new YaYiUtil(paramConfig);
            }

            @Override
            public void close() throws Exception {
                // 关闭线程池
                if (executorService != null) {
                    executorService.shutdown();
                }
                log.error("----------------------------情感分析-线程池关闭----------------------");
            }

            @Override
            public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
                JSONObject data = input;
                String uuid = data.getString("uuid");
                log.error("-----------------------数据超时----------------------:{}", uuid);
                //对超时数据进行处理
                resultFuture.complete(Collections.singleton(data));
            }

            @Override
            public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {

                CompletableFuture.supplyAsync(new Supplier<JSONObject>() {

                    @Override
                    public JSONObject get() {
                        String uuid = json.getString("uuid");
                        long start =System.currentTimeMillis();
                        try {
                            //TODO: 根据业务逻辑进行处理
                            String title = json.getString("title");
                            String content = json.getString("content");
                            String translate_title = json.getString("translate_title");
                            String translate_content = json.getString("translate_content");
                            String languageRecognition = json.getJSONObject("analysis").getString("language");
                            String dataSourceType = json.getJSONObject("platform").getString("data_source_type");

                            
                            if (StringUtils.isNotBlank(translate_content)) {
                                String polarity = new String();
                                Integer polaritySum = 0;
								//具体算法调用
                                YaYiPolarityService yaYiPolarityService = new YaYiPolarityService();
                                polarity = yaYiPolarityService.yaYiPolarity(translate_content);
                                if (StringUtils.isNotBlank(polarity)) {
                                    polaritySum = StringUtils.equals(polarity, "A") ? 0 : StringUtils.equals(polarity, "B") ? 1 : 2;
                                    JSONObject analysis = json.getJSONObject("analysis");
                                    if (Objects.nonNull(analysis)) {
                                        analysis.put("polarity", polaritySum);
                                        json.put("analysis", analysis);
                                    } else {
                                        JSONObject analysisJson = new JSONObject();
                                        analysisJson.put("polarity", polaritySum);
                                        json.put("analysis", analysisJson);
                                    }
                                }
                                log.error("uuid:{},分析后数据:{}", uuid, polarity);
                                long end =System.currentTimeMillis();
                                log.error("uuid:{},分析,耗时:{} ms", uuid,(end-start));
                            }
                            return json;
                        } catch (Exception e) {
                            log.error("--------分析异常:{},数据:{}",uuid, e);
                            return json;
                        }
                    }
                }, executorService).thenAccept((JSONObject dbResult) -> {
                    resultFuture.complete(Collections.singleton(dbResult));
                });
            }
        };
        DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
                dataStream,
                richAsyncFunction,
                50000,
                TimeUnit.MILLISECONDS,
                asyncNum).name("qinggan").setParallelism(TranslateParallelism);

        return downloadStream;
    }


}

结果等到执行的时候总是出现大批量的调用IO算法超时:
在这里插入图片描述

分析:

因为开发,编译,以及运行时没有任何异常,所以一度考虑过网络问题,接口问题,还有Flink集群问题。
结果一一查过发现都不是,只能一点点debug代码,结果发现是这里的问题
在这里插入图片描述
每次代码运行到这里,就会卡住,然后直到过了我预定的超时时间,然后执行异步io超时方法

解决:

因为这个StringUtils 用的是:import org.apache.commons.lang.StringUtils 的方法,只需要换成import org.apache.commons.lang3.StringUtils 就可以了。
很无奈这个事实,因为时间太忙,没有时间继续深究,估计是老的StringUtils方法中使用到了加锁的方法。

注:博主使用的版本就是:<flink.version>1.16.1</flink.version>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1983398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PageRank算法与TextRank算法

PageRank PageRank 是一种用于计算网页重要性的算法&#xff0c;其核心思想源自随机浏览模型。这个模型假设一个网络中的用户通过随机点击链接在网页之间跳转&#xff0c;并根据网页的链接结构计算每个网页的重要性。 假设三个网页按以下方式连接&#xff0c;计算每个网页的PR值…

【零基础实战】基于物联网的人工淡水湖养殖系统设计

文章目录 一、前言1.1 项目介绍1.1.1 开发背景1.1.2 项目实现的功能1.1.3 项目硬件模块组成1.1.4 ESP8266工作模式配置 1.2 系统设计方案1.2.1 关键技术与创新点1.2.2 功能需求分析1.2.3 现有技术与市场分析1.2.4 硬件架构设计1.2.5 软件架构设计1.2.6 上位机开发思路 1.3 系统…

Robot Operating System——深度解析单线程执行器(SingleThreadedExecutor)执行逻辑

大纲 创建SingleThreadedExecutor新增Nodeadd_nodetrigger_entity_recollectcollect_entities 自旋等待get_next_executablewait_for_workget_next_ready_executableTimerSubscriptionServiceClientWaitableAnyExecutable execute_any_executable 参考资料 在ROS2中&#xff0c…

ARM知识点二

一、指令 指令的生成过程 指令执行过程示例 if (a 0) {x 0; } else {x x 3; } //翻译为 cmp r0,#0 MOVEQ R1,#0 ADDGT R1,R1,#3指令获取&#xff1a;从Flash中读取 CMP R0, #0&#xff0c;控制器开始执行。 指令解码&#xff1a;解码器解析 CMP 指令&#xff0c;ALU比较R…

DAMA学习笔记(十)-数据仓库与商务智能

1.引言 数据仓库&#xff08;Data Warehouse&#xff0c;DW&#xff09;的概念始于20世纪80年代。该技术赋能组织将不同来源的数据整合到公共的数据模型中去&#xff0c;整合后的数据能为业务运营提供洞察&#xff0c;为企业决策支持和创造组织价值开辟新的可能性。与商务智能&…

浅谈线程组插件之jp@gc - Ultimate Thread Group

浅谈线程组插件之jpgc - Ultimate Thread Group jpgc - Ultimate Thread Group是JMeter的一个强大且灵活的扩展插件&#xff0c;由JMeter Plugins Project提供。它为性能测试提供了超越JMeter原生线程组的更精细的控制能力&#xff0c;允许用户根据复杂的场景设计自定义负载模…

【TFT电容屏】

TFT电容屏基础知识补课 前言一、入门知识1.1 引脚介绍1.1.1 显示部分片选指令选择写指令读操作复位并行数据接口 1.1.2 背光电源背光电源 1.1.3 触摸IIC接口外部中断接口复位NC 1.2 驱动介绍1.3 FSMC介绍 总结 前言 跟着阳桃电子的学习⇨逐个细讲触摸屏接口定义–STM32单片机…

科普文:JUC系列之ForkJoinPool源码解读ForkJoinWorkerThread

科普文&#xff1a;JUC系列之ForkJoinPool基本使用及原理解读-CSDN博客 科普文&#xff1a;JUC系列之ForkJoinPool源码解读概叙-CSDN博客 科普文&#xff1a;JUC系列之ForkJoinPool源码解读WorkQueue-CSDN博客 科普文&#xff1a;JUC系列之ForkJoinPool源码解读ForkJoinTask…

复现sql注入漏洞

Less-1 字符型注入 页面如下&#xff1a; 我们先输入“?id1”看看结果&#xff1a; 页面显示错误信息中显示提交到sql中的“1”在通过sql语句构造后形成“1" LIMIT 0, 1”&#xff0c;其中多了一个“”&#xff0c;那么&#xff0c;我们的任务就是——逃脱出单引号的控制…

petalinux安装成功后登录Linux出现密码账号不正确

安装完Linux系统后发现登陆开发板上的Linux系统登陆一直错误&#xff0c;但你输入的账号和密码确确实实是“root”&#xff0c;但仍然一直在重复登陆。 这个时候就会怀疑自己是不是把密码改了&#xff0c;导致错误&#xff0c;然后又重新创建petalinux工程。 其实这个时候不需…

2024年第二季度HDD出货量和容量分析

概述 根据Trendfocus, Inc.发布的《SDAS: HDD Information Service CQ2 24 Quarterly Update – Executive Summary》报告&#xff0c;2024年第二季度硬盘驱动器(HDD)出货量和容量均出现了显著增长。总体来看&#xff0c;HDD出货量较上一季度增长2%&#xff0c;达到3028万块&a…

MySQLDM笔记-查询库中是否存在列出的表名及查询库中列出的不存在的表名

如下表名&#xff1a; aaa,bb,cc,ccs,dds,csdf,csdfs,sdfa,werwe,csdfsd 在MySQL库中&#xff0c;查询哪些表名在数据库中 SELECT table_name FROM information_schema.tables WHERE table_schema your_database_name_here AND table_name IN (aaa, bb, cc, ccs, dds, csdf…

硬件电路学习记录(七)——全面概述MOS管

目录 1.NMOS&#xff1a; 工作原理 特性 应用 2.PMOS&#xff1a; PMOS的结构与工作原理 结构 工作原理 增强型PMOS与耗尽型PMOS 增强型PMOS&#xff08;Enhancement Mode PMOS&#xff09; 耗尽型PMOS&#xff08;Depletion Mode PMOS&#xff09; 应用 PMOS的工…

不同角色路由权限配置(六)

一、启用方式 配置开启config/config.ts。同时需要 src/access.ts 提供权限配置 export default {access: {},// access 插件依赖 initial State 所以需要同时开启initialState: {}, };这里以扩展的路由配置为例&#xff0c;配置只有admin权限才能查看的页面 1、在src/acces…

新华三H3CNE网络工程师认证—路由基础

我们的一个个网络其实是由不同的广播域构成的&#xff0c;而路由器的作用就是用来连接不同的广播域。那么不同广播域之间是如何通信的呢&#xff1f;比如有三个网段&#xff0c;1.0、2.0和3.0。网段1.0和网段2.0通信需要构造数据包&#xff0c;源是1.1&#xff0c;目标去往2.1。…

3.6 上下文菜单

上下文菜单 上下文菜单就是常见的右键菜单(弹出式菜单)。 显示上下文菜单&#xff0c;阻塞函数 BOOL TrackPopupMenu(HMENU hMenu, //菜单句柄UINT uFlags, //显示方式int x, //水平位置&#xff0c;屏幕坐标系int y, //垂直位置&#xff0c;屏幕坐标系UINT nReserved, //…

Cartopy简介和安装

Cartopy 是一个开源免费的第三方 Python 扩展包&#xff0c;由英国气象办公室的科学家们开发&#xff0c;支持 Python 2.7 和 Python 3&#xff0c;致力于使用最简单直观的方式生成地图&#xff0c;并提供对 matplotlib 友好的协作接口。初学Cartopy&#xff0c;欢迎指正&#…

Leetcode—186. 反转字符串中的单词 II【中等】Plus

2024每日刷题&#xff08;152&#xff09; Leetcode—186. 反转字符串中的单词 II 实现代码 class Solution { public:void reverseW(vector<char>& s, int n) {int i 0;int j 0;while(i < n) {while(i < j || i < n && s[i] ) {i;}while(j &…

Spring自动装配的局限

Spring自动装配的局限 1. 覆盖风险2. 类型限制3. 精确性挑战4. 维护难度 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; Spring的自动装配功能虽然为开发者带来了极大的便利&#xff0c;但在实际应用中也存在一些不容忽视的局限。 1. 覆盖…

大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…