Spring Boot整合RocketMQ实现延迟消息消费

news2025/2/24 3:42:24
导包
     <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:
  consumer:
    consume-message-batch-max-size: 1
    consume-mode: ORDERLY
    consume-thread-max: 1
    group: myGroup
    topics: topic1
  name-server: 106.52.60.215:9876
  producer:
    group: myGroup
    topics: topic1
生产者发送消息
同步发现消息

在Spring Boot中,可以使用RocketMQTemplate来发送消息。设置消息的延迟级别,可以使用RocketMQTemplatesend(Message message, long timeout, int delayLevel)方法,其中delayLevel为延迟级别,单位为秒

RocketMQ支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
 */
@Slf4j
@RestController
public class MyProducer1 {

    @Value("${rocketmq.producer.topics}")
    private String topic;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * description 同步发送延迟消息
     *
     * @param:  []
     * @return
     * @Date   2023/3/11
     */
    @GetMapping("syncSendTest")
    public void sendDelayMsg() {
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");

        // delayTimeLevel代表延迟级别  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        int delayTimeLevel = 3;
        Message<Blog> message = MessageBuilder.withPayload(blog)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel)
                .build();

        SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);
        log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
        log.info("发送结果:{}", sendResult);
    }
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
 */
@Slf4j
@RestController
public class DemoProducers {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * description 延迟消息发送
     *
     * @param:  [user]
     * @return
     * @Date   2023/3/11
    */
    @RequestMapping("/asyncSendTest")
     public  String asyncSendTest(){
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");
        // 构建消息体
        Message<Blog> msg = MessageBuilder.withPayload(blog).build();
        rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {
            // 发送成功
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // 发送失败
            @Override
            public void onException(Throwable throwable) {
                log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // ps:3 代表第三个延迟10s   延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        },3000,3);

        return "发送成功";
    }
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
@Slf4j
@RestController
public class MyProducer2 {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.topics}")
    private String topic;

    /**
     * description 延迟消息发送
     * 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。
     * 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)
     * ,第四个参数是发送消息的重试次数。
     * @param:  [user]
     * @return
     * @Date   2023/3/11
    */
    @RequestMapping("/asyncSendTest")
     public  String asyncSendTest(){
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");
        // 构建消息体
        Message<Blog> msg = MessageBuilder.withPayload(blog).build();
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            // 发送成功
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // 发送失败
            @Override
            public void onException(Throwable throwable) {
                log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // ps:3 代表第三个延迟10s   延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        },3000,4);

        return "发送成功";
    }
}
启动测试

启动请求:http://localhost:8081/asyncSendTest

控制台打印

可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟

延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。

通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1866395.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

open()函数——打开文件并返回文件对象

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 open()函数用于打开文件&#xff0c;返回一个文件读写对象&#xff0c;然后可以对文件进行相应读写操作。 语法参考 open()函数的语法格式如下&…

Day 34:2368. 受限条件下可到达节点的数目

Leetcode 2368. 受限条件下可到达节点的数目 现有一棵由 n 个节点组成的无向树&#xff0c;节点编号从 0 到 n - 1 &#xff0c;共有 n - 1 条边。 给你一个二维整数数组 edges &#xff0c;长度为 n - 1 &#xff0c;其中 edges[i] [ai, bi] 表示树中节点 ai 和 bi 之间存在一…

深度学习31-33

1.负采样方案 &#xff08;1&#xff09;为0是负样本&#xff0c;负样本是认为构造出来的。正样本是有上下文关系 负采样的target是1&#xff0c;说明output word 在input word之后。 2.简介与安装 &#xff08;1&#xff09;caffe:比较经常用于图像识别&#xff0c;有卷积网…

可以一键生成热点营销视频的工具,建议收藏

在当今的商业环境中&#xff0c;热点营销已经成为了一种非常重要的营销策略。那么&#xff0c;什么是热点营销呢&#xff1f;又怎么做热点营销视频呢&#xff1f; 最近高考成绩慢慢公布了&#xff0c;领导让结合“高考成绩公布”这个热点&#xff0c;做一个关于企业或产品的营销…

如何确保消息不被重复消费

一、重复消费问题出现的原因 导致重复消费的原因可能出现在生产者&#xff0c;也可能出现在 MQ 或 消费者。这里说的重复消费问题是指同一个数据被执行了两次&#xff0c;不单单指 MQ 中一条消息被消费了两次&#xff0c;也可能是 MQ 中存在两条一模一样的消费。 生产者&…

【新闻】全球热钱,正在流入新加坡 这个夏天有点猛,油价看涨? 普华永道已丢了六成“A股大客户”

新加坡成为全球投资焦点&#xff0c;吸引大量并购活动。预计经济增长2.4%&#xff0c;股指上涨8%。未来可期待更多国际投资涌入。 近期&#xff0c;新加坡成为全球投资者的焦点&#xff0c;吸引了大量的并购和投资活动。 据报道&#xff0c;2024年第二季度&#xff0c;新加坡…

【活动】搜维尔科技携Xsens邀您出席世界人工智能大会

展会介绍 由外交部、国家发展改革委、教育部、科技部、工业和信息化部、国家网信办、中国科学院、中国科协和上海市政府共同主办的世界人工智能大会&#xff08;WAIC&#xff09;&#xff0c;将于7月4日-7日在上海举行。围绕“以共商促共享 以善治促善智”主题&#xff0c;打造…

ElementUI搭建

概述 Element&#xff0c;一套为开发者、设计师和产品经理准备的基于 Vue 2.0 的桌面端组 件库. 安装 ElementUI npm 安装 推荐使用 npm 的方式安装&#xff0c;它能更好地和 webpack 打包工具配合使用。 npm i element-ui -S 在控制台输入此命令来安装ElementUI 在 main.j…

调用京灵平台接口,很详细

调用京灵平台接口&#xff0c;很详细 一、准备1、开发资源2、申请环境 二、测试接口调用1、查看接口文档2、查看示例代码3、引入对应依赖4、改造后需要的依赖5、测试调用 三、工具类1、配置dto2、公共参数dto3、请求参数dto4、响应参数dto4、调用工具类&#xff08;重要&#x…

【目标检测】Yolov8 完整教程 | 检测 | 计算机视觉

学习资源&#xff1a;https://www.youtube.com/watch?vZ-65nqxUdl4 努力的小巴掌 记录计算机视觉学习道路上的所思所得。 1、准备图片images 收集数据网站&#xff1a;OPEN IMAGES 2、准备标签labels 网站&#xff1a;CVAT 有点是&#xff1a;支持直接导出yolo格式的标…

Stirling-PDF 安装和使用教程

PDF (便携式文档格式) 目前已经成为了文档交换和存储的标准。然而&#xff0c;找到一个功能全面、安全可靠、且完全本地化的 PDF 处理工具并不容易。很多在线 PDF 工具存在隐私和安全风险&#xff0c;而桌面软件往往价格昂贵或功能有限。那么&#xff0c;有没有一种解决方案能够…

Java知识点整理 12 — 前端 Ant Design Pro 初始化模板使用

一. 项目初始化 Ant Design Pro 是基于 Ant Design 和 umi 封装的一整套企业级中后台前端设计框架&#xff0c;致力于在设计规范和基本组件的基础上&#xff0c;继续向上构建&#xff0c;提炼出典型模板或配套设计资源&#xff0c;进一步提升企业级中后台产品设计研发过程中的…

【学习笔记】Hive

Hive 作为数仓应用工具&#xff0c;对比 RDBMS&#xff08;关系型数据库&#xff09; 有3个“不能”&#xff1a; 不能像 RDBMS 一般实时响应&#xff0c;Hive 查询延时大&#xff1b; 不能像 RDBMS 做事务型查询&#xff0c;Hive 没有事务机制&#xff1b; 不能像 RDBMS 做行…

理解论文笔记:基于AHP和模糊综合评价的无线传感器网络可维护性评估方法

作为一个研0的娃,这是我认真读的第一篇论文,想着笔记让自己能看懂。如有侵权,请联系删除。 I. INTRODUCTION 介绍 主要介绍了无线传感器网络可维护性研究的重要性和必要性,并对下面的各章进行了总结。 翻译:第二部分简要介绍了无线传感器网络的维护,并对影响系统的因素…

【C++】final关键字 | 避免派生、重写

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

【ARM Trace32(劳特巴赫) 使用介绍 2.7 -- bat 脚本传参数给 trace32 cmm 脚本】

请阅读【Trace32 ARM 专栏导读】 文章目录 bat 脚本传参数给 trace32脚本可变参数传入CMM 脚本接收参数运行BAT脚本bat 脚本传参数给 trace32脚本 在使用 Trace32 的过程中,如果每次都是通过GUI 界面来操作,是习惯使用命令行工作的人所不能忍受的!!!,那么能不同通过脚本…

打破数据分析壁垒:SPSS复习必备(十一)

一、方差分析 方差分析的应用条件如下&#xff1a; &#xff08;1&#xff09;独立&#xff0c;各组数据相互独立&#xff0c;互不相关&#xff1b; &#xff08;2&#xff09;正态&#xff1a;即各组数据符合正态分布&#xff1b; &#xff08;3&#xff09;方差齐性&…

【第十八课】区域经济分析——探索性空间数据分析软件实操

一、前言 ArcGIS有专门处理探索性空间数据分析方法的工具,即地统计分析模块。 该模块主要由三个功能模块组成:探索性数据分析(Explore)、地统计分析向导(Geostatistical Wizard),以及生成数据子集(Create Subsets)。其中利用 这些基本功能模块,可以方便完成多种地统…

RPC架构基本结构和核心技术

当你在构建一个分布式系统时&#xff0c;势必需要考虑的一个问题是&#xff1a;如何实现服务与服务之间高效调用&#xff1f;当然&#xff0c;你可以使用Dubbo或Spring Cloud等分布式服务框架来完成这个目标&#xff0c;这些框架帮助我们封装了技术实现的复杂性。那么&#xff…

Spring Boot 整合 JSP

Spring Boot 是一个开源的 Java 框架&#xff0c;用于创建独立、生产级的基于 Spring 框架的应用程序。它简化了基于 Spring 的应用程序的创建和部署过程。JSP&#xff08;JavaServer Pages&#xff09;是一种动态网页技术&#xff0c;允许开发者在 HTML 中嵌入 Java 代码。将 …