前言:本文以maven+springboot 整合Rocketmq 完成消息的发送和接收。
1 Rocketmq 介绍:
1.1 Rocketmq 特性:
Apache RocketMQ是一款快速、可靠的分布式消息传递和流处理平台,具有可扩展性和高性能。它是一个分布式的、去中心化的消息队列,具有以下特性:
-
分布式:RocketMQ允许将消息存储在多个Broker上并支持水平扩展,可以通过增加更多的Broker来扩展存储能力和吞吐量。
-
异步传输:RocketMQ采用异步传输方式来提高性能,它的异步传输机制利用了Linux内核底层的零拷贝技术,从而实现了高吞吐量和低延迟。
-
可靠性:RocketMQ采用了复制和故障转移机制来保证消息的可靠性。它可以配置多副本(通常是3个副本)来存储消息,当有一个Broker宕机时,系统可以自动将消息路由到其他副本上。
-
灵活性:RocketMQ支持多种消息模式,包括点对点模式、发布/订阅模式和事务消息模式。它还支持多种消息协议,包括JMS、OpenMessaging和MQTT等。
-
易于使用:RocketMQ使用简单,提供了丰富的客户端API和管理工具,使得开发人员可以快速地集成和使用它。
RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。
1.2 Rocketmq 主要组件:
Rocketmq 是一种基于发布-订阅(Pub/Sub)消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer)。而RocketMQ的基础消息模型就是一个简单的Pub/Sub模型。
RocketMQ主要由以下几个组件组成:
- Nameserver:Nameserver是RocketMQ中的重要组件之一,它充当了命名服务和路由服务的角色。当Producer和Consumer要发送或者接收消息时,它们需要向NameServer请求获取Broker的信息,然后才能和Broker进行通信。Nameserver的作用类似于DNS服务器,用来维护RocketMQ中各个Broker的地址信息。
- Broker:Broker是RocketMQ中的消息存储和传输核心组件。所有的消息都存储在Broker中,Producer向Broker发送消息,Consumer从Broker中订阅和接收消息。Broker的作用是接收、存储和转发消息,确保消息的可靠性和可扩展性。
- Producer:Producer是创造和发送消息的客户端应用程序,它通过调用API将消息发送到Broker中。Producer可以按照不同的消息模式发送消息,包括点对点模式、发布/订阅模式和事务消息模式等。
- Consumer:Consumer是接收和处理消息的客户端应用程序。它通过从Broker中订阅和消费消息来实现消息的处理。Consumer可以按照不同的消息模式消费消息,包括点对点模式、发布/订阅模式和事务消息模式等。
- Message:Message是RocketMQ中最基本的消息单元,它包含了消息的内容和一些元数据,例如消息ID、消息主题、消息标签等。Producer将消息发送到Broker中,Consumer从Broker中订阅和接收消息。
Producer (生产者)和Consumer(消费者),一个向topic 发送消息,一个向topic 读取消息,消息的基本单元由Message 承接;
一般的消息组件对于消息的存储分发都只有一个组件处理,RocketMQ 中却使用了Nameserver和Broker 两个组件,那么这两个组件的关系是什么呢:
为了方便理解,这里使用图书馆进行类比:
- 首先图书馆里存储了海量的图书,这些图书并不是杂乱无章的进行堆叠,而是按照一定的类型完成了分类存放;比如新闻类,医学类,生物类,文学类 等等,每种不同的分类下都有海量的图书;如果把每本图书看做是具体的一个个消息,那么图书的分类就是不同的topic;
- 对于每种分类,为了统计的方便有可能需要为其在划分小类,如生物类,可以被划分为 植物类,动物类 等等,对于每个大类如果可以看做是topic ,那么大类下划分的小类就可以看做是 不同的 tag分类;
- 显然每一种topic/tag分类的图书并不是杂乱无章的存放,而是会被整齐的放入到一排排的书架上,一排排的书架就可以看做是分区下的队列;
- 显然书架作为了书籍最终的存放位置,那么可以将图书馆的书架看做是Broker,用户来借书和还书,最终都要来到书架上拿书和放书;
- 显然图书馆里的书籍不仅需要分类存放,每层的图书管理人员,还需要熟悉自己负责楼层的书籍的位置信息,以及需要对书籍的维护;如果来借书的人需要的图书不在本楼层,图书管理人员也需要为其提供书籍正确的楼层位置信息,显然每层的图书管理人员,都需要掌握每层楼的图书信息,并且必要情况下,需要有可以顶替其他楼层管理员的能力;
- 显然在rocketmq 中 ,Nameserver 的角色就和 每层的图书管理员相似;当每个用户来到本楼层还书(生产消息),楼层管理人员,需要告知还书的用户这本书,需要被正确归还的位置(消息的路由),从而帮助用户更好的还书;
- 当用户来借书(消费消息),楼层管理人员,需要告知用户,想要的书籍正确楼层及详细位置信息(消息的路由);
- 图书管理员怎么知道各个图书的分类以及位置信息,就需要不时的在自己的系统里动态维护数据的信息,以便于更好的服务借书的还书的人;
- 显然rocketmq 中 最终存放数据的broker 组件需要和Nameserver 进行不时的交互,这样Nameserver 就可以实时的知晓数据的信息,当生产者投递消息时,先向Nameserver询问自己要投递的位置信息,然后在将数据进行投递到broker;当消费者消费消息时,也先向Nameserver询问自己想要消费数据的位置信息,然后在向具体的broker 获取消息;
2 springboot 整合:
2.1 引入jar:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2.2 配置rocketmq:
# name-server地址
rocketmq.name-server=localhost:9876
# 配置消费组
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=30000
# 设置日志级别
logging.level.root=debug
2.3 生产者 消息发送工具类:
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* 生产者
*/
@Slf4j
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
/**
* 发送普通消息
*
* @param topic
* @param tag
* @param msgBody
*/
public void sendMsg(String topic, String tag, Object msgBody) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
rocketMQTemplate.convertAndSend(topic, msgBody);
}
/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* sendResult为返回的发送结果
*/
public <T> SendResult sendMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
/**
* 发送异步消息
*
* @param topic
* @param tag
* @param msgBody
* @param callback
*/
public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 发送延时消息
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*
* @param topic
* @param tag
* @param msgBody
* @param timeout
* @param delayLevel 值的有效范围1至18
*/
public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {
if (StringUtils.isNotBlank(tag)) {
topic = topic.concat(":") + tag;
}
if (timeout != null) {
messageTimeOut = timeout.intValue();
}
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*
* @param topic 消息主题
* @param msg 消息体
* @param <T> 消息泛型
*/
public <T> void sendOneWayMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(topic, message);
}
/**
* 发送批量消息
*
* @param topic 消息主题
* @param msgList 消息体集合
* @param <T> 消息泛型
* @return
*/
public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
List<Message<T>> messageList = msgList.stream()
.map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param <T> 消息泛型
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param timeout 超时时间
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
}
2.4 消费者:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
topic = "test_topic",
selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body);
log.debug("监听到消息:message:{}", msg);
}
}
2.5 测试消息发送:
import com.example.springrocket.config.RocketMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringRocketApplicationTests {
@Autowired
private RocketMQProducer rocketMQProducer;
@Test
void contextLoads() {
}
@Test
void sendMQMessage(){
SendResult sendResult = rocketMQProducer.sendMsg("test_topic","hello test 123");
System.out.println(sendResult);
}
}
消息获取:
3 整合遇到的问题参考:
3.1 提示RocketMQTemplate bean 没有被找到:
- 检查nameServer 和Broker 服务,是否正常启动;
- 检查10911,10909,10912 端口是否正常暴露;
- 检查生产者的group 分组是否配置:rocketmq.producer.group
- 如果springboot 的版本为3.x 则可以降低2.x 的版本,因为3.x 的版本不会进行rocketmq 的自动装配;
3.2 如果提示xxx.xx.xx.xx:10911 连接失败或者决绝:
- 检查broker 的启动配置文件broker.conf 的brokerIP1 是否为公网ip 如果不是,则需要修改为公网ip;
4 参考:
4.1 Apache RocketMQ