【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

news2025/1/24 2:14:00

🚀 作者 :“大数据小禅”

🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


目录导航

      • Flink怎么操作Redis
      • Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

Flink怎么操作Redis

  • Flink怎么操作redis?

    • 方式一:自定义sink
    • 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

    • getCommandDescription 选择对应的数据结构和key名称配置
    • getKeyFromData 获取key
    • getValueFromData 获取value
  • 使用

    • 添加依赖
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  • 编码

    public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Integer> value) {
            return value.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Integer> value) {
            return value.f1.toString();
        }
    }
    

Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

  • Redis环境说明 redis6

    • 使用docker部署redis6.x 看个人主页docker相关文章

      docker run -d  -p 6379:6379 redis
      
  • 编码实战

数据源

public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {


    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }


    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }


    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式与存取的方法

public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {


    /***
     * 选择需要用到的命令,和key名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    /**
     * 获取对应的key或者filed
     *
     * @param data
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {

        System.out.println("getKeyFromData=" + data.f0);
        return data.f0;
    }

    /**
     * 获取对应的值
     *
     * @param data
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        System.out.println("getValueFromData=" + data.f1.toString());
        return data.f1.toString();
    }
}

落地

public class Flink07RedisSinkApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //数据源 source
//        DataStream<VideoOrder> ds = env.fromElements(
//                new VideoOrder("21312","java",32,5,new Date()),
//                new VideoOrder("314","java",32,5,new Date()),
//                new VideoOrder("542","springboot",32,5,new Date()),
//                new VideoOrder("42","redis",32,5,new Date()),
//                new VideoOrder("4252","java",32,5,new Date()),
//                new VideoOrder("42","springboot",32,5,new Date()),
//                new VideoOrder("554232","flink",32,5,new Date()),
//                new VideoOrder("23323","java",32,5,new Date())
//        );
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());



        //transformation
       DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                return new Tuple2<>(value.getTitle(),1);
            }
        });



//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                out.collect(new Tuple2<>(value.getTitle(),1));
//            }
//        });


       //分组
        KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //统计每组有多少个
        DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);

        //控制台打印
        sumDS.print();

        //单机redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));


        //DataStream需要调用execute,可以取个名称
        env.execute("custom redis sink job");
    }

}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1006821.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

宿舍考勤安全系数?这个答案全国统一!

在现代教育和住宿管理中&#xff0c;确保学生或员工的宿舍考勤管理变得越来越重要。传统的考勤方法可能受到人为错误和滥用的威胁&#xff0c;同时也可能耗费大量时间和资源。 人脸识别技术已经在各个领域展现了强大的潜力。它不仅提高了安全性&#xff0c;还改善了考勤管理的效…

GP08|财务估值因子过滤实盘小市值

量化策略开发&#xff0c;高质量社群&#xff0c;交易思路分享等相关内容 大家好&#xff0c;今天我们来分享gp08策略。千呼万唤始出来&#xff0c;由于xxx原因&#xff08;不便说&#xff0c;好奇的可以私聊我&#xff09;&#xff0c;我们从9月份开始&#xff0c;后面分享的策…

深入解析顺序表:揭开数据结构的奥秘,掌握顺序表的精髓

&#x1f493; 博客主页&#xff1a;江池俊的博客⏩ 收录专栏&#xff1a;数据结构探索&#x1f449;专栏推荐&#xff1a;✅C语言初阶之路 ✅C语言进阶之路&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f525;编译环境&#xff1a;Visual Studio 2022&#x1f38…

包装类、多线程的基本使用

包装类 1.基本数据类型对应的引用数据类型(包装类) 1.概述:所谓的包装类就是基本类型对应的类(引用类型),我们需要将基本类型转成包装类,从而让基本类型具有类的特性(说白了,就是将基本类型的数据转成包装类,就可以使用包装类中的方法来操作此数据)2.为啥要学包装类:a.将来有…

springboot上传文件到后台指定文件夹

第一步&#xff0c;在application.yml做一下配置&#xff0c;预设下载目录 files:upload:path: D:/SpringBootItem/springboot/files/ 其中有用到hutool工具依赖&#xff0c;如下在pom.xml中添加依赖&#xff0c;也可以选择不添加&#xff0c;自己修改下Controller中的代码即可…

批量采集头条号文章的工具-让我们更好地辨别信息好坏

大家好&#xff0c;今天我想和大家聊一聊一个在互联网时代备受瞩目的话题&#xff0c;那就是批量采集头条号的文章。作为一个热衷于信息获取和分享的人&#xff0c;我深知这一领域的挑战和机遇。 让我们来谈谈批量采集头条号的文章所面临的挑战。随着互联网的迅猛发展&#xff…

亚马逊API接口解析,实现获得AMAZON商品详情

要解析亚马逊API接口并实现获取亚马逊商品详情&#xff0c;你需要按照以下步骤进行操作&#xff1a; 了解亚马逊开发者中心&#xff1a;访问亚马逊开发者中心&#xff0c;并了解相关的API文档、开发者指南和规定。注册开发者账号&#xff1a;在亚马逊开发者中心上注册一个开发…

Java“牵手”京东商品详情数据,京东商品详情接口,京东API接口申请指南

京东商品详情API是京东平台提供给开发者的应用程序编程接口&#xff0c;通过API可以获取京东平台上商品详情信息。 京东商品详情API可以获取到商品的标题、价格、销量、评价、详情页等信息。开发者在京东开放平台注册开发者账号&#xff0c;并获得访问API接口的密钥后&#xf…

代码随想录算法训练营day46|139.单词拆分|多重背包基础理论| 背包总结

139.单词拆分 力扣题目链接 给定一个非空字符串 s 和一个包含非空单词的列表 wordDict&#xff0c;判定 s 是否可以被空格拆分为一个或多个在字典中出现的单词。 说明&#xff1a; 拆分时可以重复使用字典中的单词。 你可以假设字典中没有重复的单词。 示例 1&#xff1a…

关于批量安装多个apk

for %i in (apks地址/*.apk); do adb install %i https://www.cnblogs.com/lihongtaoya/p/15084378.html adb install -r apks地址/1.apk && adb install -r apks地址/2.apk install-multi-package - 暂时nok https://adbshell.com/commands 最新版本的platform-tool…

Dinky上路之旅

1、部署flink集群 1.1、flink-conf.yaml cat > flink-conf.yaml << EOF jobmanager.rpc.address: boshi-146 jobmanager.rpc.port: 6123 jobmanager.bind-host: 0.0.0.0 jobmanager.memory.process.size: 1600m taskmanager.bind-host: 0.0.0.0 # 修改为本机ip tas…

今日宜分享:科技十足主页面的高校官网颜值排行榜

科技十足主页面的高校官网颜值排行榜 全国985名单&#xff08;最新&#xff09;1. 北京&#xff08;8所&#xff09;2. 上海&#xff08;4所&#xff09;3. 湖南&#xff08;3所&#xff09;4. 陕西&#xff08;3所&#xff09;5. 湖北&#xff08;2所&#xff09;6. 山东&…

postgresql如何关闭自动提交设置

问题&#xff1a;代码运行报错&#xff1a; Caused by: org.postgresql.util.PSQLException: Cannot commit when autoCommit is enabled. 解决方法&#xff1a;修改postgresql配置文件&#xff0c;关闭自动提价设置

机器学习:PCA(Principal Component Analysis主成分)降维

参考&#xff1a;PCA降维原理 操作步骤与优缺点_TranSad的博客-CSDN博客 PCA降维算法_偶尔努力翻身的咸鱼的博客-CSDN博客 需要提前了解的数学知识&#xff1a; 一、PCA的主要思想 PCA&#xff0c;即主成分分析方法&#xff0c;是一种使用最广泛的数据降维算法。PCA的主要思想…

centerOS连不上网解决办法

1、检查路由 route -n如果你缺失第一个路由&#xff0c;是肯定无法ping通外网的。 2、添加dns # vim /etc/resolv.conf nameserver 8.8.8.83、在/etc/resolv.conf文件添加路由 route add default gw 192.168.0.14、重启网卡 service network restart

KT142C-sop16语音芯片的4个IO口如何一对一触发播放_配置文件详细说明

目录 KT142C是一个提供串口的SOP16语音芯片&#xff0c;完美的集成了MP3的硬解码。内置330KByte的空间&#xff0c;最大支持330秒的语音长度&#xff0c;支持多段语音&#xff0c;支持直驱0.5W的扬声器无需外置功放 如上图&#xff0c;芯片有4个IO口可以一对一&#xff0c;详…

服务器数据恢复-热备盘同步过程中硬盘离线的RAID5数据恢复案例

服务器数据恢复环境&#xff1a; 华为OceanStor某型号存储&#xff0c;11块硬盘组建了一组RAID5阵列&#xff0c;另外1块硬盘作为热备盘使用。基于RAID5阵列的LUN分配给linux系统使用&#xff0c;存放Oracle数据库。 服务器故障&#xff1a; RAID5阵列1块硬盘由于未知原因离线…

线程详细解析

本文重点: 目录 什么是线程? 线程共享和非共享资源 线程的优缺点 多线程 线程池 Java创建线程池 什么是线程? 线程是操作系统调度和执行的基本单位,线程和进程一样也有PCB 一个进程必定会有一个线程 在Linux内核中是不会区分进程和线程的,只在用户层面区分 线程共享…

新手询问想要成功学好嵌入式开发有什么建议吗?

今日话题&#xff0c;想要成功学好嵌入式开发有什么建议吗&#xff1f;想要学好的话选择一门合适的编程语言是关键。虽然嵌入式开发支持多种语言&#xff0c;但C和C仍然是最常用的。如果你是初学者&#xff0c;从学习C语言开始是一个不错的选择。它相对容易学习&#xff0c;而且…