本文根据阿里云 RocketMQ产品文档整理,地址:https://help.aliyun.com/document_detail/29532.html?userCode=qtldtin2
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。下面我们来搭建RocketMQ
先去官网下载RocketMQ:
下载 | RocketMQ
选择最新版本下载,下载后解压:
设置环境变量:
然后启动服务端:
已经正常启动。
接着启动broker
已经正常启动!
application.yaml文件配置mq生产者和消费者:
server:
port: 8083
spring:
application:
name: springboot-rocketmq
rocketmq:
# nameserver地址
consumer:
group: ${spring.application.name}-consumer-group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
name-server: 192.168.1.104:9876
producer:
#指定发送者组名
group: ${spring.application.name}
send-message-timeout: 300000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-times-when-send-async-failed: 0
retry-next-server: true
retry-times-when-send-failed: 2
pom文件添加依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
接着我们进行生产消息:
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private MessageSender messageSender;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/syncSendBatch/{nums}")
public String syncSendBatch(@PathVariable("nums") Integer nums) {
messageSender.syncSend(nums);
return "发送成功";
}
}
写上消息发送处理,这里通过接受的数量,进行延迟发送接收到的次数个消息:
@Component
public class MessageSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 延时消息
public void syncSend(Integer nums){
/**
* 发送可靠同步消息 ,可以拿到SendResult 返回数据
* 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
* 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。
*
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
* 参数3: 超时时间 毫秒
*/
for (int i = 0; i < nums; i++) {
SendResult result= rocketMQTemplate.syncSend("test-send","测试同步消息:"+i,3000);
// System.out.println(result.getMessageQueue());
System.out.println(result);
}
}
}
接下来使用监听来消费消息:
@Component
@RocketMQMessageListener(topic = "test-send", consumerGroup = "${spring.application.name}-consumer-group",
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("-------接收到rocketmq消息:" + message);
}
}
接着我通过接口请求来生产消息:
测试1000000万的消息很快,实际中需要处理复杂的业务,会有事务处理,这个时候我们需要进行分布式部署,削峰,保证系统的高可靠性。
我们可以通过控制台来观察消息的收发情况:
去官网下载:
https://github.com/apache/rocketmq-externals/tree/develop/dev
下载后修改服务器地址即可: