一.消费模式
 
MQ的消费模式可以大致分为两种,一种是 推Push,一种是 拉Pull。
- Push 是 服务端 (MQ) 主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
 - Pull 是 客户端 需要主动到 服务端 (MQ) 取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

 
Push模式也是基于Pull模式的,所以不管是Push模式还是Pull模式,都是Pull模式。一般情况下,优先选择Pull模式
二.同步消息(***)
同步消息 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。
可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

原生依赖引入:
        <!--  原生api,不是starter      -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.0</version>
        </dependency> 
同步消息生产者:
public class Producer {
    public static void main(String[] args) throws Exception {
            /*
             1. 谁来发?
             2. 发给谁?
             3. 怎么发?
             4. 发什么?
             5. 发的结果是什么?
             6. 关闭连接
             **/
            //1.创建一个发送消息的对象Producer,并指定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("sync-producer-group");
            //2.设定发送的命名服务器地址
            producer.setNamesrvAddr("ip:9876");
            producer.setSendMsgTimeout(1000000);
            //3.1启动发送的服务
            producer.start();
            //4.创建要发送的消息对象,指定topic,指定内容body
            Message msg = new Message("sync-topic", "hello-rocketmq".getBytes(StandardCharsets.UTF_8));
            //3.2发送消息
            SendResult result = producer.send(msg);
            System.out.println("返回结果:" + result);
            //5.关闭连接
            producer.shutdown();
    }
}
 
同步消息消费者:
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer,并指定消费者组名
        //两种模式:①消费者定时拉取模式  ②建立长连接让Broker推送消息(选择第二种)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-producer-group");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("ip:9876");
        //3.订阅一个主题,* 表示订阅这个主题的所有消息,后期会有消息过滤
        consumer.subscribe("sync-topic","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //3.设置监听器,用于接收消息(一直监听,异步回调,异步线程)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            //消费消息
            //消费上下文:consumeConcurrentlyContext
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 这个就是消费的方法 (业务处理)
                System.out.println("我是消费者");
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("消费上下文:" + context);
                //签收消息,消息会从mq出队
                //如果返回 RECONSUME_LATER 或 null 或 产生异常 那么消息会重新 回到队列 过一会重新投递出来 ,给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已经开启!");
        //5 不要关闭消费者!因为需要监听!
        //挂起
        System.in.read();
    }
}
 
三.异步消息(***)
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。
例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

异步消息生产者:
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        Message message = new Message("async-topic", "我是一个异步消息".getBytes());
        //没有返回值的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }
            @Override
            public void onException(Throwable e) {
                System.err.println("发送失败:" + e.getMessage());
            }
        });
        System.out.println("我先执行");
        //需要接收异步回调,这里需要挂起
        System.in.read();
    }
}
 
消费者无特殊变化:
public class SimpleConsumer {
    public static void main(String[] args) throws Exception{
        // 创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("async-producer-group");
        // 连接namesrv
        consumer.setNamesrvAddr("ip:9876");
        // 订阅一个主题  * 标识订阅这个主题中所有的消息  后期会有消息过滤
        consumer.subscribe("async-topic", "*");
        // 设置一个监听器 (一直监听的, 异步回调方式)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 这个就是消费的方法 (业务处理)
                System.out.println("我是消费者");
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("消费上下文:" + context);
                // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
                // RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        // 挂起当前的jvm
        System.in.read();
    }
}
 
四.单向消息(*)
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,一般用于结果不重要的场景,例如日志信息的发送
 
单向消息生产者:
public class SingleWayProducer {
    public static void main(String[] args) throws Exception{
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("single-way-producer-group");
        // 设置nameServer地址
        producer.setNamesrvAddr("ip:9876");
        // 启动实例
        producer.start();
        Message msg = new Message("single-way-topic", ("单向消息").getBytes());
        // 发送单向消息
        producer.sendOneway(msg);
        // 关闭实例
        producer.shutdown();
    }
}
 
日志服务的编写思路
产生日志的服务利用MQ发送单向消息,不用等回复,大大减少了发送日志的时间,由log-service统一写入日志表中。并且由于日志过于庞大,可以对日志进行冷热分离,近一个月的为热数据,近一年的为冷数据(实际情况据业务而定),存储的位置不同,时间过于久远的日志可以删掉

五.延迟消息(***)
消息放入MQ后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,15min后去检查这个订单的状态,如果还是未付款就取消订单释放库存(订单超时)。
在分布式定时调度触发、任务超时处理等场景,使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

延迟等级

延迟消息生产者:
public class DelayProducer {
    public static void main(String[] args) throws Exception{
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");
        // 设置nameServer地址
        producer.setNamesrvAddr("ip:9876");
        // 启动实例
        producer.start();
        Message msg = new Message("delay-topic", ("延迟消息").getBytes());
        // 给这个消息设定一个延迟等级
        // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        msg.setDelayTimeLevel(3);
        // 发送单向消息
        producer.send(msg);
        // 打印时间
        System.out.println(new Date());
        // 关闭实例
        producer.shutdown();
    }
}
 
延迟消息消费者(无特殊变化):
public class MSConsumer {
    public static void main(String[] args) throws Exception{
        // 创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-producer-group");
        // 连接namesrv
        consumer.setNamesrvAddr("ip:9876");
        // 订阅一个主题  * 标识订阅这个主题中所有的消息  后期会有消息过滤
        consumer.subscribe("delay-topic", "*");
        // 设置一个监听器 (一直监听的, 异步回调方式)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("收到时间:"+new Date());
                // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
                // RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        // 挂起当前的jvm
        System.in.read();
    }
}
 
可以通过打印一下时间差来检测一下(第一次有误差很正常)
六.批量消息
Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费。
在对吞吐率有一定要求的情况下,可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。



















