Flink消费pubsub问题

news2025/1/12 1:55:24

我看网上flink消费pubsub的资料并不多,最近跑通了,大家有问题的可以给我留言。

一、基本资料

1.flink官网接入方式

Google Cloud PubSub | Apache Flink

StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();

public class PubsubRecordDeserializer implements PubSubDeserializationSchema<RecordSchema>{...}

SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
      .withDeserializationSchema(new PubsubRecordDeserializer())
      .withProjectName("project")
      .withSubscriptionName("subscription")
      .build();

streamExecEnv.addSource(source);

注意这里是project,不是topic。

(鉴权方式和反序列化方式后面会讲)

2.maven依赖

        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.62.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
          <version>1.14.2</version>
        </dependency>

版本自己定。

3.credential鉴权方法

需要先有含有鉴权信息的JSON文件(谷歌授权)

然后有两个方法选一个可以实现

1)设置环境变量 GOOGLE_APPLICATION_CREDENTIALS

值为本地的access key文件。

2)在程序中加载resource文件

PubSubSource.newBuilder()
....                

.withCredentials(ServiceAccountCredentials.fromStream(getAutheficateFile()))

ServiceAccountCredentials这个类可以接入流式文件。返回一个实现了Credentials接口的方法,

    public static InputStream getAutheficateFile() {
        String configFile = "/key.json";
        InputStream credentialsFile = PubsubSubscriber.class.getResourceAsStream(configFile);
        return credentialsFile;
    }

即可传入参数来用。

4.反序列化方法

点开 withDeserializationSchema方法,发现传入的反序列化对象,需要实现

PubSubDeserializationSchema接口

主要实现接口里的这两个方法

其中 RecordSchema类是自己定义的接收自己需要信息的case class。实现了setter和getter方法。

(核心用户数据在getdata里)

二、碰到的问题

===================================================================

我在开发中 碰到了反序列化PubsubMessage的问题,理解有点偏差,

去StackOverflow提问了下,不过后来自己理解了。

返回的形式已经是一个PubsubMessage类的对象了,指定的schema只是给解析后自己用的。

(之前理解以为要指定一个返回的消息类型的schema,来承接数据接入)

deserialization - What is the proper way to use flink-connector-gcp-pubsub - Stack Overflow

(CSDN居然吞掉StackOverflow的链接

题目是 What is the proper way to use flink-connector-gcp-pubsub

作者 Reina )

=====================================================================

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/536019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android平台如何实现外部编码后(H.264/H.265)数据实时预览播放

技术背景 我们在对接开发者的时候&#xff0c;遇到这样的诉求&#xff1a;除了正常的RTMP、RTSP直播播放外&#xff0c;有些硬件设备输出编码后&#xff08;H.264/H.265&#xff09;的数据&#xff0c;比如无人机或类似硬件产品&#xff0c;回调出来的H.264/H.265数据&#xf…

C#中的委托是什么

https://www.cnblogs.com/deepalley/p/12150931.html 1.什么是委托&#xff1f;&#xff08;方法作另一个方法的参数&#xff09; delegate void MyDel(int value); //声明委托类型 和类一样&#xff0c;委托是用户自定义的类型&#xff0c;但是类是数据和方法的集合&#…

vue实现功能完整的购物商城,商品零食、电商通用商城

目录 一、项目结构 1.项目截图 2.项目简介 3.项目布局 二、首页 1.效果图 2.源码 三、商品详情 1.效果图 2.源码 四、分类 1.效果图 五、购物车、提交订单 1.效果图 六、个人中心 1.源码结构 2、效果图 七、总结 一、项目结构 1.项目截图 2.项目简介 项目基于vue…

海睿思分享 | 一文读懂企业数据资产目录建设的重要性

小王是某公司信息化部门负责人。 某天&#xff0c;公司领导需要获取近三年来生产部门的人员信息全面数据&#xff0c;小王费了九牛二虎之力&#xff0c;召开了各种会议&#xff0c;在各个系统里来回找数据&#xff0c;最终找到了这些数据。然而领导所需的人员职称、人员获奖信…

UOS服务器系统配置bond

一、Bond介绍 bond可以将多个网卡绑定到一起&#xff0c;可以让两个或多个接口作为一个接口&#xff0c;同时提高带宽&#xff0c;并提供网络链路的冗余&#xff0c;当有其中一块网卡故障的时候&#xff0c;不会中断服务器的业务。 二、Bond模式 1、mode0&#xff08;balanc…

老杨说运维 | 运维数智化转型正确打开方式是什么?他这样说

2023年5月9日&#xff0c;中国计算机用户协会信息科技审计分会会员大会暨金融科技风险管理与审计论坛成功于北京召开。擎创科技CEO杨辰受邀与会&#xff0c;并分享了在数智运维发展过程中对企业数智化转型建设的规划思考以及相关实践经验。 同时&#xff0c;年会上举行了“金融…

《基础知识》提示学习的基本知识

《基础知识》提示学习的基本知识 提示学习背景提示的形式和元素提示学习的输入形式提示学习的重要元素提示学习的输入形式举例基本提示任务提示学习 内容参考:打工人转型之道(二):提示工程(Prompt Engineering)进阶篇

