【RocketMQ】RocketMQ入门
文章目录
- 【RocketMQ】RocketMQ入门
- 1. 消费模式
- 2. 发送/消费 消息
- 2.1 同步消息
- 2.2 异步消息
- 2.3 单向消息
- 2.4 延迟消息
- 2.5 批量消息
- 2.6 顺序消息
1. 消费模式
MQ的消费模式大致分为两种,一种是推Push,一种是拉pull。
Push模式:
- 优点:
- 及时性较好
- 缺点:
- 客户端没有做好流控的话容易导致客户端消息堆积甚至崩溃。
Pull模式:
- 优点:
- 客户端可以根据自己的消费能力进行消费
- 缺点:
- 拉取频率不好控制,频繁容易造成客户端压力过大,拉取间隔长容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
2. 发送/消费 消息
参考文档:RocketMQ官方文档
以下代码采用的都是rocketmq的原生api
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2.1 同步消息
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
生产者发送消息代码如下:
@Test
public void simpleProducer() throws Exception {
//创建一个生产者 (制定一个组名)
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
//连接namesrv
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//启动
producer.start();
for (int i = 1; i <= 10; i++) {
//创建消息
Message message = new Message("testTopic", ("我是一个简单的消息" + i).getBytes());
//发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult.getSendStatus());
}
//关闭生产者
producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void simpleConsumer() throws Exception {
//创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
//连接namesrv
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//订阅一个主题 * 标识订阅这个主题中所有消息,后期会有消息过滤
consumer.subscribe("testTopic", "*");
//设置一个监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//这个就是消费的方法(业务处理)
System.out.println("我是消费者");
System.out.println(list.get(0).toString());
System.out.println("消息内容:" + new String(list.get(0).getBody()));
System.out.println("消费上下文" + consumeConcurrentlyContext);
//返回值 CONSUME_SUCCESS 成功,消息会从mq出队
// RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//TimeUnit.SECONDS.sleep(100);
//挂起当前jvm
System.in.read();
}
2.2 异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
注:异步发送生产者需要实现异步发送回调接口。
生产者发送消息代码如下:
@Test
public void asyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
//连接
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//启动
producer.start();
Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败" + throwable.getMessage());
}
});
System.out.println("我先执行");
System.in.read();
}
消费者代码基本和同步消息的相同,不展示。
2.3 单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
生产者发送消息代码如下:
@Test
public void onewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "这是一条单向消息".getBytes());
producer.sendOneway(message);
System.out.println("成功");
producer.shutdown();
}
2.4 延迟消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。应用场景是外卖15分钟未支付则取消订单。
RokcketMQ一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
生产者发送消息代码如下:
@Test
public void msProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("orderMsTopic", "订单消息".getBytes());
//设置延迟等级
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(3);//10s
producer.send(message);
System.out.println("发送事件:" + new Date());
producer.shutdown();
}
2.5 批量消息
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
注:批量消息大小不能超过1MIB(1024*1024),同一批的 topic 必须相同
生产者发送消息代码如下:
@Test
public void testBatchProducer() throws Exception{
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
// 设置nameServer地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动实例
producer.start();
List<Message> msgs = Arrays.asList(
new Message("batchTopic", "我是一组消息的A消息".getBytes()),
new Message("batchTopic", "我是一组消息的B消息".getBytes()),
new Message("batchTopic", "我是一组消息的C消息".getBytes())
);
SendResult send = producer.send(msgs);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void msConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bathc-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("batchTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("收到消息了:" + new Date());
System.out.println(list.size());
System.out.println("消息体是:" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2.6 顺序消息
待续。。。