深入浅出消息队列----【延迟消息的实现原理】

news2025/1/8 19:19:34

深入浅出消息队列----【延迟消息的实现原理】

  • 粗说 RocketMQ 的设计
  • 细说 RocketMQ 的设计
  • 这样实现是否有什么问题?

本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】

粗说 RocketMQ 的设计

RocketMQ 约定了一些延迟时间,即生产者无法灵活的自定义延迟时间,而是固定的几个延迟时间来供生产者选择。

请添加图片描述
这样延迟消息就有了统一归类和约束,便于管理和调配。

虽说归类了延迟消息,但是同一个延迟 level 的延迟消息共用一个闹钟也是无法满足需求的。

所以变成专门雇一个“人”,每个“人”管一个 level 的延迟消息,定时查看是否有到期的消息,如果到了立马让消息给消费者消费。

至于复用 commitlog 这一套的问题,专门搞个存放延迟消息的 Topic,延迟消息先发往这个 Topic,消费者并不会订阅这个 Topic,因此此时消费者无法消费到这个消息。

等到延迟消息到达时间后,Broker 将这个延迟消息发往原 Topic,此时消费者就能从原 Topic 消费到这条消息!

也就是说 Broker 自己建立一个专门 Topic 用来存放延迟消息,此时延迟消息的存储能复用 commitlog 这一套模型,消息也会被分发到 consumerQueue。

不同的延迟 level 的消息回存放到这个 Topic 不同的队列中,也就是说这个 Topic 一个有 18 个队列对应 18 个 level。

请添加图片描述

然后会有一个定时线程去每个队列按序检查消息是否都到时间了,如果到了就发到消息原先的 Topic 中。

请添加图片描述

细说 RocketMQ 的设计

延迟消息的发送很简单,仅需设置一个 delayTimeLevel 即可:

Message message = new Message("TestTopic",("Hello scheduled message" + i).getBytes());
message.setDelayTimeLevel(3);
producer.send(message);

Broker 收到这个消息后,一看 delayTimeLevel 设置了值,那么就知道它是一个延迟消息,于是乎直接来个偷梁换柱!

把消息的原 Topic 和对应队列 ID 保存在消息扩展属性里面。

然后把这条消息的 Topic 设置成 SCHEDULE_TOPIC_XXXX,没错 Topic 的名字就是 SCHEDULE_TOPIC_XXXX哈,后面就是 XXXX

并且根据消息的 Level 选择 SCHEDULE_TOPIC_XXXX 下对应的队列。

请添加图片描述

这样一来延迟消息就存储好了。

然后 Broker 起了一个定时线程池,里面一共有 18 个核心线程,这个线程池的任务就是定时调度 SCHEDULE_TOPIC_XXXX 下的每个队列的消息,一旦有到期的消息,就分发到原 Topic 供消费者消费。

具体的做法是在初始时,每个队列都会对应被创建一个任务扔到线程池中,这些任务的内容就是根据传入的队列 ID,得到对应的 consumeQueue,当然还有对应的 offset。

请添加图片描述

Broker 会定时保存 SCHEDULE_TOPIC_XXXX 下 consumeQueue 的消费 offset。

得到 consumeQueue 和 offset,对应的就能获取延时消息,这时候将延迟时间跟当前时间对比,就能判断是否到期。

如果到期了,就从消息扩展属性里面获取原 Topic 和对应队列 ID,然后投递到原队列中。

上面的图表就是这个意思,这里再贴一下:

请添加图片描述

然后再代码上的实现是立马新建一个任务扔到线程池中,延迟时间是 1000ms,任务的入参会塞入更新后的 offset,这样线程就会继续消费后面的消息,如此往复循环。

当然,如果拿到的对应延迟消息还未到时间,那么 offset 不变,也立马新建一个任务塞入到线程池中,这样 1000,s 后又会来看这个消息是否到期。

可以看到,整个延迟消息设计就加了一个线程池,很巧妙地复用了正常消息的 commitlog 和 consumeQueue 的存储机制,且利用发布订阅的特性,改变了消息的 Topic 来使得消费者无法消费到未到时间的消息。

