flink sql 自定义 rabbitmq connector
直接上代码
github 地址:
https://github.com/liutaobigdata/flink-sql-rabbitmq-connector
- SourceFactory 代码
public class RabbitmqTableSourceFactory implements DynamicTableSourceFactory {
private static final String FACTORY_IDENTIFIER = "rabbitmq";
public static final ConfigOption<String> QUEUE = ConfigOptions.key("queue")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> EXCHANGE_NAME = ConfigOptions.key("exchange-name")
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();
public static final ConfigOption<Integer> QOS = ConfigOptions.key("qos")
.intType()
.defaultValue(100);
public static final ConfigOption<String> HOSTS = ConfigOptions.key("hosts")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> VIRTUAL_HOST = ConfigOptions.key("virtual-host")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> FORMAT = ConfigOptions.key("format")
.stringType()
.noDefaultValue();
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final int port = options.get(PORT);
final String hosts = options.get(HOSTS);
final String virtualHost = options.get(VIRTUAL_HOST);
final String useName = options.get(USERNAME);
final String password = options.get(PASSWORD);
final String exchangeName = options.get(EXCHANGE_NAME);
final String queue = options.get(QUEUE);
final int qos = options.get(QOS);
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
final DataType dataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
return new RabbitmqDynamicTableSource(hosts, port, virtualHost, useName, password, queue, exchangeName, qos,decodingFormat,dataType);
}
@Override
public String factoryIdentifier() {
return FACTORY_IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTS);
options.add(PORT);
options.add(QUEUE);
options.add(VIRTUAL_HOST);
options.add(USERNAME);
options.add(PASSWORD);
options.add(EXCHANGE_NAME);
options.add(FORMAT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
return options;
}
}
- ScanTableSource 代码
public class RabbitmqDynamicTableSource implements ScanTableSource {
private final String hostname;
private final String virtualHost;
private final String userName;
private final String password;
private final String queue;
private final String exchangeName;
private final int qos;
private final DataType dataType;
private final int port;
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
public RabbitmqDynamicTableSource(String hostname,
int port,
String virtualHost,
String useName,
String password,
String queue,
String exchangeName,
int qos,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType dataType) {
this.hostname = hostname;
this.port = port;
this.virtualHost = virtualHost;
this.userName = useName;
this.password = password;
this.queue = queue;
this.exchangeName = exchangeName;
this.qos = qos;
this.decodingFormat = decodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode() {
// define that this format can produce INSERT and DELETE rows
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
// // create runtime classes that are shipped to the cluster
// final SourceFunction<RowData> sourceFunction = new RabbitmqSourceFunction(
// hostname,
// port,
// virtualHost,
// userName,
// password,
// queue,
// exchangeName,
// qos);
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(scanContext, dataType);
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(hostname)
.setPort(port)
.setUserName(userName)
.setPassword(password)
.setPrefetchCount(qos)
.setVirtualHost(virtualHost)
.build();
RMQSource source = new RMQSource(
connectionConfig,
queue,
false,
deserializer);
return SourceFunctionProvider.of(source, false);
}
@Override
public DynamicTableSource copy() {
return null;
}
@Override
public String asSummaryString() {
return "rabbitmq table source";
}
}
- SPI 配置
- pom 文件 内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-sql-rabbitmq</artifactId>
<version>6.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<packaging>jar</packaging>
<name>flink-sql-rabbitmq-connector</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.15.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
</project>
- 上传到 阿里云 flink connector 中 flink sql 代码格式
CREATE TEMPORARY TABLE rabbitmq_source (
colunn1 STRING,
colunn2 STRING,
colunn3 STRING
) WITH (
'connector' = 'rabbitmq',
'queue' = '',
'hosts' = '',
'port' = '',
'virtual-host' = '',
'username' = '',
'password' = '',
'exchange-name'='',
'format'='json'
);