SpringBoot集成Apache RocketMQ详解

news2024/12/24 20:22:15

文章目录

  • 0. 前言
  • 1. Spring Boot 集成Apache RocketMQ详细步骤
    • 1.1.添加依赖
    • 1.2.配置RocketMQ
    • 1.3.创建消息生产者(Producer)
    • 1.4.创建消息消费者(Consumer)
  • 2. 测试验证
  • 3. 常见报错
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
这段代码实现了以下类型的消息发送:
使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

  1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

  2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

  3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

  4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

  5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

  6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

1. Spring Boot 集成Apache RocketMQ详细步骤

1.1.添加依赖

在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.15</version>
		<relativePath/>
	</parent>
	<groupId>com.icepip.project</groupId>
	<artifactId>springboot-icepip-rocketMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>springboot-icepip-rocketMQ-example</name>
	<description>Spring boot 集成rocketMQ 示例</description>
	<properties>
		<java.version>8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.0.4</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

1.2.配置RocketMQ

application.properties文件中配置RocketMQ的相关信息:

rocketmq.name-server=你的RocketMQ服务IP:9876
rocketmq.producer.group=my-producer
# 刚开始未配置 导致超时报错
rocketmq.producer.sendMessageTimeout=10000

1.3.创建消息生产者(Producer)

package com.icepip.project.mqtt.controller;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
/**
 *  SpringBoot集成Apache RocketMQ详解
 * @author 冰点
 * @version 1.0.0
 * @date 2023/9/9 17:02
 */

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 同步发送消息到指定主题
     * @param message
     * @return
     */
    @GetMapping("/syncSend")
    public String syncSend(String message) {
        // 同步发送消息到指定主题
        rocketMQTemplate.syncSend("test-topic", message);
        return "Sync message: " + message + " sent";
    }
    /**
     * 异步发送消息到指定主题
     * @param message
     * @return
     */
    @GetMapping("/asyncSend")
    public String asyncSend(String message) {
        // 异步发送消息到指定主题
        rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Async message sent successfully, result: " + sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.err.println("Failed to send async message: " + throwable.getMessage());
            }
        }, 3000, 3); // 3000 ms timeout, delay level 3

        return "Async message: " + message + " sent";
    }

    /**
     * 发送单向消息到指定主题,无需等待Broker的确认
     * @param message
     * @return
     */
    @GetMapping("/sendOneWay")
    public String sendOneWay(String message) {
        // 发送单向消息到指定主题,无需等待Broker的确认
        rocketMQTemplate.sendOneWay("test-topic", message);
        return "OneWay message: " + message + " sent";
    }

    // 发送顺序消息
    @GetMapping("/sendOrderly")
    public String sendOrderly(String message) {
        // 发送顺序消息到指定主题
        rocketMQTemplate.syncSendOrderly("test-topic", message, "order");
        return "Orderly message: " + message + " sent";
    }

    // 发送延迟消息
    @GetMapping("/sendDelayed")
    public String sendDelayed(String message) {
        // 发送延迟消息到指定主题,延迟级别为3
        rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);
        return "Delayed message: " + message + " sent";
    }

    // 发送批量消息
    @GetMapping("/sendBatch")
    public String sendBatch() {
        List<String> messages = new ArrayList<>();
        messages.add("message1");
        messages.add("message2");
        // 批量发送消息到指定主题
        rocketMQTemplate.syncSend("test-topic", messages);
        return "Batch messages sent";
    }
}

1.4.创建消息消费者(Consumer)

package com.icepip.project.mqtt.handler;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 定义一个消费者,监听test-topic主题的消息
 * @author 冰点
 * @version 1.0.0
 * @date 2023/9/9 16:29
 */

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class MyConsumer implements RocketMQListener<String>{

    // 当收到消息时,该方法将被调用
    @Override
    public void onMessage(String message) {
        System.out.println("Received message: "+ message);
    }
}

2. 测试验证

在这里插入图片描述
在这里插入图片描述

3. 常见报错

  1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
解决办法,修改Broker的IP为宿主机IP
进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
在这里插入图片描述

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/995427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

下载Ubantu镜像文件、创建虚拟机以及ubantu安装详细教程

目录 前言 Ubantu是什么&#xff1f;它有什么作用&#xff1f; 一、Ubantu镜像文件下载步骤 1.第一步安装VMware Workstation 2.第二步下载Ubuntu的镜像文件 镜像文件下载官网网址入下&#xff1a; 二、创建虚拟机和安装Ubantu的步骤 1.打开VMware Workstation并点击创…

反射与注解

【今日】 人生只有一次 不妨大胆一点 目录 一 反射 1.访问构造方法 1.Constructor的使用 2.反射一个类中的所有构造方法 3.用Constructor创建并返回实列对象 2.访问成员变量 1Field的使用方法 2.反射一个类中的所有成员变量 3.获取修改成员变量的值 3.访问成员方…

javaee springMVC model的使用

项目结构图 pom依赖 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

【LeetCode75】第五十一题 最大子序列的分数

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个长度一样的数组&#xff0c;让我们再num1中找出一个长度为k的子序列&#xff0c;然后把这个子序列累加的和乘上在nums2中对…

74 QML ProgressBar显示进度数字

1 引言 由于目前使用的是qt.5.14版本&#xff0c;Qt Quick Controls 已经从1.0版本 变为2.0版本了&#xff0c;如果继续使用的Qt Quick Controls 1 的style:方式&#xff0c;改变进度条的样式已经不行了&#xff0c;其会报错&#xff1a;Invalid property name "style&quo…

