【Spring Cloud Alibaba】RocketMQ的基础使用,如何发送消息和消费消息

news2025/1/23 4:55:09

在现代分布式架构的开发中,消息队列扮演着至关重要的角色,用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件,凭借其高性能、高可用性和良好的扩展性,成为了众多企业在构建高可靠性、高吞吐量应用系统时的首选。
对于Spring Cloud Alibaba的用户来说,集成RocketMQ并进行消息的发送与消费是常见的任务。本篇博客将深入介绍RocketMQ的基础使用方法,带大家一步步学习如何在Spring Cloud Alibaba中发送和消费消息。

在开始之前,确保已经完成了以下准备工作:

  1. 安装RocketMQ:确保已经在系统中成功安装了RocketMQ,并启动了相关服务。教程可以查看我上一篇博客
    【Spring Cloud Alibaba】Linux安装RocketMQ以及RocketMQ Dashboard可视化工具
  2. JDK:安装了JDK 1.8及以上版本,以便于运行Java应用程序。

文章目录

  • 🎺 第一步,搭建rocketmq项目环境
  • 🎺 第二步,生产者代码
    • 🎺普通消息
      • 🎺普通消息发送
        • 🎺同步发送
        • 🎺异步发送
        • 🎺单向模式发送
      • 🎺 普通消息接收
    • 🎺顺序消息
      • 🎺顺序消息发送
      • 🎺顺序消息接收
    • 🎺 延迟消息
      • 🎺 延迟消息发送
      • 🎺延时消息接收
    • 🎺批量消息
      • 🎺批量消息发送
      • 🎺批量消息接收
    • 🎺 事务消息
      • 🎺 事务消息发送
      • 在实际中遇到的问题

rocketmq官网地址:
https://rocketmq.apache.org/zh/docs/4.x/

这里我们讲的是4.x的版本

🎺 第一步,搭建rocketmq项目环境

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

我们使用的是rocketmq-spring-boot-starter2.2.1,其中的rocketmq版本为4.9.1

🎺 第二步,生产者代码

🎺普通消息

🎺普通消息发送

🎺同步发送

在这里插入图片描述

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

    /**
     * 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#31-%E5%90%8C%E6%AD%A5%E5%8F%91%E9%80%81
     *
     * @return
     */
    @GetMapping("/syncSend")
    public SendResult syncSend(String message) {
        Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
        return rocketMQTemplate.syncSend("my-topic:*", stringMessage);
    }

🎺异步发送

在这里插入图片描述
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

    /**
     * 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#32-%E5%BC%82%E6%AD%A5%E5%8F%91%E9%80%81
     *
     * @return
     */
    @GetMapping("/asyncSend")
    public String asyncSend(String message) {
        Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
        rocketMQTemplate.asyncSend("my-topic:*", stringMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步发送服务器返回信息成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步发送服务器返回信息失败");
            }
        });
        return "异步发送成功";
    }

🎺单向模式发送

在这里插入图片描述

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    /**
     * 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#33-%E5%8D%95%E5%90%91%E6%A8%A1%E5%BC%8F%E5%8F%91%E9%80%81
     *
     * @return
     */
    @GetMapping("/sendOneway")
    public String sendOneway(String message) {
        Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
        rocketMQTemplate.sendOneWay("my-topic:*", stringMessage);
        return "发送成功";
    }

🎺 普通消息接收

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

Apache RocketMQ既提供了Push模式也提供了Pull模式。

该博客中所有消费者都使用默认的push模式

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-group", consumeMode = ConsumeMode.CONCURRENTLY)
public class NormalRocketMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message:"+message);
    }
}

messageModel = MessageModel.BROADCASTING即广播消息,每个消费者都会去消费
但是即使都消费了,但是trackType都会显示NOT_CONSUME_YET
在这里插入图片描述

🎺顺序消息

🎺顺序消息发送

顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费

/**
     * 顺序消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/03message2
     *
     * @return
     */
    @GetMapping("/syncSendOrderly")
    public SendResult syncSendOrderly(String message) {
        String[] split = message.split(",");
        List<Message<String>> list = new ArrayList<>();
        for (String mes : split) {
            list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));
        }
        return rocketMQTemplate.syncSendOrderly("order-topic:*", list, String.valueOf(System.currentTimeMillis()));
    }

