【自定义Source、Sink】Flink自定义Source、Sink对redis进行读写操作

news2024/12/26 23:47:00

使用ParameterTool读取配置文件

Flink读取参数的对象

  1. Commons-cli: Apache提供的,需要引入依赖
  2. ParameterTool:Flink内置

ParameterTool 比 Commons-cli 使用上简便;

ParameterTool能避免Jar包的依赖冲突

建议使用第二种

使用ParameterTool对象可以直接获取配置文件中的信息,需要如下依赖

        <!-- Flink基础依赖 【ParameterTool类 在该依赖中】 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
        </dependency>
        <!-- Flink流批处理依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        </dependency>

Java读取资源的方式

  1. Class.getResourceAsStream(Path):Path 必须以 “/”,表示从ClassPath的根路径读取资源
  2. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”,默认从ClassPath的根路径读取资源

推荐使用第2种,以类加载器的方式获取静态资源文件,不要通过ClassPath的相对路径查找

最基本的工具类

public class ParameterUtil {
    	// 创建 ParameterTool 对象
        public static ParameterTool getParameters() {

        // 读取 resources 文件夹下 "flink.properties" 文件
        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        try {
            return ParameterTool.fromPropertiesFile(inputStream);
        } catch (Exception e) {
            throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
        }
    }
}

image-20231209095849541

可以通过 ParameterUtil.getParameters().get("redis.port") 直接读取key对应的value值

Flink写入Redis方式

  1. 继承RichSinkFunction (Flink-Stream)
  2. 使用第3方的包 (Apache-Bahir-Flink)

Apache-Bahir-Flink 的 Redis-Connector的缺点:

  1. 使用Jedis, 没有使用Lettuce
  2. 没有对 Flink Table/SQL Api 的支持

不少基于bahir二开的例子解决了上述问题

gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search

github地址:https://github.com/apache/bahir-flink

bahir 集成了许多连接器,其中就包含Redis

image-20231209103659812

Flink官网上也可以看到bahir的影子

image-20231209104014483

方便起见,接下来就基于bahir,Flink写入Redis集群

基于巴希尔(Bahir)-Flink写入Redis集群

引入connector连接器依赖

        <!-- Flink-Connector-Redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
        </dependency>

依赖版本定义在父模块中

image-20231209100449996

实现RedisMapper接口自定义Sink

首先实现RedisMapper接口并指定泛型——处理元素的类型

/**
 * 基于apache bachir flink的RedisSink,作用于Redis String数据类型
 */
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {

    /**
     * 指定Redis的命令
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        /* **********************
         *
         * 如果Redis的数据类型是 hash 或 z-Set
         * RedisCommandDescription 的构造方法必须传入 additionalKey
         * additionalKey就是Redis的键
         *
         * *********************/
        return new RedisCommandDescription(RedisCommand.SET);
    }

    /**
     * 从数据流里获取Key值
     */
    @Override
    public String getKeyFromData(Tuple2<String, String> input) {
        return input.f0;
    }

    /**
     * 从数据流里获取Value值
     */
    @Override
    public String getValueFromData(Tuple2<String, String> input) {
        return input.f1;
    }
}

写入Redis工具类

public class RedisWriteUtil {

    /* **********************
     *
     * FlinkJedisClusterConfig:集群模式
     * FlinkJedisPoolConfig:单机模式
     * FlinkJedisSentinelConfig:哨兵模式
     *
     * *********************/

    // Jedis配置
    private static final FlinkJedisClusterConfig JEDIS_CONF;

    static {
        ParameterTool parameterTool = ParameterUtil.getParameters();
        String host = parameterTool.get("redis.host");
        String port = parameterTool.get("redis.port");

        /* **********************
         *
         * InetSocketAddress 是Java的套接字
         *
         * *********************/
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));

        Set<InetSocketAddress> set = new HashSet<>();
        set.add(inetSocketAddress);
        JEDIS_CONF = new FlinkJedisClusterConfig
                .Builder()
                .setNodes(set)
                .build();
    }


    /**
     * 基于Bahir写入Redis,Redis的数据是String类型
     */
    public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {
        input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));
    }

}

测试一下

class RedisWriteUtilTest {

