一.消费模式
MQ的消费模式可以大致分为两种,一种是 推Push,一种是 拉Pull。
- Push 是 服务端 (MQ) 主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
- Pull 是 客户端 需要主动到 服务端 (MQ) 取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于Pull模式的,所以不管是Push模式还是Pull模式,都是Pull模式。一般情况下,优先选择Pull模式
二.同步消息(***)
同步消息 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。
可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
原生依赖引入:
<!-- 原生api,不是starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
同步消息生产者:
public class Producer {
public static void main(String[] args) throws Exception {
/*
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?
6. 关闭连接
**/
//1.创建一个发送消息的对象Producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("sync-producer-group");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("ip:9876");
producer.setSendMsgTimeout(1000000);
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("sync-topic", "hello-rocketmq".getBytes(StandardCharsets.UTF_8));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
//5.关闭连接
producer.shutdown();
}
}
同步消息消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer,并指定消费者组名
//两种模式:①消费者定时拉取模式 ②建立长连接让Broker推送消息(选择第二种)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-producer-group");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("ip:9876");
//3.订阅一个主题,* 表示订阅这个主题的所有消息,后期会有消息过滤
consumer.subscribe("sync-topic","*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
//3.设置监听器,用于接收消息(一直监听,异步回调,异步线程)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
//消费消息
//消费上下文:consumeConcurrentlyContext
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 这个就是消费的方法 (业务处理)
System.out.println("我是消费者");
System.out.println(msgs.get(0).toString());
System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
System.out.println("消费上下文:" + context);
//签收消息,消息会从mq出队
//如果返回 RECONSUME_LATER 或 null 或 产生异常 那么消息会重新 回到队列 过一会重新投递出来 ,给当前消费者或者其他消费者消费的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!因为需要监听!
//挂起
System.in.read();
}
}
三.异步消息(***)
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。
例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
异步消息生产者:
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr("ip:9876");
producer.start();
Message message = new Message("async-topic", "我是一个异步消息".getBytes());
//没有返回值的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable e) {
System.err.println("发送失败:" + e.getMessage());
}
});
System.out.println("我先执行");
//需要接收异步回调,这里需要挂起
System.in.read();
}
}
消费者无特殊变化:
public class SimpleConsumer {
public static void main(String[] args) throws Exception{
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("async-producer-group");
// 连接namesrv
consumer.setNamesrvAddr("ip:9876");
// 订阅一个主题 * 标识订阅这个主题中所有的消息 后期会有消息过滤
consumer.subscribe("async-topic", "*");
// 设置一个监听器 (一直监听的, 异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 这个就是消费的方法 (业务处理)
System.out.println("我是消费者");
System.out.println(msgs.get(0).toString());
System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
System.out.println("消费上下文:" + context);
// 返回值 CONSUME_SUCCESS成功,消息会从mq出队
// RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
// 挂起当前的jvm
System.in.read();
}
}
四.单向消息(*)
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,一般用于结果不重要的场景,例如日志信息的发送
单向消息生产者:
public class SingleWayProducer {
public static void main(String[] args) throws Exception{
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("single-way-producer-group");
// 设置nameServer地址
producer.setNamesrvAddr("ip:9876");
// 启动实例
producer.start();
Message msg = new Message("single-way-topic", ("单向消息").getBytes());
// 发送单向消息
producer.sendOneway(msg);
// 关闭实例
producer.shutdown();
}
}
日志服务的编写思路
产生日志的服务利用MQ发送单向消息,不用等回复,大大减少了发送日志的时间,由log-service统一写入日志表中。并且由于日志过于庞大,可以对日志进行冷热分离,近一个月的为热数据,近一年的为冷数据(实际情况据业务而定),存储的位置不同,时间过于久远的日志可以删掉
五.延迟消息(***)
消息放入MQ后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,15min后去检查这个订单的状态,如果还是未付款就取消订单释放库存(订单超时)。
在分布式定时调度触发、任务超时处理等场景,使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
延迟等级
延迟消息生产者:
public class DelayProducer {
public static void main(String[] args) throws Exception{
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");
// 设置nameServer地址
producer.setNamesrvAddr("ip:9876");
// 启动实例
producer.start();
Message msg = new Message("delay-topic", ("延迟消息").getBytes());
// 给这个消息设定一个延迟等级
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
// 发送单向消息
producer.send(msg);
// 打印时间
System.out.println(new Date());
// 关闭实例
producer.shutdown();
}
}
延迟消息消费者(无特殊变化):
public class MSConsumer {
public static void main(String[] args) throws Exception{
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-producer-group");
// 连接namesrv
consumer.setNamesrvAddr("ip:9876");
// 订阅一个主题 * 标识订阅这个主题中所有的消息 后期会有消息过滤
consumer.subscribe("delay-topic", "*");
// 设置一个监听器 (一直监听的, 异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs.get(0).toString());
System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
System.out.println("收到时间:"+new Date());
// 返回值 CONSUME_SUCCESS成功,消息会从mq出队
// RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
// 挂起当前的jvm
System.in.read();
}
}
可以通过打印一下时间差来检测一下(第一次有误差很正常)
六.批量消息
Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费。
在对吞吐率有一定要求的情况下,可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。