扩展dlink-connector-phoenix使其phoenix-5.0.0支持flink1.16

news2025/1/15 21:46:58

感慨:玩大数据java必须要精通,不然遇到眼花缭乱的框架以及插件拓展的时候就会一下子傻眼了。各种框架之间版本不同有现成的插件或者方案到还可以但是没有就需要自己扩展。目前我使用的是CDH6.3.2,flink使用的是1.6,Phoenix版本的是5.0.0这有在我的博客中提到过,hbase使用的是自带的2.x。这就遇到问题了目前有支持的比较好的是dlinky这个里面的插件,我现在需要做的是将dlink-connector-phoenix这个插件编译打包上传到flink的lib目录中使用sql-client进行测试。

问题:目前dlinky支持flink1.14不支持flink1.16所以需要扩展。经过我比对flink源码中的flink-connector-jdbc的写法。结合
dlinky中dlink-connector-phoenix-1.14的版本进行扩展

如果想要已经打包好的
https://download.csdn.net/download/u012228523/87853354

1、拉取dlinky的源码,
https://gitee.com/mirrors/Dlink.git
并且切换到0.7.3分支

2、按照官网来基本环境要求来
http://www.dlink.top/docs/next/deploy_guide/compiler

特殊说明:
mvn的仓库配置的是

  <mirrors>
    <mirror>
      <id>alimaven</id>
      <mirrorOf>central</mirrorOf>
      <name>nexus</name>
      <url>https://maven.aliyun.com/repository/public/</url>
     </mirror>  
  </mirrors>

3、将dlink-connectors中的dlink-connector-phoenix-1.14拷贝一份到同级目录下面。修改名称为dlink-connector-phoenix-1.16
在这里插入图片描述
修改dlink-connectors同级目录的pom.xml文件
在这里插入图片描述
新增代码如下截图:
在这里插入图片描述
4、修改dlink-connector-phoenix-1.16的pom.xml文件
在这里插入图片描述
一定是这样并且flink-table-common必须在最上面不然会有问题。版本1.16-SNAPSHOT是我编译flink的时候打的包。你也可以写1.16.1或者1.16.2如果编译不同过可以留言邮箱,我发出来。

5、编译过程中出现兼容问题,需要修改源码PhoenixDynamicTableSource.java这个类中由于dlinky0.7.3使用了
TableSchemaUtils这个工具类中的projectSchema方法.但是这个方法在flink1.16已经给删除了。于是可以将flink1.14中TableSchemaUtils的projectSchema方法写到PhoenixDynamicTableSource.java这个类中稍后贴出源码直接覆盖类就行

6、打包

mvn clean install --settings /Users/admin/Documents/softwares/repository-zi/settings-aliyun.xml  -DskipTests=true -P aliyun,prod,scala-2.12,web,fast,flink-1.16

8、编译完成后如果是想只使用flink只需要到入一下包到flink的lib目录下
dlink-connector-phoenix-1.16-0.7.3.jar,phoenix-5.0.0-cdh6.2.0-client.jar,phoenix-core-5.0.0-cdh6.2.0.jar

注意:一定不要与flink自带的hbase-connector包放一起,会冲突

/*
 *
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

package org.apache.flink.connector.phoenix.table;

import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Objects;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.types.utils.DataTypeUtils;
/**
 * PhoenixDynamicTableSource
 *
 * @author gy
 * @since 2022/3/17 10:40
 **/
