利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

news2024/9/20 5:39:48

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、Flink 代码优化
    • 2.0 问题发现
    • 2.1 原有代码
    • 2.2 CompletableFuture 优化
  • 三、avatorscript 使用的简单介绍
    • 3.1 自定义函数
    • 3.2 从 Map 中取值
    • 3.3 使用 Java 的工具类
    • 3.4 AviatorScript 函数
  • 四、总结


一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

图片.png
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();

CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);

for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,

                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }

    public String getName() {

        return "add" ;
    }
}


public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)

 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1703743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【吊打面试官系列】Java高并发篇 - ConcurrentHashMap 的并发度是什么?

大家好&#xff0c;我是锋哥。今天分享关于 【ConcurrentHashMap 的并发度是什么?】面试题&#xff0c;希望对大家有帮助&#xff1b; ConcurrentHashMap 的并发度是什么? ConcurrentHashMap 的并发度就是 segment 的大小&#xff0c;默认为 16&#xff0c; 这意味着最多同时…

.DFS.

DFS 全称为Depth First Search&#xff0c;中文称为深度优先搜索。 这是一种用于遍历或搜索树或图的算法&#xff0c;其思想是: 沿着每一条可能的路径一个节点一个节点地往下搜索&#xff0c; 直到路径的终点&#xff0c;然后再回溯&#xff0c;直到所有路径搜索完为止。 DFS俗…

ComfyUI简单介绍

&#x1f353;什么是ComfyUI ComfyUI是一个为Stable Diffusion专门设计的基于节点的图形用户界面&#xff0c;可以通过各种不同的节点快速搭建自己的绘图工作流程。 软件打开之后是长这个样子&#xff1a; 同时软件本身是github上的一个开源项目&#xff0c;开源地址为&#…

I.MX6ULL Linux C语言开发环境搭建(点灯实验)

系列文章目录 I.MX6ULL Linux C语言开发 I.MX6ULL Linux C语言开发 系列文章目录一、前言二、硬件原理分析三、构建步骤一、 C语言运行环境构建二、软件编写三、链接脚本 四、实验程序编写五、编译下载验证 一、前言 汇编语言编写 LED 灯实验&#xff0c;但是实际开发过程中汇…

Python实现国密GmSSL

Python实现国密GmSSL 前言开始首先安装生成公钥与私钥从用户证书中读取公钥读取公钥生成签名验证签名加密解密 遇到的大坑参考文献 前言 首先我是找得到的gmssl库&#xff0c;经过实操&#xff0c;发现公钥与密钥不能通过pem文件得到&#xff0c;就是缺少导入pem文件的api。这…

maven的下载以及配置的详细教程(附网盘下载地址)

文章目录 下载配置IDEA内部使用配置 下载 1.百度网盘下载 链接: https://pan.baidu.com/s/1LD9wOMFalLL49XUscU4qnQ?pwd1234 提取码: 1234 2.解压即可 配置 1.打开安装文件下conf下的settings.xml文件&#xff0c;我的如下 2.修改配置信息&#xff08;目的是为了修改本地…

【技术分享】Maven常用配置

一、Maven简介 &#xff08;一&#xff09;为什么使用 Maven 由于 Java 的生态非常丰富&#xff0c;无论你想实现什么功能&#xff0c;都能找到对应的工具类&#xff0c;这些工具类都是以 jar 包的形式出现的&#xff0c;例如 Spring&#xff0c;SpringMVC、MyBatis、数据库驱…

MQ本地消息事务表

纯技术方案水文特此记录 MQ本地消息事务表解决了什么问题&#xff1f; MQ本地事务表方案解决了本地事务与消息发送的原子性问题&#xff0c;即&#xff1a;事务发起方在本地事务执行成功后消息必须发出去&#xff0c;否则就丢弃消息。实现本地事务和消息发送的原子性&#xf…

系统安全扫描扫出了:可能存在 CSRF 攻击怎么办

公司的H5在软件安全测试中被检查出可能存在 CSRF 攻击&#xff0c;网上找了一堆解决方法&#xff0c;最后用这种方式解决了。 1、问题描述 CSRF 是 Cross Site Request Forgery的缩写(也缩写为也就是在用户会话下对某个 CGI 做一些 GET/POST 的事&#xff0c;RIVTSTCNNARGO一这…

