基于Redis实现特殊的消息队列

news2025/1/24 17:31:22

特殊场景的消息队列

消息队列使用比较多的产品kafka,在各个领域都发挥了很大的作用,但是在以下的几种场景是无法满足需求。

场景

  1. 消息重复概率比较高时,需要对重复消息进行合并处理避免浪费有限的资源,减少延迟
  2. 需要根据业务自定义的优先级进行消息处理,高优先级的消息比低优先级的消息先处理
  3. 消息需要定时消费场景,消息只有在设定的消费时间到了之后立马被消费

RMQ(Redis message queue,RMQ)

功能:

RMQ设计为一个第三方库,可以帮助用户基于Redis快速实现消息队列的功能,RMQ消息队列具有消息合并、区分优先级、支持定时消息等特性。RMQ消息队列可以用于异步解耦、削峰填谷,支持千万级别的数据堆积。

RMQ消息队列目前支持三种类型的消息,分别是:RangeMergeMessage(区间重复合并消息)、PriorityMessage(优先级消息)、FixedTimeMessage(任意定时消息)

区间重复合并消息

RangeMergeMessage支持区间重复消息合并,发送消息时需要设置时间区间,消息延迟该时间长度后被消费,在该时间区间内如果发送重复的消息,重复消息将会被合并。如果消息在Redis服务端发生堆积,重复到来的消息依然会被合并处理。该类型消息适用于消息重复率较高并且希望消息合并并处理的场景,对重复消息进行合并可以减少下游消费系统的压力,减少不必要的资源消耗,将有限的资源最大化的利用,提升消费效率。

优先级消息

PriorityMessage支持给消息设置任意等级的优先级,优先级高的消息会被优先消费,相同的优先级的消息被随机消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后的消息优先级等于最后存储的消息的优先级。该类型消息适用于希望重复消息合并处理并且需要设置优先级的场景,下游消费者资源有限时候,合并重复消息并且优先级高的消息将可以合理利用有限的资源。

任意定时消息

FixedTimeMessage支持给消息设置任意消费时间,只有消费时间到了之后消息才被消费,消费时间可以精确到秒。消息到期后没有及时被消费时,消费者将按照时间由远到近进行消费。如果消息在Redis服务端发送堆积,重复的消息将被合并处理,合并后消息的消费时间等于最后存储的消息的消费时间,该类型消息适用于希望重复消息合并处理且需要定时消费的场景,定时消息应用场景非常丰富,比如定时打标去标、活动结束后清理动作、定时超时关闭等。

并发消息控制

使用传统消息中间件进行集群消费的时候,为了避免并发处理同一元数据导致不一致的问题,通常需要对元数据加分布式锁,频繁的锁冲突会导致消费效率低下,加分布式锁的最终目的其实就是保障属于同一元数据的消息被串行消费。加分布式锁并不是最好的方案,最好的方案是从根本上解决并发问题,让属于同一元数据的消息串行消费。RMQ 消息队列具有并发消费控制能力,属于同一元数据的消息只会被分配给全局唯 一一个线程进行消费,因此属于同一元数据的消息将被串行消费。使用方如果需要 该能力,除了需要提供 Redis,还需要提供 ZooKeeper。

重试次数控制

RMQ 消息队列支持失败重试消费 16 次,业务返回消费失败后,消息会被回滚并等待重试消费,重试 16 次后消息进入死信队列,消息不再被消费,除非人工干预。

RMQ实现的原理

RMQ消息队列由三部分组成,分别是ZooKeeper、RMQ二方库、Redis

Zookeeper

Zookeeper负责维护集群worker的信息,将topic的所有slot分配给全局的woker;

Redis

Redis负责存储消息,采集Sorted Set结构存储,Store Queue是消息队列,Prepare Queue是采用二阶段消费的方式正在消费消息队列中的信息,Dead Queue是死的队列信息

RMQ

  1. RmqClient负责RMQ的启动工作,包括上报TopicDef、Worker给Zookeeper,分配给Slot的Worker扫描业务定义的MessageListerBean
  2. Producer负责根据不用消息类型将消息按照指定的方式存储到Redis中
  3. Consumer负责根据不用消息类型按照指定方式从Redis弹出的消息并调用业务的MessageListener。

消息的存储

Topic的设计原理

Topic的定义有三个部分组成,topic表示主题名称、slotAmount表示消息存储划分的槽的数量,topicType表示消息的类型

