redis第九章-Redis队列Stream、Redis6多线程详解

news2025/1/24 22:30:14

Redis队列Stream

前置说明:Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者声明 Redis Stream 地借鉴了 Kafka 的设计。

在这里插入图片描述

  • Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。
  • 每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID开始消费,这个ID 用来初始化last_delivered_id变量。
  • 每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。
  • 同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往 前移动。每个消费者有一个组内唯一名称。
  • 消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
  • 消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容就是键值对,形如 hash 结构的键值对

常用操作命令
生产端
xadd 追加消息
xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。
xrange 获取消息列表,会自动过滤已经删除的消息
xlen 消息长度
del 删除 Stream
例如:
在这里插入图片描述

  1. streamtest 表示这个队列的名字
  2. *表示服务器自动生成id,一般建议自动生成
  3. 后面是我们对消息存入的键值对
  4. 返回值是生成的消息 ID,它由两部分组成:时间戳-序号。时间戳时毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型。序
    号是在这个毫秒时间点内的消息序号。(因为redis一条命令是纳秒级别的,所以需要时间戳加号码确定唯一 一条消息)

为了保证消息是有序的,因此 Redis 生成的 ID 是单调递增有序的。由于 ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。

在插入两条数据,此刻这队列一共三条数据
在这里插入图片描述

#其中-表示最小值 , + 表示最大值
xrange streamtest - +

在这里插入图片描述
或者我们可以指定消息 ID 的列表:

#复制编号,从第二条查询
xrange streamtest 1686400124425-0 + 

在这里插入图片描述

#查看消息队列长度
xlen streamtest

在这里插入图片描述

#删除消息
xdel streamtest 1686400124425-0

在这里插入图片描述
消费端

  1. 单消费者
    虽然 Stream 中有消费者组的概念,但是可以在不定义消费组的情况下进行Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。
xread count 1 streams streamtest 0-0

在这里插入图片描述

在这里插入图片描述
所以最好以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来

xread block 0 count 1 streams streamtest $

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以看到阻塞解除了,返回了新的消息内容,而且还显示了一个等待时间,这里我们等待了 22s
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

消费组
创建消费组
Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。

#“streamtest”指明了要读取的队列名称,“cg1”表示消费组的名称,
#“0-0”表示从头开始消费
xgroup create streamtest cg1 0-0
#  $表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
xgroup create streamtest cg2 $

在这里插入图片描述

# 1 2可以看到消费队列长度,9 10可以看到最后生成消息id,7 8可以看到有两个消费组
xinfo stream streamtest

在这里插入图片描述
消息消费

有了消费组,自然还需要消费者,Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。
它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

#“GROUP”属于关键字,“cg1”是消费组名称,“c1”是消费者名称,“count 
#1”指明了消费数量,> 号表示从当前消费组的 last_delivered_id 后面开始
#读,每当消费者读取一条消息,last_delivered_id 变量就会前进
xreadgroup group cg1 c1 count 1 streams streamtest >

在这里插入图片描述
在一直读的时候很快就把插入的三条信息读完了

然后设置阻塞等待

xreadgroup GROUP cg1 c1 block 0 count 1 streams streamtest >

在这里插入图片描述
在这里插入图片描述

回到原来的客户端,发现阻塞解除,收到新消息在这里插入图片描述
我们来观察一下观察消费组状态
在这里插入图片描述
如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers 指令观察每个消费者的状态

xinfo consumers streamtest cg1

如果我们确认一条消息,发现就还有三条没有ack确认

xack streamtest cg1 1686403439863-0

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Redis 中的线程和 IO 模型

什么是 Reactor 模式 ?
“反应”器名字中”反应“的由来:
“反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应;这种控制逆转又称为“好莱坞法则”(不要调用我,让我来调用你)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Redis 中的线程和 IO 概述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
I/O 多路复用程序在这里插入图片描述
文件事件分派器

  • 文件事件分派器接收 I/O 多路复用程序传来的 socket, 并根据 socket 产生的事件类型, 调用相应的事件处理器。

文件事件处理器

  • 服务器会为执行不同任务的套接字关联不同的事件处理器, 这些处理器是一个个函数, 它们定义了某个事件发生时, 服务器应该执行的动作。
  • Redis 为各种文件事件需求编写了多个处理器,若客户端连接 Redis,对连接服务器的各个客户端进行应答,就需要将 socket映射到连接应答处理器写数据到 Redis,接收客户端传来的命令请求,就需要映射到命令请求处理器从 Redis读数据,向客户端返回命令的执行结果,就需要映射到命令回复处理器当主服务器和从服务器进行复制操作时, 主从服务器都需要映射到特别为复制功能编写的复制处理器。

文件事件的类型
I/O 多路复用程序可以监听多个 socket 的 ae.h/AE_READABLE 事件和
ae.h/AE_WRITABLE 事件, 这两类事件和套接字操作之间的对应关系如下:
当 socket 可读(比如客户端对 Redis 执行 write/close 操作),或有新的可应答的 socket 出现时(即客户端对 Redis 执行 connect 操作),socket 就会产生一个 AE_READABLE 事件。
当 socket 可写时(比如客户端对 Redis 执行 read 操作),socket 会产生一个AE_WRITABLE 事件。
I/O 多路复用程序可以同时监听 AE_REABLE 和 AE_WRITABLE 两种事件,要是一个 socket 同时产生这两种事件,那么文件事件分派器优先处理 AE_REABLE 事件。即一个 socket 又可读又可写时, Redis 服务器先读后写 socket。

在这里插入图片描述

Redis6 中的多线程

1. Redis6.0 之前的版本真的是单线程吗?
Redis 在处理客户端的请求时,包括获取 (socket 读)、解析、执行、内容返回 (socket 写)等都由一个顺序串行的主线程处理,这就是所谓的“单线程”。
但如果严格来讲从 Redis4.0 之后并不是单线程,除了主线程外,它也有后台线程在处理一些较为缓慢的操作,例如清理脏数据、无用连接的释放、大 key 的删除等等。

2. Redis6.0 之前为什么一直不使用多线程?
官方曾做过类似问题的回复:使用 Redis 时,几乎不存在 CPU 成为瓶颈的情况, Redis 主要受限于内存和网络。例如在一个普通的 Linux 系统上,Redis 通过使用 pipelining 每秒可以处理 100 万个请求,所以如果应用程序主要使用 O(N)或O(log(N))的命令,它几乎不会占用太多 CPU。
使用了单线程后,可维护性高。多线程模型虽然在某些方面表现优异,但是它却引入了程序执行顺序的不确定性,带来了并发读写的一系列问题,增加了系统复杂度、同时可能存在线程切换、甚至加锁解锁、死锁造成的性能损耗。Redis通过 AE 事件模型以及 IO 多路复用等技术,处理性能非常高,因此没有必要使用多线程。单线程机制使得 Redis 内部实现的复杂度大大降低,Hash 的惰性Rehash、Lpush 等等 “线程不安全” 的命令都可以无锁进行。

3. Redis6.0 为什么要引入多线程呢?
Redis 将所有数据放在内存中,内存的响应时长大约为 100 纳秒,对于小数据包,Redis 服务器可以处理 80,000 到 100,000 QPS,这也是 Redis 处理的极限了,对于 80%的公司来说,单线程的 Redis 已经足够使用了
但随着越来越复杂的业务场景,有些公司动不动就上亿的交易量,因此需要更大的 QPS。
常见的解决方案是在分布式架构中对数据进行分区并采用多个服务器,但该方案有非常大的缺点,例如要管理的 Redis 服务器太多,维护代价大;某些适用于单个 Redis 服务器的命令不适用于数据分区;数据分区无法解决热点读/写问题;数据偏斜,重新分配和放大/缩小变得更加复杂等等。
从 Redis 自身角度来说,因为读写网络的 read/write 系统调用占用了 Redis执行期间大部分 CPU 时间,瓶颈主要在于网络的 IO 消耗, 优化主要有两个方向:
• 提高网络 IO 性能,典型的实现比如使用 DPDK 来替代内核网络栈的方式
• 使用多线程充分利用多核,典型的实现比如 Memcached。协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以总结起来,redis 支持多线程主要就是两个原因:
• 可以充分利用服务器 CPU 资源,目前主线程只能利用一个核
• 多线程任务可以分摊 Redis 同步 IO 读写负荷

