编码技巧——Redis Pipeline

news2025/1/11 13:03:28

本文介绍Redis pipeline相关的知识点及代码示例,包括Redis客户端-服务端的一次完整的网络请求、pipeline与client执行多命令的区别、pipeline与Redis"事务"、pipeline的使用代码示例;

pipeline与client执行多命令的区别

Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务;Redis客户端-服务端的一次完整的网络请求来回如下图;

简化一下,一次Redis请求和响应,会经历如下的步骤

  • 客户端发起一个(查询/插入)请求,并监听socket返回,通常情况都是阻塞模式等待Redis服务器的响应;
  • 服务端处理命令,并且返回处理结果给客户端;
  • 客户端接收到服务的返回结果,程序从阻塞代码处返回;

Redis客户端和服务端之间通过网络连接进行数据传输,这个连接可以很快(loopback接口)或很慢(建立了一个多次跳转的网络连接);但无论网络延如何延时,数据包总是能从客户端到达服务器,并从服务器返回数据回复客户端,这个时间被称之为RTT(Round Trip Time - 往返时间)

我们可以很容易就意识到,Redis在连续请求服务端时,即使Redis每秒能处理100k请求,但也会因为网络传输花费大量时间,导致整体性能的下降

因此如果遇到大量的批处理,我们可以考虑使用Redis的pipeline(管道);

对于pipeline技术而言,对于N个命令,就相当于将N个上图中的步骤,合并成1个,其他多余的时间开销仅作用于命令的执行,这样服务请求响应的总体时间将会大大的减少

关于pipeline与client单命令的压测结果可参考Redis精通系列——Pipeline(管道);

值得注意的是,管道技术并不是Redis特有的技术,管道技术往往需要客户端-服务器的共同配合,大部分工作任务其实是在客户端完成;Redis在较早的版本就已经支持管道技术;

如下图,多个连续的incr指令,使用pipeline(管道)后,多个连续的incr指令只会花费一次网络来回开销;这个开销会随着N数值的增大,大幅减少网络IO开销,从而提升整体服务的性能;

Redis pipeline的使用注意事项

1. pipeline一次执行的命令不宜过多

结合上面redis命令完整执行流程图,有个值得注意的点——可能出现我们经常说到的IO阻塞

  • 当write操作发生,并且发送缓冲区(send buffer)满时,就会导致write操作阻塞;
  • 当read操作发生,并且接收缓冲区(recv buffer)满时,就会导致read操作阻塞;

上述的这两个阻塞如果出现,将会导致整个请求时间变长;

因此我们操作大批量指令的时候,比如10k个指令,我们可以合理的对指令分多次批量发送,这样可以减少出现阻塞的情况,也可以避免服务器响应一个过大的response包,导致客户端内存负载过重;

即使不发生IO阻塞,pipeline每批打包的命令不能过多还有一个原因:因为 pipeline 方式打包命令再发送,那么 redis server 必须在处理完所有命令前,先缓存起所有命令的处理结果,这样就有一个缓存结果的内存的消耗

2. pipeline不保证命令执行的原子性

官方文档的一句话——

Redis::PIPELINE block is simply transmitted faster to the server, but without any guarantee of atomicity.

其实Redis的高性能设计本就不支持包含多命令的严格事务,哪怕是multi/exec操作还是Lua脚本;Redis只是提供了一些命令来一定程度实现"事务";

multi/exec操作针对命令语法错误和执行时参数错误,处理是不一样的,详情见我之前的文章《Redis——“事务“/Lua脚本》;

Lua脚本也只能一定程度保证逻辑处理和Redis命令打包的原子性,例如库存扣减;

pipeline的使用代码示例

