在上一章中我们对Rocket的基础知识、特性以及四大核心组件进行了详细的介绍,本章带着大家一起去在项目中具体的进行应用,并设计将其作为一个工具包只提供消息的分发服务和业务模块进行解耦
在进行本章的学习之前,需要确保你的可以正常启动和访问RocketMq服务,还未安装的可以移步至此:《docker安装rocketMq》
1.添加maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2.项目结构
3.配置管理
rocketmq:
name-server: 127.0.0.1:9876
# 生产者
producer:
group: boot_group_1
# 消息发送超时时间
send-message-timeout: 3000
# 消息最大长度4M
max-message-size: 4096
# 消息发送失败重试次数
retry-times-when-send-failed: 3
# 异步消息发送失败重试次数
retry-times-when-send-async-failed: 2
# 消费者
consumer:
group: boot_group_1
# 每次提取的最大消息数
pull-batch-size: 5
上面的配置如果是在分布式环境下也可以配置在Apollo或nacos等配置中心里进行动态配置
4.配置类
在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ninesun
* @ClassName RocketMqConfig
* @description: 消息中间件配置类
* @date 2024年05月19日
* @version: 1.0
*/
@Configuration
public class RocketMqConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.max-message-size}")
private Integer maxMessageSize;
@Value("${rocketmq.producer.retry-times-when-send-failed}")
private Integer retryTimesWhenSendFailed;
@Value("${rocketmq.producer.retry-times-when-send-async-failed}")
private Integer retryTimesWhenSendAsyncFailed;
@Bean
public RocketMQTemplate rocketMqTemplate() {
RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
rocketMqTemplate.setProducer(defaultMqProducer());
return rocketMqTemplate;
}
@Bean
public DefaultMQProducer defaultMqProducer() {
DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(this.nameServer);
producer.setProducerGroup(this.producerGroup);
producer.setSendMsgTimeout(this.sendMsgTimeout);
producer.setMaxMessageSize(this.maxMessageSize);
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
return producer;
}
}
5.基础用法
5.1 消息生产
编写一个生产者接口类,分别使用RocketMQTemplate和DefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情
- 编写Controller进行消息的发送
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class TestController01 {
@Resource
private RocketMQTemplate rocketMqTemplate;
@Resource
private DefaultMQProducer defaultMqProducer;
/**
* 利用rocketMqTemplate发送消息
*
* @return
*/
@GetMapping("/send/msg1")
public String sendMsg1() {
try {
// 构建消息主体
Map<String, String> msgBody = new HashMap<>();
msgBody.put("data", "利用rocketMqTemplate发送消息");
// 发送消息
rocketMqTemplate.convertAndSend("boot-mq-topic", JSON.toJSONString(msgBody));
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
/**
* 利用DefaultMQProducer发送消息
* @return
*/
@GetMapping("/send/msg2")
public String sendMsg2() {
try {
// 构建消息主体,此处可以用对象代替,为了方便演示,使用map
Map<String, String> msgBody = new HashMap<>();
msgBody.put("data", "利用DefaultMQProducer发送消息");
// 构建消息对象
Message message = new Message();
message.setTopic("boot-mq-topic");
message.setTags("boot-mq-tag");
message.setKeys("boot-mq-key");
message.setBody(JSON.toJSONString(msgBody).getBytes());
// 发送消息,打印日志
SendResult sendResult = defaultMqProducer.send(message);
log.info("msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
}
自己自行测试,访问这两个接口后,我们可以在Dashboard控制面板查看到:
5.2 消息消费者
接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。
import com.alibaba.fastjson.JSON;
import com.example.demo.po.MessageData;
import com.example.demo.po.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author hb24795
* @ClassName BootMqConsumer
* @description: 消费者
* @date 2024年05月26日
* @version: 1.0
*/
@Service
@RocketMQMessageListener(topic = "boot-mq-topic", consumerGroup = "boot_group_1")
public class BootMqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received:");
System.out.println(message);
}
}
当然你可以设置更多的消费前置条件:
@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name",selectorExpression = "your_tag", selectorType = ExpressionType.TAG)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理消息的逻辑
System.out.println("Received message: " + message);
}
}
注意:此处的consumerGroup要和我们配置的group对应
上面我们已经完成了一个基本的从消息发送到消息消费的逻辑,大家可以在自己的项目中结合设计模式,aop等来定制化自己的消息中间件,但是消息的类型远不止这几个,在实现之前我们先把消息的发送代码和我们的业务代码进行解耦
@Component
public class MessageProduct {
@Resource
private RocketMQTemplate rocketMqTemplate;
@Resource
private DefaultMQProducer defaultMqProducer;
public SendResult SendMessage(String topic, Object data, List<String> keys, String tags) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message message = new Message();
if (StringUtils.isBlank(topic)) {
return null;
} else {
message.setTopic(topic);
}
if (data != null) {
message.setBody(JSON.toJSONString(data).getBytes());
}
if (!CollectionUtils.isEmpty(keys)) {
message.setKeys(keys);
}
if (StringUtils.isBlank(tags)) {
message.setTags(tags);
}
message.setBody(JSON.toJSONString(data).getBytes());
// 发送消息,打印日志
return defaultMqProducer.send(message);
}
}
5.3 延迟消息
RocketMQ 支持延迟消息发送,但并非任意时间,而是有特定的延迟等级。在较旧的版本中,延迟等级从1到18,每个等级对应一个固定的延迟时间范围。然而,随着RocketMQ的更新发展,其对于延迟消息的支持变得更加灵活。
根据最新的资料,RocketMQ 5.x版本支持的最大延迟时间可以非常长,可达一年之久,这意味着在一定意义上,它能够满足大部分应用场景对于延迟消息的需要,尽管这并不等同于可以任意指定任意时间精度的延迟。
我们先看看必须通过指定的等级进行延迟的实现:
RocketMQ不支持任意时间的延迟,只有18个等级的延迟时间,默认是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。从头到尾共18个等级时间,s、m、h、d,分别表示秒、分、时、天。
默认的18个等级对应的时间可以修改,在broker.conf中增加如下配置,根据自身需求修改时间,然后重启broker。
1、延迟消息的原理
如果让你来设计RocketMQ的延迟消息,你会怎么设计?
1.延迟消息也是个消息,只是多了延迟时间,既然是消息,不管是不是要立刻处理,先找个临时Topic存储起来。
2.Topic里面实际上是一个个队列,那所有的延迟消息要存在一个队列里吗?不要放在同一个队列里,因为消息各自都有不同的延迟时间,如果放在一个队列里,会牵扯到其余问题:比如排序、比如记录消费位置等。所以是按延迟时间分开存。
3.消息已经存起来了,那怎么处理呢?既然涉及到了延迟时间,那自然启动线程去定时获取消息,判断消息的延迟时间是否已经到达,到达之后则取出来投放到目的Topic。
讲到这里,延迟消息的架构图基本浮现出来了:
实际上RocketMQ在设计延迟消息时,跟上面的思路基本类似,不在赘述,额外补充几点:
1.消息进入Broker后,会被存储在TopicSCHEDULE_TOPIC_XXXX中,只是在Dashboard中看不到。
TopicSCHEDULE_TOPIC_XXXX中有18个消息队列,分别存储18个延迟等级对应的消息。
2.RocketMQ 在启动时,会从broker.conf中获取18个等级对应的时间,延迟等级和时间的关系会存在放到DelayLevelTable中。
3.RocketMQ会开启18个定时任务每隔100ms,从TopicSCHEDULE_TOPIC_XXXX判断18个队列里的第一个消息是否可以被投放,如果可以投放,则在投放到原本的目的Topic。
判断逻辑:存入时间+延迟时间 > 当前时间。
说到这里,估计你也能猜到,为什么不支持自定义延迟时间了,核心原因还是性能问题。
试想一下,如果设计成任意时间,那么就不可能使用18个队列了,更不可能使用无限个队列了,只可能使用单个队列。
但是如果使用单个队列,按照先进先出的存放的话,那出现需要后进先出的消息怎么办?那只能对整个队列进行排序,如果消息量很大,每次有消息进来都需要排序,那CPU肯定会被玩爆。
而且队列里的消息被消费后,都会记录偏移量,如果每次有消息进来都要排序,那偏移量则失去意义,增加了消息丢失的风险。
所以,RocketMQ的这种18个延迟时间等级的设计,虽然在延迟时间的自由度上作出了妥协,但是基本满足业务,性能也很优秀。
2.具体实现
我们对上面的通用发送进行修改:
public SendResult SendMessage(String topic, Object data, List<String> keys, String tags, Integer delayLevel) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message message = new Message();
if (StringUtils.isBlank(topic)) {
return null;
} else {
message.setTopic(topic);
}
if (data != null) {
message.setBody(JSON.toJSONString(data).getBytes());
}
if (!CollectionUtils.isEmpty(keys)) {
message.setKeys(keys);
}
if (StringUtils.isBlank(tags)) {
message.setTags(tags);
}
if (delayLevel != null) {
message.setDelayTimeLevel(delayLevel);
}
message.setBody(JSON.toJSONString(data).getBytes());
// 发送消息,打印日志
return defaultMqProducer.send(message);
}
此处设置的等级5,可能对应1分钟的延迟,但具体等级和时间的映射关系可以根据RocketMQ服务器的配置有所不同
我们测试一下:
@GetMapping("/send/msg2")
public String sendMsg2() {
try {
// 构建消息主体,此处可以用对象代替,为了方便演示,使用map![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/3799dce071c44aec923eb35d5a418f2e.png#pic_center)
User user = User.builder()
.id(1)
.name("ninesun")
.build();
SendResult sendResult = messageProduct.SendMessage("boot-mq-topic", user, Collections.singletonList(user.getId().toString()), null, null);
log.info("msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
如果我们想实现任意时间的延迟,可以利用上面的延迟等级进行实现
我只需要根据自定义的延迟时间获取延迟等级
首先自定义一个消息体,用于存储必要的信息
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessageDTO<T> {
private T data;
private Integer delayTime;
private String topic;
private List<String> keys;
private String tags;
}
设计一个简单的算法用于获取延迟等级
private static final Integer[] delayTimes = {1, 5, 10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private Integer getDelayLevel(Integer delayTime) {
if (delayTime == null || delayTime == 0) {
return null;
}
for (int i = 0; i < delayTimes.length; i++) {
int level = i + 1;
if (delayTime.equals(delayTimes[i])) {
return level;
}
if (delayTime > delayTimes[i] && delayTime < delayTimes[i + 1]) {
return level;
}
}
return null;
}
新增一个消费者专门用于处理该消息的延迟
@Service
@RocketMQMessageListener(topic = "delay-consumer-topic", consumerGroup = "boot_group_1")
@Slf4j
public class DelayMqConsumer implements RocketMQListener<String> {
@Resource
MessageProduct messageProduct;
@Override
public void onMessage(String message) {
log.info("收到延迟消费消息,消息:{}", message);
System.out.println(message);
DelayMessageDTO delayMessageDTO = JSON.parseObject(message, DelayMessageDTO.class);
log.info("剩余:{}s", delayMessageDTO.getDelayTime());
try {
messageProduct.SendDelayMessage(delayMessageDTO);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
}
消息的发送
public SendResult SendDelayMessage(DelayMessageDTO data) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
// 发送消息,打印日志
if (data == null) {
return null;
}
Integer delayLevel = this.getDelayLevel(data.getDelayTime());
if (delayLevel == null) {
return this.SendMessage(data.getTopic(), data, data.getKeys(), data.getTags(), delayLevel);
}
Integer delayTime = data.getDelayTime() - delayTimes[delayLevel - 1];
data.setDelayTime(delayTime);
return this.SendMessage("delay-consumer-topic", data, data.getKeys(), data.getTags(), delayLevel);
}
至此我们就可以简单的实现一个消息的任意时间延迟,但是实际的延迟实现中,我们并不推荐该延迟方式,因为这种延迟完全依赖于mq的性能,如果遇到消息的积压等,我们的延迟将变得十分不可靠,很多开源社区中推荐:
- 外部存储与调度:将消息和期望的发送时间存储到数据库或缓存中(如Redis),然后使用一个定时任务(如Quartz、Spring Scheduler)定期检查这些存储的消息,当达到预定时间时,再通过DefaultMQProducer发送出去。
- 利用死信队列与TTL:虽然这不是直接实现任意时间延迟的方式,但可以通过设置消息的TTL(生存时间)和死信队列机制间接实现。消息到期后成为死信,触发特定逻辑进行处理或重定向到另一个队列进行实际发送。
在RocketMQ5.0版本,支持任意时段的延迟消息。在Github中最新的版本中已经解决了这个问题,[ISSUE #6203] Allow to publish delay message with arbitrary timestamp #6204
我们可以直接使用以下方式去实现:
@Resource
private RocketMQTemplate rocketMqTemplate;
public SendResult SendMessage(DelayMessageDTO data) {
return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), JSON.toJSONString(data), data.getDelayTime());
}