flink 批量压缩redis集群 sink

news2024/9/20 5:35:18

idea maven依赖

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>


import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import redis.clients.jedis.*
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream


class RedisSink : RichSinkFunction<Data>() {
    private lateinit var jedisCluster: JedisCluster

    override fun open(parameters: Configuration) {
        super.open(parameters)
        val jedisPoolConf = JedisPoolConfig()
        jedisPoolConf.maxTotal = 128// 最大连接数
        jedisPoolConf.maxIdle = 50// 最大空闲连接数
        jedisPoolConf.testOnBorrow = true // 当调用 borrow Object方法时,是否进行有效性检查

        // 集群模式
        val nodes = HashSet<HostAndPort>()

        val hostAndPort1 = HostAndPort("h1", port)
        val hostAndPort2 = HostAndPort("h2", port)
        val hostAndPort3 = HostAndPort("h3", port)
        nodes.add(hostAndPort1)
        nodes.add(hostAndPort2)
        nodes.add(hostAndPort3)

        jedisCluster = JedisCluster(nodes, 100000, 100000, 2, "password", jedisPoolConf)
    }

    override fun invoke(value: Data, context: SinkFunction.Context<*>?) {
        val key = "data" + value.x+ value.y

 

            val outputBts = ByteArrayOutputStream()
            val gzip = GZIPOutputStream(outputBts)
            gzip.write(value.toByteArray())
            gzip.flush()
            gzip.finish()

            jedisCluster.set(key.toByteArray(), outputBts.toByteArray())
            jedisCluster.expire(key.toByteArray(),15552000)
        
    }

    override fun close() {
        super.close()
        jedisCluster.close()
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2148070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

由于安全风险,安全领导者考虑禁止人工智能编码

安全团队与开发团队之间的紧张关系 83% 的安全领导者表示&#xff0c;他们的开发人员目前使用人工智能来生成代码&#xff0c;57% 的人表示这已成为一种常见做法。 然而&#xff0c;72% 的人认为他们别无选择&#xff0c;只能允许开发人员使用人工智能来保持竞争力&#xff0…

【Unity】对象池 - 未更新完

自定义泛型对象池 文章目录 自定义泛型对象池封装泛型类例子 使用Unity自带对象池 封装泛型类 public abstract class MyPool<T> : MonoBehaviour where T :Component {[SerializeField] protected T prefab; // 生成的预制体[SerializeField] protected int defaultNum…

Delphi5利用DLL实现窗体的重用

文章目录 效果图参考利用DLL实现窗体的重用步骤1 设计出理想窗体步骤2 编写一个用户输出的函数或过程&#xff0c;在其中对窗体进行创建使它实例化步骤3 对工程文件进行相应的修改以适应DLL格式的需要步骤4 编译工程文件生成DLL文件步骤5 在需要该窗体的其他应用程序中重用该窗…

8.sklearn-模型保存

文章目录 环境配置&#xff08;必看&#xff09;头文件引用1.保存模型代码工程运行结果生成文件 2.加载模型代码工程运行结果 环境配置&#xff08;必看&#xff09; Anaconda-创建虚拟环境的手把手教程相关环境配置看此篇文章&#xff0c;本专栏深度学习相关的版本和配置&…

HTML基础和常用标签

“合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。” 文章目录 前言文章有误敬请斧正 不胜感恩&#xff01;1. HTML的基本结构解释&#xff1a; 2. 常见标签的介绍2.1 标题和文本2.2 链接和图片2.3 列表2.4 …

云安全 | AWS S3存储桶安全设计缺陷分析

什么是AWS S3&#xff1f; 默认情况下&#xff0c;Amazon S3 是安全的。创建后&#xff0c;只有资源所有者才能访问他们创建的 Amazon S3 资源。 Amazon S3 支持用户身份验证来控制对数据的访问。您可以使用存储桶策略和访问控制列表 (ACL)等访问控制机制来有选择地向用户和用…

solidwork直线画圆弧的操作

效果如下&#xff1a; 踩过好多坑了。 首先选择直线 先点一下这个点拉出来再回到这个点&#xff08;这个过程点一次就可以了&#xff09;&#xff0c;注意注意一定要这么做&#xff01;否则没有圆弧

prime1靶机渗透 (信息收集 内核提权)

靶机信息 vulnhub靶机 prime1 主机发现 -sn 是scan and no port hack 只用于主机发现 ┌──(kali㉿kali)-[~] └─$ sudo nmap -sn 192.168.50.0/24 Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-09-09 02:25 EDT Nmap scan report for 192.168.50.1 Host is up …

web学习——day1

1.web标准 2.html和css 此时&#xff0c;学完这一部分&#xff0c;你web的具体的结构已经有了 但是呢&#xff0c;这还是太单调了&#xff0c;我们应该加点儿样式&#xff0c;这就该用到CSS了 CSS引入方式 样式1&#xff1a;颜色 样式2&#xff1a;哪怕对于同一类事物&#xf…

消息队列-Kafka(概念篇)

1 为什么需要消息队列&#xff1f; 消息队列是一种基于消息的异步通信机制&#xff0c;用于在分布式系统中不同组件或服务之间传递数据和通知。实际上可以将消息队列看作为存放消息的容器&#xff0c;参与消息传递的分别称为生产者&#xff08;发送消息&#xff09;和消费者&am…

【macOS】【zsh报错】zsh: command not found: python

【macOS】【zsh Error】zsh: command not found: python 本地已经安装了Python&#xff0c;且能在Pycharm中编译Python程序并运行。 但是&#xff0c;在macOS终端&#xff0c;运行Python&#xff0c;报错。 首先要确认你在macOS系统下&#xff0c;是否安装了Python。 如果安…

打不开Qtcreator(This application fail to start...........)

目录 今天突然打不开Qtcreator,报错如下 解决方案 1.检查环境变量配置(我就是通过这个解决好的) 2.如果也弹出跟我一样的AMD窗口,可以更新AMD驱动试试 3.重装qtcreator 4.检查 qtcreator下的bin\plugins\platforms是否缺少提示的相关.dll文件 总结 今天突然打不开Qtcreat…

马来西亚交通标志检测系统源码分享

马来西亚交通标志检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Comp…

VSCode语法提示的配置

ctrlshiftP打开Command Palette,运行C/Cpp: Edit configurations...生成c_cpp_properties.json c_cpp_properties.json是什么&#xff1f; 这个文件主要是用于VSCode语法提示的配置&#xff0c;例如&#xff1a;指定 include 路径&#xff0c;问题匹配类型等。CtrlShiftP打开C…

nvm list available报错Could not retrieve https://nodejs.org/dist/index.json.

今天在查看nvm list available时出现如下错误&#xff1a; 首先找到nvm所在文件夹settings.txt 打开此文件后&#xff0c;加入两段代码&#xff0c;如果有就替换掉 node_mirror: https://npmmirror.com/mirrors/node/ npm_mirror: https://npmmirror.com/mirrors/npm/ 再次运行…

Android轻量级RTSP服务使用场景分析和设计探讨

技术背景 好多开发者&#xff0c;对我们Android平台轻量级RTSP服务模块有些陌生&#xff0c;不知道这个模块具体适用于怎样的场景&#xff0c;有什么优缺点&#xff0c;实际上&#xff0c;我们的Android平台轻量级RTSP服务模块更适用于内网环境下、对并发要求不高的场景&#…

golang操作mysql利器-gorm

1、傻瓜示例 GORM通过将数据库表中的数据映射到面向对象的模型中&#xff0c;简化了数据库操作&#xff0c;使得开发者可以很方便的使用代码来操作数据库&#xff0c;而无需编写SQL语句。 目前有个mysql表&#xff1a;miniprogram_orders&#xff0c;其存储了所有用户对应的订…

PyCharm和VS Code 安装通义灵码,可本地安装包安装,解决插件安装不上问题

PyCharm和VS Code 安装通义灵码&#xff0c;可本地安装包安装&#xff0c;解决插件安装不上问题 PyCharm、VS Code 安装通义灵码介绍主要应用场景支持编程语言安装指南JetBrains IDEs 中安装指南步骤 1&#xff1a;准备工作步骤 2&#xff1a;在 JetBrains IDEs 中安装通义灵码…

实验3 Hadoop集群运行环境搭建和使用

实验3 Hadoop集群运行环境搭建和使用 一、实验介绍 本节实验旨在引导学生通过实际操作搭建一个基本的Hadoop集群,并进行基本的使用验证。实验包括在集群节点上添加域名映射以实现节点间的相互识别,配置免密SSH登录以便无密码访问各节点,安装和配置JDK以满足Hadoop的运行需求…

Flink1.18.1 Standalone模式集群搭建

Flink1.18.1 Standalone模式集群搭建 Flink1.18.1 Standalone模式集群搭建1. 环境准备1.1 Flink下载地址1.2 集群角色分配 2. Flink 集群安装步骤2.1 下载并解压 Flink2.2 解压安装包2.3 配置环境变量2.4 配置 SSH 免密登录 3. 配置 Flink 集群3.1 修改 flink-conf.yaml 配置文…