感慨:玩大数据java必须要精通,不然遇到眼花缭乱的框架以及插件拓展的时候就会一下子傻眼了。各种框架之间版本不同有现成的插件或者方案到还可以但是没有就需要自己扩展。目前我使用的是CDH6.3.2,flink使用的是1.6,Phoenix版本的是5.0.0这有在我的博客中提到过,hbase使用的是自带的2.x。这就遇到问题了目前有支持的比较好的是dlinky这个里面的插件,我现在需要做的是将dlink-connector-phoenix这个插件编译打包上传到flink的lib目录中使用sql-client进行测试。
问题:目前dlinky支持flink1.14不支持flink1.16所以需要扩展。经过我比对flink源码中的flink-connector-jdbc的写法。结合
dlinky中dlink-connector-phoenix-1.14的版本进行扩展
如果想要已经打包好的
https://download.csdn.net/download/u012228523/87853354
1、拉取dlinky的源码,
https://gitee.com/mirrors/Dlink.git
并且切换到0.7.3分支
2、按照官网来基本环境要求来
http://www.dlink.top/docs/next/deploy_guide/compiler
特殊说明:
mvn的仓库配置的是
<mirrors>
<mirror>
<id>alimaven</id>
<mirrorOf>central</mirrorOf>
<name>nexus</name>
<url>https://maven.aliyun.com/repository/public/</url>
</mirror>
</mirrors>
3、将dlink-connectors中的dlink-connector-phoenix-1.14拷贝一份到同级目录下面。修改名称为dlink-connector-phoenix-1.16
修改dlink-connectors同级目录的pom.xml文件
新增代码如下截图:
4、修改dlink-connector-phoenix-1.16的pom.xml文件
一定是这样并且flink-table-common必须在最上面不然会有问题。版本1.16-SNAPSHOT是我编译flink的时候打的包。你也可以写1.16.1或者1.16.2如果编译不同过可以留言邮箱,我发出来。
5、编译过程中出现兼容问题,需要修改源码PhoenixDynamicTableSource.java这个类中由于dlinky0.7.3使用了
TableSchemaUtils这个工具类中的projectSchema方法.但是这个方法在flink1.16已经给删除了。于是可以将flink1.14中TableSchemaUtils的projectSchema方法写到PhoenixDynamicTableSource.java这个类中稍后贴出源码直接覆盖类就行
6、打包
mvn clean install --settings /Users/admin/Documents/softwares/repository-zi/settings-aliyun.xml -DskipTests=true -P aliyun,prod,scala-2.12,web,fast,flink-1.16
8、编译完成后如果是想只使用flink只需要到入一下包到flink的lib目录下
dlink-connector-phoenix-1.16-0.7.3.jar,phoenix-5.0.0-cdh6.2.0-client.jar,phoenix-core-5.0.0-cdh6.2.0.jar
注意:一定不要与flink自带的hbase-connector包放一起,会冲突
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Objects;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.types.utils.DataTypeUtils;
/**
* PhoenixDynamicTableSource
*
* @author gy
* @since 2022/3/17 10:40
**/
public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown,
SupportsLimitPushDown {
private final PhoenixJdbcOptions options;
private final JdbcReadOptions readOptions;
private final JdbcLookupOptions lookupOptions;
private TableSchema physicalSchema;
private final String dialectName;
private long limit = -1L;
public PhoenixDynamicTableSource(PhoenixJdbcOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.lookupOptions = lookupOptions;
this.physicalSchema = physicalSchema;
this.dialectName = options.getDialect().dialectName();
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
return TableFunctionProvider.of(
new PhoenixRowDataLookupFunction(
options,
lookupOptions,
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
keyNames,
rowType));
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder()
.setDrivername(this.options.getDriverName())
.setDBUrl(this.options.getDbURL())
.setUsername((String)this.options.getUsername().orElse((String) null))
.setPassword((String)this.options.getPassword().orElse((String) null))
.setAutoCommit(this.readOptions.getAutoCommit())
//setting phoenix schema
.setNamespaceMappingEnabled(this.options.getNamespaceMappingEnabled())
.setMapSystemTablesToNamespace(this.options.getMapSystemTablesToNamespace())
;
if (this.readOptions.getFetchSize() != 0) {
builder.setFetchSize(this.readOptions.getFetchSize());
}
JdbcDialect dialect = this.options.getDialect();
String query = dialect.getSelectFromStatement(this.options.getTableName(), this.physicalSchema.getFieldNames(), new String[0]);
if (this.readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = (Long)this.readOptions.getPartitionLowerBound().get();
long upperBound = (Long)this.readOptions.getPartitionUpperBound().get();
int numPartitions = (Integer)this.readOptions.getNumPartitions().get();
builder.setParametersProvider((new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)).ofBatchNum(numPartitions));
query = query + " WHERE " + dialect.quoteIdentifier((String)this.readOptions.getPartitionColumnName().get()) + " BETWEEN ? AND ?";
}
if (this.limit >= 0L) {
query = String.format("%s %s", query, dialect.getLimitClause(this.limit));
}
builder.setQuery(query);
RowType rowType = (RowType)this.physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(this.physicalSchema.toRowDataType()));
return InputFormatProvider.of(builder.build());
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public boolean supportsNestedProjection() {
return false;
}
@Override
public void applyProjection(int[][] projectedFields) {
this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);
}
private boolean containsPhysicalColumnsOnly(TableSchema schema) {
Preconditions.checkNotNull(schema);
return schema.getTableColumns().stream().allMatch(TableColumn::isPhysical);
}
private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
Preconditions.checkArgument(
containsPhysicalColumnsOnly(tableSchema),
"Projection is only supported for physical columns.");
TableSchema.Builder builder = TableSchema.builder();
FieldsDataType fields =
(FieldsDataType)
DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
RowType topFields = (RowType) fields.getLogicalType();
for (int i = 0; i < topFields.getFieldCount(); i++) {
builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));
}
return builder.build();
}
public DynamicTableSource copy() {
return new PhoenixDynamicTableSource(this.options, this.readOptions, this.lookupOptions, this.physicalSchema);
}
public String asSummaryString() {
return "JDBC:" + this.dialectName;
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof PhoenixDynamicTableSource)) {
return false;
} else {
PhoenixDynamicTableSource that = (PhoenixDynamicTableSource)o;
return Objects.equals(this.options, that.options)
&& Objects.equals(this.physicalSchema, that.physicalSchema)
&& Objects.equals(this.dialectName, that.dialectName)
&& Objects.equals(this.limit, that.limit);
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.options, this.readOptions, this.lookupOptions, this.physicalSchema, this.dialectName, this.limit});
}
public void applyLimit(long limit) {
this.limit = limit;
}
}