Idea+maven+springboot项目搭建系列--1 整合Rocketmq

news2025/1/25 9:15:56

前言:本文以maven+springboot 整合Rocketmq 完成消息的发送和接收。

1 Rocketmq 介绍:

1.1 Rocketmq 特性:
Apache RocketMQ是一款快速、可靠的分布式消息传递和流处理平台,具有可扩展性和高性能。它是一个分布式的、去中心化的消息队列,具有以下特性:

  • 分布式:RocketMQ允许将消息存储在多个Broker上并支持水平扩展,可以通过增加更多的Broker来扩展存储能力和吞吐量。

  • 异步传输:RocketMQ采用异步传输方式来提高性能,它的异步传输机制利用了Linux内核底层的零拷贝技术,从而实现了高吞吐量和低延迟。

  • 可靠性:RocketMQ采用了复制和故障转移机制来保证消息的可靠性。它可以配置多副本(通常是3个副本)来存储消息,当有一个Broker宕机时,系统可以自动将消息路由到其他副本上。

  • 灵活性:RocketMQ支持多种消息模式,包括点对点模式、发布/订阅模式和事务消息模式。它还支持多种消息协议,包括JMS、OpenMessaging和MQTT等。

  • 易于使用:RocketMQ使用简单,提供了丰富的客户端API和管理工具,使得开发人员可以快速地集成和使用它。

RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。

1.2 Rocketmq 主要组件:
Rocketmq 是一种基于发布-订阅(Pub/Sub)消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer)。而RocketMQ的基础消息模型就是一个简单的Pub/Sub模型。
在这里插入图片描述
RocketMQ主要由以下几个组件组成:

  • Nameserver:Nameserver是RocketMQ中的重要组件之一,它充当了命名服务和路由服务的角色。当Producer和Consumer要发送或者接收消息时,它们需要向NameServer请求获取Broker的信息,然后才能和Broker进行通信。Nameserver的作用类似于DNS服务器,用来维护RocketMQ中各个Broker的地址信息。
  • Broker:Broker是RocketMQ中的消息存储和传输核心组件。所有的消息都存储在Broker中,Producer向Broker发送消息,Consumer从Broker中订阅和接收消息。Broker的作用是接收、存储和转发消息,确保消息的可靠性和可扩展性。
  • Producer:Producer是创造和发送消息的客户端应用程序,它通过调用API将消息发送到Broker中。Producer可以按照不同的消息模式发送消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Consumer:Consumer是接收和处理消息的客户端应用程序。它通过从Broker中订阅和消费消息来实现消息的处理。Consumer可以按照不同的消息模式消费消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Message:Message是RocketMQ中最基本的消息单元,它包含了消息的内容和一些元数据,例如消息ID、消息主题、消息标签等。Producer将消息发送到Broker中,Consumer从Broker中订阅和接收消息。

Producer (生产者)和Consumer(消费者),一个向topic 发送消息,一个向topic 读取消息,消息的基本单元由Message 承接;
一般的消息组件对于消息的存储分发都只有一个组件处理,RocketMQ 中却使用了Nameserver和Broker 两个组件,那么这两个组件的关系是什么呢:
为了方便理解,这里使用图书馆进行类比:

  • 首先图书馆里存储了海量的图书,这些图书并不是杂乱无章的进行堆叠,而是按照一定的类型完成了分类存放;比如新闻类,医学类,生物类,文学类 等等,每种不同的分类下都有海量的图书;如果把每本图书看做是具体的一个个消息,那么图书的分类就是不同的topic;
  • 对于每种分类,为了统计的方便有可能需要为其在划分小类,如生物类,可以被划分为 植物类,动物类 等等,对于每个大类如果可以看做是topic ,那么大类下划分的小类就可以看做是 不同的 tag分类;
  • 显然每一种topic/tag分类的图书并不是杂乱无章的存放,而是会被整齐的放入到一排排的书架上,一排排的书架就可以看做是分区下的队列;
  • 显然书架作为了书籍最终的存放位置,那么可以将图书馆的书架看做是Broker,用户来借书和还书,最终都要来到书架上拿书和放书;
  • 显然图书馆里的书籍不仅需要分类存放,每层的图书管理人员,还需要熟悉自己负责楼层的书籍的位置信息,以及需要对书籍的维护;如果来借书的人需要的图书不在本楼层,图书管理人员也需要为其提供书籍正确的楼层位置信息,显然每层的图书管理人员,都需要掌握每层楼的图书信息,并且必要情况下,需要有可以顶替其他楼层管理员的能力;
  • 显然在rocketmq 中 ,Nameserver 的角色就和 每层的图书管理员相似;当每个用户来到本楼层还书(生产消息),楼层管理人员,需要告知还书的用户这本书,需要被正确归还的位置(消息的路由),从而帮助用户更好的还书;
  • 当用户来借书(消费消息),楼层管理人员,需要告知用户,想要的书籍正确楼层及详细位置信息(消息的路由);
  • 图书管理员怎么知道各个图书的分类以及位置信息,就需要不时的在自己的系统里动态维护数据的信息,以便于更好的服务借书的还书的人;
  • 显然rocketmq 中 最终存放数据的broker 组件需要和Nameserver 进行不时的交互,这样Nameserver 就可以实时的知晓数据的信息,当生产者投递消息时,先向Nameserver询问自己要投递的位置信息,然后在将数据进行投递到broker;当消费者消费消息时,也先向Nameserver询问自己想要消费数据的位置信息,然后在向具体的broker 获取消息;

