JedisCluster 通过 Pipeline 实现两套数据轮换更新

news2024/11/16 9:38:55

其他系列文章导航

Java基础合集
数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀 

 3.4 更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

    @Override
    public R<String> updateCampToJedis() {
        R<String> r = new R<>();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        String currentMonth = dateFormat.format(new Date());
        //1. 从数据库里查数据
        List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
        if (UserWideInfoList.size() == 0) {
            r.setCode(R.ERROR_CODE);
            r.setMsg("没有数据");
            return r;
        }
        //2. 更新当前前缀
        updateCurrentPrefixIndex();
        r.setCode(R.SUCCESS_CODE);
        //3. 往redis集群存入数据
        insertToJedis(ZhmsUserWideInfoList);
        return r;
    }

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
        SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
    </select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下: 

    private static final String PREFIX_A = "A";
    private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的 currentPrefixIndex 。

代码如下: 

    @GetMapping("/init")
    public String init() {
        return jedisCluster.set("currentPrefixIndex", "0");
    }

3.3 获取当日前缀 

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下: 

     //获取当日前缀
    private String getKeyPrefix() {
        int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
        if (currentPrefixIndex % 2 == 0) {
            return PREFIX_A;
        } else {
            return PREFIX_B;
        }
    }

 3.4 更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

代码如下: 

    // 重新设置currentPrefixIndex
    private void updateCurrentPrefixIndex() {
        String currentValue = jedisCluster.get("currentPrefixIndex");
        int newValue = Integer.parseInt(currentValue) + 1;
        jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
    }

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

代码如下:

    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
        String keyPrefix = getKeyPrefix();
        List<String> keys = new ArrayList<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (JedisPool node : clusterNodes.values()) {
            try (Jedis jedis = node.getResource()) {
                Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                keys.addAll(nodeKeys);
            }
        }
        Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
        //先删旧的
        for (JedisPool jedisPool : delKey.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                List<String> keysList = delKey.get(jedisPool);
                for (String key : keysList) {
                    pipelined.del(key);
                }
                pipelined.sync();
            }
        }
        List<String> keyList =new ArrayList<>();
        HashMap<String, String> map = new HashMap<>();
        //填充keyList和value
        for (UserWideInfo UserWideInfo : UserWideInfoList) {
            String key = keyPrefix + "_" + UserWideInfo.getBillNo();
            keyList.add(key);
            //构建value
            ...
            ...
            map.put(key, value);
        }
        Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
        for (JedisPool jedisPool : result.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                // 获取当前JedisPool对应的键列表
                List<String> keysList = result.get(jedisPool); 
                // 将命令添加到Pipeline中
                for (String key : keysList) {
                    String value = map.get(key);
                    pipelined.set(key, value);
                }
                // 执行Pipeline中的所有命令
                pipelined.sync();
            }
        }
    }

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

代码如下:

@Slf4j
public class JedisPipelineUtil {

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
        Map<String, List<String>> hostPhoneMap = new HashMap<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
        JedisPool jedisPool = next.getValue();
        Jedis jedis = jedisPool.getResource();
        Map<Integer, String> slots = discoverClusterSlots(jedis);
        for (String s : list) {
            String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
            if (hostPhoneMap.containsKey(hostAndPort)) {
                hostPhoneMap.get(hostAndPort).add(s);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(s);
                hostPhoneMap.put(hostAndPort, newList);
            }
        }
        jedis.close();
        return hostPhoneMap;
    }

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
        Map<JedisPool, List<String>> map = new HashMap<>();
        Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
        Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
            map.put(jedisPool, next.getValue());
        }
        return map;

    }

    private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
        Map<Integer, String> slotsMap = new HashMap<>();
        List<Object> slots = jedis.clusterSlots();
        Iterator var3 = slots.iterator();
        while (var3.hasNext()) {
            Object slotInfoObj = var3.next();
            List<Object> slotInfo = (List) slotInfoObj;
            if (slotInfo.size() > 2) {
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                List<Object> hostInfos = (List) slotInfo.get(2);
                if (!hostInfos.isEmpty()) {
                    String targetNode = generateHostAndPort(hostInfos);
                    Iterator<Integer> var4 = slotNums.iterator();
                    while (var4.hasNext()) {
                        Integer slot = var4.next();
                        slotsMap.put(slot, targetNode);
                    }
                }
            }
        }
        return slotsMap;
    }

    private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
        List<Integer> slotNums = new ArrayList<>();

        for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
            slotNums.add(slot);
        }

        return slotNums;
    }

    private static String generateHostAndPort(List<Object> hostInfos) {
        String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
        int port = ((Long) hostInfos.get(1)).intValue();
        return host + ":" + port;
    }
}

