10. 好客租房-RocketMQ快速入门[非项目必需]

news2025/1/11 21:43:12

本章节主要是学习RocketMQ, 目标快速入门, 能够回答或解决以下问题:

  1. 了解什么是RocketMQ

  1. 了解RocketMQ的核心概念

  1. 动手安装RocketMQ服务

  1. 快速入门,掌握RocketMQ的api使用

  1. 对producer、consumer进行详解

  1. 了解RocketMQ的存储特点

10.1 RocketMQ简介与安装

10.1.1 核心概念速通

RocketMQ是一个采用Java语言开发的一款高性能、高可靠、高实时的分布式消息中间件. 阿里开源,现在已成为 Apache 顶级孵化项目。

Rocketmq 解决哪些问题?

  • 发布订阅, 消息队列最基本的功能.

  • 消息优先级, 消息有序消费, 消息过滤, 消息持久化

  • 支持分布式事务

看不懂? 没关系, 我也是从网上粘贴到, 你知道RocketMQ是一个分布式消息系统就行, 支持生产者消费者模式(发布订阅). 其他的暂时不用在意.

10.1.2 常见名词介绍

生产者消费者介绍

生产者消费者, 也就是发布订阅模型, 往往有两个角色参与, "生产者, 也就是发布方", "消费者,也就是订阅方", 举个例子, 你关注了一个公众号, 那么公众号就是生产者, Ta负责发布内容, 你就是消费者, 订阅内容.

现在你知道了生产者消费者模式, 那你应该可以理解MQ就是一个推送机制, 生产者将内容推送到消息队列中, 消费者从消息队列中取出内容. 当然实际情况远比我们上面举得例子复杂.

但是基本上都可以归类为两种, 一种是只要有一个消费者消费后队列中内容消失, 另一种是只有所有的消费者都消费后才从列表中移除, 比如:

  1. 一个小组共用一个待办列表, 任何一个代办只要有一个人完成了, 那就从代办列表消失.

  1. 多人订阅一个公众号, 也就是生产一份内容, 多个消费者订阅后消费这一份.

RocketMQ的结构图介绍

生产者消费者已经介绍过了, 接下来主要介绍下调度员工人

nameServer cluster, 其实就是nameServer的集群.
nameServer 是RocketMQ的调度员, 当生产者想要发布内容的时候, 先对接调度员, 从而找到真正发布消息的工人,

生产者与nameServer和broker交互流程如下图所示:

然后MQ发布给所有订阅者的流程如下图:

nameServer与broker的关系, 大家懂得都懂, broker定时上报自己是可运行的(默认10s一次), 不然nameServer就认为他失效了(2mins未收到上报就认为他失效了)

其他你会用到的概念

Topic, 也就是一个主题, 生产者和消费者往往是多对多的关系, 就需要Topic进行中继连接, 就像百度贴吧或者微博超话, 生产者和消费者因为topic而聚集在一起.

Message, 这个简单, 就是一条消息, 里面还需要包含一个Topic的指向. 包含一个nameServer的指向.

10.1.3 使用docker安装.

我们使用docker安装体验下方便快捷的方式.

#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
docker pull foxiswho/rocketmq:broker-4.3.2
#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver/logs:/opt/logs \
-v /haoke/rmq/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#创建broker容器
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker/logs:/opt/logs \
-v /haoke/rmq/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver rmqbroker
#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver

如果你得/etc/rocketmq/broker.conf不存在或者是一个文件夹的话, 那么使用这个内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
brokerIP1={替换成你的公网ip}
namesrvAddr={替换成你的公网ip}:9876
brokerName=broker_haoke
enablePropertyFilter=true

如果你使用的是云服务, 那么需要放开以下三个端口, 不然本地会无法连接MQ.

9876
10911
10909

10.1.4 使用java连接测试

第一步,创建itcast-rocketmq工程

