flink + Atlas 任务数据血缘调通

news2025/2/10 22:03:03

据此修改 Flink 源码

版本
Flink1.13.5
Atlas1.2.0

将 atlas 配置文件打进 flink-bridge;atlas 相关的 jar 放进 flink/lib

jar uf flink-bridge-1.2.0.jar atlas-application.properties 

在这里插入图片描述
flink-conf.yaml 注册监听
在这里插入图片描述
org.apache.flink.configuration.ExecutionOptions 添加配置属性

public static final ConfigOption<List<String>> JOB_LISTENERS =
            ConfigOptions.key("execution.atlas.job-listeners")
                    .stringType()
                    .asList()
                    .noDefaultValue()
                    .withDescription("JobListenerFactories to be registered for the execution.");

一点说明:官方Flink1.12.0 版本之后支持配置execution.job-listeners,因此自己添加了个配置属性execution.atlas.job-listeners 进行区分,
org.apache.flink.configuration.DeploymentOptions
在这里插入图片描述

任务提交
flink run -m yarn-cluster -ys 1 -yjm 1024 -ytm 1024 -c com.nufront.bigdata.v2x.test.AtlasTest /opt/v2x-1.0-SNAPSHOT.jar
测试任务
public class AtlasTest {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.disableOperatorChaining();

        // TODO kafka消费
        // 配置 kafka 输入流信息
        Properties consumerprops = new Properties();
        consumerprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.67:9092");
        consumerprops.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 添加 kafka 数据源
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<>("atlas-source-topic", new SimpleStringSchema(), consumerprops));

        // 配置kafka输入流信息
        Properties producerprops = new Properties();
        producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.67:9092");
        // 配置证书信息
        dataStreamSource.addSink(new FlinkKafkaProducer<String>("atlas-sink-topic", new KeyedSerializationSchemaWrapper(new SerializationSchema<String>(){
            @Override
            public byte[] serialize(String element) {
                return element.getBytes();
            }
        }), producerprops));
        env.execute("AtlasTest");
    }

}
flink on yarn 日志输出

在这里插入图片描述

修改 json 解析方式

org.apache.atlas.utils.AtlasJson#toJson

public static String toJson(Object obj) {
		String ret;

		if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {
			ret = ((JsonNode) obj).textValue();
		} else {
			
			// 修改 json 处理方式:fastjson,原来的ObjectMapper.writeValueAsString() 一度卡住不往下执行
			// ret = mapper.writeValueAsString(obj);
			ret = JSONObject.toJSONString(JSONObject.toJSON(obj));
			LOG.info(ret);
		}
		
        return ret;
    }
查看目标 kafka 对应topic

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/646028.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

6月第2周榜单丨飞瓜数据B站UP主排行榜(哔哩哔哩)发布!

飞瓜轻数发布2023年6月5日-6月11日飞瓜数据UP主排行榜&#xff08;B站平台&#xff09;&#xff0c;通过充电数、涨粉数、成长指数三个维度来体现UP主账号成长的情况&#xff0c;为用户提供B站号综合价值的数据参考&#xff0c;根据UP主成长情况用户能够快速找到运营能力强的B站…

51、C++ 学习笔记

1、引用类型 引用类型是C引入的新类型&#xff0c;根据汇编的知识进行理解&#xff0c;程序在汇编后&#xff0c;变量名将失去意义&#xff0c;因为汇编码将替换成用内存的(链接地址or运行地址)访问变量。在C/C语言中&#xff0c;用变量名表示变量所占的那块内存&#xff0c;为…

仓储管理小程序开发 实现不同行业不同规模的仓管需求

在电子商务快速发展的时代&#xff0c;仓库管理对于一个企业的经营发展来说至关重要。如今互联网技术深入发展&#xff0c;很多企业都开发了信息化管理系统&#xff0c;仓库管理APP小程序就是企业结合自身的运算法则开发的一款线上应用软件&#xff0c;通过智能智慧仓库内人、物…

网络安全是一个好的专业吗?高考之后怎么选择?

目录 一.始于大学 二.一路成长 三. 如何学习网络安全 学前感言 零基础入门 尾言 本人信息安全专业毕业&#xff0c;在甲方互联网大厂安全部与安全乙方大厂都工作过&#xff0c;有一些经验可以供对安全行业感兴趣的人参考。 或许是因为韩商言让更多人知道了CTF&#xff0…

linuxOPS基础_LAMP开源项目实战

LAMP概述 LAMP&#xff1a;Linux Apache MySQL PHP LAMP 架构&#xff08;组合&#xff09; LNMP&#xff1a;Linux Nginx MySQL php-fpm LNMP 架构&#xff08;组合&#xff09; LNMPA&#xff1a;Linux Nginx(80) MySQL PHP Apache Nginx 代理方式 Apache&#…

Markdown编辑器使用

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

这所Top3顶尖院校,专业课太简单了,比双非还要简单!

一、学校及专业介绍 复旦大学&#xff08;FDU&#xff0c;简称旦旦&#xff09;&#xff0c;除清北之外的顶尖学府&#xff0c;想必不用我过多介绍&#xff0c;Top3之一&#xff08;众所周知&#xff0c;Top3有好多所图片&#xff0c;但我心目中的Top3永远是上海交大图片&#…

