基于Flink的流式计算可视化开发实践之配置->任务生成->任务部署过程

news2024/11/24 4:39:32

1. 引言

在我们大数据平台(XSailboat)的DataStudio模块中实现了基于Hive的业务流程开发和基于Flink的实时计算管道开发。

DataStudio是用来进行数据开发的,属于开发环境,另外还有任务运维模块,负责离线分析任务和实时计算任务在生产环境的部署和运维。在开发环境开发好的业务流程和计算管道可以提交/发布到生产环境。
DataStudio
整个大数据平台的可视化开发其实都是一种配置驱动的思想。在界面上开发编辑的都是一种配置数据,在部署运行的时候,后台会有程序将其转为可执行程序或解释执行配置。

2. Flink计算任务的配置化实现

在我的另一篇文章《Flink的DAG可视化开发实践》中表述了我们对Flink任务的一种模式抽象思路。有了这种模式抽象之后,让Flink计算任务适合于可视化开发,即把一种完全自由的代码开发模式,转变成了一种“输入”、“输出”、“状态存储器”、“个性化配置”、“基础配置”、“前置处理”、“后置处理”等阶段性、部件型的可配置项。

Flink有很多的算子,在原来的算子特性的基础之上,套用模式,并且对它的功能进行一些个性化设定,就可以形成自己的界面开发用的算子及配置。

例如“SQL增量查询”,就是我们自定义实现的一种在支持JDBC和SQL的数据库上,对一个SQL查询,可以基于一个具有增长特性的字段(例如自增长的字段或者最近编辑时间,记录型数据的创建时间等)进行定时增量查询和读取的源节点。
它的个性化配置界面如下:
在这里插入图片描述
这个节点的配置面板中的各个配置项暂且不介绍。从界面中可以看出SQL增量查询的功能配置可以转化成一个多层次结构的Bean。这个Bean是“SQL增量查询”节点实例信息的一个组成部分。
如此,一个计算管道DAG图,就可以用“计算管道(图)–>算子节点–>节点配置”这样的多层数据结构,并映射到成多张关系数据库表,并在其中存储。
这样就实现了算子的配置化及配置存储。

3. 配置转成Flink计算任务

既然是配置驱动的程序,基本都是“配置+解释执行器”的构成方式。配置是多样的,而解释执行器就一种实现。

Flink的计算任务开发,都会开发至少一个带main函数的Java类,在部署运行的时候上传jar包并指定main函数所在Java类。

一般的Java程序,也会开发一个带main函数的类,在MANIFEST.MF文件中指定或在命令行里面指定,这样JVM就知道程序的入口。Flink的计算任务也类似,只是不是用来告诉JVM的,而是用来告诉JobManager的。JobManager从这里进去,执行里面的代码,构建出计算任务图(有多阶段的图,可以不用细究)。构建出图之后,再将其拆分交给一个或多个TaskManager执行。

所以我们的main函数里面的逻辑只是构建了一个计算任务以及每一个该怎么执行的执行流图。这和反应式编程很类似,先是构建计算管路,再塞入数据执行。这和我们一般的函数主动式调用有所不同,那是调用即执行。

既然Flink的Job的main函数里面是构建计算管路,那么我们按照计算管道的DAG图及其配置,生成计算管路即可。即一手解读配置,一手按配置构建算子,组成计算任务。

我们界面上定义的每一个节点,都有专门的构建器,将节点转成相应的算子。所以配置转计算任务的过程是:

1. 提交计算任务的时候,通过参数指定运行的是那个计算管道。因为在平台里面有很多工作空间,每个工作空间里面有许多计算管道。
2. Flink的JobManager运行“执行解释器”的jar,进去其main函数。
3. 在main函数中,获取相关入参,其中就有计算管道id,然后调用其它服务提供的通过id获取计算管道及其配置信息的接口,获取计算管道的详细信息。
4. 解析计算管道详细信息,构建计算任务,将计算管道中的每个算子配置信息转换成Flink的算子。

下面贴出上面例举的SQL增量查询节点的构建器,以便更好理解我们是怎么做的。