第二步,导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>itcast-rocketmq</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
package cn.itcast.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test-group");
        producer.setNamesrvAddr("{替换成你的公网ip}:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest11", "TagA",
                    ("Hello RocketMQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

运行测试,效果如下

10.1.5 使用UI管理

#拉取镜像
docker pull styletang/rocketmq-console-ng:1.0.0
#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.16.55.185:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0

然后打开

10.2 快速入门

10.2.1创建topic

package cn.itcast.rocketmq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class TopicDemo {
    public static void main(String[] args) throws MQClientException {
        // 创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
        // 生产者需要连接调度员
        producer.setNamesrvAddr("{ip}:9876");
        producer.start();
        // 生产者去创建一个topic (一般线上topic都需要审批才能创建)
        // broker_haoke是连接的broker的名字, 需要和你配置的broker名字一致
        // haoke_im_topic 是创建的topic的名字
        producer.createTopic("broker_haoke", "haoke_im_topic", 8);
        System.out.println("创建topic成功");
        
        producer.shutdown();
    }
}

运行,如果没有报错就ok.

10.2.2 生产者发消息(同步)

package cn.itcast.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 同步发送消息
 */
public class SyncProducer2 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
        producer.setNamesrvAddr("{你的服务器公网ip}:9876");
        producer.start();
        // 构建消息内容
        String msgStr = "用户A发送消息给用户B";
        Message msg = new Message("haoke_im_topic","SEND_MSG",
                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送消息
        SendResult sendResult = producer.send(msg);
        // 这里会同步等待到服务器的响应
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
        producer.shutdown();
    }
}
/*
打印结果:
消息状态:SEND_OK
消息id:AC1037A0307418B4AAC2374062400000
消息queue:MessageQueue [topic=haoke_im_topic, brokerName=broker_haoke_im, queueId=6]
消息offset:0
*/

简单介绍下Message对象

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
	// 每条message必须指明自己所在的topic
    private String topic;
    // MQ为应用扩展的字段, 完全由应有自己决定 
    private int flag;
    // 发送消息可以携带一些参数, 以便于自定义化, 当然里面也有一些系统推荐用的变量, 如tags, 这个就是MQ用来实现消息过滤需求的
    // 
    private Map<String, String> properties;
    // 消息内容, 需要转为字节数组传入
    private byte[] body;
    // RocketMQ支持分布式事务, 暂时不用管这个字段, 等学到对应内容
    private String transactionId;
    // 省略所有方法
}

10.2.3 生产者发消息(异步版)

package cn.itcast.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
        producer.setNamesrvAddr("{你的服务器公网ip}:9876");
        // 发送失败的重试次数, 默认为2
        producer.setRetryTimesWhenSendAsyncFailed(0);
        producer.start();

        String msgStr = "用户A发送消息给用户B";
        Message msg = new Message("haoke_im_topic","SEND_MSG",
                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 异步发送消息
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 接收到MQ发来的确认,会调用这里
                System.out.println("消息状态:" + sendResult.getSendStatus());
                System.out.println("消息id:" + sendResult.getMsgId());
                System.out.println("消息queue:" + sendResult.getMessageQueue());
                System.out.println("消息offset:" + sendResult.getQueueOffset());
                producer.shutdown();
            }
            @Override
            public void onException(Throwable e) {
                System.out.println("发送失败!" + e);
            }
        });
        System.out.println("发送成功!");
        // 如果这个不注释掉, 就会发送失败, 因为是异步发送, 发送还没完成就shutdown
        // producer.shutdown();
    }
}

10.2.3 消费消息

package cn.itcast.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;

public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        // 创建消费者与创建生产者代码基本一致
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
        consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("haoke_im_topic", "*");
        // 也可以写成下面这种写法, 只匹配tags中带有SEND_MSG 或者SEND_MSG1的消息, 其他消息都不接收
        // consumer.subscribe("haoke_im_topic", "SEND_MSG || SEND_MSG1");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("收到消息->" + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

10.2.4 消费消息(自定义过滤版)

先写一个生产者