public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown,
        SupportsLimitPushDown {

    private final PhoenixJdbcOptions options;
    private final JdbcReadOptions readOptions;
    private final JdbcLookupOptions lookupOptions;
    private TableSchema physicalSchema;
    private final String dialectName;
    private long limit = -1L;

    public PhoenixDynamicTableSource(PhoenixJdbcOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, TableSchema physicalSchema) {
        this.options = options;
        this.readOptions = readOptions;
        this.lookupOptions = lookupOptions;
        this.physicalSchema = physicalSchema;
        this.dialectName = options.getDialect().dialectName();

    }

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        // JDBC only support non-nested look up keys
        String[] keyNames = new String[context.getKeys().length];
        for (int i = 0; i < keyNames.length; i++) {
            int[] innerKeyArr = context.getKeys()[i];
            Preconditions.checkArgument(
                    innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
            keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
        }
        final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();

        return TableFunctionProvider.of(
                new PhoenixRowDataLookupFunction(
                        options,
                        lookupOptions,
                        physicalSchema.getFieldNames(),
                        physicalSchema.getFieldDataTypes(),
                        keyNames,
                        rowType));
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder()
                .setDrivername(this.options.getDriverName())
                .setDBUrl(this.options.getDbURL())
                .setUsername((String)this.options.getUsername().orElse((String) null))
                .setPassword((String)this.options.getPassword().orElse((String) null))
                .setAutoCommit(this.readOptions.getAutoCommit())
                //setting phoenix schema
                .setNamespaceMappingEnabled(this.options.getNamespaceMappingEnabled())
                .setMapSystemTablesToNamespace(this.options.getMapSystemTablesToNamespace())
                ;

        if (this.readOptions.getFetchSize() != 0) {
            builder.setFetchSize(this.readOptions.getFetchSize());
        }

        JdbcDialect dialect = this.options.getDialect();
        String query = dialect.getSelectFromStatement(this.options.getTableName(), this.physicalSchema.getFieldNames(), new String[0]);
        if (this.readOptions.getPartitionColumnName().isPresent()) {
            long lowerBound = (Long)this.readOptions.getPartitionLowerBound().get();
            long upperBound = (Long)this.readOptions.getPartitionUpperBound().get();
            int numPartitions = (Integer)this.readOptions.getNumPartitions().get();
            builder.setParametersProvider((new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)).ofBatchNum(numPartitions));
            query = query + " WHERE " + dialect.quoteIdentifier((String)this.readOptions.getPartitionColumnName().get()) + " BETWEEN ? AND ?";
        }

        if (this.limit >= 0L) {
            query = String.format("%s %s", query, dialect.getLimitClause(this.limit));
        }

        builder.setQuery(query);
        RowType rowType = (RowType)this.physicalSchema.toRowDataType().getLogicalType();
        builder.setRowConverter(dialect.getRowConverter(rowType));
        builder.setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(this.physicalSchema.toRowDataType()));
        return InputFormatProvider.of(builder.build());
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    @Override
    public boolean supportsNestedProjection() {
        return false;
    }

    @Override
    public void applyProjection(int[][] projectedFields) {
        this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);
    }

    private boolean containsPhysicalColumnsOnly(TableSchema schema) {
        Preconditions.checkNotNull(schema);
        return schema.getTableColumns().stream().allMatch(TableColumn::isPhysical);
    }

    private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
        Preconditions.checkArgument(
                containsPhysicalColumnsOnly(tableSchema),
                "Projection is only supported for physical columns.");
        TableSchema.Builder builder = TableSchema.builder();

        FieldsDataType fields =
                (FieldsDataType)
                        DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
        RowType topFields = (RowType) fields.getLogicalType();
        for (int i = 0; i < topFields.getFieldCount(); i++) {
            builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));
        }
        return builder.build();
    }

    public DynamicTableSource copy() {
        return new PhoenixDynamicTableSource(this.options, this.readOptions, this.lookupOptions, this.physicalSchema);
    }

    public String asSummaryString() {
        return "JDBC:" + this.dialectName;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (!(o instanceof PhoenixDynamicTableSource)) {
            return false;
        } else {
            PhoenixDynamicTableSource that = (PhoenixDynamicTableSource)o;
            return Objects.equals(this.options, that.options)
                    && Objects.equals(this.physicalSchema, that.physicalSchema)
                    && Objects.equals(this.dialectName, that.dialectName)
                    && Objects.equals(this.limit, that.limit);
        }
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.options, this.readOptions, this.lookupOptions, this.physicalSchema, this.dialectName, this.limit});
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/597894.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构入门7-1(查找)

