SpringBoot 集成 RocketMQ

news2025/1/12 15:59:58

项目地址

前面我们介绍了怎么使用 docker 安装 rocketMQ,现在我们就来试试使用 SpringBoot 集成之后,怎么发送消息和消费消息。

集成步骤

工程结构

工程结构图

第一步:引入相关依赖

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>

<!--   rocketMQ 核心依赖包     -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.23</version>
</dependency>

第二步:增加配置文件

rocketmq:
  name-server: 192.168.152.130:9876	 # 这里需要换成自己的 rocketMq 的地址
  producer:
    group: SpringBoot_Group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3
  consumer:
    group: SpringBoot_Group

第三步:增加消息的发送者

发送消息其实也比较好理解,就是通过 RocketMQTemplate 来操作,由于 Spring 中封装了一层,所以我们操作起来就比较简单,具体的代码向下看就好。

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleProducer {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息
     *
     * @param topic 主题
     * @param msg   消息体
     */
    public void sendSyncMsg(String topic, String msg) {
        rocketMQTemplate.convertAndSend(topic, msg);
    }

    /**
     * 异步消息
     *
     * @param topic 主题
     * @param msg   消息体
     */
    public void sendAsyncMsg(String topic, String msg) {
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            //消息发送成功的回调
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            //消息发送失败的回调
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }
        });
    }

    /**
     * 发送异步消息
     *
     * @param topic        主题
     * @param msg          消息体
     * @param sendCallback 回调方式
     */
    public void sendAsyncMsg(String topic, String msg, SendCallback sendCallback) {
        rocketMQTemplate.asyncSend(topic, msg, sendCallback);
    }
}

第四步:增加消息的消费者

我们使用注解 @RocketMQMessageListener 来作为监听指定的 topic 以及 consumerGroup 的消息,另外我们需要实现 RocketMQListener 来处理回调消息,还是比较简单的,具体的代码如下:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 *
 * 实现 RocketMQListener 监听器是为了接受到发送过来的消息,泛型是消息的类型
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq", consumerGroup = "${rocketmq.consumer.group}")
public class SimpleConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:" + message);	// 打印传递的消息
    }
}

到这里呢,其实已经算是搞完了,接下来我们来测试下消息。

消息测试

启动程序服务

程序启动

编写测试类

import com.demo.mq.rocketmq.producer.SimpleProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author wuq
 * @Time 2023-5-5 13:42
 * @Description
 */
@SpringBootTest
public class ProducerMsgTest {

    @Autowired
    SimpleProducer simpleProducer;

    @Test
    public void testSync(){
        simpleProducer.sendSyncMsg("springboot-mq", "发送同步消息");
    }