使用 assignKey 方法就可以分配管道。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

金智融门户(统一身份认证)同步数据至钉钉通讯录

前言:因全面使用金智融门户和数据资产平台,二十几个信息系统已实现统一身份认证和数据同步,目前单位使用的钉钉尚未同步组织机构和用户信息,职工入职、离职、调岗时都需要手工在钉钉后台操作,一是操作繁琐,二是钉钉通讯录更新不及时或经常遗漏,带来管理问题。通过金智融…

【网络安全】网络防护之旅 - 非对称密钥体制的解密挑战

&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《网络安全之道 | 数字征程》⏰墨香寄清辞&#xff1a;千里传信如电光&#xff0c;密码奥妙似仙方。 挑战黑暗剑拔弩张&#xff0c;网络战场誓守长。 目录 &#x1f608;1. 初识网络安…

Python将已标注的两张图片进行上下拼接并修改、合并其对应的Labelme标注文件(v2.0)

Python将已标注的两张图片进行上下拼接并修改、合并其对应的Labelme标注文件&#xff08;v2.0&#xff09; 前言前提条件相关介绍实验环境上下拼接图片并修改、合并其对应的Labelme标注文件代码实现输出结果 前言 此版代码&#xff0c;相较于Python将已标注的两张图片进行上下拼…

Centos开机进入grub命令行模式进入不了操作系统

环境&#xff1a;没有linux命令&#xff0c;没有initrd命令&#xff0c;没有init6命令 由于删除了/boot/efi/EFI/centos/grub.cfg &#xff0c;重启服务器后&#xff0c;无法进入原来正常的系统&#xff0c;进入了grub命令行界面 备注&#xff1a;对于centos7/8/openEuler: 如果…

VR云游打造沉浸式文旅新体验,延伸智慧文旅新业态

从“跃然纸上”到“映入眼帘”&#xff0c;随着国家数字化战略的深入实施&#xff0c;文旅产业的数字化转型正在不断加快&#xff0c;“沉浸式”逐渐成为了文旅消费新热点。VR技术与文旅产业相融合&#xff0c;新产品、新模式、新业态不断涌现&#xff0c;文旅资源逐渐“活”起…

巴贝拉葡萄酒是单一品种还是混合品种制成的?

大多数巴贝拉葡萄酒都是由单一的巴贝拉葡萄品种制成的&#xff0c;许多意大利葡萄酒商开始尝试在巴贝拉葡萄酒中加入其它葡萄品种&#xff0c;其中两个最受欢迎的意大利品种是皮埃蒙特的巴贝拉德阿尔巴和达斯蒂。和朋友在一家意大利餐厅吃饭&#xff0c;被酒单吓到了&#xff1…

nodejs+vue+微信小程序+python+PHP全国天气可视化分析系统-计算机毕业设计推荐

3.2.1前台用户功能 前台用户可分为未注册用户需求和以注册用户需求。 未注册用户的功能如下&#xff1a; 注册账号&#xff1a;用户填写个人信息&#xff0c;并验证手机号码。 浏览天气资讯&#xff1a;用户可以浏览天气资讯信息详情。 已注册用户的功能如下&#xff1a; 登录&…

例如,用一个DatabaseRow类型表示一个数据库行(容器),用泛型Column<T>作为它的键

以下是一个简单的示例&#xff0c;演示如何使用泛型的Column<T>作为DatabaseRow的键&#xff0c;表示一个数据库行&#xff08;容器&#xff09;&#xff1a; // 列定义 class Column<T> {private String columnName;private T value;public Column(String column…

ARM KEIL 安装

根据设备类型安装开发工具及环境 Arm,Cortex ----> MDK-Arm 8051 ----> C51 80251 ----> C251 C166,XC166,XC2000 MCU设备 ----> C155 填写信息提交后下载 点击MDK539.EXE下载 : MDK539.EXE 双击MDK539安装 点击Next 默认安装路径,点击Ne…