目录 注 查找的基本概念 线性表的查找 顺序查找 折半查找 分块查找 树表的查找 二叉排序树 平衡二叉树 平衡二叉树的定义 平衡二叉树的平衡调整方式 平衡二叉树的实现 B-树 B-树的定义 B-树的示意性实现 B树 注 本笔记参考&#xff1a;《数据结构&#xff08;C…

【04】数据结构与算法基础-类C语言有关操作补充 | 数组的静态、动态定义 | C、C++中内存分配 | C++中的参数传递方式-指针、数组、引用

目录 1.元素类型说明1.1顺序表类型定义1.2数组定义1.3C语言的内存动态分配1.4C的动态存储分配1.4.1创建内存1.4.2释放内存 1.5C中的参数传递1.5.1传值方式1.5.2传地址方式-指针变量1.5.3传地址方式-数组名1.5.4传地址方式-引用类型 1.元素类型说明 1.1顺序表类型定义 typedef…

HDFS 的健壮性体现在哪里?

前言 本文隶属于专栏《大数据技术体系》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和参考文献请见大数据技术体系 正文 HDFS 的主要目标就是即使在出错的情况下也要保…

Linux进程间通信(消息队列)

可以用命令“ipcs”查看三种 IPC&#xff0c;“ipcrm”删除 IPC 对象。在 i.MX6ULL 终结者开发板终端输入“ipcs” 查看系统中存在的 IPC 信息&#xff1a; 这些 IPC 对象存在于内核空间&#xff0c;应用层使用 IPC 通信的步骤为&#xff1a; 1. 获取 key 值&#xff0c;内核…

【网络】交换机的原理和配置方法

目录 &#x1f341;交换机工作原理 &#x1f341;交换机接口的双工模式 &#x1f341;交换机命令行模式 &#x1f341;交换机常见命令 &#x1f9e7;帮助命令 &#x1f9e7;常用命令介绍 &#x1f341;交换机的基本配置 &#x1f9e7;配置接口的双工模式及速率 &#x1f990;博…

数据库误删恢复

说明 经常听说删库跑路这真的不只是一句玩笑话&#xff0c;若不小心删除了数据库&#xff0c;事情很严重。你一个不小心可能会给公司删没。建议研发不要直连生成环境&#xff0c;一般的话都会分配账号权限&#xff0c;生产环境的账号尽量是只读&#xff0c;以防你一个不经意给库…

java快速结束嵌套循环

java快速结束嵌套循环 快速结束for循环 out:for (int i 0; i < 5; i) {in:for (int j 0; j < 5; j) {if (j 2) {break out;}System.out.println("i " i " j " j);}}解释 将外层for循环起别名 o u t \color{red}{out} out,将内层for循环起别名…

Java NIO-非阻塞I/O(一)

文章目录 1. 简介2. 一个示例服务器3. 缓冲区4. 创建缓冲区5. 填充与排空6. 批量方法7. 数据转换8. 视图缓冲区9. 压缩缓冲区10. 复制缓冲区11. 分片缓冲区 1. 简介 与CPU和内存相比&#xff0c;甚至和磁盘相比&#xff0c;网络都很慢&#xff0c;但要允许CPU速度高于网络&…

YOLOV8最强操作教程.

YoloV8详细训练教程. 相信各位都知道yolov8发布了&#xff0c;也是U神大作&#xff0c;而且V8还会出论文喔&#xff01; 2023.1.17 更新 yolov8-grad-cam热力图可视化链接 2023.1.20 更新 YOLOV8改进-添加EIoU,SIoU,AlphaIoU,FocalEIoU 链接 2023.1.30 更新 如果你需要修改或者…

Vector Scope

