Flink之RedisSink

news2024/12/29 2:13:30

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhirapahce的开源项目,是专门给sparkflink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.

  • 下载源码包
    通过下图进入到GitHub
    在这里插入图片描述
    选择clonedownload源码都可以,如下图
    在这里插入图片描述
  • 编译源码包
    下载好源码后,maven会自动下载对应的依赖项
    • 删除不需要的子项目
      因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除
      在这里插入图片描述
    • 修改pom文件
      删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
      <!-- 这里只保留这一个模块就可以了 -->
      <modules>
          <module>flink-connector-redis</module>
      </modules>
      
      修改完成模块配置后,还需要修改对应的flinkscala版本依赖,这个根据自己实际的开发环境进行修改
       <properties>
          <!-- 修改这里的版本就可以 -->
          <!-- Flink version -->
          <flink.version>1.15.3</flink.version>
          <scala.binary.version>2.12</scala.binary.version>
          <scala.version>2.12.11</scala.version>
      </properties>
      
      这些都完成后就可以通过maven下载对应的依赖了.
  • 编译安装
    依赖下载完成后pom文件中可能会有几处是报错的状态,如下图
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    以上几处错误无需理会,不影响扩展包的编译.
    接下来通过maveninstall将扩展包编译并安装到本地的maven资源库,如下图
    在这里插入图片描述
    编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
        <!-- Redis connector -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis</artifactId>
            <version>1.2-SNAPSHOT</version>
        </dependency>
    
    上面依赖中groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了.
  • 代码
    其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
    这里我们要创建一个类实现RedisMapper
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/4
     * @Description: 测试
     **/
    public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
        @Override
        // 这个方法是选择使用哪种命令插入数据到Redis
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
        }
    
        @Override
        // 这个方法是选择哪个作为Key
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
    
        @Override
        // 这个方法是选择哪个作为Value
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }
    }
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/4
     * @Description: 测试
     **/
    public class FlinkRedisSink {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 这里使用的是自定义数据源为了方便测试
            DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
            // 将数据转换成Tuple的形式
            SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource
                                 .map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString()))
                                 .returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码
            // 配置Redis
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                    .setHost("127.0.0.1") // redis服务器地址
                    .setPassword("password") // redis密码
                    .build();
            // 添加Sink
            tuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
            env.execute("Redis Sink");
        }
    }
    
    到这里代码就结束了,具体应用根据实际业务需求进行更改.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/836397.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【word技巧】如何做到,批量保存word文档图片

Word文件中有很多图片都需要保存&#xff0c;除了一张张的进行图片另存为以外&#xff0c;我们还有其他方法可以批量一次性保存word文档中的图片嘛&#xff1f;今天分享两个方法&#xff0c;批量保存word文档图片。 方法一&#xff1a; 将文件进行另存为&#xff0c;在选择路…

Java版工程行业管理系统源码-专业的工程管理软件-em提供一站式服务

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目…

回归分析书籍推荐

回归分析在线免费书籍&#xff1a;I 1-ntroduction to Regression Methods for Public Health using R Introduction to Regression Methods for Public Health Using R 2-An Introduction to Statistical Learning https://hastie.su.domains/ISLR2/ISLRv2_website.pdf 可以…

安达发制造工业迈向智能化:APS高级计划排程助力提升生产效率

随着市场竞争的加剧&#xff0c;制造企业纷纷寻求提高生产效率和降低成本的方法。近年来&#xff0c;越来越多的制造企业开始采用APS(高级计划与排程)系统&#xff0c;以优化生产计划和排程&#xff0c;提高生产效率&#xff0c;并在竞争中取得优势。 现代制造业通常面临复杂的…

React diff 根据相对位置的 diff 算法

文章目录 diff 算法没有 key 时的diff通过 key 的 diff查找需要移动的节点移动节点添加新元素移除不存在的元素缺点 diff 算法 没有 key 时的diff 根据新旧列表的长度进行 diff 公共长度相同的部分直接patch新列表长度>旧列表长度则添加&#xff0c;否则删除 function pa…

yo!这里是STL::vector类简单模拟实现

目录 前言 重要接口模拟实现 默认成员函数 1.构造函数 2.析构函数 3.拷贝构造函数 4.赋值运算符重载 迭代器 简单接口 1.size() 2.capacity() 3.swap() 操作符重载 1.操作符[] 扩容接口 1.reserve() 2.resize() 增删查改接口 1.push_back() 2.pop_back() …

vue页面布局

布局 用element-plus自带的布局&#xff1b; 左边菜单 用他的Menu 菜单、自带收缩和展开&#xff1b;数据可以接口获取或者写死&#xff1b; 使用的如下操作、把主题和默认打开的index存到缓存中 头部&#xff1b; 简单的先分成左右&#xff1b;再简单的分成左右 1、左…

CS 144 Lab Four -- the TCP connection

CS 144 Lab Four -- the TCP connection TCPConnection 简述TCP 状态图代码实现完整流程追踪 测试 对应课程视频: 【计算机网络】 斯坦福大学CS144课程 Lab Three 对应的PDF: Lab Checkpoint 4: down the stack (the network interface) TCPConnection 简述 TCPConnection 需…