... 省略
public class SI_SQLIncQuery_Builder extends StreamSourceNodeBuilder
{
	
	@Override
	public CPipeNodeType getNodeType()
	{
		return CPipeNodeType.SI_SQLIncQuery ;
	}

	@Override
	public void buildStreamFlow(JSONObject aNodeJo, IStreamFlowBuilder aStreamFlowBuilder , WorkContext aCtx) throws Exception
	{
		String nodeId = aNodeJo.optString("id") ;
		String nodeName = aNodeJo.optString("name") ;
		StreamExecutionEnvironment env = aStreamFlowBuilder.getExecutionEnvironment() ;
		JSONObject execConfJo = aNodeJo.optJSONObject("execConf") ;
		JSONObject baseConfJo = aNodeJo.optJSONObject("baseConf") ;
		String dsId = execConfJo.optString("dataSourceId") ;
		JSONObject dsJo = aNodeJo.pathJSONObject("dataSources", dsId);
		ConnInfo connInfo = JacksonUtils.asBean(dsJo.toJSONString() , ConnInfo.class) ;
		DataSource ds = new DataSource() ;
		ds.setId(dsId) ;
		ds.setName(dsJo.optString("name")) ;
		ds.setType(dsJo.optEnum("dataSourceType" , DataSourceType.class)) ;
		WorkEnv workEnv = WorkEnv.valueOf(aCtx.getWorkEnv()) ;
		if(WorkEnv.dev == workEnv)
			ds.setDevConnInfo(connInfo) ;
		else
			ds.setProdConnInfo(connInfo) ;
		
		
		// 查询密码
		WorkContext ctx = aStreamFlowBuilder.getWorkContext();
		KeyPair keyPair = RSAKeyPairMaker.getDefault().newOne().getValue();
		HttpClient client = aCtx.getGatewayClient();
		String cipherText = client.askForString(Request	.GET()
														.path(IApis_Gateway.sGET_DataSourcePassword)
														.queryParam("env", ctx.getWorkEnv())
														.queryParam("id", dsId)
														.queryParam("publicKey", RSAUtils.toString(keyPair.getPublic()))
														.queryParam("usage", "TDengine类型的下沉节点[" + nodeName + "]"));
		
		String password = RSAUtils.decrypt(keyPair.getPrivate(), cipherText);
		((ConnInfo_Pswd)connInfo).setPassword(password);
		
		int periodMs = execConfJo.optInt("periodMs") ;
		List<String> storeStateFields = execConfJo.optJSONArray("storeStateFields").toCollection(CS.arrayList() , XClassUtil.sCSN_String) ;
		DatasetDescriptor dsDesc = JacksonUtils.asBean(execConfJo.optJSONObject("dataset").toJSONString() , DatasetDescriptor.class) ;
		Dataset dataset = new Dataset() ;
        dataset.setDatasetDescriptor(dsDesc) ;
        dataset.setName(nodeName) ;
        dataset.setDataSourceId(ds.getId()) ;
        dataset.setWorkEnv(workEnv) ;
        dataset.setDataSourceType(ds.getType()) ;
		
		JSONArray outRowFieldsJa = execConfJo.optJSONArray("outRowFields") ;
		Assert.notNull(outRowFieldsJa , "没有找到outRowFields!%s" , execConfJo);
		ERowTypeInfo rowTypeInfo = JSONKit.toRowTypeInfo(outRowFieldsJa) ;
		
		// 水位线设置待实现
		WatermarkStrategy<Row> watermarkStrategy = null ;
		WaterMarkGenMethod waterMarkGenMethod = execConfJo.optEnum("waterMarkGenMethod" , WaterMarkGenMethod.class) ;
		if(waterMarkGenMethod == null)
		{
			watermarkStrategy = WatermarkStrategy.noWatermarks() ;
			waterMarkGenMethod = WaterMarkGenMethod.NoWatermarks ;
		}
		else
		{
			switch(waterMarkGenMethod)
			{
			case NoWatermarks :	
			{
				watermarkStrategy = WatermarkStrategy.noWatermarks() ;
			}
				break ;
			case MonotonousTimestamps:
				watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps() ;
				break ; 
			case BoundedOutOfOrderness:
				JSONObject waterMarkGenMethodConfJo = execConfJo.getJSONObject("waterMarkGenMethodConf") ;
				TimeUnit timeUnit = TimeUnit.valueOf(waterMarkGenMethodConfJo.optString("timeUnit")) ;
				int timeLen = waterMarkGenMethodConfJo.optInt("timeLen" , 0) ;
				Assert.isTrue(timeLen>0 , "时间长度必须大于0!") ;
				Duration duration = null ;
				switch(timeUnit)
				{
				case NANOSECONDS:
					duration = Duration.ofNanos(timeLen) ;
					break ;
				case MILLISECONDS:
					duration = Duration.ofMillis(timeLen) ;
					break ;
				case SECONDS:
					duration = Duration.ofSeconds(timeLen) ;
					break ;
				case MINUTES:
					duration = Duration.ofMinutes(timeLen) ;
					break ;
				case HOURS:
					duration = Duration.ofHours(timeLen) ;
					break ;
				case DAYS:
					duration = Duration.ofDays(timeLen) ;
					break ;
				case MICROSECONDS:
					duration = Duration.of(timeLen, ChronoUnit.MICROS) ;
					break ;
				}
				watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness(duration) ;
				break ;
			case MaxWatermarks:
				watermarkStrategy = WatermarkStrategy.forGenerator(new MaxWaterMarkGenSupplier()) ;
				break ;
			default:
				throw new IllegalStateException("未支持的水位线生成方法:"+waterMarkGenMethod) ;
			}
		}

		String timestampExpr = execConfJo.optString("timestampExpr") ;
		if(XString.isNotEmpty(timestampExpr) && waterMarkGenMethod != WaterMarkGenMethod.NoWatermarks)
		{
			watermarkStrategy = watermarkStrategy.withTimestampAssigner(new ExprTimestampAssigner(aCtx.getPipeArgs() 
					, timestampExpr, rowTypeInfo)) ;
		}
		mLogger.info("水位线生成策略是:{} , 时间表达式是:{}" , waterMarkGenMethod , timestampExpr) ;
		SQLIncQuerySourceFunction sourceFunc = new SQLIncQuerySourceFunction(storeStateFields, periodMs
				, dataset
				, ds) ;
		SingleOutputStreamOperator<Row> dss = env.addSource(sourceFunc , nodeName , rowTypeInfo)
				.assignTimestampsAndWatermarks(watermarkStrategy)		// 2023-01-08 这一句是必需的,否则不会产生水位线
				.name(nodeName)
				.uid(nodeId)
				;
		
		int parallelism = baseConfJo.optInt("parallelism", 1) ;
		if(parallelism <=0 )
			mLogger.info("指定的并发度为 {} , 小于1,将不设置,采用缺省并发度。" , parallelism) ;
		else
		{
			dss.setParallelism(parallelism) ;
		}
		
		aStreamFlowBuilder.putFlowPoint(nodeId , dss);
	}
}

3. 计算任务的部署

我们要构建的是一套可视化开发、部署平台,在我们的界面上就能完成开发、调试、部署的过程。我们的大数据平台底层基础设施有Hadoop,所以我们考虑使用Hadoop Yarn的容器部署Flink集群。要使用Yarn容器部署Flink计算任务,首先需要将程序包上传到Hadoop FS中。
在这里插入图片描述
我们这里把我们自己开发的扩展部分(ext_jars,解释执行器及其相关jar)和Flink的原生程序包(app,扩展了一些数据库驱动)分成两部分,在我们进行容器化部署的时候,会将其合并。

Flink集群在容器中以Session模式运行,一个Flink集群可以运行多个计算任务。我们给Flink集群增加了一个标签,以区分各个Flink集群。我们设定开发环境,一个工作空间只能运行一个Flink集群,用来开发调试。一个工作空间,在生产环境可以运行1个或1个以上的不同标签的集群。在生产环境部署的时候,需要通过标签指定部署到那个集群,如果标签不存在,就会部署一个新的指定标签的集群,并在上面部署计算任务。
计算管道部署

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2109402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决Metasploit调用Nessus报错问题

问题描述 Error while running command nessus_scan_new: undefined method []’ for nil:NilClass 解决方法 发现报错&#xff0c;经过网上查询解决方法 在Nessus服务器执行&#xff0c;下面的版本号可能有所不同&#xff0c;更加自己的情况更改&#xff0c;需要管理员身份执…

Vue2 day-01

目录 一. Vue 是什么&#xff1f; 1.1 什么是渐进式 1.2 自底向上逐层应用 二. 相关概念 2.1 为什么学习vue 2.2 库和框架区别 2.3 三大主流框架 三. vue基本使用 3.1 体验vue的使用 3.2 相关知识分析 3.3 插值表达式{{}} 3.4 vue-devtools 四. 指令 4.1 v-cloak…

深信服EasyConnect-Docker部署方式

一、项目简介 让深信服开发的非自由的 VPN 软件 EasyConnect 或 aTrust 运行在 docker 中&#xff0c;提供 socks5 和 http 代理服务和网关供宿主机连接使用。 项目地址&#xff1a; https://github.com/docker-easyconnect/docker-easyconnect 二、图形界面版 EasyConnect&…

Oceanbase 数据库审计

数据加密和访问控制可以大幅降低安全风险&#xff0c;但对于具备权限的用户&#xff0c;仍然需要记录其操作&#xff0c;以防止用户登录信息泄露&#xff0c;或者访问权限被滥用。审计功能可以加强企业对数据安全、合规等方面的要求&#xff0c;是跟踪用户行为最主要的工具。 目…

浏览器百科:网页存储篇-Session storage应用实例(九)

1.引言 在前面的文章中&#xff0c;我们详细介绍了如何在 Chrome 浏览器中打开并使用 Session storage 窗格&#xff0c;进行数据的查看、编辑和管理。作为网页存储技术的重要组成部分&#xff0c;sessionStorage在提升用户体验和数据管理能力方面发挥了重要作用。在本篇《浏览…

NAT技术-将多个内部网络设备映射到一个公共IP地址

问题&#xff1a; 今天上课的时候老师让我们在VMware填同一个子网ip 192.168.196.0&#xff0c;然后给我们的linux镜像都是同一个压缩包&#xff0c;结果我们的静态ip地址都是同一个。 192.168.196.0下面有256个ip地址&#xff0c;范围是192.168.196.0到192.168.196.255。我们…

CP-Net:用于生物细胞解析的实例感知部分分割网络|文献速递--基于深度学习的医学影像病灶分割

Title 题目 CP-Net: Instance-aware part segmentation network for biological cell parsing CP-Net&#xff1a;用于生物细胞解析的实例感知部分分割网络 01 文献速递介绍 实例分割是计算机视觉中的一个经典任务&#xff0c;用于识别图像中每个像素的对象类别&#xff0…

基于Android Studio 实现通讯录—原创

目录 一、项目演示 二、开发环境 三、项目详情 四、项目完整源码 一、项目演示 基于Android Studio 实现通讯录—原创 二、开发环境 三、项目详情 1.启动页 这段代码是一个简单的Android应用程序启动活动&#xff08;Activity&#xff09;&#xff0c;具体功能如下&#xf…

【看雪-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…

学习 SSM框架 项目总结

通过指导老师发布的学习SSM框架项目&#xff0c;这次我深刻体会到了SSM整体项目之间的紧连关系。 以下是我自己学习过程中总结出来的经验。 SSM框架 配置 导入核心 spring 组件坐标 将spring相关组件坐标&#xff0c;导入到 pom 文件中 <!--spring、springMVC--><…

集成电路学习:什么是RTOS实时操作系统