代码示例的背景是:根据appId批量查询本地缓存的App信息,未命中本地缓存的,需要从redis中拿这多个key的value,然后刷入本地缓存;在"从redis中拿这多个key的value",在key数量较多时,做个优化,使用Redis的pipeline;

    private List<AppSimpleInfoDTO> querySimpleAppWithCache(List<String> payAppIds) {
        if (CollectionUtils.isEmpty(payAppIds)) {
            return Lists.newArrayList();
        }
        List<AppSimpleInfoDTO> appList = Lists.newArrayList();
        try {
            // 从本地缓存读取APP
            final List<String> tobeQry = Lists.newArrayList();
            for (String payAppId : payAppIds) {
                AppSimpleInfoDTO appSimpleInfoDTO = null;
                appSimpleInfoDTO = appSimpleInfoCache.getIfPresent(payAppId);
                if (appSimpleInfoDTO != null) {
                    log.info("get_appSimpleInfoDTO_fr_appSimpleInfoCache_suc. [appSimpleInfoDTO={}]", JSON.toJSONString(appSimpleInfoDTO));
                    appList.add(appSimpleInfoDTO);
                } else {
                    tobeQry.add(payAppId);
                }
            }
			// 未命中本地缓存则去redis查询
            if (CollectionUtils.isNotEmpty(tobeQry)) {
                final List<AppSimpleInfoDTO> cacheAppSimpleInfoDTOs = getAndCacheAppSimpleInfoDTOs(tobeQry);
                if (CollectionUtils.isNotEmpty(cacheAppSimpleInfoDTOs)) {
                    appList.addAll(cacheAppSimpleInfoDTOs);
                }
            }

            return appList;
        } catch (Exception e) {
            log.error("querySimpleAppWithCache error.", e);
            // 异常时刷全量缓存
            return getAndCacheAppSimpleInfoDTOs(payAppIds);
        }
    }