Python系统学习1-3-变量,运算符

1、变量 变量&#xff1a;关联一个对象的标识符 学习目标&#xff1a;学会画变量的内存图 命名规则:字母数字下划线&#xff0c;所有单词小写&#xff0c;单词之间下划线隔开 赋值&#xff1a;创建一个变量或改变一个变量关联的数据。 语法&#xff1a;变量名数据&#xf…

vue运行在IE浏览器空白报错SCRIPT1006: 缺少‘)‘ -【vue兼容IE篇】

其他浏览器均正常&#xff0c;但是切换ie模式&#xff0c;打开空白&#xff0c;F12打开报错缺少‘)‘ &#xff0c;如下图 在搜狗浏览器下点开报错&#xff1a;定格在crypto-js处 解决&#xff1a; 步骤一&#xff1a;使用npm安装babel-polyfill 依赖&#xff08;已安装了可忽…

Java与Kotline Funcation函数与参数函数的详解

一.介绍 在现在以IDE为开发工具的时代&#xff0c;各种开发语言都有&#xff0c;kotlin的语法势头比较强&#xff0c;今天我们将介绍在项目中出现比较多的两种函数&#xff0c;一种是参数函数&#xff0c;还有一种就是Function函数 如果你不了匿名函数请阅读以下文档&#xff…

IT 运营分析 (ITOA)

IT 运营 &#xff08;ITOps&#xff09; 是指向组织实施、管理、交付和支持 IT 服务&#xff0c;ITOps 可帮助组织维护和运行所需的所有技术工具&#xff0c;以保持业务活动以最高质量正常运行&#xff0c;同时降低成本。 一些常见的 ITOps 过程是&#xff1a; 问题整改&…

el-table 去掉边框(修改颜色)

原始&#xff1a; 去掉表格的border属性&#xff0c;每一行下面还会有一条线&#xff0c;并且不能再拖拽表头 为了满足在隐藏表格边框的情况下还能拖动表头&#xff0c;修改相关css即可&#xff0c;如下代码 <style lang"less"> .table {//避免单元格之间出现白…

Clickhouse 优势与部署

一、clickhouse简介 1.1 clickhouse介绍 ClickHouse的背后研发团队是俄罗斯的Yandex公司&#xff0c;2011年在纳斯达克上市&#xff0c;它的核心产品是搜索引擎。我们知道&#xff0c;做搜索引擎的公司营收非常依赖流量和在线广告&#xff0c;所以做搜索引擎的公司一般会并行推…

【LeetCode-简单】剑指 Offer 52. 两个链表的第一个公共节点

题目 输入两个链表&#xff0c;找出它们的第一个公共节点。 如下面的两个链表&#xff1a; 在节点 c1 开始相交。 输入&#xff1a;intersectVal 8, listA [4,1,8,4,5], listB [5,0,1,8,4,5], skipA 2, skipB 3 输出&#xff1a;Reference of the node with value 8 输…

想参加华为杯竞赛、高教社杯和数学建模国赛的小伙伴看过来

本文目录 ⭐ 赛事介绍⭐ 辅导比赛 ⭐ 赛事介绍 ⭐ 参赛好处 ⭐ 辅导比赛 ⭐ 写在最后 ⭐ 赛事介绍 华为杯全国研究生数学建模竞赛是由华为公司主办的一项面向全国研究生的数学建模竞赛。该竞赛旨在通过实际问题的建模和解决&#xff0c;培养研究生的创新能力和团队合作精神&a…

【ASP.NET MVC】使用动软(四)(12)

一、筛选器类和Cookie实现路由 需解决的问题&#xff1a; 网站登录往往需要用户名密码验证&#xff0c;为避免重复验证&#xff0c;一般采用Cookie 、Session等技术来保持用户的登录状态&#xff1a; Session是在服务端保存的一个数据结构&#xff0c;用来跟踪用户的状态&…

EtherCAT转MODBUS RTU/RS485/232总线协议网关

产品功能 JM-ECT-RTU是一款EtherCAT从站功能的通讯网关。该产品主要功能是将EtherCAT网络和MODBUS-RTU网络连接起来。 JM-ECT-RTU网关连接到EtherCAT总线中作为从站使用&#xff0c;连接到MODBUS-RTU总线中作为主站或从站使用。 本网关产品将基于MODBUS 的设备或串行RS-232/…

10分钟理解React生命周期

前言 学习React&#xff0c;生命周期很重要&#xff0c;我们了解完生命周期的各个组件&#xff0c;对写高性能组件会有很大的帮助。 一、简介 React /riˈkt/ 组件的生命周期指的是组件从创建到销毁过程中所经历的一系列方法调用。这些方法可以让我们在不同的时刻执行特定的…

科班应届生,我选择来黑马提升技能!

不论是因为对未来的迷茫和焦虑&#xff0c;还是对生活的现状不满意&#xff0c;又或者是想完善自己的专业知识&#xff0c;亦或是跨界迎接新的挑战&#xff0c;都可以来黑马…… 学科 | JavaEE 校区 | 武汉 薪资 | 10k&#xff08;应届生&#xff09; 黑马程序员的学弟、学妹…