主题名称时一个Topic的唯一标示,相同主题名称的Topic的SlotAmount和topicType一定是一样的消息存储采用Redis的Sorted Set结构表示,为了支持大量的消息堆积的情况,需要把消息分散存储到很多个槽中,SlotAmount表示该Topic消息存储共同使用的槽数量,槽数量一定需要是2的n次方的幂。在消息存储的时候,采用制定数据或者消息体的哈希求余数得到槽的位置

SoreQueue的设计原理

上图中的topic划分了8个槽为,编号是0--7,如果发送指定了消息的slotBasis,则计算slotBasis的CRC32的值,CRC32值对槽数量进行取模得到对应的槽序号,SlotKey设计为#{topic}_#{index}也就是Redis的键,其中#{}表示占位符的处理。

发送方需要保证相同内容的消息的SlotBasis相同,如果没有制定SlotBasis则采用内容计算SlotKey,这样内容相同的消息就会落在同一个Sorted Set里面,所以内容相同的消息会进行合并的处理。

Redis的Sorted Set中的数据按照分数排序,实现不同类型的消息的关键就在于如何利用分数,如何增加消息到Sorted Set、如何从Sorted Set中获取数据消息。

  1. 优先级消息将优先级作为分数,消费时每次弹出分数最大的消息;
  2. 任意定时消息将时间戳作为分数,消费时每次弹出分数大于当前时间戳的一个 消息;
  1. 区间重复合并消息将时间戳作为分数,添加消息时将(当前时间戳+时间区间)作为分数,消费时每次弹出分数大于当前时间戳的一个消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/46560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

正大国际期货:投资外盘期货如何用布林线判断走势?

外盘期货的投资者越来越多,每个投资者判断大盘走势的方式也不尽相同,那么今天正大IxxxuanI就来简单的和大家说说外盘期货投资如何用布林线判断走势吧。 1、当布林线多条轨道向上或是向下运行时,这能非常有效的说明走势强劲程度。投资者应牢牢…

接触非线性分析不收敛? 写给ABAQUS初学者的N个经验

接触,在仿真分析中,绝对是个看似青铜实则王者级别的难题。一些通用的解决办法,在帮助文件的Interaction → Contact Difficulties and Diagnostics中找到,例如初始接触状况、穿透、突然分离造成的局部不稳定等等。 但是确实没有一…

slam定位学习笔记(七)-g2o学习

主要学习的是这篇文章,但大佬并没有在文章里面仔细的讲g2o,所以我在网上找了这几篇介绍g2o的文章,讲的十分详细,对入门十分友好:文章一、文章二、文章三,这三篇都是一个作者写的,主要是针对编程…

第五届“强网”拟态防御国际精英挑战赛——特邀战队篇

第五届“强网”拟态防御国际精英挑战赛即将在南京隆重开赛!本届大赛面向全球顶尖CTF战队,在创新应用场景与技术的基础上,拓展升级赛道,全面覆盖典型网络设备。大赛汇集国内外60支精英战队,参赛阵容、数量再创新高。 本…

35岁了,月薪还不足2W,辞职又怕找不到工作,该何去何从?

今天看到网上有人在吐槽:“马上就35岁了,月薪还不到2W,公司发展缓慢,想离职又怕找不到工作,不知道怎么办?” 单看月薪两万,好像也不少,不过收入跟行业和地域也有很大关系。薪资&…

JWT详解

1、什么是token,解决了什么问题? token 就是常说的 “令牌”,本质上是全局唯一的字符串,用来唯一识别一个客户端,解决了session依赖单个web服务器的问题。单体应用时,用户的会话信息保存在session中&#…

如何在视频中加水印?分享这些实用的加水印方法给你

视频要怎么添加水印呢?在我们的日常生活中,短视频已经离不开我们的视野了,我们经常通过短视频来放松、查找资料或者是丰富知识。同样的,我们也可以通过自己的剪辑并发布一些视频到各个平台上获取流量。那么在这个过程中&#xff0…

web前端-javascript-基本数据类型和引用数据类型(对象和基本数据类型保存到栈内存,对象保存在堆内存,比较两个基本数据类型或引用数据类型)

基本数据类型和引用数据类型 var a 123; var b a; a;/* console.log("a "a); console.log("b "b); */var obj new Object(); obj.new "孙悟空";var obj2 obj;//修改obj的name属性 obj.name "猪八戒";/* console.log(obj.name…

京东低代码平台:浅谈水滴拖拽画布的设计与实现

水滴低代码平台简介 京东水滴平台面向企业内部后台管理系统场景,提供可视化搭建等低代码配置、构建及部署能力。 水滴画布作为水滴低代码的核心能力之一,具备灵活、易用的特点,用户可以通过简单拖拉拽的方式,在不需要具备前端知…