香橙派AIpro初体验,详解如何安装Home Assistant Supervised

香橙派AIpro&#xff08;OrangePi AIpro&#xff09;开发版&#xff0c;定位是一块AI开发板&#xff0c;搭载的是华为昇腾310&#xff08;Ascend310&#xff09;处理器。 没想到&#xff0c;这几年的发展&#xff0c;AI开发板也逐渐铺开&#xff0c;记得之前看到华为发布昇腾3…

挑战你的数据结构技能:复习题来袭【3】

chap3 练习1 一. 单选题 1. (单选题)栈和队列具有相同的&#xff08;&#xff09; A. 抽象数据类型B. 逻辑结构C. 存储结构D. 运算 答案: B:逻辑结构 答案分析&#xff1a;逻辑结构都属于线性结构,只是它们对数据的运算不同。 2. (单选题)栈是() A. 顺序存储的线性结构B…

深入理解python列表与字典:数据结构的选择与性能差异

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、列表与字典&#xff1a;基础数据结构的对比 二、列表&#xff1a;逐个遍历的查找方式 …

SARscape5.7已经支持3米陆探一号(LT-1)数据处理

SARsacpe5.6.2.1版本已经开始支持LT-1的数据处理&#xff0c;由于当时只获取了12米的条带模式2&#xff08;STRIP2&#xff09;例子数据&#xff0c;对3米条带模式1&#xff08;STRIP1&#xff09;数据的InSAR处理轨道误差挺大&#xff0c;可能会造成干涉图异常。 SARsacpe5.7最…

Android Display Graphics #1 整体框架介绍一

软件基础 Android的framework层提供了一系列的图像渲染API&#xff0c;可绘制2D和3D。简单理解就是上层开发APP的小伙伴提供了接口&#xff0c;开发者可以直接显示对应的自己内容。但如果掌握了Display底层逻辑再写上层app&#xff0c;会有掌控力&#xff0c;出问题可以根据lo…

vs code怎么补全路径,怎么快捷输入文件路径

安装插件&#xff1a; 链接&#xff1a;https://marketplace.visualstudio.com/items?itemNamejakob101.RelativePath 使用 按住 Ctrl Shift H&#xff0c;弹出窗口&#xff0c;输入文件补全&#xff0c;回车就可以了 排除文件 如果你的项目下文件太多&#xff0c;它会…

2000-2017年各省经济政策不确定性指数

2000-2017年各省经济政策不确定性指数 1、时间&#xff1a;2000-2017年 2、来源&#xff1a;国际能源转型学会 3、范围&#xff1a;31省 4、构建说明&#xff1a; 按照Baker等&#xff08;2016&#xff09;的方法&#xff0c;在中国省级范围内构建了经济政策不确定性&…

如果任务过多,队列积压怎么处理?

如果任务过多,队列积压怎么处理? 1、内存队列满了应该怎么办2、问题要治本——发短信导致吞吐量降低的问题不能忽略!!3、多路复用IO模型的核心组件简介1、内存队列满了应该怎么办 如图: 大家可以看到,虽然现在发短信和广告投递,彼此之间的执行效率不受彼此影响,但是请…

快速版-JS基础01书写位置

1.书写位置 2.标识符 3.变量 var&#xff1a;声明变量。 &#xff08;1&#xff09;.变量的重新赋值 &#xff08;2&#xff09;.变量的提升 打印结果&#xff1a;console.log(变量名) 第一个是你写在里面的。 第二个是实际运行的先后之分&#xff0c;变量名字在最前面。变量…

sql注入less8——布尔盲注

sql注入第八关卡是布尔盲注&#xff0c;我们将看不到一般的返回值&#xff0c;只能通过You are in......的消失与否来判断自己输入的字符是否与查询的数据的字符相同&#xff0c;相同则显示You are in......&#xff0c;相反则不显示&#xff0c;如下图所示&#xff1a; 查询语…

每天五分钟深度学习框架pytorch:tensor张量的维度转换大全

本文重点 在深度学习中比较让人头疼的一点就是矩阵的维度,我们必须构建出符合神经网络维度的矩阵,只有将符合要求的矩阵放到神经网络中才可以运行神经网络,本节课程我们将学习以下tensor中维度的变化。 view和shape View和shape,这两个方法可以完成维度的变换操作,而且使…