到时间了又投递回原 Topic 使得消费者可以消费到到期的消息,非常 nice!

这样实现是否有什么问题?

从实现层面来看,大大减少了延迟消息开发的复杂度,但是这样的实现对延迟时间来说是不准的

首先,同一个延迟 level 的消息都是入同一个队列,然后上一个延迟消息处理完之后继续处理下一个,如果同一时刻有大量的同一个 level 的延迟消息产生,那么它们都堆积在一个队列里面,一个一个处理,这样一来即使后面的消息到时间了也得排队等着。

这样的机制就做不到非常实时。

并且从 SHEDULE_TOPIC_XXXX 分发至原 Topic 之后,假设原 Topic 本身就已经有很多消息堆积了,那么等消费者消费到这条消息的时候,时间也有大大的延迟。

请添加图片描述

当然,本身在大流量下对时间的把控是无法做到很准确的,不论是什么方法,都会有延迟,无非是延迟精度多少的问题。

有一种比较好的定时结构就是时间轮了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1962693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

四步教你快速解决UE5文件迁移失败❗️

本期作者:尼克 易知微3D引擎技术负责人 不知道大家在用UE5迁移文件时,有没有发现这个问题:如果文件输出的路径选择了非项目路径,那么UE会提示无法迁移。在UE4中,这样做是不存在问题的,只要选择「忽略」就可…

OS—文件系统

目录 一. 文件系统结构I/O 控制层基本文件系统文件组织模块逻辑文件系统 二. 文件系统布局文件系统在磁盘中的结构主引导记录(MasterBoot Record,MBR)引导块(boot block)超级块(super block)文件系统中空闲块的信息 文件系统在内存中的结构 三. 外存空间管理空闲表法空闲链表法…

关于CDN

CDN 代表内容分发网络(Content Delivery Network)它是一种通过将内容复制到多个地理位置分散的服务器上,从而加速网络内容传输的技术。CDN 的主要目的是提高用户访问速度、减少延迟和提升网站的可靠性。 具体来说,CDN 通过以下方…

飞创直线模组桁架机械手优势及应用领域

随着工业自动化和智能制造的发展,直线模组桁架机械手极大地减轻了人类的体力劳动负担,在危险性、重复性高的作业环境中展现出了非凡的替代能力,引领着工业生产向自动化、智能化方向迈进。 一、飞创直线模组桁架机械手优势 飞创直线模组桁架…

爬虫问题---ChromeDriver的安装和使用

一、安装 1.查看chrome的版本 在浏览器里面输入 chrome://version/ 回车查看浏览器版本 Chrome的版本要和ChromeDriver的版本对应,否则会出现版本问题。 2.ChromeDriver的版本选择 114之前的版本:https://chromedriver.storage.googleapis.com/index.ht…

mmdetection:用于目标检测、实例分割、全景分割和半监督目标检测的工具包

MMDetection 是一个基于 PyTorch 的开源目标检测工具箱,是 OpenMMLab 项目的一部分。该工具箱采用模块化设计,使用户能够通过组合不同组件轻松构建自定义的目标检测框架。 MMDetection 支持多种检测任务,包括目标检测、实例分割、全景分割和…

【ROS 最简单教程 002/300】ROS 集成开发环境安装: Noetic

💗 有遇到安装问题可以留言呀 ~ 当时踩了挺多坑,能帮忙解决的我会尽力 ! 1. 安装操作系统环境 Linux ❄️ VM / VirtualBox Ubuntu20.04 如果已有 linux 环境 (如双系统等),可跳过步骤 1 ~ 👉 保姆级图文安装教程指路…

Python_Flask学习笔记

1.配置 查询字符串的形式传参 app.route(/book/list) def book_list():page request.args.get(page,default1,typeint)return f"您获取的是{page}的图书列表!"if __name__ __main__:app.run()3.HTML模版渲染 from flask import Flask,render_templa…

大厂的堡垒机到底是啥?为什么需要它?

