flink-sql自定义rabbitmq connector

news2024/11/17 11:36:47

flink sql 自定义 rabbitmq connector

直接上代码

github 地址:

https://github.com/liutaobigdata/flink-sql-rabbitmq-connector

  • SourceFactory 代码
   public class RabbitmqTableSourceFactory implements DynamicTableSourceFactory {

    private static final String FACTORY_IDENTIFIER = "rabbitmq";

    public static final ConfigOption<String> QUEUE = ConfigOptions.key("queue")
            .stringType()
            .noDefaultValue();


    public static final ConfigOption<String> EXCHANGE_NAME = ConfigOptions.key("exchange-name")
            .stringType()
            .noDefaultValue();

    public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
            .intType()
            .noDefaultValue();
    public static final ConfigOption<Integer> QOS = ConfigOptions.key("qos")
            .intType()
            .defaultValue(100);

    public static final ConfigOption<String> HOSTS = ConfigOptions.key("hosts")
            .stringType()
            .noDefaultValue();


    public static final ConfigOption<String> VIRTUAL_HOST = ConfigOptions.key("virtual-host")
            .stringType()
            .noDefaultValue();


    public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
            .stringType()
            .noDefaultValue();

    public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
            .stringType()
            .noDefaultValue();
    public static final ConfigOption<String> FORMAT = ConfigOptions.key("format")
            .stringType()
            .noDefaultValue();
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        // either implement your custom validation logic here ...
        // or use the provided helper utility
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);


        // validate all options
        helper.validate();

        // get the validated options
        final ReadableConfig options = helper.getOptions();

        final int port = options.get(PORT);
        final String hosts = options.get(HOSTS);
        final String virtualHost = options.get(VIRTUAL_HOST);
        final String useName = options.get(USERNAME);
        final String password = options.get(PASSWORD);
        final String exchangeName = options.get(EXCHANGE_NAME);
        final String queue = options.get(QUEUE);
        final int qos = options.get(QOS);
        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class,
                FactoryUtil.FORMAT);
        final DataType dataType =
                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
        return new RabbitmqDynamicTableSource(hosts, port, virtualHost, useName, password, queue, exchangeName, qos,decodingFormat,dataType);
    }

    @Override
    public String factoryIdentifier() {
        return FACTORY_IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HOSTS);
        options.add(PORT);
        options.add(QUEUE);
        options.add(VIRTUAL_HOST);
        options.add(USERNAME);
        options.add(PASSWORD);
        options.add(EXCHANGE_NAME);
        options.add(FORMAT);
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        return options;
    }
}
  • ScanTableSource 代码
public class RabbitmqDynamicTableSource implements ScanTableSource {

    private final String hostname;
    private final String virtualHost;
    private final String userName;
    private final String password;
    private final String queue;
    private final String exchangeName;
    private final int qos;
    private final DataType dataType;


    private final int port;

    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;


    public RabbitmqDynamicTableSource(String hostname,
                                      int port,
                                      String virtualHost,
                                      String useName,
                                      String password,
                                      String queue,
                                      String exchangeName,
                                      int qos,
                                      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
                                      DataType dataType) {
        this.hostname = hostname;
        this.port = port;
        this.virtualHost = virtualHost;
        this.userName = useName;
        this.password = password;
        this.queue = queue;
        this.exchangeName = exchangeName;
        this.qos = qos;
        this.decodingFormat = decodingFormat;
        this.dataType = dataType;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        // define that this format can produce INSERT and DELETE rows
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .build();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {

//        // create runtime classes that are shipped to the cluster
//        final SourceFunction<RowData> sourceFunction = new RabbitmqSourceFunction(
//                hostname,
//                port,
//                virtualHost,
//                userName,
//                password,
//                queue,
//                exchangeName,
//                qos);
        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(scanContext, dataType);
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost(hostname)
                .setPort(port)
                .setUserName(userName)
                .setPassword(password)
                .setPrefetchCount(qos)
                .setVirtualHost(virtualHost)
                .build();


        RMQSource source = new RMQSource(
                connectionConfig,
                queue,
                false,
                deserializer);

        return SourceFunctionProvider.of(source, false);
    }

    @Override
    public DynamicTableSource copy() {
        return null;
    }

    @Override
    public String asSummaryString() {
        return "rabbitmq table source";
    }
}

  • SPI 配置
    在这里插入图片描述
  • pom 文件 内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-sql-rabbitmq</artifactId>
    <version>6.0</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <packaging>jar</packaging>

    <name>flink-sql-rabbitmq-connector</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.15.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.15.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq</artifactId>
            <version>1.15.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

    </dependencies>
</project>

  • 上传到 阿里云 flink connector 中 flink sql 代码格式
CREATE TEMPORARY TABLE rabbitmq_source (
  colunn1 STRING,
  colunn2 STRING,
  colunn3 STRING
) WITH (
  'connector' = 'rabbitmq',
  'queue' = '',
  'hosts' = '',
  'port' = '',
  'virtual-host' = '',
  'username' = '',
  'password' = '',
  'exchange-name'='',
  'format'='json'
);

测试运行结果 符合预期

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/700416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进阶实战,接口自动化测试——requests文件上传/下载实战代码

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 如果需要发送文件…

record-4.网络

4、网络 技术栈&#xff1a; 1、计算机网络体系结构 OSI分层 &#xff08;7层&#xff09;&#xff1a;物理层、数据链路层&#xff08;网桥&#xff0c;交换机&#xff09;、网络层&#xff08;IP&#xff0c;ICMP&#xff0c;ARP&#xff09;、传输层&#xff08;TCP&…

springboot房屋管理系统

房屋管理系统 springboot房屋管理系统 java房屋管理系统 技术&#xff1a; 基于springboothtml房屋管理系统的设计与实现 运行环境&#xff1a; JAVA版本&#xff1a;JDK1.8 IDE类型&#xff1a;IDEA、Eclipse都可运行 数据库类型&#xff1a;MySql&#xff08;8.x版本都可…

微服务-Gradle的入门和使用

对于一个新的工程拉下来&#xff0c;如果该工程用了gradle。需要学习一下gradle项目管理工具。我在本机macbook M1的环境下操作。 一、配置安装 下载Gradle&#xff1a; https://services.gradle.org/distributions/ 我下载了6.9版本的Gradle 下载好了以后&#xff0c;放到了…

elasticsearch学习篇:初识ES

一、什么是ES 1、基础概念 是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff0c;可以帮助我们从海量数据中快速找到需要的内容es是elastic stack(ELK)的核心&#xff0c;负责存储、搜索、分析数据。 ELK包括以下内容&#xff1a; ELK被广泛应用在日志数据…

用python实现调用百度图片搜索的API

前言&#xff1a;这段代码是一个简单的图片爬虫程序它可以通过输入关键词&#xff0c;在百度图片中搜索相关图片&#xff0c;并返回一张随机的图片。代码使用Flask框架搭建了一个简单的Web应用&#xff0c;将用户输入的关键词作为参数传递给爬虫程序&#xff0c;然后从百度图片…

观察者模式(Observer)

别名 事件订阅者者&#xff08;Event-Subscriber&#xff09;监听者&#xff08;Listener&#xff09; 定义 观察者是一种行为设计模式&#xff0c;允许你定义一种订阅机制&#xff0c;可在对象事件发生时通知多个“观察”该对象的其他对象。 前言 1. 问题 假如你有两种类…

Linux vfs各种operation操作介绍

1.ext4文件系统定义的各种操作 //普通文件操作 const struct file_operations ext4_file_operations {.llseek ext4_llseek,.read_iter generic_file_read_iter,.write_iter ext4_file_write_iter,.unlocked_ioctl ext4_ioctl, #ifdef CONFIG_COMPAT.compat_ioctl …

【Python基础】- break和continue语句(文末送书4本)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

6 图像处理实现螺纹识别案例(matlab程序)

学习目的&#xff1a;学习识别案例掌握识别方法 2.代码 clear;clc;close all Iimread(luowen1.bmp); %读取螺纹图片 try Irgb2gray(I); %如果是RGB图&#xff0c;则转换成灰度图 catch end figure imshow(I) title(原图&#xff08;半边螺纹&#xff09;) f…

基于pyqt和卷积网络CNN的中文汉字识别

直接上效果演示图&#xff1a; 通过点击按钮可以实现在画板上写汉字识别和加载图片识别两个功能。 视频演示和demo仓库地址在b站视频001期&#xff1a; 到此一游7758258的个人空间-到此一游7758258个人主页-哔哩哔哩视频 所有代码展示&#xff1a; 十分的简洁&#xff0c;主…

【从零开始学习JAVA | 第二十六篇】泛型补充知识

目录 前言&#xff1a; 泛型的更多应用&#xff1a; 泛型类&#xff1a; 泛型方法&#xff1a; 泛型方法&#xff1a; 总结&#xff1a; 前言&#xff1a; 在上一篇文章中我们介绍了泛型的基础知识&#xff0c;也就是在创建集合的时候用到泛型&#xff0c;此时的泛型更多…

关于内存颗粒的地址映射

即便从软件角度&#xff0c;抛开地址译码器讨论内存颗粒中指定位置处的地址&#xff08;DDR中的指定位置的电容&#xff09;也是没有意义的。晶体管没有绝对地址&#xff0c;就如同地理测量中测定位置前需要确定坐标系一样&#xff0c;同一个位置在不同的坐标系中的地址描述可以…

【CVRP测评篇】 算法性能如何?来测!

我跨越了2100015秒的距离&#xff0c;为你送上更全面的算法性能评测。 目录 往期优质资源1 CVRP数据集2 实验准备2.1 计算机配置2.2 调参方法2.3 参数设定2.4 实验方法 3 实验结果3.1 最优解统计3.1.1各数据集上的算法性能对比3.1.2 求解结果汇总3.1.3小结一下3.1.4 还有话说 3…

使用 RedisTemplate 对象的 opsForValue() 方法获取 Redis 中的值获取不到

问题 使用 RedisTemplate 对象的 opsForValue() 方法获取 Redis 中的值获取不到 详细问题 笔者代码如下 1 使用 ValueOperations 对象的 set() 方法将一个键值对存储到 Redis 中 valueOperations.set("order:" user.getId() ":" goods.getId(), sec…

【Redis】2、Redis应用之【根据 Session 和 Redis 进行登录校验和发送短信验证码】

目录 一、基于 Session 实现登录(1) 发送短信验证码① 手机号格式后端校验② 生成短信验证码 (2) 短信验证码登录、注册(3) 登录验证① 通过 SpringMVC 定义拦截器② ThreadLocal (4) 集群 Session 不共享问题 二、基于 Redis 实现共享 session 登录(1) 登录之后&#xff0c;缓…

23款迈巴赫S480升级原厂10°后轮转向系统,减少转弯半径

就是低速的情况下&#xff0c;有更强的机动性&#xff0c;前后车轮的不同转动方向使得车辆可以凭借更更小转弯半径实现转向&#xff0c;在特定的狭窄路段或者停车时&#xff0c;车辆的操控性大大提升&#xff0c;而内轮差也缩小也增大了转向的安全性。 高速的情况下&#xff0…

C. Road Optimization(dp)

Problem - 1625C - Codeforces 火星政府不仅对优化太空飞行感兴趣&#xff0c;还希望改进该行星的道路系统。 火星上最重要的高速公路之一连接着奥林匹克城和西多尼亚的首都Kstolop。在这个问题中&#xff0c;我们只考虑从Kstolop到奥林匹克城的路线&#xff0c;而不考虑相反的…

技术创举!比亚迪-汉上的实景三维导航...

实景三维技术的发展日新月异&#xff0c;但在应用中却一直深陷内存占用、渲染缓慢、加载卡顿和模型塌陷等问题。对此&#xff0c;大势智慧率先推出海量数据轻量化技术&#xff0c;在业内首次实现实景三维模型在车机系统的直接浏览&#xff0c;展示了轻量化技术赋能实景三维应用…

面试Dubbo ,却问我和Springcloud有什么区别?

Dubbo 、Springcloud? 这两有关系&#xff1f; 前言一、RPC 框架的概念1. 什么是RPC框架2. RPC 和 普通通信 的区别 二、常用 RPC 框架1. Dubbo2. gRPC3. Thrift4. Feign 三、dubbo 与 Springcloud1. Dubbo 的模型2. Springcloud3. dubbo 与 Springcloud 的区别 前言 提到Dub…