使用pipeline做多个key的get命令:

    private List<AppSimpleInfoDTO> getAndCacheAppSimpleInfoDTOs(List<String> payAppIds) {
        final Set<String> payAppIds2Qry = new HashSet<>(payAppIds);
        final List<AppSimpleInfoDTO> result = Lists.newArrayList();
        // 先尝试从redis获取 pipeline模式
        JedisClusterPipeLine pipeline = null;
        try {
            pipeline = jedisCluster.pipelined();
            for (String payAppId : payAppIds) {
                final String appSimpleInfoKey = CacheKeyUtils.getAppSimpleInfoKey(payAppId);
				// pipeline添加get命令
                pipeline.get(appSimpleInfoKey);
            }
			// pipeline执行并获取结果
            final List<Object> allVal = pipeline.syncAndReturnAll();
            if (CollectionUtils.isNotEmpty(allVal)) {
                allVal.forEach(val -> {
                    if (val != null) {
                        final String jsonStr = String.valueOf(val);
                        if (StringUtils.isNotBlank(jsonStr)) {
                            Optional.ofNullable(JSON.parseObject(jsonStr, AppSimpleInfoDTO.class)).ifPresent(appSimpleInfoDTO -> {
                                result.add(appSimpleInfoDTO);
                                appSimpleInfoCache.put(appSimpleInfoDTO.getPayAppId(), appSimpleInfoDTO);
                                log.info("localCache_AppSimpleInfoDTO_fr_redis_suc. [simpleApp={}]", JSON.toJSONString(appSimpleInfoDTO));
                                payAppIds2Qry.remove(appSimpleInfoDTO.getPayAppId());
                            });
                        }
                    }
                });
            }
        } catch (Exception e) {
            log.error("localCache_AppSimpleInfoDTOs_fr_redis_error. [payAppIds={}]", JSON.toJSONString(payAppIds), e);
        } finally {
            if (pipeline != null) {
                pipeline.close();
            }
        }

		// redis未查到的数据走RPC查询 并异步加载到redis和localCache
        if (CollectionUtils.isNotEmpty(payAppIds2Qry)) {
            final List<String> payAppIds2QryList = new ArrayList<>(payAppIds2Qry);
            List<App> apps = Lists.newArrayList();
            int index = 0;
            while (index < payAppIds2QryList.size()) {
                int toIndex = Math.min(index + max_batch, payAppIds2QryList.size());
                List<String> payAppIds2QryTemp = payAppIds2QryList.subList(index, toIndex);
				// 实时查APP信息接口
                List<App> appsTemp = queryAppByPayAppIds(new ArrayList<>(payAppIds2QryTemp));
                apps.addAll(appsTemp);
                index += max_batch;
            }
			// 异步加载到redis和localCache
            if (CollectionUtils.isNotEmpty(apps)) {
                for (App app : apps) {
                    AppSimpleInfoDTO simpleApp = new AppSimpleInfoDTO(app.getName(), app.getPackname(), app.getCode(), app.getBigType());
                    CompletableFuture.runAsync(() -> {
                        // redis缓存1小时
                        jedisClusterTemplate.setex(CacheKeyUtils.getAppSimpleInfoKey(simpleApp.getPayAppId()), VivoConfigManager.getInteger(JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL, JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL_DEFAULT), JSON.toJSONString(simpleApp));
                        log.info("redisCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));
                        // local cache 缓存5分钟
                        appSimpleInfoCache.put(simpleApp.getPayAppId(), simpleApp);
                        log.info("localCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));
                    });
                    result.add(simpleApp);
                }
            }
        }

        // 重新排序
        final Map<String, AppSimpleInfoDTO> payAppIdMap = result.stream().collect(Collectors.toMap(AppSimpleInfoDTO::getPayAppId, Function.identity(), (old, newly) -> newly));
        result.clear();
        payAppIds.forEach(payAppId -> Optional.ofNullable(payAppIdMap.get(payAppId)).ifPresent(result::add));

        return result;
    }

本文参考:

Redis精通系列——Pipeline(管道)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/399637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何挖掘专利创新点?

“无意中发现了一个巨牛的人工智能教程&#xff0c;忍不住分享一下给大家。教程不仅是零基础&#xff0c;通俗易懂&#xff0c;而且非常风趣幽默&#xff0c;像看小说一样&#xff01;觉得太牛了&#xff0c;所以分享给大家。点这里可以跳转到教程。” 对于广大的软件工程师来说…

W806|CKLINK LITE|ICE调试|HardPoint|elf模板|CSDK|Debug|学习(4):CKLINK调试W806

目录 一、硬件连接 接线方式 错误提示 二、调试前准备 正常识别状态 wm_tool.exe缺失错误​ 三、flash配置 增加W806模板 compiler选项卡 Debug选项卡 ICE设置 正常连接信息 四、调试工程 添加硬断点 断点配置 仿真调试 下载固件 参考&#xff1a; 《手把手教…

《MySQL系列-InnoDB引擎28》表-约束详细介绍

约束 1 数据完整性 关系型数据库系统和文件系统的一个不同点是&#xff0c;关系数据库本身能保证存储数据的完整性&#xff0c;不需要应用程序的控制&#xff0c;而文件系统一般需要在程序端进行控制。当前几乎所有的关系型数据库都提供约束(constraint)机制&#xff0c;该机制…

群智能优化计算中的混沌映射

经实验证明&#xff0c;采用混沌映射产生随机数的适应度函数值有明显提高&#xff0c;用混沌映射取代常规的均匀分布的随机数发生器可以得到更好的结果&#xff0c;特别是搜索空间中有许多局部解时&#xff0c;更容易搜索到全局最优解&#xff0c;利用混沌序列进行种群初始化、…

基于Qt WebEngine 的Web仪器面板GUI程控技术

随着IIoT的发展&#xff0c;很多工业仪器也具备了远程管理的GUI。与早期使用串口进行命令交互不同&#xff0c;这些GUI可以直接在远程呈现数据。 作为希望对仪器、软件进行二次开发的小公司来说&#xff0c;会遇到GUI人工操作转自动化的需求。在无法通过串口等传统接口进行自动…

nextjs开发 + vercel 部署 ssr ssg

前言 最近想实践下ssr 就打算用nextjs 做一个人博客 &#xff0c; vercel 部署 提供免费域名&#xff0c;来学习实践下ssr ssg nextjs 一个轻量级的react服务端渲染框架 vercel 由 Next.js 的创建者制作 支持nextjs 部署 免费静态网站托管 初始化项目 npx create-next-app p…

【Linux】目录结构

Linux世界里&#xff0c;一切皆文件。 /bin&#xff1a;是Binary的缩写&#xff0c;这个目录存放着最经常使用的命令。&#xff08;常用&#xff09; /sbin&#xff1a;s就是Super User的意思&#xff0c;这里存放的是系统管理员使用的系统管理程序。 /home&#xff1a;存放普…

关于Pytorch中的张量学习

关于Pytorch中的张量学习 张量的概念和创建 张量的概念 Tensor是pytorch中非常重要且常见的数据结构&#xff0c;相较于numpy数组&#xff0c;Tensor能加载到GPU中&#xff0c;从而有效地利用GPU进行加速计算。但是普通的Tensor对于构建神经网络还远远不够&#xff0c;我们需…

实力加持!RestCloud完成多方国产化适配,携手共建信创生态

近年来&#xff0c;随着数字化建设进入深水区&#xff0c;企事业单位对信息安全重视程度与日俱增&#xff0c;核心技术自主可控已成为时代呼唤&#xff0c;国产化浪潮日益汹涌澎湃。近日&#xff0c;RestCloud在国产化方面取得新进展&#xff0c;完成了全部产品线信创环境的多方…

系统重装漏洞

zzcms系统重装漏洞 一、配置zzcms环境 1. 使用小皮搭建zzcms框架 2. 安装zzcms 按照下面的操作进行,傻瓜式操作即可 3. 打开网站 二、漏洞利用 在访问install目录的默认文件后,会出现zzcms安装向导 http://www.zzcms.com/install/index.php 但是会显示 “安装向导…

MQTT协议-发布消息(客户端向服务器发送)

MQTT协议-发布消息&#xff08;客户端向服务器发送&#xff09; 发布消息报文组成&#xff1a;https://blog.csdn.net/weixin_46251230/article/details/129414158 在分析完服务器下发到客户端的报文后&#xff0c;就可以参考JSON格式的有效载荷&#xff0c;将温湿度的值改为…

Linux发行版的backport

遇到一个问题,简要记录如下: base on ubuntu18.06 4.15内核,这版内核不支持一款intel的集成网卡,追踪内核代码的提交历史才发现,这款网卡是从linux-4.20才开始支持的,系统自带的这个Kernel版本不支持。 如果不允许升级内核,面对这种问题,社区的做法是把新内核的特性cher…

顺序表【数据结构】

文章目录:star2:1. 顺序表概念:star2:2. 框架3. 基本功能3.1 头文件:star:3.2 初始化:star:3.3 扩容:star:3.4 打印:star:3.5 尾插:star:3.6 头插:star:3.7 尾删:star:3.8 头删:star:3.9 指定插入:star:3.10 指定删除:star:3.11 查找:star2:3.12 注意事项4. 顺序表的缺点&#…

云原生安全2.X 进化论系列|云原生安全2.X未来展望(4)

随着云计算技术的蓬勃发展&#xff0c;传统上云实践中的应用升级缓慢、架构臃肿、无法快速迭代等“痛点”日益明显。能够有效解决这些“痛点”的云原生技术正蓬勃发展&#xff0c;成为赋能业务创新的重要推动力&#xff0c;并已经应用到企业核心业务。然而&#xff0c;云原生技…

Git学习笔记(六)-标签管理

发布一个版本时&#xff0c;我们通常先在版本库中打一个标签&#xff08;tag&#xff09;&#xff0c;这样&#xff0c;就唯一确定了打标签时刻的版本。将来无论什么时候&#xff0c;取某个标签的版本&#xff0c;就是把那个打标签的时刻的历史版本取出来。所以&#xff0c;标签…

销售使用CRM系统集成Excel的五个技巧

销售过程中有很多情况会降低团队的效率。通过正确的实施CRM客户管理系统&#xff0c;可以帮助您的企业自动执行手动任务、减少错误并专注于完成交易。这里有5个技巧&#xff0c;可以帮助您的销售人员通过CRM集成Excel为销售流程赋能并提高他们的整体效率。 技巧1&#xff1a;将…

Python每日一练(20230309)

目录 1. 删除有序数组中的重复项 ★ 2. 二叉树的最小深度 ★★ 3. 只出现一次的数字 II ★★ &#x1f31f; 每日一练刷题专栏 C/C 每日一练 ​专栏 Python 每日一练 专栏 1. 删除有序数组中的重复项 给你一个有序数组 nums &#xff0c;请你原地删除重复出现的元素…

Xuetr杀毒工具使用实验(28)

实验目的 &#xff08;1&#xff09;学习Xuetr的基本功能&#xff1b; &#xff08;2&#xff09;掌握Xuetr的基本使用方法。预备知识 windows操作系统的基本知识如&#xff1a;进程、网络、服务和文件等的了解。 XueTr是近年推出的一款广受好评的ARK工具。ARK工具全称为Anti R…

Ubuntu20.04中Docker安装与配置

一、安装 1、卸载可能存在的旧版本 sudo apt-get remove docker docker-engine docker-ce docker.io2、更新apt包索引 sudo apt-get update显示“正在读取软件包列表… 完成” 3、安装以下包以使apt可以通过HTTPS使用存储库(repository) sudo apt-get install -y apt-tran…

java多线程(二三)并发编程:Callable、Future和FutureTask

一、Callable 与 Runnable 先说一下java.lang.Runnable吧&#xff0c;它是一个接口&#xff0c;在它里面只声明了一个run()方法&#xff1a; public interface Runnable {public abstract void run(); }由于run()方法返回值为void类型&#xff0c;所以在执行完任务之后无法返…