🎯 导读:该文档介绍了Apache RocketMQ消息队列的基础应用,包括消息发送与接收的基本流程。首先通过创建生产者实例,并指定名称服务器地址,启动后即可发送消息至指定主题。然后创建消费者实例订阅相应主题,并设置监听器处理接收到的消息。文档中还提供了代码示例,展示了如何实现简单的生产和消费逻辑。此外,文档解释了消息队列在不同场景下的分发策略,如负载均衡与广播模式,并强调了队列数量与消费者数量之间的关系以确保消息的合理分配。
文章目录
- 消息发送和监听的流程
- 消息生产者
- 消息消费者
- 搭建RocketMQ入门案例
- 创建项目
- 加入依赖
- 编写生产者
- 编写消费者
- 说明
- 一个消费者组消费一个topic
- 两个消费者组消费一个topic
- 生产者的消息发送给主题的哪个队列
- 消费者如何从队列中拉取消息
- 只有一个消费者,要拉取所有队列的消息
- 两个消费者,每个消费者要负责两个队列
- 三个消费者(要求尽量平衡)
- 四个消费者,一人一个
- 五个消费者,第五个消费者永远不接收消息
RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等
消息发送和监听的流程
消息生产者
1、创建消息生产者 producer ,并指定生产者组名
2、指定 Nameserver 地址
3、启动 producer
4、创建消息对象,指定主题 Topic、Tag 和消息体等
5、发送消息
6、关闭 producer
消息消费者
1、创建消费者 consumer ,指定消费者组名
2、指定 Nameserver 地址
3、创建监听订阅主题 Topic和Tag 等
4、处理消息
5、启动消费者 consumer
搭建RocketMQ入门案例
创建项目
加入依赖
引入原生API,先不用spring-boot-starter版本
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
<!--docker的用下面这个版本-->
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
编写生产者
/**
* 测试生产者
*
* @throws Exception
*/
@Test
public void testProducer() throws Exception {
// 创建默认的生产者(指定生产者组名)
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
for (int i = 0; i < 1; i++) {
// 创建消息
// 第一个参数:主题的名字
// 第二个参数:消息内容(要转化为字节数组)
Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
// 发送结果
SendResult send = producer.send(msg);
// 打印发送状态
System.out.println(send.getSendStatus());
}
// 关闭实例
producer.shutdown();
}
为了连接方便,可以使用一个常量NAME_SRV_ADDR
来存储localhost:9876
【运行】
在控制台中可以看到创建了一个主题 testTopic
点击状态,一个主题默认4个队列
点击路由,可以查看 broker 的 ip 地址
在CONSUMER管理中,可以查看消费者
编写消费者
@Test
public void simpleConsumer() throws Exception {
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
// 连接 namesrv
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅一个主题 * 表示订阅这个主题中所有的消息,后面会有消息过滤的教程
consumer.subscribe("testTopic", "*");
// 设置一个监听器 (一直监听的,异步回调方式,消费者线程和监听线程不是一个线程)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 这个就是消费的方法 (业务处理)
System.out.println("我是消费者");
// msgs 虽然是List,但是只有一条消息,所以get(0)就行
System.out.println(msgs.get(0).toString());
// 消息内容从字节数组转化为String
System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
System.out.println("消费上下文:" + context);
// 返回值 CONSUME_SUCCESS成功,消息会从mq出队
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// RECONSUME_LATER(报错/null)失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动
consumer.start();
// 挂起当前的jvm,让监听一直存在
System.in.read();
}
【运行】
说明
- 一个生产者组可以投递到多个主题
- 一个消费者组只能订阅一个主题
一个消费者组消费一个topic
【负载均衡模式】消息1给 C1 消费,消息2给 C2 消费,以此类推
【广播模式】同一条消息既给 C1 消费,又给 C2 消费
两个消费者组消费一个topic
同一消息,两个消费者组都获取到,但是组内要分配给哪个消费者,就看是负载【均衡模式】还是【广播模式】了
生产者的消息发送给主题的哪个队列
生产者会将消息轮询发送到主题的4个队列
消费者如何从队列中拉取消息
只有一个消费者,要拉取所有队列的消息
- 代理者:MQ
- 消费者:我们的程序
测试,生产者生产12个消息
差值:代理者位点-消费者位点。如果差值太大,说明消息堆积
两个消费者,每个消费者要负责两个队列
三个消费者(要求尽量平衡)
四个消费者,一人一个
五个消费者,第五个消费者永远不接收消息
队列数量最好大于等于消费者组内的消费者数量!!!