【服务器】利用树莓派搭建 web 服务器【无需公网IP】

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员&#xff0c;2024届电子信息研究生 目录 概述 使用 Raspberry Pi Imager 安装 Raspberry Pi OS 设置 Apache Web 服务器 测试 web 站点 安装静态样例站点 将web站点发布到公网 安装 Cpolar内网穿透 cpolar进行tok…

基于 FPGA 的彩色图像灰度化的设计实现(image_stitche_x)

文章目录 前言一、图像合并模块的设计二、仿真文件 前言 image_stitche_x 模块&#xff1a;将串口接收的尺寸为 400480 大小的彩色图像与灰度化处理后的 400480 大小的图像数据以左右形式合并成一张 800*480 的图像。 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面…

AI再度升级,IT业一片哀鸿遍野:程序员真的要失业了吗?

IT人员真的要失业了吗&#xff1f; 随着各个大厂已经相继传来裁员&#xff0c;降薪&#xff0c;减招的消息和ChatGPT等大型AI模型可以定制化写参考代码&#xff0c;甚至通过外接API直接帮助操作&#xff0c;IT人员似乎越来越不吃香了。 其实&#xff0c;ChatGPT有用的不是取代…

Diango学习-用户管理系统(简单部门管理、用户管理)

目录 1、创建项目和app 1.创建项目 2.创建app 2种创建方式 注册app 2、表结构的创建 Django中的模型字段有很多种&#xff0c;包括但不限于&#xff1a; 设计表结构&#xff08;Django&#xff09; 在models.py文件中创建表&#xff1a;部门表和员工表 加入性别列&…

FL Studio21.0.3.3517完整试用版

系统要求 FL STUDIO 可以运行在任何计算机上: 支持 WINDOWS: 7, 8, 10 或者更高版本 支持 MacOS: 10.11 或更高版本 不低于 4GB 的可用硬盘空间 建议最低 4GB 内存或 更高 当然CPU 越强大&#xff0c;也就意味着你运行的音源和效果器越多! FL Studio是一个非常受欢迎的数…

轻松实现远程访问本地wamp服务器,无公网IP也不怕,「内网穿透」

目录 前言 1.Wamp服务器搭建 1.1 Wamp下载和安装 1.2 Wamp网页测试 2. Cpolar内网穿透的安装和注册 2.1 本地网页发布 2.2 Cpolar云端设置 2.3 Cpolar本地设置 3. 公网访问测试 4. 结语 转载自cpolar极点云的文章&#xff1a;无公网IP&#xff1f;教你在外远程访问本地…

新零售发展现状剖析

“新零售”的商业生态将涵盖网络页面、实体店面、支付终端、数据系统、物流平台和营销路径等诸多方面。 企业通过使用大数据和人工智能等先进技术升级产品的生产、流通和销售流程&#xff0c;重塑商业结构与生态圈&#xff0c;深度融合线上服务、线下体验和现代物流。将物流、…

Devexpress GridControl 内部调用外面实现的FocusedRowChanged

个人需求是网格自带的条件发生改变时&#xff08;网格显示的内容会发生改变&#xff09;&#xff0c;同时需要刷新另一个网格的数据源&#xff0c;而另一个网格的数据源是走的这个网格的行焦点改变事件去刷新&#xff0c;自带的条件发生改变时并不会触发行焦点的改变 当前情况…

【HTTP协议】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 1. HTTP协议概述 2. HTTP协议的工作过程 3.…

LabVIEWCompactRIO 开发指南21 使用TCP/IP时处理孤立套接字

LabVIEWCompactRIO 开发指南21 使用TCP/IP时处理孤立套接字 无法重新建立侦听TCP套接字是设计基于TCP/IP的应用程序时最常见的挑战之一。此症状是由终止客户端或服务器应用程序后发生的孤立套接字引起的。如果按照本节中所述的技术设计代码&#xff0c;则可以避免此问题。本节…

大数据拥抱云原生 HashData助力资管数字化转型

5月16日&#xff0c;2023国际资管科技创业者与投资者大会“资管数据处理&#xff08;大模型&#xff09;技术”专场在上海举行。本次大会以“资产管理 数智技术”为主题&#xff0c;邀请企业、高校、投资机构等各方开展产业交流与讨论&#xff0c;共享共创行业机遇。 酷克数据…

GPT-5: 超越人类语言的模型,你还不了解一下?

目录 一、GPT-5时代引领者 二、技术特性 1&#xff0c;音频和视频处理 — 更强大的多模态处理能力 2&#xff0c;GPT-5颠覆影视制作&#xff1a;重写媒体消费时代 3&#xff0c;为机器人提供智慧大脑 4&#xff0c;更强的垂直行业应用 三、回顾一下GPT5被紧急叫停&…

设计模式之【命令模式】,方法调用的花式玩法

文章目录 一、什么是命令模式1、命令模式使用场景2、命令模式的主要角色3、命令模式优缺点4、命令模式注意事项及细节 二、使用示例1、命令模式的一般写法2、播放器功能案例3、遥控器案例 三、源码中的命令模式1、Thread 一、什么是命令模式 命令模式&#xff08;Command Patt…