Langchain的一些问题和替代选择

Langchain因其简化大型语言模型(llm)的交互方面的到关注。凭借其高级的API可以简化将llm集成到各种应用程序中的过程。 但是Langchain乍一看似乎是一个方便的工具&#xff0c;但是它有时候否更像是一个语言迷宫&#xff0c;而不是一个直截了当的解决方案。在本文中&#xff0c…

mysql 增量备份与恢复使用详解

目录 一、前言 二、数据备份策略 2.1 全备 2.2 增量备份 2.3 差异备份 三、mysql 增量备份概述 3.1 增量备份实现原理 3.1.1 基于日志的增量备份 3.1.2 基于时间戳的增量备份 3.2 增量备份常用实现方式 3.2.1 基于mysqldump增量备份 3.2.2 基于第三方备份工具进行增…

安装 Gin 框架

首先需要在目录下初始化一下 go 项目 go init可以看到生成了一个go.mod文件&#xff0c;然后使用以下命令安装 gin 框架 go get -u github.com/gin-gonic/gin养成一个好习惯&#xff0c;在写项目之前先初始化项目 go mod init go mod tidy如果不初始化项目的话没有第三方库补…

Tampermonkey实践:安装引导及开发一个网页背景色更改插件

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌&#xff0c;CSDN博客专家&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师…

基于SpringBoot的火车订票管理系统

基于SpringBootVue的火车订票管理系统&#xff0c;前后端分离 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 【主要功能】 角色&#xff1a;管理员、会员 会员&…

java_日期时间API

文章目录 一、JDK8之前的日期时间API1.1 System类的currentTimeMillis()1.2 两个Date类1.2.1 java.util.Date包下的1.2.2 java.sql.Date包下的 一、JDK8之前的日期时间API 1.1 System类的currentTimeMillis() 获取当前时间对应的毫秒数&#xff0c;long类型 当前时间与1970年1…

vscode各种配置的方法

一. vscode配置 vscode 是微软公司提供的一个 代码编辑器。是做C/C常用的编辑器。 在安装后&#xff0c;可以根据自己需要自行安装常用的配置插件。同时&#xff0c;也可以在设置栏设置自己需要的功能&#xff0c;以方便使用。 下面学习 vscode的几种常见的设置。 二. vsco…

vue中 字体图标引入 - iconfont阿里字体图标库

官网&#xff1a;iconfont-阿里巴巴矢量图标库 代码应用中&#xff0c;有许多方法&#xff0c;如何使用该图标库。如&#xff0c;icon单个使用、unicode引用、或 font-class引用&#xff08;推&#xff09;、symbol&#xff08;svg合集&#xff09;。本文主讲 font-class 方法…

C#,《小白学程序》第十六课:随机数(Random)第三,正态分布的随机数的计算方法与代码

1 随机数的问题 用 C# Random 类生成的随机数是平均分布的。也就是各数据段的出现的次数差不多。彩票号码属于这种随机数。 而很多很多常见的随机数&#xff0c;比如&#xff1a;成绩&#xff0c;却是符合正态分布的。 因而很多时候需要生成符合正态分布规律的随机数。 2 文…

JavaFx之Hello, World!

当使用 JavaFX 进行应用程序开发时&#xff0c;Application 类是一个关键组件。它是 JavaFX 应用程序的入口点&#xff0c;负责启动应用程序并设置应用程序的主舞台&#xff08;Stage&#xff09;和场景&#xff08;Scene&#xff09;。下面是一个详细介绍 Application 类并带有…

MySQL——锁

简介 多线程访问共享资源的时候&#xff0c;避免不了资源竞争而导致数据错乱的问题&#xff0c;所以我们通常为了解决这一问题&#xff0c;都会在访问共享资源之前加锁。 锁的分类 Mysql中的锁机制基本上都是采用的悲观锁来实现的。 行锁 行锁就是一锁锁一行或者多行记录&a…

【Spatial-Temporal Action Localization(一)】认识时空动作定位

文章目录 任务定义任务难点数据集任务现状评估指标可以思考的创新的角度 不错的博客&#xff0c;还有框架推荐 南京大学开源MultiSports&#xff1a;面向体育运动场景的细粒度多人时空动作检测数据集… 论文阅读推荐、Video Understanding&#xff08;3&#xff09;Spatio-Te…

d3dcompiler_47.dll缺失怎么修复,这个方法电脑小白也能学会

在计算机领域&#xff0c;d3dcompiler_47.dll文件是DirectX的一部分&#xff0c;用于执行硬件加速的图形渲染。当遇到“找不到d3dcompiler_47.dll丢失”的问题时&#xff0c;通常表示系统缺少此文件或其路径设置不正确。本文将介绍一些详细解决方法&#xff0c;帮助您解决这个问…

外观数列问题

给定一个正整数 n &#xff0c;输出外观数列的第 n 项。 「外观数列」是一个整数序列&#xff0c;从数字 1 开始&#xff0c;序列中的每一项都是对前一项的描述。 你可以将其视作是由递归公式定义的数字字符串序列&#xff1a; countAndSay(1) "1" countAndSay(n…

Docker的架构描述与安装部署

概述 Docker是一个开放的容器化平台&#xff0c;其提供能力轻松地支撑业务应用的开发、打包、装载、分发以及运行&#xff0c;在DevOps领域中&#xff0c;docker能高效地应对业务应用的持续集成以及持续发布&#xff08;CI/CD&#xff09;&#xff0c;其架构如下所示&#xff…