目录
一、为什么选择RocketMQ
1、为什么是为什么选择RocketMQ
2、RocketMQ、ActiveMQ和Kafka之间的比较
2.1、对比1
2.2、对比2,接着上表
二、使用案例
1、引入依赖
2、编写启动类
3、编写application.yml配置文件
4、创建rocketmq文件夹
4.1、创建生产者
4.2、创建消费者
5、控制器
一、为什么选择RocketMQ
1、为什么是为什么选择RocketMQ
在阿里孕育 RocketMQ 的雏形时期,阿里将其用于异步通信、搜索、社交网络活动流、数据管道,贸易流程中。随着阿里的贸易业务吞吐量的上升,源自阿里的消息传递集群的压力也变得紧迫。
根据研究,随着队列和虚拟主题使用的增加,ActiveMQ IO模块达到了一个瓶颈。阿里尽力通过节流、断路器或降级来解决这个问题,但效果并不理想。于是尝试了流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足阿里的要求,其尤其表现在低延迟和高可靠性方面,详见这里。在这种情况下,阿里决定发明一个新的消息传递引擎来处理更广泛的消息用例,覆盖从传统的pub/sub场景到高容量的实时零误差的交易系统。
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
2、RocketMQ、ActiveMQ和Kafka之间的比较
2.1、对比1
消息队列 Client SDK 支持协议 有序消息 调度消息 批处理消息 广播消息 消息过滤器 ActiveMQ Java, .NET, C++ etc. Push模式, 支持 OpenWire, STOMP, AMQP, MQTT, JMS 独占使用者队列或独占队列可以确保排序 支持 不支持 支持 支持 Kafka Java, Scala etc. Pull模式, 支持TCP 确保分区内消息的顺序 不支持 支持,使用异步生产者 不支持 支持,你可以使用Kafka Streams来过滤信息 RocketMQ Java, C++, Go Pull模式, 支持 TCP, JMS, OpenMessaging 确保严格的消息顺序,并能够优雅地扩大规模 支持 支持,同步模式,以避免消息丢失 支持 支持,基于 SQL92的属性筛选表达式
2.2、对比2,接着上表
服务器触发重发
消息存储消息追溯 消息优先级 高可用性和故障转移
消息轨道 配置 管理及运作工具 不支持 使用 JDBC 和高性能日志(如 levelDB、 kahaDB)支持非常快速的持久性 支持 支持 支持,如果使用 levelDB,则需要一个 ZooKeeper 服务器,具体取决于存储
不支持 默认配置级别较低,用户需要优化配置参数 支持 不支持 高性能文件存储
支持偏移量指示 不支持 支持,需要一个 ZooKeeper 服务器
不支持 卡夫卡使用键值对格式进行配置。这些值可以通过文件提供,也可以通过编程方式提供。 支持,使用终端命令公开核心指标 支持 高性能和低延迟文件存储 支持时间戳和偏移量指示 不支持 支持,主从模型,没有其他工具包 支持 开箱即用,用户只需要注意几个配置 支持丰富的 web 和终端命令来公开核心指标
二、使用案例
1、引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
</dependencies>
2、编写启动类
前提是你安装了RocketMQ,在《Window系统安装RocketMQ》已经介绍。
/**
* RocketMQ启动mqnamesrv.cmd、mqbroker.cmd
*
* @author CeaM
* 2022/08/28 10:11
**/
@SpringBootApplication
public class RocketMQApp {
public static void main(String[] args) {
SpringApplication.run(RocketMQApp.class, args);
}
}
3、编写application.yml配置文件
server:
port: 8989
rocketmq:
name-server: localhost:9876
producer:
group: ceam_group
consumer:
group: ceam_group
我用本地的rocketmq端口,这个按照你自己环境配置
4、创建rocketmq文件夹
4.1、创建生产者
package com.ceam.rocketmq.rocketmq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
/**
* @author CeaM
* 2022/08/28 10:24
**/
@Slf4j
@Component
@AllArgsConstructor
public class MQProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。
* convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),
* 也就是才会接收下一条消息。RPC调用方式。
* **/
public void sendMessage(){
// new_topic是topic
rocketMQTemplate.convertAndSend("TopicTest","hello RocketMQ");
log.info("发送成功过了");
}
}
4.2、创建消费者
package com.ceam.rocketmq.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author CeaM
* 2022/08/28 10:24
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "${rocketmq.consumer.group}")
public class MQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("收到消息啦,消息为:{}", s);
}
}
5、控制器
简单测试一下接口调用
package com.ceam.rocketmq.controller;
import com.ceam.rocketmq.rocketmq.MQProducer;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author CeaM
* 2022/08/28 15:05
**/
@RestController
@AllArgsConstructor
public class TestController {
private final MQProducer mqProducer;
@GetMapping("/mq")
public void send() {
mqProducer.sendMessage();
}
}
测试结果: