我看网上flink消费pubsub的资料并不多,最近跑通了,大家有问题的可以给我留言。
一、基本资料
1.flink官网接入方式
Google Cloud PubSub | Apache Flink
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
public class PubsubRecordDeserializer implements PubSubDeserializationSchema<RecordSchema>{...}
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(new PubsubRecordDeserializer())
.withProjectName("project")
.withSubscriptionName("subscription")
.build();
streamExecEnv.addSource(source);
注意这里是project,不是topic。
(鉴权方式和反序列化方式后面会讲)
2.maven依赖
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.62.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
<version>1.14.2</version>
</dependency>
版本自己定。
3.credential鉴权方法
需要先有含有鉴权信息的JSON文件(谷歌授权)
然后有两个方法选一个可以实现
1)设置环境变量 GOOGLE_APPLICATION_CREDENTIALS
值为本地的access key文件。
2)在程序中加载resource文件
PubSubSource.newBuilder()
....
.withCredentials(ServiceAccountCredentials.fromStream(getAutheficateFile()))
ServiceAccountCredentials这个类可以接入流式文件。返回一个实现了Credentials接口的方法,
public static InputStream getAutheficateFile() {
String configFile = "/key.json";
InputStream credentialsFile = PubsubSubscriber.class.getResourceAsStream(configFile);
return credentialsFile;
}
即可传入参数来用。
4.反序列化方法
点开 withDeserializationSchema方法,发现传入的反序列化对象,需要实现
PubSubDeserializationSchema接口
主要实现接口里的这两个方法
其中 RecordSchema类是自己定义的接收自己需要信息的case class。实现了setter和getter方法。
(核心用户数据在getdata里)
二、碰到的问题
===================================================================
我在开发中 碰到了反序列化PubsubMessage的问题,理解有点偏差,
去StackOverflow提问了下,不过后来自己理解了。
返回的形式已经是一个PubsubMessage类的对象了,指定的schema只是给解析后自己用的。
(之前理解以为要指定一个返回的消息类型的schema,来承接数据接入)
deserialization - What is the proper way to use flink-connector-gcp-pubsub - Stack Overflow
(CSDN居然吞掉StackOverflow的链接
题目是 What is the proper way to use flink-connector-gcp-pubsub
作者 Reina )
=====================================================================