2 springboot 整合:

2.1 引入jar:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

2.2 配置rocketmq:

# name-server地址
rocketmq.name-server=localhost:9876
# 配置消费组
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=30000
# 设置日志级别
logging.level.root=debug

2.3 生产者 消息发送工具类:



import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 生产者
 */
@Slf4j
@Component
public class RocketMQProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;


    /**
     * 发送普通消息
     *
     * @param topic
     * @param tag
     * @param msgBody
     */
    public void sendMsg(String topic, String tag, Object msgBody) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        rocketMQTemplate.convertAndSend(topic, msgBody);
    }
    /**
     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
     * sendResult为返回的发送结果
     */
    public <T> SendResult sendMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
        return sendResult;
    }


    /**
     * 发送异步消息
     *
     * @param topic
     * @param tag
     * @param msgBody
     * @param callback
     */
    public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);
    }
    /**
     * 发送异步消息
     *
     * @param topic         消息Topic
     * @param message       消息实体
     * @param sendCallback  回调函数
     * @param timeout       超时时间
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }

    /**
     * 发送延时消息
     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *
     * @param topic
     * @param tag
     * @param msgBody
     * @param timeout
     * @param delayLevel 值的有效范围1至18
     */
    public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        if (timeout != null) {
            messageTimeOut = timeout.intValue();
        }
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
    }

    /**
     * 发送异步延迟消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }


    /**
     * 发送异步延迟消息
     *
     * @param topic      消息Topic
     * @param message    消息实体
     * @param timeout    超时时间
     * @param delayLevel 延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---发送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
            }
        }, timeout, delayLevel);
    }

    /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     *
     * @param topic 消息主题
     * @param msg   消息体
     * @param <T>   消息泛型
     */
    public <T> void sendOneWayMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendOneWay(topic, message);
    }

    /**
     * 发送批量消息
     *
     * @param topic   消息主题
     * @param msgList 消息体集合
     * @param <T>     消息泛型
     * @return
     */
    public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
        List<Message<T>> messageList = msgList.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
        return rocketMQTemplate.syncSend(topic, messageList);
    }
    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param <T>       消息泛型
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }


    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param timeout   超时时间
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }

}

2.4 消费者:


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
        topic = "test_topic",
        selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.debug("监听到消息:message:{}", msg);
    }
}

2.5 测试消息发送:


import com.example.springrocket.config.RocketMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringRocketApplicationTests {
    @Autowired
    private RocketMQProducer rocketMQProducer;

    @Test
    void contextLoads() {
    }
    @Test
    void sendMQMessage(){
        SendResult sendResult = rocketMQProducer.sendMsg("test_topic","hello test 123");
        System.out.println(sendResult);
    }


}

消息获取:
在这里插入图片描述

3 整合遇到的问题参考:

3.1 提示RocketMQTemplate bean 没有被找到:

  • 检查nameServer 和Broker 服务,是否正常启动;
  • 检查10911,10909,10912 端口是否正常暴露;
  • 检查生产者的group 分组是否配置:rocketmq.producer.group
  • 如果springboot 的版本为3.x 则可以降低2.x 的版本,因为3.x 的版本不会进行rocketmq 的自动装配;

3.2 如果提示xxx.xx.xx.xx:10911 连接失败或者决绝:

  • 检查broker 的启动配置文件broker.conf 的brokerIP1 是否为公网ip 如果不是,则需要修改为公网ip;

4 参考:

4.1 Apache RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/587966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker 快速搭建elk

Docker搭建ELK步骤详解 文章目录 一.安装前须知二.安装 Docker三.Docker 安装 ElasticSearch四.Docker 安装 ElasticSearch-head&#xff08;可选&#xff09;五.Docker 安装 Kibana六.Docker 安装 LogStash七.创建springboot应用七.后记 一.安装前须知 以下步骤在 VMware 中…

清晰、明了的@Transcation事务嵌套使用

文章目录 概述Transcation注解事务实现原理 Transcation使用1、事务生效的情况&#xff1a;1. 外层有事务&#xff0c;内层无事务结论&#xff1a;外层有事务&#xff0c;内层也会有事务 2. 外层事务&#xff08;requierd&#xff09;&#xff0c;内层事务&#xff08;not_supp…

HEVC环路后处理核心介绍

介绍 为什么需要环路后处理技术 hevc采用基于快的混合编码框架&#xff0c;方块效应、振铃效应、颜色偏差、图像模糊等失真效应依旧存在&#xff0c;为了降低此类失真影响&#xff0c;需要进行环路滤波技术&#xff1b; 采用的技术 去方块滤波DF&#xff0c;为了降低块效应…

ADC和DAC的工作原理及其区别

ADC和DAC的工作原理及其区别 ADC和DAC都是用于模拟信号与数字信号之间的转换器。 ADC&#xff0c;即模数转换器&#xff0c;是将连续的模拟信号转换为数字信号的电路。其输入为模拟信号&#xff0c;输出为数字信号。ADC的主要组成部分是模拟信号采样模块、模拟信号处理模块、模…

生态系统服务(InVEST模型)土壤保持、水源涵养、氮磷输出、生态保护、生物多样性、碳固

白老师&#xff08;研究员&#xff09;&#xff1a;长期从事生态系统结构-格局-过程-功能-服务的变化与响应关系等研究工作&#xff1b;重点围绕生物多样性、生态系统服务与价值等&#xff0c;构建生物地球化学模型和评价指标体系&#xff0c;为城市、区域和自然保护区的可持续…

sqlserver中动态sql语句应用

前言 一、使用exec 1.用拼接方法 二、使用sp_executesql 1.用拼接方法 2.传参的方法 总结 前言 例如&#xff1a;列表查询条件不固定&#xff0c;根据前端传过来的参数&#xff0c;这时需要根据查询条件后台动态生成SQL语句 一、使用exec exec适用于字符串拼接的方式&#xf…

mac安装python

接上集&#xff0c;我们已经安装了Homebrew 那么在 macOS 上安装 Python 有多种方法&#xff0c;以下是其中两种常用方法&#xff1a; 1&#xff1a;使用 Homebrew 安装 Python Homebrew 是 macOS 上的包管理器&#xff0c;可以方便地安装和管理各种软件包。如果您已经安装了…

美国E8267C是德(KEYSIGHT) E8267D 20G/44G矢量信号发生器

Agilent E8267C、Keysight E8267D、 PSG 矢量信号发生器&#xff0c;高达 44 GHz ​Keysight E8267D (Agilent) PSG 矢量信号发生器是业界首款 I/Q 调制高达 44 GHz 的集成微波矢量信号发生器。它具有先进的宽带内部基带发生器&#xff0c;能够灵活地播放任意波形或生成复杂的…

在Windows11上模拟运行Linux命令的几种方式

在 Windows 上运行 Linux 命令的软件有很多&#xff0c;以下是其中几个比较常用的&#xff1a; Cygwin Cygwin 是一个为 Windows 提供类 Unix 环境的开源软件&#xff0c;它包含了大量的 Unix 工具和命令&#xff0c;可以在 Windows 上运行 Linux 命令。 安装命令 winget i…