element-ui中表头添加自定义按钮以及其他自定义展示

可以使用&#xff1a;render-header方法即可 添加一个按钮如下&#xff1a; renderHeader (h) { return ( <div> <span>操作</span> <el-button type"primary" style"margin-left:90px" size"small" icon"el-icon-pl…

在测试外包干了4年,我废了...

外包公司值不值得去&#xff0c;是很多同行关心的话题。在职场一直流传着“外包不被当人看”“外包没有归属感”的言论。 客观来看&#xff0c;外包岗位确实存在一些缺点&#xff0c;比如&#xff1a;公积金&#xff0c;社保缴纳基数低&#xff0c;没有稳定的涨薪通道&#xff…

登录时token的存储

1.token是什么&#xff1f; 是一种身份的标识,比如我们入住一家酒店,他会给我们一张房卡,房卡的期限是有时间限制的,只有持有房卡的人才能入住酒店。 2.jsCookie 使用的方法 下包: npm i jscookie 导入: import Cookiejs from "js-cookie"; 使用: Cookie.js.set…

object类clone、finalize

2 什么是API API&#xff08;Application Programming Interface&#xff0c;应用程序接口&#xff09;是一些预先定义的函数。目的是提供应用程序与开发人员基于某软件可以访问的一些功能集&#xff0c;但又无需访问源码或理解内部工作机制的细节. API是一种通用功能集,有时公…

HTB-OnlyForYou

HTB-OnlyForYou 信息收集立足johnjohn -> root 信息收集 Designed by BootstrapMade. 在他们的TEAM的常见问答里面发现了一个beta产品。 网站首页可以下载疑似源码的文件。 右上角还有两个功能。 一个是上传图片并调整大小。 上传了文件后会跳转到list&#xff0c;选择…

【CV大模型SAM(Segment-Anything)】如何一键分割图片中所有对象?并对不同分割对象进行保存?

之前的文章【CV大模型SAM&#xff08;Segment-Anything&#xff09;】真是太强大了&#xff0c;分割一切的SAM大模型使用方法:可通过不同的提示得到想要的分割目标,中详细介绍了大模型SAM&#xff08;Segment-Anything&#xff09;根据不同的提示方式得到不同的目标分割结果。 …

11. 100ASK-V853-PRO开发板 RGB屏测试指南

100ASK-V853-PRO开发板 RGB屏测试指南 硬件要求&#xff1a; 100ASK-V853-PRO开发板七寸RGB屏 软件要求&#xff1a; 固件下载地址&#xff1a;链接&#xff1a;百度网盘 提取码&#xff1a;sp6a 固件位于资料光盘中的10_测试镜像/1.测试七寸RGB屏/v853_linux_100ask_uart0.…

echarts中国地图使用整理

一、echarts中国地图使用案例 1.准备地图数据china.json ; 需要的添加微信&#xff1a;tianma104&#xff0c;我发你 2.引入jquery&#xff0c;引入eachars 库 <script src"http://xx/ajax/libs/jquery/3.5.1/jquery.min.js"></script> <script s…

Unity入门5——Camera

一、参数面板 二、参数介绍 1. Clear Flags&#xff1a;清除背景 Skybox&#xff1a;天空盒背景&#xff08;通常用来做 3D 游戏&#xff09; Solid Color&#xff1a;使用 Background 颜色填充&#xff08;通常设置为全黑或全白&#xff0c;2D 使用&#xff09; Depth Only&am…

【ARM AMBA ATB 入门 1 - ATB 总线简介】

文章目录 背景1.1 ATB BUS1.2.1 全局信号1.2.2 数据信号1.2.3 流控信号1.2.4 Trace ID1.2.5 Buffer Flusing 背景 在 AMBA3 中&#xff0c;增加了 Advanced Trace Bus (ATB) 总线作为片上调试的总线接口&#xff0c;为 Debug 和 Trace 提供一种解决方案。第3代总线是2003年发布…

存储快速入门——【1】网络存储主要技术(NAS、SAN、SCSI、CIFS、zone)

存储快速入门——【1】网络存储主要技术 1 NAS简介&#xff08;网络&#xff09; 在20世纪80年代初&#xff0c;英国纽卡斯尔大学布赖恩.兰德尔教授 ( Brian Randell)和同事通过“纽卡斯尔连接”成功示范和开发了在整套UNIX机器上的远程文件访问。继“纽卡斯尔连接”之后&…

Springboot集成Redis常见的报错和解决方案

Springboot集成Redis后运行时常见的报错信息和其解决方案 1. io.lettuce.core.protocol.CommandHandler : null Unexpected exception during request: java.io.IOException: 远程主机强迫关闭了一个现有的连接。报错信息原因分析解决方案 2. io.netty.util.internal.OutOfDire…

Python自动化测试 史上最全的进阶教程

Python自动化测试就是把以前人为测试转化为机器测试的一种过程。自动化测试是一种比手工测试更快获得故障反馈的方法。 随着时代的变革&#xff0c;也许在未来测试这个职位的需求会越来越少甚至消失&#xff0c;但是每一个组织&#xff0c;每一个客户对软件质量的要求是永远不…