延时消息队列

news2025/1/11 8:09:21

目录

前言

一、延时队列实用场景

二、DelayQueue 

DelayQueue的实现

使用延迟队列 

 DelayQueue实现延时任务的优缺点

三、RocketMQ

原理

四、Kafka

原理

实现 

DelayMessage定义

消息发送代码 

消费者代码 

参考



前言

延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列就是用来存放需要在指定时间点被处理的元素的队列

队列是存储消息的载体,延时队列存储的对象是延时消息。所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。


一、延时队列实用场景

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将货款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;

  • 订单在三十分钟之内未支付则自动取消;

  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员;

  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

二、DelayQueue 

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
}

DelayQueue是一个无界的BlockingQueue,是线程安全的(无界指的是队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容,阻塞队列指的是当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常)

以上是阻塞队列的特点,而延迟队列还拥有自己如下的特点:

DelayQueue中存入的必须是实现了Delayed接口的对象(Delayed定义了一个getDelay的方法,用来判断排序后的元素是否可以从Queue中取出,并且Delayed接口还继承了Comparable用于排序),插入Queue中的数据根据compareTo方法进行排序(DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable接口的compareTo方法),并通过getDelay方法返回的时间确定元素是否可以出队,只有小于等于0的元素(即延迟到期的元素)才能够被取出

延迟队列不接收null元素

DelayQueue的实现

public class UserDelayTask implements Delayed {

    @Getter
    private UserRegisterMessage message;

    private long delayTime;

    public UserDelayTask(UserRegisterMessage message, long delayTime) {
        this.message = message;
        // 延迟时间加当前时间
        this.delayTime = System.currentTimeMillis() + delayTime;
    }

    // 获取任务剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(delayTime, ((UserDelayTask) o).delayTime);
    }
}

定义延迟队列并交付容器管理 

    /**
     * 延迟队列
     */
    @Bean("userDelayQueue")
    public DelayQueue<UserDelayTask> orderDelayQueue() {
        return new DelayQueue<UserDelayTask>();
    }

使用延迟队列 

@Resource
private DelayQueue<UserDelayTask> orderDelayQueue;
 
UserDelayTask task = new UserDelayTask(message, 1000 * 60);
        orderDelayQueue.add(task);

开启线程处理延迟任务

 @Override
    public void afterPropertiesSet() throws Exception {
        new Thread(() -> {
            while (true) {
                try {
                    UserDelayTask task = orderDelayQueue.take();
                    // 当队列为null的时候,poll()方法会直接返回null, 不会抛出异常,但是take()方法会一直等待,
                    // 因此会抛出一个InterruptedException类型的异常。(当阻塞方法收到中断请求的时候就会抛出InterruptedException异常)
                    UserRegisterMessage message = task.getMessage();
                    execute(message);

                    // 执行业务
                } catch (Exception ex) {
                    log.error("afterPropertiesSet", ex);
                }
            }
        }).start();
    }

 DelayQueue实现延时任务的优缺点

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
 

三、RocketMQ

RocketMQ 和本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

setDelayTimeLevel(level)

原理

实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

四、Kafka

对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

原理

  • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别
  • 发送消息的时候根据消息延迟等级发送到延迟 topic 对应的 partition,同时把原 topic 保存到 延迟消息 中。
  • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume。

       KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。

  • 如果达到了延迟时间,那么就获取到延迟消息中的真实 topic ,直接转发

这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

实现 

DelayMessage定义

/**
 * 延迟消息
 *
 * @author yangyanping
 * @date 2023-08-31
 */
@Getter
@Setter
@ToString
public class DelayMessage<T> implements DTO {
    /**
     * 消息级别,共18个,对应18个partition
     */
    private Integer level;

    /**
     * 业务类型,真实投递到的topic
     */
    private String topic;

    /**
     * 目标消息key
     */
    private String key;

    /**
     * 事件
     */
    private DomainEvent<T> event;
}

消息发送代码 

public void publishAsync(DelayMessage delayMessage) {
        String topic = "delay_topic";

        try {
            Integer level = delayMessage.getLevel();
            Integer delayPartition = level - 1;
            String data = JSON.toJSONString(delayMessage);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, delayPartition, "", data);

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    //发送成功后回调
                    log.info("{}-异步发送成功, result={}。", topic, result.getRecordMetadata().toString());
                }

                @Override
                public void onFailure(Throwable throwable) {
                    //发送失败回调
                    log.error("{}-异步发送失败。", topic, throwable);
                }
            });
        } catch (Exception ex) {
            log.error("{}-异步发送异常。", topic, ex);
        }
    }