package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducerFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
        producer.setNamesrvAddr("{你的服务器公网ip}:9876");
        producer.start();
        String msgStr = "美女001";
        Message msg = new Message("haoke_meinv_topic","SEND_MSG",
                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 更改这里的age和sex, 可以让不同的消费者获取
        msg.putUserProperty("age", "21");
        msg.putUserProperty("sex", "女");
        // 发送消息
        SendResult sendResult = producer.send(msg);
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
        System.out.println(sendResult);
        producer.shutdown();
    }
}
package cn.itcast.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class ConsumerFilterDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
        consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
        // 订阅topic,接收此Topic下的所有消息, 过滤要求:>20岁的女性
        consumer.subscribe("haoke_meinv_topic", MessageSelector.bySql("age>=20 AND sex='女'"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("收到消息->" + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

10.3 进阶知识点(部分)

10.3.1 顺序发送与消费消息

在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订

单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。

那么现在有一个简单的思考题,

思考题: 已知我们一个topic下有多台机器, 我们能保证接到的顺序, 怎么保证顺序性呢?
很简单, 只要保证生产者和消费者的一次请求都始终绑定某个机器就可以了. 当然实际情况上更灵活, 我们只需要保证一个订单的所有状态都交给同一个机器来处理就可以了,然后消费者需要串行的消费.

接下来一个demo展示具体思路

package cn.itcast.rocketmq.order;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 *
 */
public class OrderProducer {
    public static void main(String[] args) throws Exception{
        // 创建生产者与之前的代码一致
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_ORDER_PRODUCER");
        producer.setNamesrvAddr("{你的服务器公网ip}:9876");
        producer.start();

        // 生成100条消息
        for (int i = 0; i < 100; i++) {
            String msgStr = "order --> " + i;
            int orderId = i % 10; // 模拟生成订单id
            Message message = new Message("haoke_order_topic","ORDER_MSG",
                    msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // send的三个参数:
            // 1. message, 消息本体
            // 2. MessageQueueSelector selector, 这是选择队列的选择器,MQ会根据你的选择器选用合适的队列
            // 3. 参数,这个是你为了给 selector 提供的参数.
            SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
                // 队列选择器, 根据 orderId 选择处理机器, 从而保证消息的有序性
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }, orderId);
            // 打印结果
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

消费者:

package cn.itcast.rocketmq.order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class OrderConsumer {
    public static void main(String[] args) throws Exception{
        // 创建消费者
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("HAOKE_ORDER_CONSUMER");
        consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
        consumer.subscribe("haoke_order_topic", "*");
        // 注意,这里使用的监听器不再是之前用的 MessageListenerConcurrently(并发的), 而是 MessageListenerOrderly(有序的)
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                // orderly监听器 为了保证消费者的有序性, 这里会使用一个消费者线程会顺序消费queue
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

10.3.2 事务消息

往往我们需要在事务开始前就告知消息系统准备一条消息, 当事务完成后将消息发送给消费者. 如果事务失败那么取消发送.

针对这种MQ接收到但是需要等待生产者通知后才能发送的消息, 我们称其为Half(Prepare) Message(半消息).

如果一条半消息长时间没收到成功/失败的通知, 那么会发送一条回查,确定是发送还是取消.

接下来我用一个demo演示下.

package cn.itcast.rocketmq.tx;

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        // 这里我们创建一个支持事务的生产者
        TransactionMQProducer producer = new
                TransactionMQProducer("transaction_producer");
        producer.setNamesrvAddr("{你的Ip}:9876");
        // 设置我们自定义的事务监听器, 他会根据这个监听器进行
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.start();
        // 发送消息
        Message message = new Message("pay_topic",
                "用户A给用户B转账500元".getBytes("UTF-8"));
        // 使用事务发消息
        producer.sendMessageInTransaction(message, null);

        // 模拟生产者其他操作, 如果关掉会影响回查
        Thread.sleep(999999);
        producer.shutdown();
    }
}

自定义监听器

package cn.itcast.rocketmq.tx;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.HashMap;
import java.util.Map;

// 发送二次确认和接收回查都在这里实现.
public class TransactionListenerImpl implements TransactionListener {
    private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();

    /**
     * 执行具体的业务逻辑
     *
     * @param msg 发送的消息对象
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            System.out.println("用户A账户减500元.");
            Thread.sleep(500);  //模拟调用服务

            // 模拟抛出异常, 中断事务
            // System.out.println(1/0);

            System.out.println("用户B账户加500元.");
            Thread.sleep(800);  //模拟调用服务
            // 存log日志到数据库中
            STATE_MAP.put(msg.getTransactionId(),
                    LocalTransactionState.COMMIT_MESSAGE);
            // 二次提交确认
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 存log日志到数据库中
        STATE_MAP.put(msg.getTransactionId(),
                LocalTransactionState.ROLLBACK_MESSAGE);
        // 回滚
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    /**
     * 消息回查
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 去数据库查log日志, 发现已经成功了
        return STATE_MAP.get(msg.getTransactionId());
    }
}

消费者代码如下:

package cn.itcast.rocketmq.tx;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

// 事务对消费者来说应该是无感知的
public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("HAOKE_CONSUMER");
        consumer.setNamesrvAddr("{ip}:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("pay_topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

10.3.3 重试策略

在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重

试需要分2种,分别是producer端重试和consumer端重试。

10.3.3.1 producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。

package cn.itcast.rocketmq.day2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
        producer.setNamesrvAddr("{ip}:9876");
        // TODO : 消息发送失败时,重试3次
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();
        String msgStr = "用户A发送消息给用户B";
        Message msg = new Message("haoke_im_topic", "SEND_MSG",
                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送消息,并且指定超时时间, 一旦超时会自动重试3次.
        SendResult sendResult = producer.send(msg, 1000);
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
        System.out.println(sendResult);
        producer.shutdown();
    }
}

10.3.3.2 队列timeout

比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直

至发送成功为止!

也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。

10.3.3.3 consumer上报exception

消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话

费充值,当前消息的手机号被注销,无法充值)等。

如果消息消费失败,那么消息将会在1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h后重试,一直到2h后不再重试

有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。

package cn.itcast.rocketmq.day2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
        consumer.setNamesrvAddr("134.175.110.184:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("my-test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("收到消息->" + msgs);
                if (msgs.get(0).getReconsumeTimes() >= 3) {
                    // 重试3次后,不再进行重试, 也就是会收到四次消息后才会消费掉.
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
    }
}

10.4 接入SpringBoot

依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
    </parent>
    <groupId>org.example</groupId>
    <artifactId>itcast-rocketmq</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--RocketMQ相关-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.properties 的内容如下

# Spring boot application
spring.application.name = itcast-rocketmq

rocketmq.name-server={你的ip}:9876
rocketmq.producer.group=my-group

编写启动类

package cn.itcast.rocketmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

编写生产者代码:

package cn.itcast.rocketmq.spring;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SpringProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     *
     * @param topic
     * @param msg
     */
    public void sendMsg(String topic, String msg) {
        this.rocketMQTemplate.convertAndSend(topic, msg);
    }
}

消费者代码

package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "haoke-consumer", selectorExpression = "*")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("接收到消息 -> " + msg);
    }
}

再test目录下编写测试类

package cn.itcast.rocketmq.spring;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

    @Autowired
    private SpringProducer springProducer;
    // 运行第一次只会发送消息, 然后junit就结束了, 之后才能让消费者接收到消息.
    @Test
    public void testSendMsg(){
        this.springProducer.sendMsg("my-topic", "第一个Spring消息");
    }
}

10.5 IM系统接入RocketMQ

pom中新增依赖:

        <!--RocketMQ相关依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

application.properties中新增mq相关

rocketmq.name-server={ip}:9876
rocketmq.producer.group=my-group

修改后的MessageHandler, 既可以作为发送者,也是消费者.

package cn.itcast.haoke.im.websocket;

import cn.itcast.haoke.im.dao.MessageDAO;
import cn.itcast.haoke.im.pojo.Message;
import cn.itcast.haoke.im.pojo.UserData;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.util.HashMap;
import java.util.Map;

/**
 * webSocket 消息处理器
 *
 * @author 过道
 */
@Component
// 订阅topic, 把自己设置到group中
// 这里我们选择messageModel为广播模式, 以便所有机器都可以收到消息, 然后判断自己是否是目标用户,是的话则处理掉.
@RocketMQMessageListener(topic = "haoke-im-send-message-topic",
        consumerGroup = "haoke-im-consumer",
        messageModel = MessageModel.BROADCASTING,
        selectorExpression = "SEND_MSG")
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {

    @Autowired
    private MessageDAO messageDAO;
    // TODO 引入rocket, 如果有红色下划线不用管哦
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private static final ObjectMapper MAPPER = new ObjectMapper();
    // 记录所有在线的终端, 并配置唯一标识.
    private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 将当前用户的session放置到map中,后面会使用相应的session通信
        Long uid = (Long) session.getAttributes().get("uid");
        SESSIONS.put(uid, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage
            textMessage) throws Exception {
        // 解析消息中的发送方, 接收方, 消息内容.
        Long uid = (Long) session.getAttributes().get("uid");
        JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
        Long toId = jsonNode.get("toId").asLong();
        String msg = jsonNode.get("msg").asText();
        Message message = Message.builder()
                // 假装发送用户和接受用户都是从数据库中查出来的
                .from(UserData.USER_MAP.get(uid))
                .to(UserData.USER_MAP.get(toId))
                .msg(msg)
                .build();
        // 将消息保存到MongoDB
        message = this.messageDAO.saveMessage(message);
        //  序列化保存整个message
        String msgStr = MAPPER.writeValueAsString(message);
        // 判断to用户是否在线
        WebSocketSession toSession = SESSIONS.get(toId);
        if (toSession != null && toSession.isOpen()) {
            // 具体格式需要和前端对接
            toSession.sendMessage(new TextMessage(msgStr));
            // 更新消息状态为已读
            this.messageDAO.updateMessageState(message.getId(), 2);
        } else {
            // TODO 用户不在线,或者不在当前的jvm中,发送消息到RocketMQ, 让MQ去寻找对应的消费者
            org.springframework.messaging.Message mqMessage = MessageBuilder
                    .withPayload(msgStr)
                    .build();
            // topic:tags 设置主题和标签
            this.rocketMQTemplate.send("haoke-im-send-message-topic:SEND_MSG",
                    mqMessage);
        }
    }

    /**
     * 收到消息后,
     * @param msg
     */
    @Override
    public void onMessage(String msg) {
        // System.out.println("接收到消息 -> " + msg);
        try {
            JsonNode jsonNode = MAPPER.readTree(msg);
            Long toId = jsonNode.get("to").get("id").longValue();
            // 判断to用户是否在线
            WebSocketSession toSession = SESSIONS.get(toId);
            if (toSession != null && toSession.isOpen()) {
                toSession.sendMessage(new TextMessage(msg));
                // 更新消息状态为已读
                this.messageDAO.updateMessageState(new
                        ObjectId(jsonNode.get("id").asText()), 2);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179368.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分享143个ASP源码,总有一款适合您

ASP源码 分享143个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 143个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1Fd3_qaHDj2_BuslyFT8YVQ?pwdrjmi 提取码&#x…

数据库02_函数依赖,范式---软考高级系统架构师008

1.首先我们来看这个,给定一个X,能确定一个Y那么就说,X确定Y,或者Y依赖x,那么 比如y = x * x 就是x确定y,或者y依赖于x 2.然后再来看图,那么左边的部分函数依赖,就是,通过A和B能决定C,那么如果A只用给就能决定C,那么就是部分函数依赖. 3.然后再来看,可以看到,A可以决定B,那么…

文件(2)

1)指定一个目录&#xff0c;扫描这个目录&#xff0c;找到当前目录下面所有文件的文件名是否包含了指定字符的文件&#xff0c;并提示这个用户是否要删除这个文件&#xff0c;根据用户的输入来决定是否删除&#xff1b; 1.1)需要进行输入一个目录&#xff0c;还需要进行输入一个…

概论_第7章_参数估计

参数估计的形式有两种&#xff1a; 点估计和区间估计 1 点估计 设x1,x2,...xnx_1, x_2, ... x_nx1​,x2​,...xn​是来自总体的一个样本&#xff0c; 我们用一个统计量 θ^\hat\thetaθ^ θ^(x1,x2,...,xn)\hat\theta(x_1, x_2, ..., x_n)θ^(x1​,x2​,...,xn​)的取值作为…

[ 云原生 | 容器 ] 虚拟化技术之容器与 Docker 概述

在云计算中&#xff0c;虚拟化技术一般可以被分为两类&#xff0c;分别是虚拟机&#xff08;VM&#xff0c;Virtual Machine&#xff09;技术以及容器&#xff08;Container&#xff09;技术&#xff0c;这里我们只讲云原生中 Docker 虚拟化技术。 文章目录一、应用部署方式的变…

Kubernetes:基于命名行终端/ Web 控制台的管理工具 kubebox

写在前面 kubebox 是一个轻量的 k8s 管理工具&#xff0c;可以基于命令行终端或 Web 端博文内容涉及&#xff1a;kubebox 不同方式的安装下载&#xff0c;简单使用。如果希望轻量一点&#xff0c;个人很推荐这个工具&#xff0c;轻量&#xff0c;而且使用简单。理解不足小伙伴帮…

Pointofix安装与设置为中文

Pointofix用来桌面绘图&#xff0c;还可以放大桌面一、下载官网下载地址&#xff1a;https://www.pointofix.de/&#xff0c;点击箭头所指跳转页面点击下载安装包pointofix180de-20180511-setup.zip&#xff0c;语言包pointofix-translation-20220120.zip二、安装解压pointofix…

【学习笔记】[AGC022F] Checkers

首先不考虑算重&#xff0c;因为这题坑点在于当n≥5n\ge 5n≥5时不同结构的树可能生成相同的结果。 那么我们考虑生成不同的系数序列AAA&#xff0c;然后用可重集算一下方案数。考虑将−1-1−1的边缩去后所形成的树&#xff0c;第iii层的点表示的是2i2^i2i&#xff0c;那么如何…

基于微信小程序的新生自助报到系统小程序

文末联系获取源码 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏览器…

蓝桥杯-刷题-补基础(加强版)

&#x1f33c;feels good&#x1f603;串烧 - 许天昱/陈旭辉-nn/单子玹/蒋笛含 - 单曲 - 网易云音乐 &#x1f33c;10道入门题 --- 明显比上篇博客难了一点&#xff0c;要慢慢做了 目录 一&#xff0c;第k个素数 二&#xff0c;最大公约数 三&#xff0c;最小公倍数 四…

Mybatis-Plus 多记录操作与逻辑删除

目录 多记录操作 逻辑删除 问题引入 所以对于删除操作业务问题来说有: 实现步骤 逻辑删除&#xff0c;对查询有没有影响呢? 如果还是想把已经删除的数据都查询出来该如何实现? 多记录操作 程序设计出来一个个删除的话还是比较慢和费事的&#xff0c;所以一般会给用户一…

OpenMP Parallel Construct 实现原理与源码分析

OpenMP Parallel Construct 实现原理与源码分析 前言 在本篇文章当中我们将主要分析 OpenMP 当中的 parallel construct 具体时如何实现的&#xff0c;以及这个 construct 调用了哪些运行时库函数&#xff0c;并且详细分析这期间的参数传递&#xff01; Parallel 分析——编…

逆序遍历List集合

1 问题手写一个程序&#xff0c;完成List集合对象的逆序遍历2 方法创建List接口的多态对象向创建好list集合添加元素使用hasPrevious方法。import java.util.ArrayList;import java.util.List;import java.util.ListIterator;public class TestList { public static void ma…

如何好好说话第11章 攀登抽象之梯

在心里面放一把抽象之梯我们要时刻去概括。从更宏观的角度去理解我们当前所处的事情。抽上之梯的这个概念&#xff0c;在写作教材中常常出现。他指引我们写作的时候&#xff0c;不要站在梯子的中间。要么具体详实&#xff0c;要么抽象而精简短小。为什么不要站在梯子中间呢&…

蓝桥杯 stm32 MCP4017

本文代码使用 HAL 库。 文章目录前言一、MCP4017 的重要特性二、MCP4017 计算 RBW 阻值三、MCP4017 地址四、MCP4017 读写函数五、CubeMX 创建工程 &#xff08;利用 ADC 测量 MCP4017 电压&#xff09;、对应代码&#xff1a;总结前言 一、MCP4017 的重要特性 蓝桥杯 板子上…

冯诺依曼体系结构及操作系统(OS)的简单认识

文章目录冯诺依曼体系结构操作系统&#xff08;Operator System&#xff09;冯诺依曼体系结构 冯诺依曼结构也称普林斯顿结构&#xff0c;是一种将程序指令存储器和数据存储器合并在一起的存储结构。数学家冯诺依曼提出了计算机制造的三个基本原则&#xff0c;即采用二进制逻辑…

虚拟机的介绍及安装

文章目录虚拟机介绍VMware WorkStation安装在VMware上安装Linux远程连接Linux系统虚拟机介绍 通过虚拟化技术&#xff0c;在电脑内&#xff0c;虚拟出计算机硬件&#xff0c;并给虚拟的硬件安装操作系统&#xff0c;即可得到一台虚拟的电脑&#xff0c;称之为虚拟机。 VMware…

Ubuntu安装wordpress

这里写自定义目录标题开始环境安装打开参考链接开始 环境 这里安装的php是7.4版本&#xff0c; apt install apache2 php mariadb-server apt install php7.4-mysql php-dev记得需要单独安装php7.4-mysql&#xff0c;不然可能会报错连接数据库出错&#xff0c;中电是wp_chec…

(机械师T90外接显卡GTX-1080)Win10笔记本通过M.2接口外接独立显卡+解决错误代码43

文章目录前言一、硬件清单二、硬件安装1.插入转接卡2.显卡安装3.接入电源4.连接显示器三、驱动安装&#xff08;重点&#xff09;1.禁用笔记本独显2.卸载驱动3.安装新驱动4.解决错误代码43外接显卡使用体验前言 据外接显卡成功也快有一个月了&#xff0c;期间畅玩了刺客信条奥…

【C进阶】程序环境和预处理

⭐博客主页&#xff1a;️CS semi主页 ⭐欢迎关注&#xff1a;点赞收藏留言 ⭐系列专栏&#xff1a;C语言进阶 ⭐代码仓库&#xff1a;C Advanced 家人们更新不易&#xff0c;你们的点赞和关注对我而言十分重要&#xff0c;友友们麻烦多多点赞&#xff0b;关注&#xff0c;你们…