关于lettuce的一次pipeline反向优化

news2024/9/29 5:34:37

起因是后台job对一批数据做大量的redis读写操作,为了提高job的执行速度,直接使用pipeline对一些不能批量读写的命令进行管道优化

简单介绍什么是lettuce

Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis

官方是这样介绍的:Lettuce 是一个可扩展的线程安全 Redis 客户端,提供同步、 异步和反应式API。如果多个线程避免阻塞和事务性操作(例如BLPOP和 MULTI/ ) ,则它们可以共享一个连接EXEC。优秀的netty NIO框架可以有效地管理多个连接。其中包括对高级 Redis 功能(例如 Sentinel、Cluster 和 Redis 数据模型)的支持。

为什么选择使用pipeline进行批处理优化

客户端与服务端通过网络连接,无论两者间的网络延迟是高还是低,数据包从客户端到服务端(请求),再从服务端返回客户端(响应)的过程总是会消耗一定的时间。我们将这段时间称为RTT(Round Trip Time)。假设在延迟非常高的网络条件下,RTT达到250ms,此时就算服务端拥有每秒处理100k请求的能力,(基于单一连接)整体的QPS也仅仅只有4。而如果借助管道模式,客户端则可以一次性发出大量(如1k)请求,并随后一次性接收大量服务端的响应,从而显著提高请求处理速度
在这里插入图片描述

还原现场

本次使用的zRem接口,zRem可以在一个网络请求中对一个key的多个values删除,但是如果删除的是多个key,则做不到,所以会考虑pipeline进行优化

简单的zRemove接口

public Boolean zRemove(String key, String value) {
        try {
            Long affected = stringRedisTemplate.opsForZSet().remove(key, value);
            return Objects.equals(affected, 1L);
        } catch (Exception e) {
            log.error("Redis Operation[zRemove] Error", e);
        }
        return null;
    }

使用pipeline后

public Boolean zRemoveByPipeline(String key, Collection<String> values) {
        if (CollectionUtils.isEmpty(values)) {
            return Boolean.TRUE;
        }

        try {
            List<Object> results = stringRedisTemplate.executePipelined(
                    (RedisCallback<Object>) connection -> {
                        for (String value : values) {
                            connection.zRem(key.getBytes(), value.getBytes());
                        }
                        return null;
                    });
            return Objects.equals(results.size(), values.size());
        } catch (Exception e) {
            log.error("Redis Operation[zRemoveByPipeline] Error", e);
        }
        return Boolean.FALSE;
    }

优化结果

不仅job运行时间变长,服务器cpu也有显著上升

原因排查

查看executePipelined方法

public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {

		return execute((RedisCallback<List<Object>>) connection -> {
			connection.openPipeline();
			boolean pipelinedClosed = false;
			try {
				Object result = action.doInRedis(connection);
				if (result != null) {
					throw new InvalidDataAccessApiUsageException(
							"Callback cannot return a non-null value as it gets overwritten by the pipeline");
				}
				List<Object> closePipeline = connection.closePipeline();
				pipelinedClosed = true;
				return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
			} finally {
				if (!pipelinedClosed) {
					connection.closePipeline();
				}
			}
		});
	}

其实就是使用pipeline后,方法首尾分别调用了connection.openPipeline();connection.closePipeline();

查看lettuce的openPipeline()实现:

public void openPipeline() {

		if (!isPipelined) {
			isPipelined = true;
			ppline = new ArrayList<>();
			flushState = this.pipeliningFlushPolicy.newPipeline();
			flushState.onOpen(this.getOrCreateDedicatedConnection());
		}
	}

先解释一下:

  1. isPipelined代表本次调用是否使用pipeline
  2. ppline 存贮封装好的LettuceResult
  3. flushState:pipeline的刷新策略,lettuce默认PipeliningFlushPolicy.flushEachCommand()每个命令一刷

第一点很好理解,第二点我们看一下ppline这个集合什么时候被添加元素:

void pipeline(LettuceResult result) {

		if (flushState != null) {
			flushState.onCommand(getOrCreateDedicatedConnection());
		}

		if (isQueueing()) {
			transaction(result);
		} else {
			ppline.add(result);
		}
	}

在执行reids命中的execute方法中,如果读取到isPipelined为true,则调用上面的pipeline方法,将LettuceResult的对象存入ppline集合中

这样就可以解释第三点,上面代码将LettuceResult的对象存入ppline集合中执行会执行:flushState.onCommand(getOrCreateDedicatedConnection());
然而在FlushEachCommand的刷新策略类中是这样实现该方法的: public void onCommand(StatefulConnection<?, ?> connection) {}
什么也不做,这就会很奇怪了,必须看看别的实现类怎么做,比如BufferedFlushing的实现:

private static class BufferedFlushing implements PipeliningFlushState {

		private final AtomicLong commands = new AtomicLong();

		private final int flushAfter;

		public BufferedFlushing(int flushAfter) {
			this.flushAfter = flushAfter;
		}

		@Override
		public void onOpen(StatefulConnection<?, ?> connection) {
			connection.setAutoFlushCommands(false);
		}

		@Override
		public void onCommand(StatefulConnection<?, ?> connection) {
			if (commands.incrementAndGet() % flushAfter == 0) {
				connection.flushCommands();
			}
		}

		@Override
		public void onClose(StatefulConnection<?, ?> connection) {
			connection.flushCommands();
			connection.setAutoFlushCommands(true);
		}
	}

很明了:

  1. onOpen:打开pipeline时,将自动刷新设置为false,该值默认是true,此时lettuce是每个命令都会立即发送到redis server
  2. onCommand:每flushAfter个命令后,调用flushCommands进行刷新
  3. onClose:刷新命令并设置自动刷新为true

看到这里,博主赶紧回头看FlushEachCommand的三个方法:果然都是什么都没做,因为默认就是自动刷新:每个命令都会立即发送到redis server

到了这里已经出现端倪:我们默认用的是FlushEachCommand,那么即便使用了pipeline,也没有做到像开头说的那样:将一批redis 命中只通过一次网络调用发往redis

FlushEachCommand什么时候被使用?
在把命令封装成LettuceResult的时候回调用dispatch方法,最终在DefaultEndpoint类中:write(RedisCommand<K, V, T> command)方法

if (autoFlushCommands) {

                if (isConnected()) {
                    writeToChannelAndFlush(command);
                } else {
                    writeToDisconnectedBuffer(command);
                }

            } else {
                writeToBuffer(command);
            }

当autoFlushCommands = true,writeToChannelAndFlush
否则:writeToBuffer()将命令装入内部缓冲区 private final Queue<RedisCommand<?, ?, ?>> commandBuffer;
在onCommand和onClose的时候调用flushCommands将缓冲区的命令发送至redis server

在本次案例中因为autoFlushCommands = false,所以将命令放入缓冲区

为什么cpu和时间都会在增加呢?

上面的代码中我们已经知道excute方法把redis command封装成LettuceResult放入了pipeline集合中,LettuceResult其实是把redis命令放入了内部缓冲区commandBuffer

而刷新策略的区别就是BufferedFlushing在onOpen的时候设置autoFlushCommands为false,onCommand.的时候选择是否flushCommands,onClose的时候flushCommands并且设置autoFlushCommands为true;而FlushEachCommand什么都没做

本例的优化手段只是使用了pipeline,并没有操作autoFlushCommands,所以本质上还是每个命令都会直接被发送到redis server,所以网络请求并不会减少,至于CPU占用上升,
则是因为pipeline使用了异步请求,增加了CPU的占用

附closePipeline方法核心片段:

flushState.onClose(this.getOrCreateDedicatedConnection());
		flushState = null;
		isPipelined = false;
		List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
		for (LettuceResult<?, ?> result : ppline) {
			futures.add(result.getResultHolder());
		}

		try {
			boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
					futures.toArray(new RedisFuture[futures.size()]));

先调用flushState.onClose方法,无论哪个实现类的最终目的都是将缓冲区的命令刷新到redis server,再通过 LettuceFutures.awaitAll方法获取最终结果;

怎么改进

1、聪明的你肯定已经知道了:那就使用BufferedFlushing

没问题,但是如果你了解过lettuce,应该已经知道了,lettuce是单连接的且线程安全,考虑一个问题:为了一些批处理的功能,我们设置了例如每5个命令刷新一次管道,如果此时一些C端的服务要读这些缓存,也要每5个命令才能刷新一次,虽然在高并发的情况下5个命令很快就打满了,影响微乎其微,但终究不让人放心,是否能读“写分离呢”?

再次优化,读和写的操作分开成两个服务或者读和写分开成连个连接,一个设置成每5个命令刷新一次用来写,另外一个服务或者连接保持默认用来读那就ok了

2、如果flushAfter这个参数你不好把握,想认为自己控制什么自动刷新的开闭功能,那么刷新策略就不要设置成BufferedFlushing,默认为FlushEachCommand,然后自己在各自封装好的代码中进行人为控制,比如开始写之前,关闭自动刷新,写结束,打开自动刷新;但是这样1中的问题也会存在,且打开和关闭的时机也不好把握,不如第一条稳妥;

3、官方如何推荐?
首先要明白lettuce为什么单个连接也能应对高并发的读写请求,参考
深度解析lettuce,为什么单连接也可以处理高并发redis请求

简单理解Lettuce 是一个非阻塞和异步客户端。它提供了一个同步 API,以在每个线程的基础上实现阻塞行为,以创建等待(同步)命令响应。阻塞本身不会影响其他线程。Lettuce 被设计为以pipeline方式运行。多个线程可以共享一个连接。当一个线程可以处理一个命令时,另一个线程可以发送一个新命令。第一个请求返回后,第一个线程的程序流将继续,而第二个请求由 Redis 处理并在某个时间点返回。

关于刷新:状态AutoFlushCommands是针对每个连接设置的,因此对于使用共享连接的所有线程都可见。如果您想忽略此影响,请使用专用连接。

同时还有警告:请勿在跨线程共享连接时使用,至少在没有正确同步的情况下不要使用setAutoFlushCommands(…)。根据许多问题和(无效的)错误报告,setAutoFlushCommands(…)在多线程场景中使用会导致大量复杂性开销,并且很可能会导致您这边出现问题。setAutoFlushCommands(…)只能在批量加载等场景中可靠地用于单线程连接使用。

也就是官方并不推荐自己setAutoFlushCommands,尤其是在跨线程的时候,但如果真的需要使用,还是建议分批次刷新,根据性能测试推荐配置为50-1000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2066683.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WEB渗透免杀篇-绕过

360白名单 需要足够的权限 360的扫描日志和设置白名单日志位置在&#xff1a;C:\Users[username]\AppData\Roaming\360Safe\360ScanLog 查看扫描日志内容可以查询到白名单文件 日志文件记录的是添加或移除白名单的时间、文件名、hash等信息&#xff0c;otc1为添加白名单&#…

SadTalker翻译与代码调试

文章目录 SadTalker原文翻译SadTalker&#xff1a;学习风格化音频驱动单幅图像说话人脸动画的真实 3D 运动系数Abstract1. Introduction2. Related Work3. Method3.1. Preliminary of 3D Face Model3.2. Motion Coefficients Generation through Audio3.3. 3D-aware Face Rende…

操作系统简介:设备管理

设备管理 1. 设备管理概述2. 设备管理技术通道技术DMA技术缓冲技术Spooling技术 3. 磁盘调度 设备管理是操作系统中最繁杂而且与硬件紧密相关的部分&#xff0c;不但要管理实际 I/O 操作的设备(如磁盘机、扫描仪、打印机、键盘和鼠标)&#xff0c;还要管理诸如设备控制器、DMA…

Linux信号机制探析--信号的处理

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;Linux从入门到进阶 欢迎大家点赞收藏评论&#x1f60a; 目录 &#x1f351;信号处理信号处理常见方式概览 &#x1f352;内核如何实现信号的捕捉 &#x1f34e;内核态与用户态操作系统是如何正常…

下载cmake操作步骤

cmake官网链接 cmake-3.30.2.tar.gz源代码官网下载链接

中国的人形机器人都有哪些出色的产品?

8月21日&#xff0c;2024世界机器人大会在北京亦庄正式开幕。本次大会共有169家企业集中展出了600余件机器人创新产品&#xff0c;人形机器人占比最大&#xff0c;大会还开设人形机器人专区&#xff0c;共亮相27款整机。 展会中多数人形机器人产品都偏向服务型&#xff0c;主要…

乾坤大挪移!将脚趾移到手指上,江山邦尔骨科医院成功完成一例断指再植手术

2024年6月中旬&#xff0c;家住江山贺村的何阿姨经历一次不小的意外。 那天天气晴朗&#xff0c;何阿姨准备把院子修缮修缮。操作切割工具时&#xff0c;何阿姨没有握稳&#xff0c;让工具一下子飞了出去——飞出去的瞬间&#xff0c;工具切掉了她的左手拇指&#xff0c;血流不…

网络安全大考,攻防演练驱动企业常态化安全运营升级!

当前&#xff0c;网络安全形势日益严峻&#xff0c;恶意软件、勒索软件肆虐&#xff0c;钓鱼攻击手段层出不穷&#xff0c;不断威胁企业数据安全与业务连续性。随着云计算、大数据、物联网等新兴技术的广泛应用&#xff0c;网络边界模糊化&#xff0c;攻击面急剧扩大&#xff0…

Qt (10)【Qt窗口 —— 如何在窗口中创建浮动窗口和状态栏】

阅读导航 引言一、如何在窗口中创建浮动窗口1. 浮动窗口的创建2. 设置停靠的位置 二、如何在窗口中创建状态栏1. 状态栏的创建2. 在状态栏中显示实时消息3. 在状态栏中显示永久消息4. 调整显示消息的位置&#xff0c;并加上进度条 引言 在上一篇文章中&#xff0c;我们一同探索…