消费者代码 

/**
 * 参考RocketMQ支持延迟消息设计,不支持任意时间精度的延迟消息,只支持特定级别的延迟消息,
 * 将消息延迟等级分为1s、5s、10s 、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,共18个级别,
 * 只创建一个有18个分区的延时topic,每个分区对应不同延时等级。
 *
 * https://blog.csdn.net/weixin_40270946/article/details/121293032
 *
 * https://zhuanlan.zhihu.com/p/365802989
 *
 * @author yangyanping
 * @date 2023-08-30
 */
@Slf4j
@Component
public class DelayConsumer implements ConsumerSeekAware {

    /**
     * 锁
     */
    private final Object lock = new Object();

    /**
     * 间隔
     */
    private final int interval = 5000;

    /**
     * 消费者
     */
    @Resource(name = "kafkaConsumer")
    private KafkaConsumer<String, String> kafkaConsumer;

    /**
     * 延迟消息发布
     */
    @Resource
    private DelayMessagePublisher delayMessagePublisher;

    @PostConstruct
    public void init() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        //当系统需要循环间隔一定时间执行某项任务的时候可以使用scheduleWithFixedDelay方法来实现
        executorService.scheduleWithFixedDelay(() -> {
            synchronized (lock) {
                resume();
                lock.notifyAll();
                log.info("DelayConsumer-notifyAll");
            }
        }, 0, interval, TimeUnit.MILLISECONDS);

    }

    /**
     * 批量消费消息
     */
    @KafkaListener(topics = "#{'${delayTopic.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
    public void onMessage(List<ConsumerRecord<String, String>> records, Consumer consumer) {

        synchronized (lock) {
            try {
                if (CollectionUtil.isEmpty(records)) {
                    log.info("DelayConsumer-records is empty !");
                    consumer.commitSync();
                    return;
                }

                boolean delay = false;

                for (ConsumerRecord<String, String> record : records) {
                    long timestamp = record.timestamp();
                    String value = record.value();
                    JSONObject jsonObject = JSON.parseObject(value);
                    Integer level = Convert.toInt(jsonObject.get("level"));
                    String targetTopic = Convert.toStr(jsonObject.get("topic"));
                    String event = Convert.toStr(jsonObject.get("event"));

                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    long delayTime = getDelayTime(timestamp, level);

                    if (delayTime <= System.currentTimeMillis()) {
                        log.info("DelayConsumer-delayTime={} <= currentTime={}", delayTime, System.currentTimeMillis());
                        // 处理消息
                        processMessage(record, consumer, topicPartition, targetTopic, event);
                    } else {
                        log.info("DelayConsumer-delayTime={} > currentTime={}", delayTime, System.currentTimeMillis());
                        // 暂停消费
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, record.offset());
                        delay = true;
                        break;
                    }
                }

                if (delay) {
                    lock.wait();
                }
            } catch (Exception var10) {
                log.error("{}.onMessage#error . message={}");
                throw new BizException("事件消息消费失败", var10);
            }
        }
    }

    /**
     * 消息级别,共18个
     * level-1 :30s
     * level-2 : 1m
     * level-3 : 5m
     * level-4 : 10m
     * level-5 : 20m
     * level-6 : 30m
     */
    private Long getDelayTime(long timestamp, Integer level) {
        switch (level) {
            case 1:
                return timestamp + 1 * 1000;
            case 2:
                return timestamp + 5 * 1000;
            case 3:
                return timestamp + 10 * 1000;
            case 4:
                return timestamp + 30 * 1000;
            case 5:
                return timestamp + 1 * 60 * 1000;
            case 6:
                return timestamp + 2 * 60 * 1000;
            //.........  省略
        }

        return timestamp;
    }


    /**
     * 处理消息 并提交消息
     */
    private void processMessage(ConsumerRecord<String, String> record, Consumer consumer, TopicPartition topicPartition, String targetTopic, String event) {

        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
        HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
        metadataHashMap.put(topicPartition, offsetAndMetadata);
        delayMessagePublisher.sendMessage(targetTopic, event);

        log.info("DelayConsumer-records#offset={},targetTopic={},event={}", record.offset() + 1, targetTopic, event);
        consumer.commitSync(metadataHashMap);
    }

    /**
     * 重启消费
     */
    private void resume() {
        try {
            kafkaConsumer.resume(kafkaConsumer.paused());
        } catch (Exception ex) {
            log.error("DelayConsumer-resume", ex);
        }
    }
}