    @Test
    public void testAsync(){
        simpleProducer.sendAsyncMsg("springboot-mq", "发送异步消息");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试发送消息

测试方法来发送消息:
发送消息
消费消息结果如下:
消费消息
也可以通过管理后台来重新消费消息,不过这个操作就是做测试使用的哈,正式环境应该不会这样子干,毕竟消息在列表上面都不知道哪条是哪条:
管理后台
重新消费消息:
重新消费消息
重新消费消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/492025.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【新星计划-2023】什么是OSI七层模型?一文带你了解

一、什么是OSI七层模型 OSI七层模型又叫做“OSI参考模型”&#xff0c;它的全称为“开放系统互连参考模型”&#xff0c;它是一个七层的、抽象的模型体&#xff0c;不仅包括一系列抽象的术语或概念&#xff0c;也包括具体的协议。那么为什么会出现OSI参考模型&#xff1f; 原…

第 6 章 整合 Phoenix

6.1 Phoenix 简介 6.1.1 Phoenix 定义 Phoenix 是 HBase 的开源 SQL 皮肤。可以使用标准 JDBC API 代替 HBase 客户端 API 来创建表&#xff0c;插入数据和查询 HBase 数据。 6.1.2 为什么使用 Phoenix 官方给的解释为&#xff1a;在 Client 和 HBase 之间放一个 Phoenix 中…

WatchGuard 防火墙策略、配置和日志分析器

获取 Internet 活动见解并及时了解安全事件是一项具有挑战性的任务&#xff0c;因为安全设备会生成大量的安全和流量日志。Firewall Analyzer 针对 WatchGuard 防火墙设备的报告功能具有一系列功能&#xff0c;使您能够增强网络安全。WatchGuard 日志分析器软件&#xff0c;可让…

【推荐】网络安全10本入门必看书籍

前言 对于初学者来说&#xff0c;了解网络安全的入门知识是非常重要的。以下是我推荐的10本入门网络安全必看的书籍 1.《黑客攻防技术宝典》 作者&#xff1a;余洪涛&#xff0c;出版社&#xff1a;清华大学出版社 这本书是网络安全初学者入门的好选择。书中讲解了黑客攻击和…

尚硅谷大数据技术Spark教程-笔记06【SparkCore(案例实操,电商网站)】

视频地址&#xff1a;尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【SparkCore&#xff08;概述、快速上手、运行环境、运行架构&#xff09;】尚硅谷大数据技术Spark教程-笔记02【SparkCore&#xff08;核心编程&#xff0c;RDD-核…

速看,关于Python的17个学习网站,从基础到机器学习【建议收藏】

目录 一、基础学习网站Python官方教程Python官方安装包地址PyCharm下载地址anaconda3清华开源下载地址 二、爬虫学习网站requests官方学习网站BeautifulSoup文档网站selenium官方学习网站scrapy中文学习网站 三、数据分析学习网站numpy官方文档网站pandas官方文档网站sklearn官…

Spring-Rest- url 请求风格和SpringMVC 映射请求数据

目录 Rest- url 请求风格 Rest-基本介绍 ● 说明 实例 说明&#xff1a; ● REST 的核心过滤器 代码说明Rest 风格的 url-完成增删改 需求说明 ​编辑修改 web.xml 添加 HiddenHttpMethodFilter 修改 springDispatcherServlet-servlet.xml 作用 创建rest.jsp 解读…

大数据技术之Kettle

目录 第1章 Kettle概述 1.1 ETL简介 1.2 Kettle简介1.2.1 Kettle是什么 1.2.2 Kettle的两种设计 1.2.3 Kettle的核心组件 1.2.4 Kettle特点 第2章 Kettle安装部署 2.1 Kettle下载 2.1.1 下载地址 2.1.2 Kettle目录说明 2.1.3 Kettle文件说明 2.2 Kettle安装部署 2…

openai账号创建教程-openai注册问题大全

openai注册页面打不开 遇到openai注册页面打不开&#xff0c;可以用以下解决方法&#xff1a; 检查网络连接。如果您的网络连接不稳定或者有问题&#xff0c;可能会导致访问网站异常。请尝试使用其他设备或连接其他网络&#xff0c;看是否能够打开OpenAI注册页面。 清除浏览器…

为什么需要使用Docker

简介与概述 1.介绍 Docker是一个开源的应用容器引擎&#xff0c;基于Go语言开发的&#xff0c;并且遵从Apache2.0协议开源。 Docker可以让开发者打包他们的应用以及依赖&#xff0c;打包到轻量级、可移植的容器中&#xff0c;然后发布到任何一个流行的Linux服务器上&#xff…

新库上线 | CnOpenData中国标准数据

中国标准数据 一、数据简介 按照《中华人民共和国标准化法》的定义&#xff0c;标准是指农业、工业、服务业以及社会事业等领域需要统一的技术要求。标准作为一种通用性的规范语言&#xff0c;在合理利用国家资源、保障产品质量、提高市场信任度、促进商品流通、维护公平竞争、…

使用JPA自动生成代码(轻松上手看了就会版)

目录 背景&#xff1a;方案概念&#xff1a;JPA 的主要作用 jpa简单使用&#xff08;Springboot项目&#xff09;jpa进阶使用总结 背景&#xff1a; 项目需要自动生成sql代码&#xff0c;不需要写sql语句&#xff0c;能够自动进行查询&#xff0c;我想到了JPA。 方案 概念&a…

Jetpack Compose 不止是一个UI框架~

Jetpack Compose是用于构建原生Android UI的现代工具包。 Jetpack Compose使用更少的代码&#xff0c;强大的工具和直观的Kotlin API&#xff0c;简化并加速了Android上的UI开发。这是Android Developers 官网对它的描述。 本文不是教你Jetpack Compose 的一些基本使用方法&am…

【通过xib自定义Cell Objective-C语言】

一、我们怎么样来自定义单元格呢, 1.我们先来分析一下, 我们这里虽然有很多行,但是每一行,长的都是一样的, 这里有一个Label、那里有一个Label, 每一行每一行,长的都是一样的,唯独只有数据不一样吧, 所以说,遇到这种情况,我们就可以考虑用一个xib,描述一个单元…

倒计时24天!接棒香港展,CTIS2023观众预登记全面启动

4月22日,环球资源春季香港展在亚洲国际展览馆落下帷幕。两期展会,十馆全开,共历时8天。汇聚来自中国大陆、香港、台湾地区、韩国、越南、印度等地逾4,000家优质供应商超过30万件产品。吸引近10万人次的专业观众参观。环球资源为买卖双方打造优质贸易平台,让买家第一时间把握消费…

【Python零基础学习入门篇⑤】——第五节:Python中的函数

⬇️⬇️⬇️⬇️⬇️⬇️ ⭐⭐⭐Hello&#xff0c;大家好呀我是陈童学哦&#xff0c;一个普通大一在校生&#xff0c;请大家多多关照呀嘿嘿&#x1f601;&#x1f60a;&#x1f618; &#x1f31f;&#x1f31f;&#x1f31f;技术这条路固然很艰辛&#xff0c;但既已选择&…

【RDC2022纪念板】RT-Smart D1s上手

目录 环境准备开发板硬件介绍开发环境搭建烧录 环境准备 windows电脑&#xff08;用于烧录固件和串口日志查看&#xff09;Ubuntu虚拟机&#xff08;用于编译生成固件&#xff09;RDC2022纪念板TypeC数据线 开发板硬件介绍 开发板使用了全志科技的D1s芯片&#xff0c;全志RIS…

PAVC100R4222 PARKER轴向柱塞泵

PAVC100R4222 PARKER轴向柱塞泵特点&#xff1a; 1、壳体为高强度铸铁 2、两段设计便于维护 3、全密封的轴用轴承 4、内置增压器***高转速性能&#xff0c;可达3000 RPM( PAVC100为2600 RPM) 5、控制器为插装形式&#xff0c;易于现场更换 6、配流盘为可替换的青铜复合 10、过滤…

R实践——paleobioDB详解(paleobiology database)

paleobioDB详解&#xff08;paleobiology database&#xff09; PBDB初步认识paleobioDB一个简单的例子 所有函数详解1. pbdb_collection描述用法参数细节值例子 2. pbdb_collections描述用法参数值例子 3. pbdb_collections_geo描述用法参数值例子 4. pbdb_interval描述用法参…

成为数据分析师,需要具备哪些技能?

随着互联网的发展&#xff0c;数据分析师的特点越来越明显&#xff0c;对数据分析师综合素质的要求也较高。 1、较强的数据挖掘、信息整理、和逻辑分析能力 数据分析&#xff0c;也是数据分析师的一个方向。 制作日常性的经营报表&#xff0c;对公司或者行业KPI指标进行拆解…