什么是堡垒机 堡垒机,即在一个特定的网络环境下,为了保障网络和数据不受来自外部和内部用户的入侵和破坏,而运用各种技术手段监控和记录运维人员对网络内的服务器、网络设备、安全设备、数据库等设备的操作行为,以便集中报警、及…

使用 Elastic Observability 中的 OpenTelemetry 进行基础设施监控

作者:来自 Elastic ISHLEEN KAUR 将 OpenTelemetry 与 Elastic Observability 相结合,形成应用程序和基础设施监控解决方案。 在 Elastic,我们最近决定全面采用 OpenTelemetry 作为首要的数据收集框架。作为一名可观察性工程师,我…

YOLOv9最新最全代码复现(论文复现)

YOLOv9最新最全代码复现(论文复现) 本文所涉及所有资源均在传知代码平台可获取 文章目录 YOLOv9最新最全代码复现(论文复现)引言YOLOv9模型概述模型框架图环境搭建及训练推理环境配置数据集准备训练过程测试和评估实践应用 报错修…

【CVPR2024】Efficient LoFTR: 高效的 LoFTR:具有类似稀疏的速度的半密集局部特征匹配

Efficient LoFTR: 高效的 LoFTR:具有类似稀疏的速度的半密集局部特征匹配 Efficient LoFTR realtime_demo 0.摘要 \qquad 我们提出了一种新的方法来有效地产生跨图像的半密集匹配。以往的无探测器匹配器LoFTR在处理大视点变化和纹理差的场景下表现出了出色的匹配能力…

【零基础必看的前端教程】——JavaScript(八)函数

欢迎大家打开前端的新篇章——JavaScript,JavaScript与HTML、CSS合称为前端三大件,JavaScript是前端的重中之重,小洪将继续以零基础视角,带你循序渐进学习前端知识,一看就懂,小白也能转行做前端&#xff01…

创建个人公私钥对

Windows电脑 本地电脑打开命令输入框,如windows WINR–cmd打开cmd窗口输入ssh-keygen -t rsa -C “Remote dev” ,按三次回车,即可看到本地生成的公私钥进入用户目录,如windows为C:\Users\xxx(个人域账号).ssh,可看到…

在C++程序中新建并使用库

创建一个新的cpp文件后,定义一个函数 我们希望这个函数是可以被多个程序调用的,而不是直接输入在程序中进行编译。在C程序中,不是所有代码都会被编译成可执行文件,只有main函数所在的程序才可以生成可执行文件。而这个库是通过上一…

(第三期)书生大模型实战营——书生大模型全链路开源开放体系

任务及教程来自书生大模型实战营https://github.com/InternLM/Tutorial

负载均衡、软件平滑升级

安装nginx 1.26.1 平滑升级、负载均衡 安装依赖 gcc gcc-c pcre-devel openssl-devel 七层负载均衡配置: [rootf ~]# vim /usr/local/nginx/conf/nginx.conf 43 location / {44 # root html;45 # index index.html index…

在docker中安装MongoDB 5.0+

文章目录 1、查看物理机是否支持avx指令集:安装资料中的cpu-z_2.10-cn.exe,并打开2、查看虚拟机是否支持avx指令集:3、创建目录4、使用Docker来运行一个MongoDB数据库实例5、进入容器6、查看当前db版本7、查看当前db的链接机器地址8、帮助指令…

《昇思25天学习打卡营第24天》

接续上一天的学习任务,我们要继续进行下一步的操作 构造网络 当处理完数据后,就可以来进行网络的搭建了。按照DCGAN论文中的描述,所有模型权重均应从mean为0,sigma为0.02的正态分布中随机初始化。 接下来了解一下其他内容 生成…

程序一调用这个接口就会崩溃, 因为他的静态库添加是放在release文件下,而我用的debug模式

程序一调用这个接口就会崩溃 因为他的静态库添加是放在release文件下 而我用的debug模式 DESTDIR ../x64/ReleaseINCLUDEPATH ./../3rdparty/ZZDecode/include LIBS -lopengl32 \-lglu32 \-luser32 \./../3rdparty/ZZDecode/x64/release/ZZDecodeInterface.lib