一、什么是消息队列
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
官方首页:RocketMQ · 官方网站 | RocketMQ
官方文档:RocketMQ · 官方网站 | RocketMQ
下载链接:点击下载
可视化工具:点击下载
1.1、应用场景
- 异步处理
- 流量削峰填谷(比如:秒杀)
- 解耦微服务
1.2、常见MQ产品对比
1.3、为什么选择RocketMQ
第一点,开发语言优势。
RocketMQ 使用 Java 语言开发,比起使用 Erlang 开发的 RabbitMQ 来说,有着更容易上手的阅读体验和受众。在遇到 RocketMQ 较为底层的问题时,大部分熟悉 Java 的同学都可以深入阅读其源码,分析、排查问题,甚至可以在社区版的基础上进行二次开发
。
第二点,丰富的高级特性。
根据 RocketMQ 官方文档的列举,其高级特性达到了 12 种
,例如顺序消息、事务消息、消息过滤、定时消息等。RocketMQ 丰富的特性,能够为我们在复杂的业务场景下尽可能多地提供思路及解决方案。
第三点,良好的商业前景。
比如阿里,在其生产环境中已经部署了数百个 RocketMQ 集群,上千个节点
,如此大量地将 RocketMQ 应用于生产环境,足以说明 RocketMQ 是的确经得起残酷的生产环境考验的,并且能够针对线上环境复杂的需求场景提供相应的解决方案。
RocketMQ 不仅仅解决了阿里的内部需求,同时还被搬到了阿里云上,作为一个商业化的产品
对外提供服务。在阿里云的产品叫作“消息队列 RocketMQ 版”,自从 2016 年开始商业化到现在,RocketMQ 商业化版已经具有相当大的规模。良好的商业前景,也反向推动着 RocketMQ 在业界的普及,两者相辅相成、相得益彰。
第四点,众多大厂背书。
RocketMQ 现在被广泛应用于各个大厂的内部业务中,其诞生之地阿里自不必多说,历年来大家所熟悉的双十一促销,在阿里内部就是使用的 RocketMQ 来承载消息
,面对如此庞大的流量洪峰,RocketMQ 交出了一份令人满意的答卷。
再比如,RocketMQ 也是阿里交易链路中的核心产品。交易链路本已是核心中的核心,而这个核心又将 RocketMQ 当作核心链路中的核心,足以见对 RocketMQ 的重视程度。同时,字节跳动
内部不同的业务也都在大量地使用 RocketMQ,同样也是作为业务主流程中的核心组件,RocketMQ 在生产环境中保持着非常高的稳定性、可用性,非常良好地支撑了业务的运行与发展。
二、如何使用RocketMQ
2.1、搭建RocketMQ
2.1.1、下载安装Apache RocketMQ
本博客使用的Rocket MQ版本为:rocketmq-all-5.0.0-bin-release,如果需要可以自行下载或者找博主要
2.1.2、配置环境变量
ROCKETMQ_HOME 本地解压路径
NAMESRV_ADDR localhost:9876
2.1.3、启动mqnamesrv
powerShell 管理员身份运行,否则服务会出现闪退现象!!!
若出现以下提示则服务启动成功,可进行下一步操作
2.1.4、启动mqbroker
2.2、搭建RocketMQ控制台
下载可视化工具zip包,本地解压后使用IDEA打开
运行,浏览器访问http://localhost:8080/
控制台使用说明参考:https://github.com/eacdy/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
三、SpringBoot整合RocketMQ
SpringBoot版本:2.7.4
3.1、pom.xml
<!--RocketMQ坐标-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
3.2、application.yml
server:
port: 8083
rocketmq:
#RocketMQ Namesrv
name-server: 127.0.0.1:9876
producer:
#生产者分组,RocketMQ必填项,字符可随意
group: test_mq
#发送消息超时时间,单位:毫秒。默认为 3000
send-message-timeout: 3000
#消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
compress-message-body-threshold: 4096
#消息体的最大允许大小。。默认为 4 * 1024 * 1024B
max-message-size: 4194304
#同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-failed: 2
#异步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2
#发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
retry-next-server: false
3.3、编写生产者
package com.rocket.rocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
* convertAndSend(String destination, Object payload) 发送字符串比较方便
*/
@RequestMapping("/send")
public void send(){
//参数一:topic
//参数二:消息内容
rocketMQTemplate.convertAndSend("test1","test-message");
}
/**
* 发送同步消息
* Tip:该方法底层调用的是 producer.send()方法,是阻塞的,producer 一定要等到Broker进行了响应后才会返回,才能继续往下执行。如果超时,或者失败了,会触发两次默认的重试。
*/
@RequestMapping("/testSyncSend")
public void testSyncSend(){
//参数一:topic
//参数二:消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test1","同步消息测试");
System.out.println(sendResult);
}
/**
* 发送异步消息
* Tip:该方法是非阻塞的,发送结果将由一个回调函数callback进行回调。它与同步发送消息的区别是它在发送消息时多传递了一个SendCallback对象,该方法一调用立马返回,而不需要等待Broker的响应返回。消息发送成功或失败后将回调SendCallback对象的对应方法。
*/
@RequestMapping("/testASyncSend")
public void testASyncSend(){
//参数一:topic
//参数二:消息内容
//参数三:回调
rocketMQTemplate.asyncSend("test1", "异步消息测试", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送异常");
throwable.printStackTrace();
}
});
}
/**
* 发送单向消息
* Tip:它的发送是单向的,即它不需要等待Broker的响应,只管发送即可,而不论发送成功与失败。通常应用于一些消息不是那么重要,可丢失的场景。
*/
@RequestMapping("/testOneWay")
public void testOneWay(){
for (int i = 0; i <10 ; i++) {
//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
//参数二:消息内容
rocketMQTemplate.sendOneWay("test1","单向消息测试测试下"+i);
}
}
}
3.4、编写消费者
package com.rocket.rocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
//consumerGroup与application.yml中rocketmq.producer.group值一致
//topic与消息生产者发送的消息topic一致
@RocketMQMessageListener(consumerGroup = "test_mq",topic = "test1")
public class RocketMQConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费消息:"+s);
}
}
3.5、测试
发送同步消息,用postman发送请求 http://localhost:8083/send
控制台效果:
发送异步消息,用postman发送请求 http://localhost:8083/testSyncSend
控制台效果: