【实战-06】正确设置flink参数,彻底站起来

news2024/11/26 0:41:42

参数宝典

    • 如何正确设置参数
    • flink Table模式下的参数
    • Table 模式下参数类相关
    • DataStream 模式下怎么设置参数?
    • 总结

如何正确设置参数

很多人在应用flink DataStream 或者是Flinksql 的时候对于一些参数设置知道的不是很清晰,本文带领大家彻底搞定这一块。

  1. 在flink的配置文件中设置,这个就不多说了,缺点就是不够灵活
  2. 在代码层面设置

flink Table模式下的参数

public class SingleTableMain {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment  tEnv = StreamTableEnvironment.create(env);
 }

    public static void setStateBackendAndCheckpoint(StreamExecutionEnvironment env, String checkpointPath) {
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));   // 设置状态后端
        //每30秒启动一个检查点
        env.enableCheckpointing(60000);
        //允許几次檢查點失敗
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

        // 设置状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath));

        //检查点保存模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2));

        //设置超时时长
        env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.MINUTES.toMillis(5));

        // 最大并发检查点数量,如果上面设置了 最小间隔,其实这个参数已经不起作用了
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //可恢复
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //重试机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(30, TimeUnit.SECONDS)));
    }


    public static void enableMiniBatch(StreamTableEnvironment tEnv, Duration duration, Long size) {
        //The maximum number of input records can
        // be buffered for MiniBatch.
        // MiniBatch is an optimization to buffer input records to reduce state access.
        // MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached.
        // NOTE: MiniBatch only works for non-windowed aggregations currently.
        // If table.exec.mini-batch.enabled is set true, its value must be positive.
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, duration);
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, size);
        tEnv.getConfig().addConfiguration(configuration);

    }

    public static void setTTL(StreamTableEnvironment tEnv, Duration duration) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void enableSqlOptimizer(StreamTableEnvironment tEnv) {
        Configuration configuration = new Configuration();
        // 设置两阶段聚合
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
        //开启 Split Distinct
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM, 2048);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void setIdleTimeout(StreamTableEnvironment tEnv, Duration duration) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT, duration);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void setJobName(StreamTableEnvironment tEnv, String jobName) {
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.NAME, jobName);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void enableDynamicParam(StreamTableEnvironment tEnv){
        Configuration configuration = new Configuration();
        configuration.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true);
        tEnv.getConfig().addConfiguration(configuration);
        logger.info("[SqlUntil] 开启动态table参数");

    }


}       

Table 模式下参数类相关

上面的代码对应下方的内部类,正好对应Table SQL模式下的:执行参数,优化参数,表参数
在这里插入图片描述

DataStream 模式下怎么设置参数?

        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,"hdfs://xxx:8020/remote-default-checkpoints/penggan/");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

上面这种只是举了个例子,感兴趣的请看下图:
在这里插入图片描述

总结

上面是最为原始的设置参数的方式,支持的参数对应的其实就是:flink官网核心参数
flink tablesql 参数

所谓的参数就是控制程序的执行行为的, 参数分为

  1. 动态传参(启动程序在cmd传入)
  2. 程序代码设置
  3. 读取默认配置文件(flink-conf.yml)
    上文讲的是代码设置, 其实参数底层解析的就是截图中的类, 可以看到根据参数的行为flink内部用不通的类去解析,这些类我们也可以直接拿来修改参数, 这就是本文的意义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/978790.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

企业微信自建应用开发流程

开发需知 1、企业微信后台管理(不是小程序管理后台):企业微信 2、企业微信开发者文档(不是小程序文档):概述 - 接口文档 - 企业微信开发者中心 3、开发应用的类型:根据开发应用类型选择文档。…

GitHub个人访问凭证在哪看

要查看 GitHub 个人访问凭证(Personal Access Token),请按照以下步骤进行操作: 登录到你的 GitHub 帐户。点击右上角的头像,然后选择 “Settings”(设置)。在左侧导航栏中,选择 “D…

【PowerQuery】PowerQuery导入JSON数据

Json数据是目前使用的最为频繁和广泛的一种数据交换格式,JSON的全称为JavaScript Object Notation。Json 主要用于在互联网的消息的数据交换信息传递,他的格式与XML有什么区别呢?为什么不用XML,用Json有啥好处呢?我们接下来讨论下Json相比XML的优势: XML传递的数据过多服…

华为云云服务器评测|前端都会的文档预览服务

嗨大家好,我是专注前端技术,热衷知识分享的小鑫同学,近期华为云云服务器焕新上线,实付0.03元拥有了一个月的云服务器使用资格,我将利用这台服务器来演示作为前端同学如何部署一个文档预览服务,拒绝将文档解…

HashMap源码分析(JDK1.8)

概述 JDK 1.8 对 HashMap 进行了比较大的优化,底层实现由之前的 “数组链表” 改为 “数组链表红黑树”,本文就 HashMap 的几个常用的重要方法和 JDK 1.8 之前的死循环问题展开学习讨论。 JDK 1.8 的 HashMap 的数据结构如下图所示,当链表节…

从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)

泛型及set集合扩展 1.泛型1.1泛型概述 2.Set集合2.1Set集合概述和特点【应用】2.2Set集合的使用【应用】 3.TreeSet集合3.1TreeSet集合概述和特点【应用】3.2TreeSet集合基本使用【应用】3.3自然排序Comparable的使用【应用】3.4比较器排序Comparator的使用【应用】3.5两种比较…

【PowerQuery】Excel的PowerQuery的复制

在Excel中构建符合要求的PowerQuery连接之后,所有的PowerQuery 连接已经顺利的保存在Excel 工作簿当中,但是如何去查看已经保存的PowerQuery连接呢?图6.3 显示了查看PowerQuery连接。 Excel界面->数据页签->查询与连接 如果你的Power…

java八股文面试[数据库]——InnoDB与MyISAM的区别

InnoDB和MyISAM是使用MySQL时最常用的两种引擎类型,我们重点来看下两者区别。 事务和外键 InnoDB支持事务和外键,支持回滚,具有安全性和完整性,适合大量insert或update操作 MyISAM不支持事务和外键,它提供高速存储和…

mysql表中删除重复记录,只保留一条记录的操作

mysql表中两个字段重复记录,只保留一条记录的操作 例如有一张学生表 其中name 和 class 相同的视为重复记录,需要保留一条记录,删除重复记录, 两种操作方式如下: 方法一: group by SELECT MIN(cs.id) AS id ,cs.name…

东南亚时尚用品电商平台ZALORA,用自养号测评快速提升产品的评论和销量

关注东南亚电商市场的人都听说过Lazada、Shopee等电商巨头,但很少有人知道ZALORA。实际上,每当提到东南亚的电商平台时,ZALORA都是一个不可忽视的话题。尽管电商巨头SheIn在印尼市场的表现不如本土时尚电商ZALORA。 ZALORA是由德国的创业加速…

分布式实时仿真系统-反射内存的应用

为了使分布式实时仿真系统(一个典型代表就行飞行模拟器)达到逼真的仿真效果,在系统内部,往往不仅需要对各种数据模型进行实时解算,而且需要一个延迟时间极低的确定性网络在系统之间传递数据,这样才能让各个子系统之间协调一致地工…

一文速学-让神经网络不再神秘,一天速学神经网络基础(七)-基于误差的反向传播

前言 思索了很久到底要不要出深度学习内容,毕竟在数学建模专栏里边的机器学习内容还有一大半算法没有更新,很多坑都没有填满,而且现在深度学习的文章和学习课程都十分的多,我考虑了很久决定还是得出神经网络系列文章,…

通过 SQL 实现海量 GIS 数据的轨迹关联计算,确定不了解下?

作者:于成铭 | YMatrix 解决方案与架构总监 前言 统计与分析轨迹和地理区域的关联问题,是 GIS 主要的应用类别之一。最早的 GIS 应用以桌面应用程序为主,但随着需处理应用越来越复杂,GIS 的开发框架也在逐年演进。就像复杂的数据…

adb-linux 调试桥

这里写自定义目录标题 摘要:一、简介二、adb使用参考连接 摘要: adb 可替代网络、串口等调试手段,可以方便的进行文件传输、终端登录等 一、简介 ADB的全称为Android Debug Bridge,即调试桥,方便调试设备或调试开发…

(值得收藏)境外投资备案申请指南

随着全球化的不断深入,越来越多的企业开始寻求境外投资的机会。然而,在进行境外投资前,需要进行备案手续,以确保投资的合法性和可行性。 境外投资备案条件: 具有完全民事行为能力的法人或自然人。拥有足够的资金和实力…

CK_Label-V23(Battery)System Developer‘s Manual

Query PTL Tags Information Introduction to the API: Query all PTL tags information Basic Information: Attributes 接口信息 Status Finished URL http://localhost/wms/associate/getTagsMsg Request Method GET/POST Content-Type …

Shader变体自定义组合压缩方案

前言 本篇文章不讲什么是变体,不讲shader_feature和multi_compile的区别,也不讲如何收集变体。 关于什么是变体,如何优化变体,看这篇文章 Shader:优化破解变体的“影分身”之术 - 知乎 (zhihu.com) 关于变体的收集…

自然语言处理历史史诗:NLP的范式演变与Python全实现

目录 一、引言什么是自然语言处理?语言与人类思维自然语言的复杂性NLP的历史轨迹 二、20世纪50年代末到60年代的初创期符号学派重要的研究和突破 随机学派重要的研究和突破 三、20世纪70年代到80年代的理性主义时代基于逻辑的范式重要的研究和突破 基于规则的范式重…

亚马逊云科技数据分析为这伴科技赋能,实现“零”中断目标

当前,利用数据分析能力赋能精准营销逐渐成为全球企业的主流趋势之一,然而复杂的基础设施和运维压力也不容忽视,因此如何才能构建行之有效的数据分析平台,支撑业务营销服务,实现与客户的共创互赢? 数字营销时…

Spring MVC入门必读:注解、参数传递、返回值和页面跳转的关键步骤

目录 引言 一、常用注解 1.1.RequestMapping 1.2.RequestParam 1.3.RequestBody 1.4.RequestHeader 1.5.PathVariable 二、参数传递 2.1.基础类型String 2.2.复杂类型 2.3.RequestParam 2.4.PathVariable 2.5.RequestBody 2.6.RequestHeader 三、返回值 3.1.vo…