RTOS&#xff1a;实时操作系统 RTOS&#xff0c;全称Real Time Operating System&#xff0c;即实时操作系统&#xff0c;是一种专为满足实时控制需求而设计的操作系统。它能够在外部事件或数据产生时&#xff0c;以足够快的速度进行处理&#xff0c;并在规定的时间内控制生产过…

UE5.3 新学到的一些性能测试合计(曼巴学习笔记)

一.简单命令行 stat FPS stat unit //增加GPU渲染时间和变量 stat unitgraph //追加了图表显示 二.查看GPU的消耗。调试GPU渲染用的高级命令 可以记录这一刻各个部分的占用情况,只能看当前的 1.在编辑器下&#xff0c;ctrlShift, 。 2.输入命令行&#xff0c;pr…

硬件-经典的TL431三端稳压管

文章目录 一&#xff1a;TL431三端稳压管1.1 器件说明1.2 电路分析1.3 把TL431设计成一个可调电压源的电路1.4 常用型号1.5 阅读手册1.6 2.5V 电压基准应用电路道友&#xff1a;努力的意义&#xff0c;不在于一定会让你取得多大的成就&#xff0c;只是让你在平凡的日子里&#…

虚幻5|知识点(1)寻找查看旋转,击打敌人后朝向主角

举例说明&#xff0c;我们想让角色一直朝着摄像头&#xff0c;我们控制角色任意位置&#xff0c;都能自行旋转都能朝向摄像头 下面是敌人一直朝向角色&#xff0c;无论主角走向哪个位置&#xff0c;敌人都能朝向主角 start是获取敌人的位置向量大小&#xff0c;Target是获取主…

【复杂系统系列(中级)】复杂系统科学的层级与不确定性方程【代码模拟】

【通俗理解】复杂系统科学的层级与不确定性方程 关键词提炼 #复杂系统科学 #层级结构 #不确定性 #上行因果 #下行因果 #初值敏感 #混沌现象 第一节&#xff1a;层级与不确定性方程的类比与核心概念【尽可能通俗】 1.1 层级与不确定性方程的类比 复杂系统科学的层级与不确定…

游戏玩家新宠:高性能远程控制解决方案

如果你出差一些存在公司电脑上的数据急需用到&#xff0c;这时候有一个远程控制工具就非常方便了。如果你担心一些远程控制软件的安全问题&#xff0c;那就从正规渠道下载&#xff0c;比如向日葵远程控制官网下载就可以得到它官方的软件。我今天给你分享一些安全可靠的远程控制…

基于STELLA系统动态模拟技术及在农业、生态环境等科学领域中的实践应用

STELLA是一种用户友好的计算机软件。通过绘画出一个系统的形象图形&#xff0c;并给这个系统提供数学公式和输入数据&#xff0c;从而建立模型。依据专业兴趣&#xff0c;STELLA可以用来建立各种各样的农业、生态、环境等方面的系统动态模型&#xff0c;为科研、教学、管理服务…

基于C++实现一个房贷计算小程序(含代码)

房贷计算程序&#xff0c;主要实现以下功能&#xff1a; 用户友好的界面&#xff1a;使用文本菜单来引导用户选择功能。支持不同还款频率&#xff1a;例如每季度还款、每半年还款等。支持贷款提前还款&#xff1a;计算提前还款对总支付利息的影响。详细的还款计划表&#xff1…

(二)ASP.NET Core WebAPI项目的启动地址设置

上一篇介绍了ASP.NET Core WebAPI项目创建&#xff0c;可参考&#xff1a; 1.webAPI的访问地址 1) 启动时&#xff0c;选择CoreWebAPI(项目名称)运行项目 可以看到打开浏览器后的地址是&#xff1a;applicationUrl"\"launchUrl 2) 启动时&#xff0c;选择IIS Expre…

C++mutable

文章目录 Claude 讲解基本用法mutable的常见用途注意事项 ChatGpt 讲解1. 基本概念2. 使用示例解释&#xff1a; 3. 适用场景4. 注意事项 lambda 讲解基本语法示例捕获方式使用场景 mutable 和 labmda 一起使用代码&#xff1a;代码分析&#xff1a;输出结果&#xff1a; 在C编…