下面以PicoScope 6403E-034为例说明 1.Scope 硬件结构介绍 前面板&#xff1a; 最多支持捕获2路CAN/CANFD/FlexRay总线&#xff0c;或者4路LIN总线 后面板&#xff1a; Scope Bus Probe 300 Mhz 2.Option Scope使用条件 前提条件1&#xff1a;购买CANoe/CANalyzer Option Sc…

03-SpringBoot3JDK9~17新特性

1、JDK9新特性---jshell交互式工具 前提是已经配置好了Jdk的环境变量。 2、JDK9新特性---模块化开发 需求&#xff1a; testA可以被模块A调用&#xff0c;testB可能是内部工具类&#xff0c;不想被模块A调用&#xff0c;又不能设置成private。 这时候我们就会用到模块化开发。…

vue3+element-plus+ts elplus table 实现表格动态列 表格列显示与隐藏的动态控制 支持传递插槽与多级表头

如题 先上效果 部分代码展示 Home页面 使用时除了名字不同其他没啥不同,但是我这个封装的函数或者属性较少,如果需要请自行增加 <script setup lang"ts"> import { IPage } from /mixins/pagination import { TableKey } from /types/enum import { useRou…

Vue3-01-Vue3 新特性及环境搭建

Vue.js是一种被广泛使用的JavaScript框架&#xff0c;用于构建用户界面和单页面应用。Vue3是其最新的主要版本&#xff0c;引入了许多新特性并做了一些改进。 一、Vue3 性能提升 1. Object.defineProperty VS Proxy Vue2 和 Vue3 在数据响应性系统的实现上采用了不同的方式&…

通则ZLT X21 CPE使用指南

目录 设备介绍应用场景案例详细配置CPE基本配置网络实现DMZ方式实现网络互通IP Passthrough方式实现网络互通 注意事项 设备介绍 ZLT X21 是一款高性能5G室内CPE&#xff0c;支持NR(SA&NSA)、TDD-LTE、FDD-LTE&#xff0c;将蜂窝网络数据转换为WIFI和有线网口数据&#xf…

【项目实战】博客系统设计与实现

一、项目概述 1.项目需求 前端&#xff1a;展示文章&#xff0c;文章分类&#xff0c;评论&#xff0c;用户登录。 后端 &#xff1a;系统管理&#xff1a;用户管理&#xff0c;菜单管理&#xff0c;角色管理。内容管理&#xff1a;文章管理&#xff0c;分类管理&#xff0c;标…

地震勘探基础(一)之地震波

地震波 纵波/P波 (Compressional Wave) &#xff1a;质点的动方向与波的传播方向一致。天然地震时&#xff0c;纵波造成地面上下颠簸震动&#xff0c;纵波先达到地表。 纵波速度与弹性参数的关系&#xff1a;纵波速度与体积模量&#xff0c;杨氏模量&#xff0c;剪切模量&…

tcpdump 抓包工具详细图文教程(下)

目录 一、tcpdump 常用参数的使用 1.1 tcpdump -i # 指定监听网络接口 1.2 tcpdump -w # 将捕获到的信息保存到文件中&#xff0c;且不分析和打印在屏幕 1.3 tcpdump -r # 从文件中读取数据 1.4 tcpdump -n # 不把 ip 转化成域名 1.5 tcpdump -t # 在每行的输出中不…

springboot自定义注解的使用++日志

1.添加切面依赖 <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId><version>1.8.9</version> </dependency> 2.自定义注解 Target(ElementType.METHOD) Retention(RetentionPolicy.RUNTI…

Java语言---PriorityQueue与堆

目录 一.堆 1.1堆的概念 1.2堆的存储方式 1.3堆的操作 1.3.1堆的创建 1.3.2代码的实现&#xff1a; 堆的插入元素 堆的删除 二、PriorityQueue 2.1概念 2.2性质 2.3PriorityQueue的创建构造 2.4PriorityQueue的操作方法 总结 &#x1f63d;个人主页&#xff1a;t…

堆的应用:Top-K问题

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下堆的应用--Top-K问题的相关知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; 数据结构与算法专栏&#xff1a;数据结构与算法 个…