    @DisplayName("测试基于Bahir写入Redis,Redis数据类型是String类型")
    @Test
    void writeByBahirWithString() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));
        RedisWriteUtil.writeByBahirWithString(dataStream);

        env.execute();
    }
}

非常完美!写入成功

image-20231209105850707

Flink读取Redis方式

  1. 继承RichSourceFunction (实现自定义Source)
  2. 继承RichParallelSourceFunction (实现自定义Source)【可以指定并行度】
  3. 实现SourceFunction接口 (实现自定义Source)

RichParallelSourceFunction 和 RichSourceFunction区别

RichParallelSourceFunction 可以设置并行度

RichParallelSourceFunction 和 RichSourceFunction 代码是可以互相套用

RichParallelSourceFunction 默认的并行度是cpu 的 核心数(core数)

RichSourceFunction 的并行度只能是1

继承RichSourceFunction类-Flink读取Redis集群

前置准备

定义枚举类

Redis数据类型枚举类

@Getter
public enum RedisDataType {

    STRING,
    HASH,
    LIST,
    SET,
    SORTED_SET,
    ;

    RedisDataType() {
    }
}

定义Redis命令的枚举类,便于Source判断操作

@Getter
public enum RedisCommand {

    // get string
    GET(RedisDataType.STRING);

    private final RedisDataType redisDataType;

    RedisCommand(RedisDataType redisDataType) {
        this.redisDataType = redisDataType;
    }
}

Jedis配置类

bahir依赖中自带jedis依赖一般不用,自行引入jedis,jedis依赖版本要与巴希尔中jedis版本保持一致

image-20231209111800457

public class JedisConf {

    public static JedisCluster getJedisCluster() throws IOException {

        ParameterTool parameterTool =
                ParameterUtil.getParameters();
        String host = parameterTool.get("redis.host");
        String port = parameterTool.get("redis.port");

        /* **********************
         * Jedis对象
         *
         * JedisPool : 用于redis单机版
         * JedisCluster: 用于redis集群
         *
         * JedisCluster对象能够自动发现正常的redis节点
         *
         * *********************/

        HostAndPort hostAndPort = new HostAndPort(
                host,
                Integer.parseInt(port)
        );
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(hostAndPort);

        return new JedisCluster(nodes);

    }
}

封装Jedis对象的redis方法

封装Jedis对象的redis方法,方便统一调用和维护

public class JedisBuilder {

    private JedisCluster jedis = null;

    public JedisBuilder(JedisCluster jedisCluster) {
        this.jedis = jedisCluster;
    }

    public void close() {
        if (this.jedis != null) {
            this.jedis.close();
        }
    }

    /**
     * Redis的Get方法
     */
    public String get(String key) {
        return jedis.get(key);
    }
}

自定义Source

Redis数据的映射对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {

    private String data;
    
}

Flink 自定义Redis Source读取Redis

/* **********************
 * 【富函数类】 比函数类提供了更多函数生命周期,提供了获取上下文的方法
 * 富函数类通常是抽象类
 * *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {

    /**
     * Jedis对象
     */
    private JedisBuilder jedisBuilder;

    /**
     * Redis命令枚举对象
     */
    private final RedisCommand redisCommand;

    /**
     * redis key
     */
    private final String key;

    public RedisSource(RedisCommand redisCommand, String key) {
        this.redisCommand = redisCommand;
        this.key = key;
    }

    /**
     * volatile 修饰的变量,它的更新都会通知其他线程.
     */
    private volatile boolean isRunning = true;

    /**
     * Redis的连接初始化
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        JedisCluster jedisCluster = JedisConf.getJedisCluster();
        jedisBuilder = new JedisBuilder(jedisCluster);
    }

    /**
     * Redis数据的读取
     */
    @Override
    public void run(SourceContext<RedisPO> output) throws Exception {

        /* **********************
         *
         * 一直监听Redis数据的读取
         *
         * *********************/

        String data = null;
        // while (isRunning) {

        switch (redisCommand.getRedisDataType()) {
            case STRING:
                data = jedisBuilder.get(key);
        }

        output.collect(new RedisPO(data));
        // }

    }
    
    @Override
    public void cancel() {
        this.isRunning = false;
    }

}

读取Redis工具类

public class RedisReadUtil {

    public static DataStream<RedisPO> read(
            StreamExecutionEnvironment env,
            RedisCommand redisCommand,
            String key) {
        return env.addSource(new RedisSource(redisCommand, key));
    }
}

测试一下

class RedisReadUtilTest {

    @DisplayName("测试自定义Source读取Redis,Redis数据类型是String类型")
    @Test
    void testReadByCustomSourceWithString() throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<RedisPO> dataStream = RedisReadUtil.read(
                env,
                RedisCommand.GET,
                "k"
        );

        dataStream.print();
        env.execute();
    }
}

测试成功!

image-20231209113539037

Flink如何自定义Source/Sink

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1304404.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

038.Python面向对象_三大特性综合案例1

我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448; 入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448; 虚 拟 环 境 搭 建 &#xff1a;&#x1f449;&…

书-二分查找找某个数字p155

#include<stdio.h> int main(){int a[10]{1,4,5,6,7,8,23,34,90,14567};int mid;int low0;int high9;while(low<high){mid(lowhigh)/2;//数组分成两段&#xff0c;前一段low-mid&#xff0c;后一段mid-highif (a[mid]<23)//因为已经是排序好的了&#xff0c;所以如…

Android获取Wifi网关

公司有这样一个应用场景&#xff1a;有一台球机设备&#xff0c;是Android系统的&#xff0c;它不像手机&#xff0c;它没有触摸屏幕&#xff0c;所以我们对球机的操作很不方便&#xff0c;于是我们搞这样一个设置&#xff1a;点击球机电源键5次分享出一个热点&#xff0c;然后…

高德地图已达成全球最大规模车道级导航覆盖

近日&#xff0c;高德地图宣布旗下基于北斗卫星导航系统、深度学习模型、惯性导航、泛在信号等前沿技术研发的车道级导航服务&#xff0c;截止目前已支持在国内99%以上的城市和乡镇道路使用。 这意味着即日起&#xff0c;用户在驾车导航时&#xff0c;无论是在城市还是乡镇&am…

AI大模型专题报告:AI大模型及应用加速落地,持续带动算力产业链发展

今天分享的AI系列深度研究报告&#xff1a;《AI大模型专题报告&#xff1a;AI大模型及应用加速落地&#xff0c;持续带动算力产业链发展》。 &#xff08;报告出品方&#xff1a;长城证券&#xff09; 报告共计&#xff1a;23页 1.行业观点 在 TMT 各子板块&#xff1a;电子…

算法设计复习题

一、选择 1.算法要对异常情况进行适当的处理&#xff0c;就是算法的&#xff08;&#xff09;。 A、正确性 B、可用性 C、健壮性 D、可行性 2.&#xff08; &#xff09;指的是算法中描述的操作都可以通过已经实现的基本操作运算有限次实现。 A、可靠性 B、正确性 C、有效性 …

什么是shell?

系统内核是操作系统的基本组成部分&#xff0c;它负责管理系统的硬件和软件资源&#xff0c;并提供一组基本的系统服务。内核是操作系统的核心&#xff0c;控制着计算机的所有主要功能&#xff0c;包括内存管理、进程管理、设备驱动程序、系统调用和安全防护等。内核在计算机中…

python实战教学之python版“张万森,好久不见”

前言 WINTER IS COMING 最近《一闪一闪亮星星》的电影在火热预售中&#xff0c;家人们抢到票了嘛&#xff0c;前两天小编写了一篇“张万森&#xff0c;下雪了”的文章后&#xff0c;收到了不少小伙伴的反馈&#xff1a;“代码的运行结果只有文字&#xff0c;没有雪花啊”&#…

腾讯-轻量应用服务器centos7中宝塔安装MySQL8.0出现内存不足

目录 前言 出现的问题: 解决方法&#xff1a; 编译安装&#xff1a; 极速安装 其他 我的其他博客 前言 说实话&#xff0c;本人也就是个穷学生买不起啥大的服务器啥的&#xff0c;整了个2核 2内存的服务器 用宝塔按mysql5.5是没问题的&#xff0c;一切换8.0就提醒内存不足…

【NLP】RAG 应用中的调优策略

​ 检索增强生成应用程序的调优策略 没有一种放之四海而皆准的算法能够最好地解决所有问题。 本文通过数据科学家的视角审视检索增强生成&#xff08;RAG&#xff09;管道。它讨论了您可以尝试提高 RAG 管道性能的潜在“超参数”。与深度学习中的实验类似&#xff0c;例如&am…

OpenAI 在中国申请 GPT-6、GPT-7 商标;Google 推迟发布 OpenAI 竞品 Gemini 至明年 1 月【无际Ai资讯】

天眼查 app 显示&#xff0c;近日&#xff0c;欧爱运营有限责任公司&#xff08;OPENAI OPCO&#xff0c;LLC&#xff09;申请多枚「GPT-6」「GPT-7」商标&#xff0c;国际分类为科学仪器、网站服务&#xff0c;当前商标状态均为等待实质审查。 此前 OpenAI CEO Sam Altman 在接…

若依源码分析

一.登录 1.1 生成验证码 基本思路 后端生成一个表达式,74?11 74?转成图片,传到前端进行展示 将结果11存入redis 前端代码实现: 请求后端地址:http://localhost/dev-api/captchaImage,通过反向代理解决前后端跨域问题,将请求路径变为:http://localhost:8080/captchaImag…

音频接口芯片GC8416,GC1809,GC8418的芯片描述与比较分析

GC8416 192KHZ数字音频接收芯片 工作电流在32~192之间&#xff0c;可替代CS8416/CIRRUS LOGIC GC1809&#xff0c;GC8418都是24位&#xff0c;192KHZ音频接收芯片 工作电流最高可达192 可替代MS8413

大数据驱动下的人口普查:新时代下的新变革

人口普查数据大屏&#xff0c;是指一种通过大屏幕显示人口普查数据的设备&#xff0c;可以将人口普查数据以可视化的形式呈现出来&#xff0c;为决策者提供直观、准确的人口数据。这种大屏幕的出现&#xff0c;让人口普查数据的利用变得更加高效、便捷。 如果您需要制作一张直观…

Redis7--基础篇8(集群cluster)

1. 集群&#xff08;cluster&#xff09;介绍 由于数据量过大&#xff0c;单个Master复制集难以承担&#xff0c;因此需要对多个复制集进行集群&#xff0c;形成水平扩展每个复制集只负责存储整个数据集 的一部分&#xff0c;这就是Redis的集群&#xff0c;其作用是提供在多个…

Android工程怎么调用C/C++代码(保姆级别,每一步截图+讲解)?

文章目录 背景新建工程拷贝/编写C/C代码编写CMake配置文件写Java代码加载动态/静态库java转换c&#xff0c;c转javanative层打印日志Android去调用Java层的native方法对外提供.so/.a库 jar包检查APK里面是否已经被正常包含.so/.a完成 背景 突然想起做了这么久的JNI开发&#…

(新手必看)自定义数据传输通信协议+STM32代码详解

前言 本篇博客主要学习和了解一些单片机协议的格式&#xff0c;在对传输大数据或者要求准确性的时候&#xff0c;都需要通过协议来发送接收&#xff0c;下面通过了解协议的基本构成和代码来分析和实现协议的发送和接收。本篇博客大部分是自己收集和整理&#xff0c;如有侵权请联…

图的搜索(二):贝尔曼-福特算法、狄克斯特拉算法和A*算法

图的搜索&#xff08;二&#xff09;&#xff1a;贝尔曼-福特算法、狄克斯特拉算法和A*算法 贝尔曼-福特算法 贝尔曼-福特&#xff08;Bellman-Ford&#xff09;算法是一种在图中求解最短路径问题的算法。最短路径问题就是在加权图指定了起点和终点的前提下&#xff0c;寻找从…

【教3妹学编程-算法题】下一个更大元素 IV

3妹&#xff1a;“太阳当空照&#xff0c;花儿对我笑&#xff0c;小鸟说早早早&#xff0c;你为什么背上炸药包” 2哥 :3妹&#xff0c;什么事呀这么开发。 3妹&#xff1a;2哥你看今天的天气多好啊&#xff0c;阳光明媚、万里无云、秋高气爽&#xff0c;适合秋游。 2哥&#x…

计算机网络:应用层(一)

我最近开了几个专栏&#xff0c;诚信互三&#xff01; > |||《算法专栏》&#xff1a;&#xff1a;刷题教程来自网站《代码随想录》。||| > |||《C专栏》&#xff1a;&#xff1a;记录我学习C的经历&#xff0c;看完你一定会有收获。||| > |||《Linux专栏》&#xff1…