4. Redis6.0 默认是否开启了多线程?
Redis6.0 的多线程默认是禁用的,只使用主线程。如需开启需要修改
redis.conf 配置文件:io-threads-do-reads yes

在这里插入图片描述
开启多线程后,还需要设置线程数,否则是不生效的。
关于线程数的设置,官方有一个建议:4 核的机器建议设置为 2 或 3 个线程,8 核的建议设置为 6 个线程,线程数一定要小于机器核数。还需要注意的是,线程数并不是越大越好,官方认为超过了 8 个基本就没什么意义了。

5.Redis6.0 采用多线程后,性能的提升效果如何?
Redis 作者 antirez 在 RedisConf 2019 分享时曾提到:Redis 6 引入的多线程IO 特性对性能提升至少是一倍以上。国内也有大牛曾使用 unstable 版本在阿里云 esc 进行过测试,GET/SET 命令在 4 线程 IO 时性能相比单线程是几乎是翻倍了。如果开启多线程,至少要 4 核的机器,且 Redis 实例已经占用相当大的 CPU耗时的时候才建议采用,否则使用多线程没有意义。

6.Redis6.0 多线程的实现机制?
在这里插入图片描述

7.开启多线程后,是否会存在线程并发安全问题?
从上面的实现机制可以看出,Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程顺序执行。所以我们不需要去考虑控制key、lua、事务,LPUSH/LPOP 等等的并发及线程安全问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/632107.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【浅谈DBA 最重要的素质---读书笔记】

👈【上一篇】 💖The Begin💖点点关注,收藏不迷路💖 【下一篇】👉 🔻【来自DBA大佬的见解1】 对于一个准备进入 DBA 领域的人,我希望他勤奋、严谨、具有钻研精神及独立思考能力。…

Android应用程序进程的启动过程

Android应用程序进程的启动过程 导语 到这篇文章为止,我们已经简要地了解过了Android系统的启动流程了,其中比较重要的内容有Zygote进程的启动和SystemService以及Launcher的启动,接下来我们将要学习的是Android应用程序的启动过程&#xff…

Python爱好者的自我修养(1):简单输入与输出

Python简单输入与输出 1.输出1.1 简单输出1.2 转义字符1.2.1 定义1.2.2 常见的转义字符用法 2.输入3.温馨提示 终于…… 终于…… 我开始玩Python了 (不是C不学了哈,C还是照更~) 今天先来简单讲下输入和输出 1.输出 1.1 简单输出 输出的函…

【一篇让你学会】Web接口测试工具--Jmeter

关于Jmeter性能测试工具不再过多介绍。如果你要学习软件性能测试,那么多少应该会对它有所耳闻。 强烈建议阅读官方文档学习:http://jmeter.apache.org/index.html 还有比这个更权威更全面的介绍Jmeter工具使用的么? 不过,此处要介…

Win7批量执行Python文件

问题背景 平时都是用Pycharm跑代码,但是每次都需要在Configuration里修改Parameters,跑完一个才能重新修改跑下一个,很不方便,于是决定借助.bat文件实现批量执行。 困难一 电脑存在cmd闪退问题,之前一直逃避懒得解决…

ExtractOfficeContent: 提取Office文件中文本、表格和图像

引言 最近有空写了一下这个库,用来提取Office文件中的文本和图像内容,用作后续整理训练语料使用。最新更新请移步:Github Extract Office Content Use Installextract_office_content$ pip install extract_office_contentRun by CLI. Ext…

STM32——04-初识STM32单片机

什么是单片机? 单片机( Single-Chip Microcomputer )是一种集成电路芯片,把具有数据处理能力的中央处 理器 CPU 、随机存储器 RAM 、只读存储器 ROM 、多种 I/O 口和中断系统、定时器 / 计数器等功 能(可能还包括显示驱…

day49_mybatis

今日内容 1 引言 2 MyBatis介绍 3 环境搭建 4 入门演示(MyBatis开发步骤) 5 CRUD 6 配置细节 一、引言 1.1 复习 第一阶段(JavaSE) java基本语法流程控制面向对象常用类集合异常IO多线程JDK新特性 第二阶段(JavaWeb) 前端 htmlcssjs --> JQueryboostraplayui 页面元素内置模…

攻防世界—file_include

打开之后发现是一段php代码 可以看出这是段代码有文件包含漏洞 下面是学习部分。。。着急看题解继续往下滑。。。谢谢。。。 文件包含漏洞(File Inclusion Vulnerability)是一种Web应用程序常见的安全漏洞,也是攻击者常用的攻击手段之一。这种…

一文讲完Java常用设计模式(全23种)

文章目录 介绍设计模式的六大原则一、创建型模式1、单例模式(Singleton Pattern)1)饿汉式2)懒汉式,双检锁3)静态内部类4)枚举 2、原型模式(Prototype Pattern)3、工厂模式…

基于Java+SpringBoot+Vue前后端分离教学资源共享平台系统

博主介绍:✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…

10:00面试,10:08就出来了 ,问的实在是太...

从外包出来,没想到算法死在另一家厂子 自从加入这家公司,每天都在加班,钱倒是给的不少,所以也就忍了。没想到8月一纸通知,所有人不许加班,薪资直降30%,顿时有吃不起饭的赶脚。 好在有个兄弟内…

如何利用google的protobuf设计、实现自己的RPC框架

一、前言 这篇文章我们就来聊一聊 RPC 的相关内容,来看一下如何利用 Google 的开源序列化工具 protobuf,来实现一个我们自己的 RPC 框架,内容有点长,请耐心看完。 序列化[1]:将结构数据或对象转换成能够被存储和传输&…

有趣的机器人工具坐标系

当机器人旋转轴上的夹爪中心不在旋转轴中心时,如何让旋转轴围绕夹爪中心旋转,这就是工具坐标系,怎末实现呢?机器人都是建立工具坐标系实现,这是什么原理?我们来探索一下: 世界上的有些特例让人…

【AIOT】手势捕捉调研

title: Data Glove Record date: 2020-06-06 20:40:13 author: liudongdong1 img: https://gitee.com/github-25970295/blogImage/raw/master/img/gloves-1268930__340.webp reprintPolicy: cc_by cover: false categories: AIOT tags: Sense 动作捕捉(Motion capture)&#x…

程序员凡尔赛,工作三年晒出5月工资条,直言加班太累了

最近有工作3年的程序员晒出自己9月份的工资条,并直言加班太累了。 从工资条上可以看到,这个收入确实不算低,才3年时间,月工资就已经到了二万五了,这个工资已经可以击败绝大多数行业了。 不过二万五只是税前工资&…

【Python opencv 】零基础也能轻松掌握的学习路线与参考资料

Python opencv 是一种强大的计算机视觉库,它为计算机视觉和图像处理任务提供了必要的工具和技术实现。接下来,将介绍Python opencv的学习路线、重点和优秀实践。 一、学习路线 Python基础 在学习Python opencv之前,需要掌握Python的基础知…

STM32F105RBT6 使用定时器TIM3输出PWM波

1. TIM3的GPIO口,查阅STM32F105RBT6 数据手册,TIM3的4通道用的是PB1 2. 初始化GPIO口和定时器TIM3 2.1 相关函数 RCC_APB1PeriphClockCmd、GPIO_Init、TIM_TimeBaseInit、TIM_OC4Init、TIM_OC4PreloadConfig、NVIC_Init、TIM_ITConfig、TIM_Cmd、 voi…

中国范围逐月夜间灯光数据(2012-2021年)

人类以其文明和智慧创造了城市,城市最美的一面就是夜晚的灯光。对于经济学家来说,城市灯光的背后隐藏着巨大的社会经济秘密。一个城市夜晚的灯光不仅可以反映这个城市人口的数量、预测房价的趋势还能反映这个城市的经济发达程度。 根据灯光的亮度我们还可…

(七)CSharp-CSharp图解教程版-事件

一、发布者和订阅者 发布者/订阅者模式(publish/subscriber pattern): 很多程序都有一个共同的需求,即当一个特定的程序事件发生时,程序的其他部分可以得到该事件已经发生的通知。 发布者: 发布者类定义…