【应用回归分析】CH4 假设检验与预测1——一般线性假设

目录 前言 引例 1.【例1】 2.【例2】 一、假设检验的基本思想 二、定理【4.1.1】 1.定理内容 2.定理证明 前言 在上一章,我们讨论了回归参数的几种估计方法,依据这些方法得到回归系数的估计,就可以建立经验回归方程。但是,…

python+django汽车租赁系统pycharm项目

目录 1 绪论 1 1.1课题背景 1 1.2课题研究现状 1 1.3初步设计方法与实施方案 2 1.4本文研究内容 2 4 2.3 B/S结构简介 4 2.4MySQL数据库 5 3 系统分析 6 3.1系统可行性分析 6 3.1.1经济可行性 6 3.1.2技术可行性 6 3.1.3运行可行性 6 3.2系统现状分析 6 3.3功能需求分析 7 …

Transformer for CV

文章目录Transformer 的基础结构NLP StructureVITSWINDERTTransformer 常用terms分块的batch-size自动计算Batch normLayer normMultihead Self AttentionGELU/ELU/RELUTransformer Vs CNN每个模型的详细笔记Vit图片分割自己的思考计算过程Segmenter运行 TrainTrain 里的结构DE…

命名空间提示“http://schemas.microsoft.com/xaml/behaviors”不存在Interation的解决办法

以下面的部分wpf程序为例&#xff1a; <Button Grid.Column"3" Margin"5" Content"<" FontSize"18" Background"Transparent" Foreground"LightGray"><b:Interaction.Triggers><b:EventTrigge…

static应用知识:单例设计模式

1、什么是设计模式&#xff08;Design pattern&#xff09; 开发中经常遇到一些问题&#xff0c;一个问题通常有n种解法的&#xff0c;但其中肯定有一种解法是最优的&#xff0c;这个最优的解法被人总结出来了&#xff0c;称之为设计模式。 设计模式有20多种&#xff0c;对应2…

轻松上手 | 使用国内资源安装 K3s 全攻略

作者&#xff1a; 王海龙&#xff0c;SUSE Rancher 中国社区技术经理&#xff0c;Linux Foundation APAC Evangelist&#xff0c;负责 Rancher 中国技术社区的维护和运营。拥有 8 年的云计算领域经验&#xff0c;经历了 OpenStack 到 Kubernetes 的技术变革&#xff0c;无论底层…

3D帧间匹配-----剔除动态障碍物

0. 简介 作为SLAMer在建图时最怕的就是大量的动态障碍物存在&#xff0c;这会导致建图的不精确&#xff0c;而本文主要围绕着如何剔除动态障碍物开始讲起&#xff0c;并提供一种快速的过滤障碍物的方法。 1. 主要方法 在调研的过程中主要存在有两种方法&#xff0c;第一种如…

安全标准汇总

文章目录资源导航法律法规0x01常见标准代号0x02 2022年新发布0x03 按体系分类一般性法律规定规范和惩罚信息网络犯罪的法律直接针对信息安全的特别规定具体规范信息安全技术、信息安全管理0x04 安全等级保护0x05 数据安全声明资源导航 国家标准全文公开系统&#xff1a;国家标…

代码随想录训练营第35天|LeetCode 860.柠檬水找零、406.根据身高重建队列、452. 用最少数量的箭引爆气球

参考 代码随想录 题目一&#xff1a;LeetCode 860.柠檬水找零 这个题在做的时候有误解&#xff0c;第一不能对数组bills排序&#xff0c;只能按照给定的顺序处理&#xff1b;第二&#xff0c;只能从头开始处理&#xff0c;不能中间的某个点开始。 其实这个题很简单&#xff…

synchronized锁升级过程

【一些面试真题】&#xff1a; 阿里P9——0x80的执行过程。 【 重温CAS过程 】&#xff1a; 【硬件】&#xff1a; Lock指令在执行后面指令的时候锁定一个北桥信号&#xff08;不采用锁总线的方式&#xff09;。 【用户态 与 内核态】&#xff1a; 作为操作系统来说&#x…

WebRTC学习笔记二 基础概念

一、WebRTC与架构 简单来说&#xff0c;WebRTC 是一个可以在 Web 应用程序中实现音频&#xff0c;视频和数据的实时通信的开源项目。在实时通信中&#xff0c;音视频的采集和处理是一个很复杂的过程。比如音视频流的编解码、降噪和回声消除等&#xff0c;但是在 WebRTC 中&…