参考

RabbitMQ、RocketMQ、Kafka延迟队列实现-腾讯云开发者社区-腾讯云

延迟消息队列设计-腾讯云开发者社区-腾讯云

用Kafka实现延迟消息_kafka延迟消费_alvin.yao的博客-CSDN博客

怎么设计一个合适的延时队列?

基于kafka实现延迟队列 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/978913.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stm32之31.iic

iic双线制。一根是SCL&#xff0c;作为时钟同步线;一根是SDA&#xff0c;作为数据传输线 SDN #include "iic.h"#define SCL PBout(8)#define SDA_W PBout(9) #define SDA_R PBin(9)void IIC_GPIOInit(void) {GPIO_InitTypeDef GPIO_InitStructure;//使能时钟GR…

开始投简历了

歇了好长时间&#xff0c;也该开始找点事情折腾了。 第一周基本上是没有什么太多的消息&#xff0c;大部分情况就是收到回复的邮件说你很优秀&#xff0c;希望下次合作这种礼节性的拒绝邮件。 给人有点感觉都是在忽悠&#xff0c;有点感觉现在的公司一边到处拒绝&#xff0c;…

短信软件平台搭建最新客户端|移讯云短信系统

根据客户 和市场需要 增加了新的客户端 新的客户端客户登录后发送短信时可自行选择用哪个通道来进行发送短信。每个通道的充值数量不一样。 通过后台给客户分配可使用的通道&#xff0c;只有在后台给客户分配可使用的通道后客户在登录客户端发送短信时才可进行选择。 关于客…

NoSQL之 Redis介绍与配置

目录 一、关系数据库和非关系数据库概述 1、关系型数据库 2、非关系型数据库 二、关系数据库和非关系数据库的区别 1、数据存储格式不同 2、扩展方式不同 3、对事务的支持不同 三、非关系数据库产生背景 1、总结 四、Redis简介 1、 Redis的单线程模式 2、Redis优点…

ChatGPT AIGC 完成多维分析雷达图

我们先让ChatGPT来帮我们总结一下多维分析雷达图的功能与作用。 同样ChatGPT AIGC完成的动态雷达图效果如下; 这样的一个多维分析动态雷达图是用HTML,JS,Echarts 来完成的。 将完整代码复制如下: <!DOCTYPE html> <html style="height: 100%"><h…

服务性能监控:USE 方法(The USE Method)

USE Method: Rosetta Stone of Performance Checklists USE Method: Rosetta Stone of Performance Checklists USE 方法基于 31 模型&#xff08;三种指标类型一种策略&#xff09;&#xff0c;来切入一个复杂的系统。我发现它仅仅发挥了 5% 的力量&#xff0c;就解决了大概…

基于视觉重定位的室内AR导航项目思路(2):改进的建图和定位分离的项目思路

文章目录 一、建图二、定位首先是第一种方法&#xff1a;几何方法其次是第二种方法&#xff1a;图像检索方法最后是第三种方法&#xff1a;深度学习方法 前情提要&#xff1a; 是第一次做项目的小白&#xff0c;文章内的资料介绍如有错误&#xff0c;请多包含&#xff01; 一、…

Unity RawImage

文章目录 1. Image2. RawImage2.1 UV Rect 3. RawImage 应用 1. Image Image 控件在我的这篇博客中有详细解释&#xff1a; https://blog.csdn.net/weixin_45136016/article/details/125655214 2. RawImage RawImage 组件是一个用来显示纹理的组件&#xff0c;常常跟Render …

【OpenCV入门】第九部分——模板匹配

文章结构 模板匹配方法单模板匹配单目标匹配多目标匹配 多模板匹配 模板匹配方法 模板是被查找的图像。模板匹配是指查找模板在原始图像中的哪个位置的过程。 result cv2.matchTemplate(image, templ, method, mask)image&#xff1a; 原始图像templ&#xff1a; 模板图像&a…

