redis 消息队列方案

news2024/10/5 17:51:02

redis 消息队列方案

观察角度:消息有序,重复消息处理,消息可靠性保证

pub/sub 发布订阅机制

list集合

消息有序:lpush和rpop可以保证消息顺序的被消费

重复消息处理:list没有为消息提供唯一标识,需要生产者提供唯一标识,消费程序自行判断消息是否重复

消息可靠性保证:讲的是消息因为网络,停电,宕机等突发因素丢失,list没有为消息保存副本,这样消费者如果正在消费时有突发因素,那么就会发生消息丢失。

所以list做消息队列时,如果是对消息丢失不是很敏感的场景可以使用。

stream

优点:支持持久化,支持消费组形式的消息读取

Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取

Stream基本结构

Redis Stream像是一个仅追加内容的消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和内容,它还从 Kafka 借鉴了另一种概念:消费者组(Consumer Group),这让Redis Stream变得更加复杂。

Redis Stream的结构如下:

在这里插入图片描述
在这里插入图片描述

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 XADD 指令追加消息时自动创建。

  1. Consumer Group消费者组,消费者组记录了Starem的状态**,使用 XGROUP CREATE 命令手动创建,在同一个Stream内消费者组名称唯一。一个消费组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享Stream内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费Stream中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。
  2. last_delivered_id游标,用来记录某个消费者组在Stream上的消费位置信息**,每个消费组会有个游标,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。创建消费者组时需要指定从Stream的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化 last_delivered_id 这个变量。这个last_delivered_id一般来说就是最新消费的消息ID。
  3. pending_ids消费者内部的状态变量,作用是维护消费者的未确认的消息ID。pending_ids记录了当前已经被客户端读取,但是还没有 ack (Acknowledge character:确认字符)的消息。 目的是为了保证客户端至少消费了消息一次,而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息ID 就会越来越多,一旦某个消息被ack,它就会对应开始减少。 这个变量也被 Redis 官方称为 PEL (Pending Entries List)。

插入消息

往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。*号代表自动创建全局唯一的id,也可以手动生成,但必须是增长且唯一的。

XADD mqstream * repo 5
"1599203861727-0"

读取消息

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 ID

创建消费者组 (XGROUP CREATE)

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

消费组命令

消费者组消费消息(XREADGROUP GROUP)

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
alt

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用matlab求解非线性目标函数

文章目录函数介绍设置函数参数步骤结果分析函数介绍 使用fmincon函数来进行求解,格式为 [x,y] fmincon(f,x0,A,b,Aeq,beq,lb,ub,nonlcon,options)其中,f表示所求的目标函数,x0表示初始值,A,b,Aeq,beq表示线性的约束&#xff0c…

SpringBoot06:整合JDBC、Druid、MyBatis

整合JDBC 1、创建一个新的工程,勾选JDBC API和MySQL Driver 2、导入web启动器 3、编写yaml配置文件,连接数据库 spring:datasource:username: rootpassword: 123456url: jdbc:mysql://localhost:3306/mybatis?useUnicodetrue&characterEncodingut…

力扣 2299. 强密码检验器 II

题目 如果一个密码满足以下所有条件,我们称它是一个 强 密码: 它有至少 8 个字符。 至少包含 一个小写英文 字母。 至少包含 一个大写英文 字母。 至少包含 一个数字 。 至少包含 一个特殊字符 。特殊字符为:“!#$%^&*()-” 中的一个。…

面向对象编程范式

目录 1.概述 1.1.面向对象编程的核心诉求 1.2.面向对象的世界观 2.类和对象 3.如何寻找类和对象 3.1.概述 3.2.示例 4.如何表示类 4.1.什么是UML 4.1.关系 4.2.权限 4.3.依赖 4.4.泛化(继承) 4.5.实现 4.6.关联 4.7.聚合 4.8.组合 1.概…

自定义类型之枚举和联合

该文章将详细介绍除结构体外的另外两种自定义类型--------枚举类型与联合类型。1.枚举1.1枚举类型的定义1.2枚举的优点1.3枚举的使用2.联合(共用体)2.1联合类型的定义2.2联合的特点2.3联合大小的计算1.枚举 枚举顾名思义就是------一一列举。 把所有可能…

ADAS HiL系统测试方案

1、什么是ADAS ADAS(Advanced Driving Assistance System)也就是高级驾驶辅助系统,是无人驾驶的过渡。 ADAS利用安装在车上的各式各样传感器(毫米波雷达、激光雷达、单\双目摄像头以及卫星导航),在汽车行驶…

[前端笔记——CSS] 12.处理不同方向文本

[前端笔记——CSS] 12.盒模型背景与边框1.书写模式2.书写模式、块级布局和内敛布局3.逻辑属性和逻辑值1.书写模式 CSS 中的书写模式是指文本的排列方向是横向还是纵向的。writing-mode 属性使我们从一种模式切换到另一种模式。例如,我们使用writing-mode: vertical…

stack、queue、priority_queue

容器适配器 适配器是一种设计模式(设计模式是一套被反复使用的、多数人知晓的、经过分类编目的、代码设计经验的总结),该种模式是将一个类的接口转换成客户希望的另外一个接口。 其中stack和queue都是容器适配器,其中stack可以封装vector、list以及我们…

ffmpeg无损裁剪、合并视频

我用的版本是 ffmpeg version git-2020-06-23-ce297b4 官方文档 https://ffmpeg.org/ffmpeg-utils.html#time-duration-syntax 时间格式 [-][HH:]MM:SS[.m...] 或 [-]S[.m...][s|ms|us]裁剪视频 假设需要裁剪视频aaa.mp4,第5秒到第15秒 ffmpeg -ss 5 -to 15 -i…

使用gazebo对scara机械臂进行仿真

本文主要介绍如何仿真一个scara机械臂,以及在网上看了一些项目以后,有了一些感想,不想看的可以直接跳到机械臂部分。 目录感想(自己的理解,不一定对。)Scara机械臂的开发运动学计算如何使用机械臂工作图一个例子: 在start_pose抓起…

【Hadoop】MapReduce分布式计算实践(统计文本单词数量)

文章目录1. 前言2. Mapper代码3. Reducer代码4. Main代码5. 项目打包6. Hadoop运行7. 运行结果查看7.1 输出文件查看7.2 日志查看1. 前言 在博客【Hadoop】MapReduce原理剖析(Map,Shuffle,Reduce三阶段)中已经分析了MapReduce的运…

ASP.NET Core+Element+SQL Server开发校园图书管理系统(二)

随着技术的进步,跨平台开发已经成为了标配,在此大背景下,ASP.NET Core也应运而生。本文主要基于ASP.NET CoreElementSql Server开发一个校园图书管理系统为例,简述基于MVC三层架构开发的常见知识点,前一篇文章&#xf…

Linux C编程一站式学习笔记6

Linux C编程一站式学习笔记 chap6 循环结构 文章目录Linux C编程一站式学习笔记 chap6 循环结构一.while语句递归 VS 循环函数式编程(Functional Programming) & 命令式编程(Imperative Programming)无限递归 & 无限循环习…

光流估计(二) FlowNet 系列文章解读

在上篇文章中,我们学习并解了光流(Optical Flow)的一些基本概念和基本操作,但是传统的光流估计方法计算比较复杂、成本较高。近些年来随着CNN卷积神经网络的不断发展和成熟,其在各种计算机视觉任务中取得了巨大成功&am…

docker-基础实战第六课镜像挂载

镜像挂载: docker run --namemynginx -d --restartalways -p 8088:80 -v /usr/local/docker/data/html:/usr/share/nginx/html:ro nginx访问403 原因: /usr/local/docker/data/html 没有创建index.html 需要创建目录并且创建index.html docker命令补充: 如果有一…

向QAbstractItemView子类如:QTreeView、QTableView等子项单元格插入窗体小部件的功能实现(第1种方法)

1.前言 工作中经常会遇到这样的需求:向QAbstractItemView子类如QTreeView、QTableView单元格插入窗体小部件,如:进度条、按钮、单行编辑框等。下面链接的系列博文就是讲解如何实现该功能的。《向QAbstractItemView子类如:QTreeView、QTableVi…

Android音频播放有杂音?原来是这个JAVA API接口惹的祸

最近在调试一个基于十年前Android版本的多媒体应用软件时,遇到了音频播放的问题,这里记录问题的发现、分析和处理过程。 有人可能会好奇,十年前的Android版本是什么版本?大家可以去Google网站上查查,就是目前Android网…

Android深入系统完全讲解(40)

调试 C 代码 15.1 改成 C 写法 这个没啥必要,但是对 C 比 C 情谊深的我,把它修改了。下面是修改的一部分代码, 把 C 的写法,改成 C 的,同时修改引入头文件。 jstring Java_hellojni_codegg_com_hellojni_MainActivity_…

Java基础41 面向对象(高级)

面向对象(高级)一、类变量和类方法1.1、static (类变量)1.1.1 关于static的存放位置1.1.2 类变量使用细节及注意事项1.2、类方法1.2.1、类方法使用细节及注意事项二、main方法2.1、深入理解main方法三、代码块3.1、代码块的基本介…

19.6、Javaweb_案例旅游路线收藏功能

旅游线路收藏功能 分析 判断当前登录用户是否收藏过该线路 当页面加载完成后,发送ajax请求,获取用户是否收藏的标记 根据标记,展示不同的按钮样式 编写代码 后台代码 RouteServlet: package cn.itcast.travel.web.servlet;…