🎺顺序消息接收

消费者这里要设置consumeMode = ConsumeMode.ORDERLY才能实现顺序接收

@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-topic", consumeMode = ConsumeMode.ORDERLY)
public class OrderRocketMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message:"+message);
    }
}

🎺 延迟消息

🎺 延迟消息发送

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

    /**
     * 延迟消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
     *
     * @return
     */
    @GetMapping("/send")
    public SendResult send(String message) {
        Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
        return rocketMQTemplate.syncSend("delay-topic:*", stringMessage,1000,2);
    }

🎺延时消息接收

@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic")
public class DelayRocketMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message:"+message);
    }
}

🎺批量消息

🎺批量消息发送

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

    /**
     * 批量消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/05message4
     *
     * @return
     */
    @GetMapping("/send")
    public SendResult send(String message) {
        String[] split = message.split(",");
        List<Message<String>> list = new ArrayList<>();
        for (String mes : split) {
            list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));
        }
        return rocketMQTemplate.syncSend("list-topic:*", list);
    }

🎺批量消息接收

@Component
@RocketMQMessageListener(topic = "list-topic", consumerGroup = "list-topic")
public class ListRocketMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message:"+message);
    }
}

🎺 事务消息

🎺 事务消息发送

    /**
     * 事务消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/06message5
     *
     * @return
     */
    @GetMapping("/send")
    public SendResult send(String message) {
        Message<String> message1 = MessageBuilder.createMessage(message, new MessageHeaders(null));
        return rocketMQTemplate.sendMessageInTransaction("transA-topic:*", message1, null);
    }

在这里插入图片描述

rocketmq-springboot 提供了一个注解@RocketMQTransactionListener
使用方法:实现RocketMQLocalTransactionListener接口,并且类上加注解@RocketMQTransactionListener

@RocketMQTransactionListener
public class TopicATransactionalMessageService implements RocketMQLocalTransactionListener {

    private Map<String, RocketMQTransactionStrategy> strategyMap;


    //策略模式
    @Autowired
    public TopicATransactionalMessageService(TopicAStrategy topicAStrategy,TopicBStrategy topicBStrategy) {
        strategyMap = new HashMap<>();
        strategyMap.put("transA-topic", topicAStrategy);
        strategyMap.put("TopicB", topicBStrategy);
        // ...
    }



    /**
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();
        RocketMQTransactionStrategy strategy = strategyMap.get(topic);
        if (strategy == null) {
            // 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态
        }
        return strategy.executeLocalTransaction(msg, arg);

    }

    /**
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();
        RocketMQTransactionStrategy strategy = strategyMap.get(topic);
        if (strategy == null) {
            // 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态
        }
        return strategy.checkLocalTransaction(msg);
    }
}

在这里插入图片描述

这里可以使用策略模式来实现对每个topic的自定义策略

每个topic处理类需要实现RocketMQTransactionStrategy接口

@Service
public class TopicAStrategy implements RocketMQTransactionStrategy {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC);
        Object payload = msg.getPayload();
        String mes = new String((byte[]) payload);
        if (mes.equals("1")) {

            return RocketMQLocalTransactionState.COMMIT;

        } else if (mes.equals("2")) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        // ...
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        return RocketMQLocalTransactionState.COMMIT;
        // ...
    }
}

这样发送不同的topic都会有不同的处理策略

更多消息可查看官网
https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart/

在实际中遇到的问题

  • conf/broker.conf 中配置autoCreateTopicEnable=true
    ,如果没有对应的topic,则会在生产者有消息发送到mq的时候自动创建对应的topic
    如果不配置该属性,且开始没有topic的时候生产者发送消息到topic会报错org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17 DESC: topic[delay-topic] not exist, apply first please!

  • 不管有没有配置autoCreateTopicEnable=true,都会出现以下的情况:
    添加了消费者注解,如@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic"),程序会自动创建一个名称为%RETRY%delay-topic的topic
    如果没有对应的topic,则会一直报错org.apache.rocketmq.client.exception.MQClientException: CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
    解决方法:
    在mq中手动添加对应的topic即可

  • 如果你修改了autoCreateTopicEnable=true没有效果,删除rocketmq存储数据的文件夹store即可,默认存放位置/root/store

博客中涉及到的仓库地址:
https://gitee.com/WangFuGui-Ma/spring-cloud-alibaba

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/880681.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于统一事件管理,一定有你想知道的(一)

本文部分内容来源于布博士----擎创科技资深产品专家 IT技术已经无处不在&#xff0c;各行各业都离不开它。无论是银行、券商、家庭、学校还是个人&#xff0c;都离不开IT技术。例如&#xff1a; 我们⼈与⼈之间社交的软件&#xff0c;如微信、QQ、陌陌、Facebook等。 银⾏通过…

火山引擎DataLeap的Data Catalog系统公有云实践

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 Data Catalog是一种元数据管理的服务&#xff0c;会收集技术元数据&#xff0c;并在其基础上提供更丰富的业务上下文与语义&#xff0c;通常支持元数据编目、查找、…

常见分辨率时序信息

分辨率列表 分辨率一:640x480(逐行) 分辨率二:800x600(逐行) 分辨率三:1024x768(逐行) 分辨率四:大名鼎鼎720P(逐行) 注:选择720P@30帧的,需拉长HOR TOTAL TIME 分辨率五:1280x800(逐行) 分辨率六:1280x960(逐行

MySQL和Redis如何保证数据一致性

MySQL与Redis都是常用的数据存储和缓存系统。为了提高应用程序的性能和可伸缩性&#xff0c;很多应用程序将MySQL和Redis一起使用&#xff0c;其中MySQL作为主要的持久存储&#xff0c;而Redis作为主要的缓存。在这种情况下&#xff0c;应用程序需要确保MySQL和Redis中的数据是…

Bootstrap-fileinput 插件的使用

1.bootstrap-fileinput 下载地址 https://github.com/kartik-v/bootstrap-fileinput.git 2.bootstrap-fileinput 使用 input 标签 multiple"multiple" 表示可以多选文件 <div class"container-fluid"><div class"card border-0 shadow-sm…

深度学习在MRI运动校正中的应用综述

运动是MRI中的主要挑战之一。由于MR信号是在频率空间中获取的&#xff0c;因此除了其他MR成像伪影之外&#xff0c;成像对象的任何运动都会导致重建图像中产生伪影。深度学习被提出用于重建过程的几个阶段的运动校正。广泛的MR采集序列、感兴趣的解剖结构和病理学以及运动模式&…

这四种订货系统不能选(四):不能源码交付

订货系统在现代企业管理中具备着重要的地位和作用。通过订货系统&#xff0c;企业能够更好地掌握市场需求&#xff0c;提高订单的准确性和及时性&#xff0c;优化企业的供应链管理&#xff0c;并加强与供应商之间的合作与沟通。今天我们分享最后一个不能选的、也是最重要的一点…

ArcGIS Pro发布地图服务(影像、矢量)

本文示例使用&#xff08;因为portal的授权的版本只有10.5的&#xff0c;故使用10.5进行示例&#xff09;&#xff1a; 软件:ArcGIS Pro3.0.1&#xff08;破解版&#xff09;&#xff0c; ArcGIS Portal10.5 当ArcGIS Pro和Portal不在一个机器或者版本不一样的时候&#xff0…

日常问题——git推送代码被拒绝

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 Push to origin/master was rejected 提交代码时提示&#xff0c;被拒绝。 二、问题原因 …

(七)Unity VR项目升级至Vision Pro需要做的工作

Vision Pro 概述 定位为混合现实眼镜&#xff0c;对AR支持更友好 无手柄&#xff0c;支持手&#xff08;手势&#xff09;、眼&#xff08;注视&#xff09;、语音交互 支持空间音频&#xff0c;相比立体声、环绕声更有沉浸感和空间感 支持VR/AR应用&#xff0c;支持多种应用模…

零基础官网下载jdk

Oracle 官网总是隔一段时间一改版&#xff0c;时间长了博客可能不适用&#xff0c;望注意&#xff0c;但是精髓不变。 Oracle官网 官网地址百度搜索&#xff0c;其他任何官网都一个套路&#xff0c;但要识别下一些广告网站会模仿官方网站。 官网地址&#xff1a;https://www.…

k8s服务注册发现

Service 是 将运行在一个或一组pod上的网络应用程序公开为网络服务的方法。 定义service前端为service名称、ip、端口等不变的部分&#xff0c;后端为符合标签选择的pod集合 注册 通过api server提交注册service请求到DNSservice随后得到clusterIP&#xff08;虚拟ip地址&am…

UHPC的疲劳计算——兼论ModelCode2010的适用性

文章目录 0. 背景1、结论及概述2、MC10对于SN曲线的调整&#xff08;囊括NC、HPC、UHPC&#xff09;2.1 疲劳失效曲面的构建2.2 新模型的验证 3、MC10对于疲劳设计强度的调整及其背后的原因4. 结语 0. 背景 今年年初&#xff0c;有一位用UHPC做混凝土塔筒的同行告诉我&#xf…

多平台1688、淘宝、京东搜索商品聚合接口,示例返回值说明

多平台根据关键词取商品列表 API 返回值说明 item_search-根据关键词取商品列表 公共参数 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;Taobao。拼多多。京东&#xff0c;1688API 接口测试secretString是调用密钥api_nameString是A…

Deep Learning With Pytorch - 最基本的感知机、贯序模型/分类、拟合

文章目录 如何利用pytorch创建一个简单的网络模型&#xff1f;Step1. 感知机&#xff0c;多层感知机&#xff08;MLP&#xff09;的基本结构Step2. 超平面 ω T ⋅ x b 0 \omega^{T}xb0 ωT⋅xb0 or ω T ⋅ x b \omega^{T}xb ωT⋅xb感知机函数 Step3. 利用感知机进行决策…

我们为什么需要API管理系统?

我们为什么需要API管理系统&#xff1f; 随着web技术的发展&#xff0c;前后端分离成为越来越多互联网公司构建应用的方式。前后端分离的优势是一套Api可被多个客户端复用&#xff0c;分工和协作被细化&#xff0c;大大提高了编码效率&#xff0c;但同时也带来一些“副作用”:…

stm32g070的PD0/PD2 PA8和PB15

目前在用STM32G070做项目&#xff0c;其中PD2TIMER3去模拟PWM&#xff0c;PD0用作按键检测&#xff0c;测试发现PD0低电平检测没有问题&#xff0c;高电平检测不到&#xff0c;电路图如下图所示&#xff1a; 用万用表测试电平&#xff0c;高电平1.0V左右&#xff0c;首先怀疑硬…

高级SQL分析函数-窗口函数

摘要&#xff1a;本文由葡萄城技术团队于CSDN原创并首发。转载请注明出处&#xff1a;葡萄城官网&#xff0c;葡萄城为开发者提供专业的开发工具、解决方案和服务&#xff0c;赋能开发者。 前言 SQL语句中&#xff0c;聚合函数在统计业务数据结果时起到了重要作用&#xff0c;…

nginx keepalived 本地二进制部署

文章目录 安装 nginx安装 keepalived卸载 nginx卸载 keepalived 安装 nginx wget http://nginx.org/download/nginx-1.24.0.tar.gz tar -xf nginx-1.24.0.tar.gz cd nginx-1.24.0/ ./configure --with-stream --prefix/usr/local/nginx make && make install修改nginx…

关于ChatGPT抽样调查:78%的人用于搜索,30%的人担心因它失业

人工智能早已不再被视为未来科技&#xff0c;而是越来越多地应用在时下人们的生活之中。根据DECO PROTESTE的调查&#xff0c;大约72%的葡萄牙人认为人工智能已经活跃于他们的日常。[1] 随着ChatGPT对各个行业的影响&#xff0c;也引发了人们关于这种人工智能模型潜力的争论&a…