[Knowledge Distillation]论文分析:Distilling the Knowledge in a Neural Network

文章目录 一、完整代码二、论文解读2.1 介绍2.2 Distillation2.3 结果 三、整体总结 论文&#xff1a;Distilling the Knowledge in a Neural Network 作者&#xff1a;Geoffrey Hinton, Oriol Vinyals, Jeff Dean 时间&#xff1a;2015 一、完整代码 这里我们使用python代码进…

CLIP 对比学习 源码理解快速学习

最快的学习方法&#xff0c;理清思路&#xff0c;找视频讲解&#xff0c;看源码逻辑&#xff1a; CLIP 源码讲解 唐宇 输入&#xff1a; 图像-文本成对配对的数据 训练模型的过程&#xff08;自己理解&#xff09;&#xff1a; 怎么做的&#xff1f;&#xff1a;利用数据内部…

使用Python实现对word的批量操作

Python在平时写写小工具真是方便快捷&#xff0c;Pyhon大法好。以下所有代码都是找了好多网上的大佬分享的代码按照自己的需求改的。 调用的库为Python-docx、win32com、PyPDF2、xlwings&#xff08;操作excel&#xff09;。 因为公司的任务要对上千个word文件进行批量操作&a…

node.js学习(简单聊天室)

在掘金查看该文章 1. TCP服务搭建 1.1 socket 先来粗略了解下socket 套接字&#xff08;socket&#xff09;是一个抽象层&#xff0c;应用程序可以通过它发送或接收数据&#xff0c;可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中&am…

uniGUI学习之Cookie

UniApplication.Cookies.SetCookie( const ACookieName: string, const AValue: string, AExpires: TDateTime 0, ASecure: Boolean False, AHTTPOnly: Boolean False, const APath: string / )

Module ‘app‘: platform ‘android-33‘ not found.

目录 一、报错信息 二、解决方法 一、报错信息 Module app: platform android-33 not found. 检查你的应用程序的build.gradle文件中的targetSdkVersion和compileSdkVersion是否正确设置为已安装的Android SDK版本。 确保你的Android Studio已正确安装并配置了所需的Android …

Axure动态面板的使用以及示例分享

目录 一. 什么是动态面板 二. 动态面板教程——以轮播图为例 2.1 创建动态面板 2.2 动态面板自适应大小 2.3 重复状态&#xff0c;将图片导入 2.4 添加交互事件——图片切换 2.5 效果展示 三. 多方式登录示例展示 四. 后台主界面左侧菜单栏示例展示 一. 什么是动态面板…

如何用Python实现量化交易?

Python是一种广泛使用的编程语言&#xff0c;它的语法简洁易学&#xff0c;而且有着丰富的库和工具&#xff0c;使得Python成为一种非常适合初学者和开发人员使用的语言。 Python量化是指使用Python编程语言进行量化投资研究和分析的过程。量化投资是一种基于数据和统计模型的…

scipy.signal.hilbert和scipy.fftpack.hilbert的区别

提示&#xff1a;分析scipy.signal.hilbert和scipy.fftpack.hilbert在应用的区别 一、代码 import matplotlib import matplotlib.pyplot as plt import numpy as np from pyhht import EMD from scipy.signal import hilbert import tftb.processing from scipy import signa…

DevOps云原生创建devops流水线(微服务项目上传git,打包镜像,部署k8s)

开发和运维人员的解决方案 一、中间件的部署&#xff08;Sentinel/MongoDB/MySQL&#xff09; 二、创建DevOps工程 邀请成员 三、创建流水线 四、编辑流水线 ①、拉取代码&#xff08;若失败&#xff0c;则将制定容器改为maven&#xff09; 若失败&#xff0c;则将命令改…

在 Spring Boot 中发送邮件简单实现

Spring Boot 对于发送邮件这种常用功能也提供了开箱即用的 Starter&#xff1a;spring-boot-starter-mail。 通过这个 starter&#xff0c;只需要简单的几行配置就可以在 Spring Boot 中实现邮件发送&#xff0c;可用于发送验证码、账户激活等等业务场景。 本文将通过实际的案…