消息生产者通过三种方式发送消息
1.同步发送:等待消息返回后再继续进行下面的操作 同步发送保证了消息的可靠性,适用于关键业务场景。
2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用
CountDownLatch计数
3.单向发送:只负责发送,不管消息是否发送成功 单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。
消费者消费消息分两种:
拉模式:消费者主动去Broker上拉取消息
推模式:消费者等待Broker把消息推送过来
事实上:尽管存在“推送消费者”(DefaultMQPushConsumer
)和“拉取消费者”(DefaultMQPullConsumer
)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.3.0</version> </dependency>
客户端与服务器安装版本一致即可
演示1 同步发送模式 客户端推送模式
注意观察 broker是把消息分两次推送的 就是发多少条消息 推送多少次
生产者
package com.example.rocketmqdemo.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* 同步发送
* 使用场景:
* 1.可靠性要求高,消息发送需要等待确认
* 2.数据量较少的场景
* 3.实时响应,消息发送需要立即得到结果
* 小的订单系统
* @author hrui
* @date 2024/7/31 20:31
*/
public class SyncProducer {
public static void main(String[] args) {
//创建一个DefaultMQProducer实例,指定生产者组名为"group1"
DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念 不需要相同
//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
try {
//启动生产者实例
producer.start();
//发送10条消息
for (int i = 0; i < 2; i++) {
//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号
Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));
//发送消息,并同步等待发送结果 (同步发送)
SendResult sendResult = producer.send(message);
//打印消息发送结果
System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);
}
} catch (Exception e) {
//捕获并打印异常信息
e.printStackTrace();
} finally {
//关闭生产者实例,释放资源
producer.shutdown();
}
}
}
消费者
package com.example.rocketmqdemo.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 简单消费者
* @author hrui
* @date 2024/7/31 20:40
*/
public class Consumer {
public static void main(String[] args) {
//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"
//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
try {
//订阅主题"Topic1",过滤标签为"*",表示接收所有消息
consumer.subscribe("Topic1", "*");
//设置消息监听器,处理接收到的消息
//可以传入两种类型的监听器:
//1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
consumer.setMessageListener(new MessageListenerConcurrently() {
//consumeMessage方法用于处理接收到的消息列表
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// //遍历消息列表,处理每条消息
// list.forEach(messageExt -> {
// //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
// System.out.println(new String(messageExt.getBody()));
// //消息处理成功后输出确认信息
// System.out.println("消息消费成功");
// });
for (int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的
}
//返回消费状态,CONSUME_SUCCESS表示消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者实例,开始接收消息
consumer.start();
} catch (Exception e) {
//捕获并打印异常信息
e.printStackTrace();
}
}
}
演示2 异步发送
package com.example.rocketmqdemo.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 异步发送消息
* 并发流量高的场景下,使用异步发送消息可以提高吞吐量。
* @author hrui
* @date 2024/7/31 21:53
*/
public class AsyncProducer {
public static void main(String[] args) {
//创建一个DefaultMQProducer实例,指定生产者组名为"group2"
DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念 不需要相同
//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
//计数器,用于跟踪异步消息发送的完成情况
CountDownLatch countDownLatch = new CountDownLatch(100);
try {
// 启动生产者实例
producer.start();
//发送100条消息
for (int i = 0; i < 100; i++) {
final int index = i;
//创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号
Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));
//发送消息,异步发送。第二个参数是SendCallback回调函数
producer.send(message, new SendCallback() {
@Override
//发送成功时,Broker回调此方法
public void onSuccess(SendResult sendResult) {
//将CountDownLatch计数器减一,表示一个消息发送任务完成
countDownLatch.countDown();
System.out.println("消息发送成功_" + sendResult);
}
@Override
//发送失败时,Broker回调此方法
public void onException(Throwable throwable) {
// 将CountDownLatch计数器减一,表示一个消息发送任务完成
countDownLatch.countDown();
System.out.println("消息发送失败_" + throwable.getStackTrace());
}
});
}
//等待所有消息发送完成
//countDownLatch.await();
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
if (!await) {
System.out.println("消息发送超时");
}
} catch (Exception e) {
//捕获并打印异常信息
e.printStackTrace();
} finally {
//关闭生产者实例,释放资源
producer.shutdown();
}
}
}
演示3 单向发送
package com.example.rocketmqdemo.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
/**
* 单向发送
* 试用场景
* 日志收集
* @author hrui
* @date 2024/7/31 22:27
*/
public class OnewayProducer {
public static void main(String[] args) {
//创建一个DefaultMQProducer实例,指定生产者组名为"group1"
DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念 不需要相同
//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
try {
//启动生产者实例
producer.start();
//发送10条消息
for (int i = 0; i < 2; i++) {
//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号 topic要和消费者相同
Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));
//发送消息,单向发送,不管发送成功与否
producer.sendOneway(message);
System.out.println(i+"_消息发送了");
}
} catch (Exception e) {
//捕获并打印异常信息
e.printStackTrace();
} finally {
//关闭生产者实例,释放资源
producer.shutdown();
}
}
}