【Java 继承】了解Java类的继承的特点,继承的关系,继承的使用,到底什么是继承?

博主&#xff1a;_LJaXi Or 東方幻想郷 专栏&#xff1a; Java | 从入门到入坟 Java 继承 继承的特点 \ 介绍 ❓特点 ♊ 继承的使用方式 &#x1f51e;避免重复方法 子类访问父类的成员变量 &#x1f232;子类访问父类的成员变量&#xff08;直接访问&#xff09;访问父类与子类…

MapReduce实战案例(3)

案例三: MR实战之TOPN(自定义GroupingComparator) 项目准备 需求测试数据 有如下订单数据 订单id商品id成交金额Order_0000001Pdt_01222.8Order_0000001Pdt_0525.8Order_0000002Pdt_03522.8Order_0000002Pdt_04122.4Order_0000002Pdt_05722.4Order_0000003Pdt_01222.8 现在…

6 具有 OCR 功能的顶级 PDF 图像转 Word 转换器

如果您在 PDF 图像中找到一些有用的信息并想转换为 Word 格式以供进一步使用&#xff0c;您将需要一个具有OCR 功能的 PDF 图像转 Word 转换器&#xff0c;该转换器旨在识别 PDF 图像中的文本并将其制作出来可编辑。 将 PDF 图像转换为 Word 并不容易&#xff0c;因为我们需要…

高压放大器工作原理(高压放大器怎么用的)

高压放大器是一种能够将低电平信号放大到足够高的电平&#xff0c;以便用于驱动大功率负载或处理高电压信号的电子设备。它广泛应用于各种电子设备中&#xff0c;例如音频放大器、射频放大器、电力电子设备等。下面我们将详细介绍高压放大器的工作原理以及使用方法。 高压放大器…

一分钟:GTP鼓谱导出转换MIDI格式教程

const loadPromise self.osmd.load("/resource/test");loadPromise.then(function () {self.osmd.render();});作为一名鼓手&#xff0c;我深知鼓谱转换MIDI格式的重要性&#xff0c;但是找了好久&#xff0c;一直没有找到一个好用的工具。 直到我发现了GTP鼓谱转换…

下载YouTube视频的一种方法

文章目录 工具名称下载方法使用方法1.只下载音频2.下载音频转换成mp3&#xff08;加上-x –audio-format参数&#xff09;3.下载视频&#xff08;带音频&#xff09;ID&#xff1a;22 | EXT&#xff1a;mp4 | 1280*720 下载的数据集&#xff1a;YouCook2 工具名称 yt-dlp 下载…

doxygen使用: 跨平台方式让markdown文件包含另一个文件

文章目录 1. 目的和问题2. 解决思路2.1 FILTER_PATTERNS 选项2.2 基于 Python 的 FILTER_PATTERNS 选项2.3 sledcpp.py 脚本 3. 完整工程3.1 目录结构3.2 hello.h 文件内容3.3 CHANGELOG.md 文件内容3.4 generate_doxyfile.py 文件内容3.5 docs/root.md3.6 docs/changelog.md3.…

Redis 事务详细介绍

事务 注意&#xff1a;Redis单条命令是保证原子性的&#xff1b;但是事务不保证原子性&#xff01; Redis事务没有隔离级别的概念&#xff0c;所有的命令在事务中&#xff0c;并没有直接被执行&#xff0c;只有发起执行命令时才执行 Redis事务本质&#xff1a;一组命令的集合&…

API接口对接的流程和注意的事项

API接口对接是将两个应用程序或系统连接并进行数据交换的过程。在进行API接口对接时&#xff0c;需要确保两个系统具有相同的协议和格式&#xff0c;并且数据传输过程中不会出现错误或数据丢失。下面是API接口对接的流程和注意事项&#xff1a; 流程&#xff1a; 1.确认数据格…

【多目标优化算法】多目标蚱蜢优化算法(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Linux——进程退出

目录 一.进程退出时有三种选择&#xff1a; 1.1 echo $?命令&#xff1a; 功能&#xff1a; 打印距离现在最近一次执行某进程的退出码 例2代码&#xff1a; 例3&#xff1a; 例4代码&#xff1a; 1.3 进程运行过程中可能会出现的错误种类&#xff1a; 二.总结&#xff…