大数据-玩转数据-Flink RedisSink

news2025/1/16 5:31:41

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
        KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer key) throws Exception {
                return key.intValue();
            }
        });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop100")
                .setPort(6379)
                .setMaxTotal(100)
                .setMaxIdle(10)
                .setMinIdle(2)
                .setTimeout(10*1000)
                .setDatabase(0)
                .setPassword("redis")
                .build();


        keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.SET);
            }

            @Override
            public String getKeyFromData(Integer integer) {
                return integer.toString();
            }

            @Override
            public String getValueFromData(Integer integer) {
                return integer.toString();
            }
        }));
        env.execute();
    }
}

可以根据要写入的redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/869359.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM 基础

巩固基础&#xff0c;砥砺前行 。 只有不断重复&#xff0c;才能做到超越自己。 能坚持把简单的事情做到极致&#xff0c;也是不容易的。 JVM 类加载机制 JVM 类加载机制分为五个部分&#xff1a;加载&#xff0c;验证&#xff0c;准备&#xff0c;解析&#xff0c;初始化&am…

手把手带你跑通网站上线全流程(一个简单的HTML和Python服务端完整上线流程)

我将向你介绍如何将一个网站部署到公网&#xff0c;包含完整流程。 前端静态网站 静态网站文件 首先需要准备一个简单的网页文件用于展示页面 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name&quo…

docker 安装elasticsearch、kibana

下载es镜像 docker pull elasticsearch 启动es容器 docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.typesingle-node" -e ES_JAVA_OPTS"-Xms512m -Xmx512m" -d elasticsearch 验证es界面访问 ​​​​​http://节点ip:9200/ ​…

三.SpringBoot整合Elasticsearch

SpringBoot整合Elasticsearch 前言一.java调用es的方式和工具二.java集成Elasticsearch-Rest-Client1.引入pom2.导入版本不一致问题3.编写配置类4.测试类4.1 执行前4.2 执行后 5.其他篇章 前言 我们整合es直接给es发请求就可以了&#xff0c;但是现在有很多方式去调用es的接口…

为什么需要智能指针?

为什么需要智能指针&#xff1f; 解决忘记释放内存导致内存泄漏的问题。解决异常安全问题。 #include<iostream> using namespace std;int div() {int a, b;cin >> a >> b;if (b 0)throw invalid_argument("除0错误");return a / b; } void Func(…

入门指南 | 如何系统搭建自己的营销战略学习体系成为领域专家?

独自进入一个行业&#xff0c;如果你没有几年的行业经验或者独特的营销方式&#xff0c;很难在行业里站住脚&#xff08;每个行业潜规则都很多&#xff09;。 每个行业都有周期&#xff0c;都有很多竞争对手&#xff0c;你扎进去一个具体的行业&#xff0c;对于各种资源有限的自…

Mysql 和Oracle的区别

、mysql与oracle都是关系型数据库&#xff0c;Oracle是大型数据库&#xff0c;而MySQL是中小型数据库。但是MySQL是开源的&#xff0c;但是Oracle是收费的&#xff0c;而且比较贵。 1 2 mysql默认端口&#xff1a;3306&#xff0c;默认用户&#xff1a;root oracle默认端口&…

FileZilla Server安装配置使用说明

作者&#xff1a;John 链接&#xff1a;https://www.zhihu.com/question/20577011/answer/2360828234 来源&#xff1a;知乎 第一步&#xff1a;右键点击”立即下载“ 第二步&#xff1a;服务器端点击&#xff0c;“windows平台”版本 第三步&#xff1a;这个安装最新的“Fi…

时序预测 | Matlab实现基于GRNN广义回归神经网络的电力负荷预测模型

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 时序预测 | Matlab实现基于GRNN广义回归神经网络的电力负荷预测模型 1.Matlab实现基于GRNN广义回归神经网络的电力负荷预测模型 2.单变量时间序列预测; 3.多指标评价,评价指标包括:R2、MAE、MBE等,代码质量极高…

由浅入深学习Tapable

文章目录 由浅入深学习TapableTapable是什么Tapable的Hook分类同步和异步的 使用Sync*同步类型钩子基本使用bailLoopWaterfall Async*异步类型钩子ParallelSeries 由浅入深学习Tapable webpack有两个非常重要的类&#xff1a;Compiler和Compilation。他们通过注入插件的方式&a…

机器学习笔记 - 基于PyTorch + 类似ResNet的单目标检测

一、获取并了解数据 我们将处理年龄相关性黄斑变性 (AMD) 患者的眼部图像。 数据集下载地址,从下面的地址中,找到iChallenge-AMD,然后下载。 Baidu Research Open-Access Dataset - DownloadDownload Baidu Research Open-Access Datasethttps://ai.baidu.com/bro…

2000-2021年全国31省经济高质量发展综合指数(熵值法)(含stata do文档计算代码)

2000-2021年全国31省经济高质量发展综合指数熵值法(包括数据和计算代码) 1、时间&#xff1a;2000-2021年 3、范围&#xff1a;31省 4、来源&#xff1a;整理自中经网和统计NJ 5、指标&#xff1a;GDP增长率、研发投入强度、投资效率、技术交易活跃度、需求结构、城乡结构、…

vue3+ts+vite手把手教你创建Vue3项目

概述 简介&#xff1a; 图文详解&#xff0c;带你使用 Vite 从零到一搭建 Vue3 项目&#xff0c;快速完成项目基建 技术栈&#xff1a; Vue3 TypeScirpt Vite Element-plus 内容&#xff1a; husky代码提交校验、router安装及模块化路由、动态路由siadebar封装、less/scss使…

GrapeCity Documents for Excel, .NET Crack

GrapeCity Documents for Excel, .NET 增加了对双面打印的支持。 GcExcel.NET支持PrintOutOptions类中的Duplex枚举&#xff0c;以启用/禁用页面上的双面打印。 枚举中有四个选项&#xff0c;用户可以相应地使用它们来打印工作簿&#xff1a; 双面打印。Default表示打印机的默认…

ElasticSearch安装与启动

ElasticSearch安装与启动 【服务端安装】 1.1、下载ES压缩包 目前ElasticSearch最新的版本是7.6.2&#xff08;截止2020.4.1&#xff09;&#xff0c;我们选择6.8.1版本&#xff0c;建议使用JDK1.8及以上。 ElasticSearch分为Linux和Window版本&#xff0c;基于我们主要学习…

EMQX物联网竟然用这个?(一)——简介

一、前言 我们这些年&#xff0c;“物联网”这个名称越来越被大家所知道了。 物联网 &#xff08;Internet of things&#xff09;&#xff0c;简称 IoT&#xff0c;这个概念在1991年就被漂亮国提出来了&#xff0c;解释一下就是万物可以通过互联网连接起来&#xff0c;可以进…

uniapp 官方扩展组件 uni-combox 实现:只能选择不能手写(输入中支持过滤显示下拉列表)

uniapp 官方扩展组件 uni-combox 实现&#xff1a;只能选择不能手写&#xff08;输入中支持过滤显示下拉列表&#xff09; uni-comboxuni-combox 原本支持&#xff1a;问题&#xff1a; 改造源码参考资料 uni-combox uni-combox 原本支持&#xff1a; 下拉选择。输入关键字&am…

MySQL 数据库文件的导入导出

目录 数据库的导出 导出整个数据库 导出数据库中的数据表 导出数据库结构 导出数据库中表的表结构 导出多个数据库 导出所有数据库 数据库的导入 数据库的导出 mysqldump -h IP地址 -P 端口 -u 用户名 -p 数据库名 > 导出的文件名 用管理员权限打开cmd进入MySQL的bi…

Java StringBuffer和StringBuilder类

由于String的不可更改特性&#xff0c;为了方便字符串的修改&#xff0c;Java中又提供StringBuilder和StringBuffer类。与String不同的是&#xff0c;StringBuffer和StringBuilder是对字符串本身进行修改&#xff0c;并且不产生新的对象&#xff0c;而String是产生新的字符串进…

中科亿海微浮点数转换定点数

引言 浮点数转换定点数是一种常见的数值转换技术&#xff0c;用于将浮点数表示转换为定点数表示。浮点数表示采用指数和尾数的形式&#xff0c;可以表示较大范围的数值&#xff0c;但存在精度有限的问题。而定点数表示则采用固定小数点位置的形式&#xff0c;具有固定的精度和范…