服务器操作系统版本:Ubuntu 24.04
Java版本:21
Spring Boot版本:3.3.5
如果打算用GUI,虚拟机安装Ubuntu 24.04,见虚拟机安装Ubuntu 24.04及其常用软件(2024.7)_ubuntu24.04-CSDN博客https://blog.csdn.net/weixin_42173947/article/details/140335522如果打算用纯命令行,见
虚拟机安装Ubuntu 24.04服务器版(命令行版)-CSDN博客https://blog.csdn.net/weixin_42173947/article/details/143747375
1 Ubuntu上部署RocketMQ
这里准备两台服务器,做集群使用,一台IP是192.168.100.200,一台是192.168.100.201
1.1 安装jdk8
首先需要部署JDK8+,这里我使用了JDK8
sudo apt-get install -y openjdk-8-jdk;
1.2 下载RocketMQ
这里使用5.2.0版本
mkdir -p /home/user/softwares;
cd /home/user/softwares;
wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip;
1.3 解压,改文件夹名
unzip rocketmq-all-5.2.0-bin-release.zip;
mv rocketmq-all-5.2.0-bin-release rocketmq;
1.4 修改参数,减少内存消耗量
cd /home/user/softwares/rocketmq;
vim bin/runserver.sh;
-Xms4g 改为 -Xms256m,-Xmx4g 改为 -Xmx256m,-Xmn2g 改为 -Xmn128m
vim bin/runbroker.sh;
-Xmn4g 改为 -Xmn256m,-Xms8g 改为 -Xms256m, -Xmx8g 改为 -Xmx256m
1.5 启动NameServer
cd /home/user/softwares/rocketmq;
nohup sh bin/mqnamesrv &
验证namesrv是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log;
jps -l;
1.6 启动Broker+Proxy
1.6.1 单点版
只有一个节点,最简单,最不稳定,一般用于测试
nohup sh bin/mqbroker -n <服务器IP>:9876 --enable-proxy &
1.6.2 全Master版
nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &
我这里IP是192.168.100.200,192.168.100.201,注意两台服务器要执行不一样的命令
nohup sh bin/mqbroker -n '192.168.100.200:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n '192.168.100.100:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &
验证broker是否启动成功
tail -f ~/logs/rocketmqlogs/proxy.log;
jps -l;
1.7 停止broker,namesrv
先停broker,后停namesrv
cd /home/user/softwares/rocketmq;
sh bin/mqshutdown broker;
sh bin/mqshutdown namesrv;
1.8 测试生产者,消费者
重新开启namesrv和broker,然后执行下面操作
cd /usr/local/softwares/rocketmq;
export NAMESRV_ADDR=localhost:9876;
# 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer;
# 消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer;
1.9 防火墙放开RocketMQ端口
sudo ufw allow 8081/tcp;
sudo ufw allow 9876/tcp;
sudo ufw allow 10911/tcp;
sudo ufw allow 10909/tcp;
sudo ufw reload;
2 关于Topic的操作
2.1 创建Topic
cd /usr/local/softwares/rocketmq;
执行如下命令,创建一个叫TestTopic的Topic
nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &
nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &
注意修改IP
nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.200:10911 &
nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.201:10911 &
2.2 删除Topic
cd /usr/local/softwares/rocketmq;
执行如下命令
nohup sh bin/mqadmin deleteTopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &
注意修改IP
nohup sh bin/mqadmin deleteTopic -n 192.168.100.200:9876 -t TestTopic -b 192.168.100.200:10911 &
nohup sh bin/mqadmin deleteTopic -n 192.168.100.201:9876 -t TestTopic -b 192.168.100.201:10911 &
2.3 查看Topic
cd /usr/local/softwares/rocketmq;
执行如下命令
sh bin/mqadmin topicList -n <服务器IP>:9876
注意修改IP
sh bin/mqadmin topicList -c -n 192.168.100.200:9876;
sh bin/mqadmin topicList -c -n 192.168.100.201:9876;
3 Java连接RocketMQ环境搭建
3.1 文件树结构
3.2 父节点pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sliverbullet</groupId>
<artifactId>jdk21-maven-test</artifactId>
<packaging>pom</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>21</java.version>
<spring-boot.version>3.3.5</spring-boot.version>
</properties>
<modules>
<module>springboot3-test</module>
<module>rocketmq-test</module>
</modules>
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
3.3 子节点xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.sliverbullet</groupId>
<artifactId>jdk21-maven-test</artifactId>
<version>1.0</version>
</parent>
<artifactId>rocketmq-test</artifactId>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>21</java.version>
<spring-boot.version>3.3.5</spring-boot.version>
<fastjson2-version>2.0.53</fastjson2-version>
<lombok-version>1.18.34</lombok-version>
<rocketmq-spring-boot-starter-version>2.3.0</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
</dependencies>
</project>
依赖环境搭建完成
4 RocketMQ基本配置
4.1 文件树结构
4.2 application.yml的配置
server:
port: 8002
spring:
application:
name: rocketmq-test
profiles:
active: dev
machine-no: 1
4.3 application-dev.yml的配置
test,prod,自行配置
spring:
logging:
file:
path: D:/log/SpringBoot3-Test
name: ${logging.file.path}/test.log
rocketmq:
name-server: 192.168.100.200:9876;192.168.100.201:9876
producer:
group: boot-product
send-message-timeout: 10000
4.4 RocketMQConsumer类
package com.sliverbullet.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
@Slf4j
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "my-consumer-test-topic", consumeTimeout = 1000L)
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
LocalDateTime localDateTime = LocalDateTime.now();
String formattedDate = dateTimeFormatter.format(localDateTime);
log.info("RocketMQ消费者接收时间:{}" ,formattedDate);
log.info("RocketMQ消费者接收内容:{}" ,message);
}
}
4.5 IRocketMQService接口
package com.sliverbullet.service;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import java.util.List;
import java.util.Map;
/**
* <p>
* RocektMQ生产者常用发送消息方法
* 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* </p>
*
* @author MrWen
* @since 2022-01-06 17:10
**/
public interface IRocketMQService {
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
* <p>
* (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
* 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
* 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* )
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String destination, Object msg);
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String topicName, String tags, Object msg);
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param key 唯一标识码要设置到keys字段,方便将来定位消息丢失问题
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String topicName, String tags, String key, Object msg);
/**
* 发送同步消息-SQL92模式
* 需要配置RocketMQ服务器 vim conf/broker.conf ##支持sql语句过滤 enablePropertyFilter=true
* 在console控制台查看集群状态 enablePropertyFilter=true 才正常
*
* @param topicName 主题名 topicName
* @param map 自定义属性
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg);
/**
* 发送同步消息-SQL92模式
* 需要配置RocketMQ服务器 vim conf/broker.conf ##支持sql语句过滤 enablePropertyFilter=true
* 在console控制台查看集群状态 enablePropertyFilter=true 才正常
*
* @param topicName 主题名 topicName
* @param map 自定义属性
* @param key 唯一标识码要设置到keys字段,方便将来定位消息丢失问题
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg);
/**
* 发生异步消息(异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param sendCallback 异步回调函数
*/
void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback);
/**
* 发送单向消息(这种方式主要用在不特别关心发送结果的场景,例如日志发送。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
*/
void sendOneway(String destination, Object msg);
/**
* 发送批量消息(发送超过1MB,做了自动分割,超时时间设置30s(默认3s)),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
*
* @param destination 主题名:标签 topicName:tags
* @param list 批量消息
*/
void sendBatchMessage(String destination, List<?> list);
/**
* 发送批量消息(发送超过1MB,做了自动分割。),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
*
* @param topicName 主题名 topicName
* @param tags 标签 tags
* @param timeout 超时时间,空则默认设为30s
* @param list 批量消息
*/
void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list);
/**
* 发送延时消息(超时时间,设置30s(默认3s))
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param delayTimeLevel 延时等级(从1开始)
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);
/**
* 发送延时消息
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param timeout 超时时间(单位毫秒)
* @param delayTimeLevel 延时等级(从1开始)
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel);
/**
* 发送顺序消息(分区有序,多个queue参与,即相对每个queue,消息都是有序的。)
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param hashKey 根据其哈希值取模后确定发送到哪一个queue队列
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendInOrder(String destination, Object msg, String hashKey);
/**
* 发送事务消息
* 事务消息使用上的限制
* 1:事务消息不支持延时消息和批量消息。
* 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
* 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
* 4:事务性消息可能不止一次被检查或消费。
* 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
* 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @param arg arg
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessageInTransaction(String destination, Object msg, Object arg);
}
4.6 RocketMQServiceImpl接口实现类
package com.sliverbullet.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import com.sliverbullet.service.IRocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public SendResult sendMessage(String topicName, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
Message<?> message = messageBuilder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
} else {
log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
}
return sendResult;
}
@Override
public SendResult sendMessage(String topicName, String tags, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
Message<?> message = messageBuilder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【RocketMQ测试】发送同步带Tag消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
} else {
log.warn("【RocketMQ测试】发送同步带Tag消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
}
return sendResult;
}
@Override
public SendResult sendMessage(String topicName, String tags, String key, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
if (StringUtils.isNotBlank(key)) {
messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
}
Message<?> message = messageBuilder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【RocketMQ测试】发送同步带Tag和Key消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
} else {
log.warn("【RocketMQ测试】发送同步带Tag和Key消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
}
return sendResult;
}
@Override
public SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg) {
return null;
}
@Override
public SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg) {
return null;
}
@Override
public void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback) {
}
@Override
public void sendOneway(String destination, Object msg) {
}
@Override
public void sendBatchMessage(String destination, List<?> list) {
}
@Override
public void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list) {
}
@Override
public SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {
Message<?> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(destination, message, 10000L, delayTimeLevel);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【RocketMQ测试】发送延时消息成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);
} else {
log.warn("【RocketMQ测试】发送延时消息不一定成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);
}
return sendResult;
}
@Override
public SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel) {
return null;
}
@Override
public SendResult sendInOrder(String destination, Object msg, String hashKey) {
return null;
}
@Override
public SendResult sendMessageInTransaction(String destination, Object msg, Object arg) {
return null;
}
}
5 正式测试
5.1 基本同步消息测试
传入一个JSON,同步进入消息队列,消息队列同步消费
5.1.1 Controller层
@Value("${machine-no}")
private String machineNo;
@Resource
private IRocketMQService rocketMQService;
@RequestMapping("/sync")
public JSONObject send(@RequestBody JSONObject param) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
LocalDateTime localDateTime = LocalDateTime.now();
String formattedDate = dateTimeFormatter.format(localDateTime);
System.out.println(formattedDate);
log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);
param.put("time", formattedDate);
param.put("name", "Sliver");
param.put("machine_no", machineNo);
SendResult sendResult = rocketMQService.sendMessage("TestTopic", param);
JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));
return returnJSONObject;
}
5.1.2 Service接口
/**
* 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
* <p>
* (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
* 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
* 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
* )
*
* @param destination 主题名:标签 topicName:tags
* @param msg 发送对象
* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
*/
SendResult sendMessage(String destination, Object msg);
5.1.3 Service接口实现类
@Override
public SendResult sendMessage(String topicName, Object msg) {
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
Message<?> message = messageBuilder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
} else {
log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
}
return sendResult;
}
5.1.4 访问测试
测试结果
{
"traceOn": true,
"regionId": "DefaultRegion",
"messageQueue": {
"queueId": 1,
"topic": "TestTopic",
"brokerName": "broker-a"
},
"msgId": "0ACC4A893DF836BAF30C6BBBD62D0000",
"queueOffset": 0,
"sendStatus": "SEND_OK",
"offsetMsgId": "C0A864C800002A9F00000000000764BC",
"transactionId": "0ACC4A893DF836BAF30C6BBBD62D0000"
}
后台日志
2024-11-21T22:04:32.151+08:00 INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.controller.RocketMQSyncController : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-21 22:04:32.151695800
2024-11-21T22:04:32.202+08:00 INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.service.impl.RocketMQServiceImpl : 【RocketMQ测试】发送同步消息成功, topicName: TestTopic, msg: {"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}, sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893DF836BAF30C6BBBD62D0000, offsetMsgId=C0A864C800002A9F00000000000764BC, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
2024-11-21T22:04:32.205+08:00 INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer : RocketMQ消费者接收时间:2024-11-21 22:04:32.205206100
2024-11-21T22:04:32.206+08:00 INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}