基于SSM的医院住院管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用Vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

第二证券:放量涨停,牛股狂揽七连板,海外机构扎堆调研股出炉

近10个交易日&#xff08;8月23日至9月5日&#xff09;&#xff0c;海外组织对343家上市公司进行调研。 昨日&#xff0c;家居人气股我乐家居再获涨停&#xff0c;成功揽获七连板。其全天成交额近4.98亿元&#xff0c;较上一日大幅放量&#xff0c;最新市值为46.25亿元。 昨日…

Spring+MyBatis使用collection标签的两种使用方法

目录 项目场景&#xff1a; 实战操作&#xff1a; 1.创建菜单表 2.创建实体 3.创建Mapper 4.创建xml 属性描述&#xff1a; 效率比较&#xff1a; 项目场景&#xff1a; 本文说明了Spring BootMyBatis使用collection标签的两种使用方法 1. 方法一: 关联查询 2. 方法…

fontforge将.woff文件转换为.ttf文件,查看字体对应关系

一、fontforge下载 阿里云盘&#xff1a;https://www.aliyundrive.com/s/tHBoGpYSgWh 提取码: 6c5m 百度网盘&#xff1a;https://pan.baidu.com/s/1ccWkGgarq3Vh_QJ8mNbn8g 提取码&#xff1a;ulr5 二、.woff文件转换为.ttf文件 1、用fontforge打开.woff文件&#xff0c…

ElasticSearch的安装部署-----图文介绍

文章目录 背景什么是ElasticSearch使用场景 ElasticSearch的在linux环境下的安装部署前期准备分配权限启动ElasticSearch创建用户组创建用户&#xff0c;并设置密码用户添加到elasticsearch用户组指定用户操作目录的一个操作权限切换用户 解压elasticsearch修改es的配置文件修改…

Mybatis-Pagehelper参数supportMethodsArguments引起的血案

0x00 背景 一个历史悠久的项目&#xff0c;使用的技术栈主要是 spring cloud 体系&#xff0c;属于 service 范畴&#xff0c;不给外部提供接口&#xff0c;但是集成了 myabtis-pagehelper&#xff0c;具体的版本如下&#xff1a; <dependency><groupId>com.gith…

【100天精通Python】Day55:Python 数据分析_Pandas数据选取和常用操作

目录 Pandas数据选择和操作 1 选择列和行 2 过滤数据 3 添加、删除和修改数据 4 数据排序 Pandas数据选择和操作 Pandas是一个Python库&#xff0c;用于数据分析和操作&#xff0c;提供了丰富的功能来选择、过滤、添加、删除和修改数据。 1 选择列和行 Pandas 提供了多种…

VS2022+CMAKE+OPENCV+QT+PCL安装及环境搭建

VS2022安装&#xff1a; Visual Studio 2022安装教程&#xff08;千字图文详解&#xff09;&#xff0c;手把手带你安装运行VS2022以及背景图设置_vs安装教程_我不是大叔丶的博客-CSDN博客 CMAKE配置&#xff1a; win11下配置vscodecmake_心儿痒痒的博客-CSDN博客 OPENCV配…

网络安全行业岗位缺口有多大?看看美国有多少岗位空缺

网络安全行业岗位缺口一直很大&#xff0c;在各类统计中其实并不能完全客观的反应这个缺口&#xff0c;不过都可以作为一个参考。同时&#xff0c;网络安全行业岗位的人员能力参差不齐&#xff0c;不仅仅在数量上有所欠缺&#xff0c;同时从质量上更加加剧了对人才的需求。我们…

高效开发工具:提升 REST API 开发效率

本文将介绍如何使用 Apifox 开发 REST API&#xff0c;并展示 Apifox 的一些关键功能。 我们可以先了解下&#xff1a;REST API 简介 - RESTful Web 服务 步骤 1&#xff1a;创建一个 Apifox 账户 首先&#xff0c;你需要在 Apifox 上创建一个账户。 步骤 2&#xff1a;创建…

React 18 使用 Context 深层传递参数

参考文章 使用 Context 深层传递参数 通常来说&#xff0c;会通过 props 将信息从父组件传递到子组件。但是&#xff0c;如果必须通过许多中间组件向下传递 props&#xff0c;或是在应用中的许多组件需要相同的信息&#xff0c;传递 props 会变的十分冗长和不便。Context 允许…