数据结构(6_3_1)——图的广度优先遍历

树和图的广度优先遍历区别 树的广度优先遍历&#xff1a; 图的广度优先遍历&#xff1a; 代码&#xff1a; 注:以下代码只适合连通图 #include <stdio.h> #include <stdbool.h>#define MAX_VERTEX_NUM 100typedef struct ArcNode {int adjvex; // 该边所指向的顶…

慧灵夹爪:工业智能的创新先锋

慧灵作为一个知名老品牌&#xff0c;其机器人产品在众多场景中广为人知。随着智能化、自动化技术的不断提升&#xff0c;智能工业飞速发展&#xff0c;慧灵夹爪在其中发挥的作用也越来越多。 在工业自动化生产中&#xff0c;精准与灵活是衡量设备性能的重要标尺。慧灵夹爪以其卓…

Criteria 是干什么用的?

我 | 在这里 ⭐ 全栈开发攻城狮、全网10W粉丝、2022博客之星后端领域Top1、专家博主。 &#x1f393;擅长 指导毕设 | 论文指导 | 系统开发 | 毕业答辩 | 系统讲解等。已指导60位同学顺利毕业 ✈️个人公众号&#xff1a;热爱技术的小郑。回复 Java全套视频教程 或 前端全套视频…

简易电压表设计验证

前言 电压表是测量电压的一种仪器。由永磁体、线圈等构成。电压表是个相当大的电阻器&#xff0c;理想的认为是断路。初中阶段实验室常用的电压表量程为0~3V和0~15V。 传统的指针式电压表包括一个灵敏电流计&#xff0c;在灵敏电流计里面有一个永磁体&#xff0c;在电流计的两个…

GenAI 的产品:快速行动,但失败

2022 年秋季&#xff0c;我正在做一个很酷的项目。是的&#xff0c;你猜对了——使用公司特定的数据对预先训练的 LLM&#xff08;Bert&#xff09;进行微调。 然而&#xff0c;很快 ChatGPT 就发布了&#xff0c;并席卷了全世界。既然已经有一门非常强大的 LLM 了&#xff0c…

支持AI智能搜索的知识库管理系统有哪些?分享4个软件

引言 在数字化时代&#xff0c;知识的获取、管理和利用已成为企业竞争力的重要组成部分。随着信息量的爆炸性增长&#xff0c;如何快速、准确地从海量数据中检索出有价值的知识&#xff0c;成为企业面临的一大挑战。支持AI智能搜索的知识库管理系统能够快速准确地检索信息&…

【前端】vue监视属性和计算属性对比

首先分开讲解各个属性的作用。 1.计算属性 作用&#xff1a;用来计算出来一个值&#xff0c;这个值调用的时候不需要加括号&#xff0c;会根据依赖进行缓存&#xff0c;依赖不变&#xff0c;computed的值不会重新计算。 const vm new Vue({el:#root,data:{lastName:张,firstNa…

严重腰椎滑脱、无法走路,江山邦尔骨科医院机器人辅助手术为患者完美复位

8月8日上午&#xff0c;53岁的李清&#xff08;化名&#xff09;扶着腰、跛脚走进江山邦尔骨科医院。接诊他的&#xff0c;是江山邦尔骨科医院脊柱科的林科院长。 李清和林院长说&#xff0c;自己已有长达两年的腰痛史&#xff0c;最近还伴随右腿麻木及跛行的症状&#xff0c;严…

深度解析上海我店 三年突破一百亿销售额!

在当今数字化时代的大潮中&#xff0c;消费模式正经历着翻天覆地的变革。上海我店网络科技有限公司&#xff08;简称“我店”&#xff09;&#xff0c;凭借其创新的“互联网实体终端”融合商业模式与独特的绿色积分体系&#xff0c;在消费市场中异军突起&#xff0c;成为引领行…

ClkLog常见问题-埋点集成篇Sec. 1

本篇主要解答ClkLog使用过程中【埋点集成】阶段的常见问题。 1.【指标项数据统计】 问&#xff1a;数据概览无法看到数据。 答&#xff1a;如果数据概览所有指标项都没有数据&#xff0c;则需要先检查埋点数据是否接收成功&#xff1b;如果只是会话相关数据&#xff08;访问次数…

缺陷检测之Anomalib

缺陷检测的现状 工业缺陷数据有一个比较显著的特征&#xff1a;样本不平衡。绝大部分采集得到的工业数据都是没有缺陷的&#xff0c;这样一来&#xff0c;正样本的数据在模型训练中根本没有起到作用&#xff0c;负样本又太少&#xff0c;很难训练得到有效的模型。使用有监督学…