从环境部署到开发实战:消息队列 RocketMQ

news2024/9/27 7:32:12

文章目录

  • 一、消息队列简介
    • 1.1 什么是消息队列
    • 1.2 常见消息队列对比
    • 1.3 RockectMQ 核心概念
    • 1.4 RockectMQ 工作机制 (★)
  • 二、RocketMQ 部署相关
    • 2.1 服务器单机部署
    • 2.2 管控台页面
  • 三、RocketMQ 的基本使用
    • 3.1 入门案例
    • 3.2 消息发送方式
      • 3.2.1 同步消息
      • 3.2.2 异步消息
      • 3.2.3 一次性消息
    • 3.3 消息消费方式
      • 3.3.1 集群模式
      • 3.3.2 广播模式
    • 3.4 顺序消息
    • 3.5 延迟消息
    • 3.6 消息过滤
      • 3.6.1 Tag 过滤
      • 3.6.2 SQL92 过滤
  • 四、SpringBoot 集成 RocketMQ
    • 4.1 入门案例
    • 4.2 消息发送方式
      • 4.2.1 同步消息
      • 4.2.2 异步消息
      • 4.2.3 一次性消息
    • 4.3 消息消费方式
      • 4.3.1 集群模式
      • 4.3.2 广播模式
    • 4.4 顺序消息
    • 4.5 延时消息
    • 4.6 消息过滤
      • 4.6.1 Tag 过滤
      • 4.6.2 SQL92 过滤


一、消息队列简介

1.1 什么是消息队列

  消息队列(MQ)也叫消息队列中间件,其主要通过消息的发送和接受来实现程序的异步解耦、削峰填谷以及数据分发但是 MQ 真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的 dubbo、http 协议都是同步的。这两种协议很难实现双端通讯(即:A调用B,B也可以主动调用A),而且不支持长链接。MQ 做的就是在这些协议上构建一个简单协议——生产者、消费者模型,MQ 带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,并且可以自己定义生产者消费者

参考:消息队列详解


1.2 常见消息队列对比

在这里插入图片描述


1.3 RockectMQ 核心概念

生产者 Producer负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消费者 Consumer负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

名字服务 Name Server名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群,但相互独立,没有信息交换。

代理服务器 Broker Server消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

消息主题 Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位

消息队列 MessageQueue:对于每个 Topic 都可以设置一定数量的消息队列用来进行数据的读取。

消息内容 Message消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

标签 Tag为消息设置的标志,用于同一主题 Topic 下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。


1.4 RockectMQ 工作机制 (★)

  RockectMQ 有自己的注册中心,即 NameServer,连接命名服务后会拉取代理服务器的列表到本地缓存,生产者会通过负载均衡选出代理服务器的具体 IP,然后向选出的代理服务器发送消息,最终发送给消费者进行消费。如果发送的消息含有标签Tag,那么会在消费者消费时进行消息的过滤
在这里插入图片描述
  其中,每个 Topic 默认有 4 个 MessageQueue,即 4 个写和读队列。在消息中间件每个 topic 设置 4 个队列,主要是为了解决并发性能的问题。如果只有一个队列,为保证线程安全,必须得给队列进行写操作时上锁。设置 4 个队列也是由于大部分的服务器核心数都是 4 核的。

在这里插入图片描述


二、RocketMQ 部署相关

2.1 服务器单机部署

搜索资源:rocketmq-all-4.4.0-bin-release.zip

① 将压缩包上传服务器,把rocketmq-all-4.4.0-bin-release.zip 拷贝到 /usr/local/software
② 使用解压命令进行解压到 /usr/local 目录

unzip /usr/local/software/rocketmq-all-4.4.0-bin-release.zip -d /usr/local

③ 软件文件名重命名

mv  /usr/local/rocketmq-all-4.4.0-bin-release/  /usr/local/rocketmq-4.4/

④ 设置环境变量

vi /etc/profile

export JAVA_HOME=/usr/local/jdk1.8
export ROCKETMQ_HOME=/usr/local/rocketmq-4.4
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

修改环境变量后,需要 source /etc/profile 使配置文件生效。

⑤ 修改脚本中的 JVM 相关参数和启动参数的配置

vi  /usr/local/rocketmq-4.4/bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
vi  /usr/local/rocketmq-4.4/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

⑥ 修改配置文件,设置 Nameserver 和 Broker-server 部署机器的IP地址。

 vi /usr/local/rocketmq-4.4/conf/broker.conf

在这里插入图片描述

注:如果是服务器本身可以不设置

⑦ 启动 NameServer

# 1.启动NameServer,& 代表后台输出
nohup sh mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

⑧ 启动 Broker

#1.启动Broker
# nohup sh mqbroker -n 部署的IP地址:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &

#2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 

⑨ 使用 jps命令查看是否开启成功,如果看到NamesrvStartupBrokerStartup这两个进程,则证明启动成功。

在这里插入图片描述

关闭 nameserver:sh mqshutdown namesrv
关闭 broker:sh mqshutdown broker

另外:服务器需要暂时关闭防火墙 systemctl stop firewalld,并可使用 firewall-cmd --state 查看防火墙状态。
具体可参考:Linux关闭防火墙命令


2.2 管控台页面

搜索资源:rocketmq-console-ng-1.0.1.jar

在 jar 包的文件夹下新建一个配置文件 application.properties,编辑管控台的端口和 NameServer 中心的 IP 地址,使用 java -jar rocketmq-console-ng-1.0.1.jar 启动即可。
在这里插入图片描述

在这里插入图片描述
访问 http://localhost:9999/#/管控台界面如下:
在这里插入图片描述

注:管控台要求 jdk1.8。


三、RocketMQ 的基本使用

在一个工程中创建两个模块,模拟生产者和消费者:
在这里插入图片描述

3.1 入门案例

添加 rocketmq 的 pom 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

生产者模块生产消息:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.101:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "helloTopic";
        // 发送消息
        for (int i = 0; i < 3; i++) {
            Message message = new Message(topic, ("RocketMQ普通消息:" + i).getBytes(Charset.defaultCharset()));
            // 发送完成之后会返回响应结果
            SendResult result = producer.send(message);
            System.out.println("发送状态:" + result.getSendStatus());
        }
        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

在这里插入图片描述

消费者模块消费消息:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        // 设置nameSever地址
        consumer.setNamesrvAddr("192.168.63.101:9876");
        // 设置订阅的主题
        consumer.subscribe("helloTopic", "*"); // * 消息不过滤
        // 设置消费模式,默认集群
        // consumer.setMessageModel(MessageModel.CLUSTERING);
        // 设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    String content = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:" + Thread.currentThread() + "消息内容:" + content);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 通知MQ消费正常
                // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 通知MQ消费失败,消费者会重新消费
            }
        });
        // 启动的消费者
        consumer.start();
    }
}

在这里插入图片描述


3.2 消息发送方式

3.2.1 同步消息

可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。上面演示的案例,就是同步消息发送方式。
在这里插入图片描述

应用程序给消息中间件发送消息的时候,需要等待消息中间件将消息存储完毕后才响应回去,业务代码才能往下执行。

发送方式:SendResult result = producer.send(msg);

上面演示的案例就是同步发送。


3.2.2 异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。

在这里插入图片描述

应用程序发送消息,消息中间件收到这个消息之后,直接给应用程序响应(此时消息并没有完全存储到磁盘),消息中间件继续存储消息,通过回调地址通知有应用程序存储的结果(成功或失败)。

发送方式:

 producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        ...
    }

    @Override
    public void onException(Throwable throwable) {
        ...
    }
});
public class AsynchronousProducer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.100:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "helloTopic";
        // 发送消息
        Message message = new Message(topic, ("RocketMQ异步消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");

        // 异步发送,需要传递异步回调消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息存储状态:" + sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送出现异常");
            }
        });
        
        System.out.println("消息发送完毕");
        TimeUnit.SECONDS.sleep(5); // 为了等回调消息,模拟程序睡眠5s后关闭资源
        // 关闭资源
        producer.shutdown();
    }
}

在这里插入图片描述


3.2.3 一次性消息

一次性消息主要用在不特别关心发送结果的场景,例如:日志发送。

在这里插入图片描述

发送方式:producer.sendOneway(message);

应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了。

public class OneTimeProducer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.100:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "helloTopic";
        // 发送消息
        Message message = new Message(topic, ("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");
        
        // 一次性消息
        producer.sendOneway(message);
        
        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

在这里插入图片描述


3.3 消息消费方式

3.3.1 集群模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
在这里插入图片描述

由于内部会根据 index++ % queue.size() 的方式来决定消息进哪个 messageQueue,因此当多个机器做集群的时候,也可能会发生消息消费分配不均等情况。如下面 topic 中一共有 10 个消息:

在这里插入图片描述

入门案例默认就是集群的消费方式:
在这里插入图片描述


3.3.2 广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

在这里插入图片描述
设置消费模式:consumer.setMessageModel(MessageModel.BROADCASTING);

在这里插入图片描述


3.4 顺序消息

从上文的消费结果来看,在集群状态下,消息的消费顺序是乱序的,但有些场景是要求消息的消费是有序的,这要怎么实现呢?我们考虑以下两个场景:
① 如果在消费者做集群的情况下,由于消息会分散在不同的队列中,因此消息不可保证顺序消费,如:第四个消息比第一个消息更早被消费。因此,可以考虑将消息全放在一个队列中

在这里插入图片描述
注:一个队列只会被一个消费者实例消费,一个消费者实例可以消费多个队列

② 我们设置消费者的监听模式的时候使用的是 MessageListenerConcurrently 即多线程并发消费的形式,那么当消息全存储在一个队列时,由于 CPU 执行权等问题,消费者实例中多线程会并发的进行消费,也不会保证顺序消费。

// 一个队列对应一个实例的多个线程
consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        ...
    }
});

在这里插入图片描述

因此,可以使用 MessageListenerOrderly 让一个队列只对应一个线程

// 从什么地方开始消费,队头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 一个队列对应一个实例的一个线程
consumer.setMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {
        ...
    }
});

在这里插入图片描述

总结:如果想要实现顺序消费,在生产者的角度将消息存储在一个队列中,在消费者的角度就是将消息对应消费者实例里的一个线程


顺序消费案例,生产者代码如下:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.100:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "orderTopic";
        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        // 设置队列的选择器
        // 将需要顺序消费的消息存储到同一个队列中
        MessageQueueSelector selector = new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                System.out.println("队列的个数:" + list.size()); // 队列个数:4
                Long orderId = (Long)o; // 传入的参数
                int index = (int)(orderId % list.size());
                return list.get(index);
            }
        };
        // 发送消息
        for (OrderStep orderStep : orderSteps) {
            Message msg = new Message(topic, orderStep.toString().getBytes(Charset.defaultCharset()));
            // 指定消息选择器,传入的参数
            producer.send(msg, selector, orderStep.getOrderId()); // 将订单号传入选择器
        }

        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

消费者代码如下:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyProducerGroup");
        // 设置nameSever地址
        consumer.setNamesrvAddr("192.168.63.101:9876");
        // 设置订阅的主题
        consumer.subscribe("orderTopic", "*");
        // 从什么地方开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 一个队列对应的一个线程
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
                ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",队列ID:" + msg.getQueueId() + ",消息内容:"
                        + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 启动的消费者
        consumer.start();
    }
}

模拟数据:

@Setter
@Getter
public class OrderStep {
    private long orderId;
    private String desc;

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
}
public class OrderUtil {

    /**
     * 生成模拟订单数据
     */
    public static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(101L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(102L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(101L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(103L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(102L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(103L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(102L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(101L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(103L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();

        orderDemo.setOrderId(101L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}

控制台打印:

在这里插入图片描述
从控制台打印效果来看,102 订单全部存储在 ID 为 2 的队列当中,并且实现了顺序消费。


3.5 延迟消息

延时消息是 RocketMQ 延时发送给消费者消费的消息,典型应用场景如:订单超时未支付等。其不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18。

等级123456789101112131415161718
延时1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.101:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "helloTopic";
        // 发送消息
        SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Message message =
            new Message(topic, ("延时消息,发送时间:" + cusFormat.format(new Date())).getBytes(Charset.defaultCharset()));
        // 设置消息延时级别
        message.setDelayTimeLevel(3);
        // 发送完成之后会返回响应结果
        SendResult result = producer.send(message);
        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayConsumerGroup");
        // 设置nameSever地址
        consumer.setNamesrvAddr("192.168.63.101:9876");
        // 设置订阅的主题
        consumer.subscribe("helloTopic", "*");
        // 设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                for (MessageExt msg : list) {
                    System.out.println("消费时间:" + cusFormat.format(new Date()) + ",消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动的消费者
        consumer.start();
    }
}

控制台打印:
在这里插入图片描述


3.6 消息过滤

3.6.1 Tag 过滤

RocketMQ 的消息标签(Message Tag)是一种简单的路由机制,允许消费者根据标签来过滤并只消费感兴趣的消息。要实现消息标签的过滤,需要在发送消息时设置标签,并在消费者端配置标签过滤器。以下示例展示如何使用的标签过滤功能:

生产者设置消息标签并发送消息:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("tagProducesGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.101:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "tagFilterTopic";
        // 发送消息
        Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));
        Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));
        Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));
        producer.sendOneway(message1);
        producer.sendOneway(message2);
        producer.sendOneway(message3);
        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

消费者端配置标签过滤器:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");
        // 设置nameSever地址
        consumer.setNamesrvAddr("192.168.63.101:9876");
        // 设置订阅的主题
        consumer.subscribe("tagFilterTopic", "TagA || TagC"); // 只消费 TagA 和 TagC
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动的消费者
        consumer.start();
    }
}

控制台打印:

在这里插入图片描述


3.6.2 SQL92 过滤

RocketMQ 的 SQL92 过滤器是一种基于消息属性的条件筛选机制,允许消费者只接收满足特定条件的消息。要使用 SQL92 过滤器,需要在消费者端设置过滤条件。以下是示例,演示如何在消费者端设置 SQL92 过滤器:

生产者设添加属性 putUserProperty(key, value)

public class Producer {
    public static void main(String[] args) throws Exception {
        // 定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("sqlProducesGroup");
        // 连接nameSever
        producer.setNamesrvAddr("192.168.63.100:9876");
        // 启动生产者
        producer.start();
        // 设置消息发送的目的地
        String topic = "sqlFilterTopic";
        // 发送消息
        Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));
        message1.putUserProperty("age", "22");
        message1.putUserProperty("weight", "45");

        Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));
        message2.putUserProperty("age", "30");
        message2.putUserProperty("weight", "50");

        Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));
        message3.putUserProperty("age", "15");
        message3.putUserProperty("weight", "48");

        producer.sendOneway(message1);
        producer.sendOneway(message2);
        producer.sendOneway(message3);
        System.out.println("消息发送完毕");
        // 关闭资源
        producer.shutdown();
    }
}

消费者设置过滤条件:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");
        // 设置nameSever地址
        consumer.setNamesrvAddr("192.168.63.101:9876");
        // 设置订阅的主题
        consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age<25 and weight<47"));
        // 一个队列对应的一个线程
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动的消费者
        consumer.start();
    }
}

在这里插入图片描述

注意:
① 过滤条件支持以下形式:

数值比较,如:>,>=,<,<=,BETWEEN,=
字符比较,如:=,<>,IN
IS NULL 或者 IS NOT NULL
逻辑符号 AND,OR,NOT

常量支持类型为:
数值,如:**123,3.1415;
字符,如:‘abc’,必须用单引号包裹起来
NULL,特殊的常量
布尔值,TRUE 或 FALSE

② 在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,添加参数enablePropertyFilter=true,重启 broker 代理服务器。

vi /usr/local/rocketmq-4.4/conf/broker.conf

在这里插入图片描述


四、SpringBoot 集成 RocketMQ

4.1 入门案例

1、添加 pom 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

2、生产者模块添加配置

rocketmq:
  name-server: 192.168.63.101:9876
  producer:
    group: my-group

3、消费者模块添加配置

rocketmq:
  name-server: 192.168.63.101:9876

4、生产者模块生产消息

@SpringBootTest
public class RocketMQTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendMsg() {
        Message<String> msg = MessageBuilder.withPayload("发送消息").build();
        rocketMQTemplate.send("helloTopicBoot", msg);
    }
}

注意:① Message 对象是 Spring 框架提供的对象 import org.springframework.messaging.Message;
   ② rocketMQTemplate.send(destination, message) 方法是同步的发送方式。

5、消费者模块消费消息

@Component
// 消费者名字叫 helloConsumerGroup,消费的生产组叫 helloTopicBoot
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot", messageModel = MessageModel.BROADCASTING)
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

4.2 消息发送方式

下文不阐述具体的发送细节,细节参考上文。

4.2.1 同步消息

@Test
public void sendSYNMsg() throws InterruptedException {
    Message<String> msg = MessageBuilder.withPayload("发送同步消息").build();
    rocketMQTemplate.syncSend("helloTopicBoot", msg);
}

4.2.2 异步消息

@Test
public void sendASYNMsg() throws InterruptedException {
    Message<String> msg = MessageBuilder.withPayload("发送异步消息").build();
    rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送状态:" + sendResult.getSendStatus());
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("消息发送失败");
        }
    });
    TimeUnit.SECONDS.sleep(5);
}

4.2.3 一次性消息

@Test
public void sendOnewayMsg() {
    Message<String> msg = MessageBuilder.withPayload("发送一次性消息").build();
    rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}

4.3 消息消费方式

这里模拟生产者发送一次性消息10次:

@Test
public void sendOnewayMsgLoop() {
    for (int i = 0; i < 10; i++) {
        Message<String> msg = MessageBuilder.withPayload("发送一次性消息:" + i).build();
        rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
    }
}

4.3.1 集群模式

默认情况下,消费者采用负载均衡方式消费消息,即采用集群模式,也可以在 @RocketMQMessageListener 注解中设置 messageModel 属性来改变消费模式。

@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",
    messageModel = MessageModel.CLUSTERING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

在这里插入图片描述

控制台打印:

在这里插入图片描述


4.3.2 广播模式

设置广播模式:messageModel = MessageModel.BROADCASTING

@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",
    messageModel = MessageModel.BROADCASTING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        ...
    }
}

控制台打印:

在这里插入图片描述


4.4 顺序消息

生产者设置队列选择器,需要将顺序消息放在同一个队列:

@Test
public void sendOrderlyMsg() {
    // 设置队列的选择器
    // 将需要顺序消费的消息存储到同一个队列中
    rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
            String orderIdStr = (String)o; // 传入参数
            Long orderId = Long.valueOf(orderIdStr);
            int index = (int)(orderId % list.size());
            return list.get(index);
        }
    });
    List<OrderStep> orderSteps = OrderUtil.buildOrders();
    // 发送消息
    for (OrderStep step : orderSteps) {
        Message<String> msg = MessageBuilder.withPayload(step.toString()).build();
        rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot", msg, String.valueOf(step.getOrderId()));
    }
}

消费者默认一个队列是线程并发消费,可以通过设置 consumeMode = ConsumeMode.ORDERLY,将一个消息队列对应消费者的一个线程,以实现顺序消费:

@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot", topic = "orderlyTopicBoot",
    consumeMode = ConsumeMode.ORDERLY) // 设置一个队列对应一个线程
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("当前线程:" + Thread.currentThread().getId() + ",队列ID:" + messageExt.getQueueId() + ",消息内容:"
            + new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

在这里插入图片描述

控制台打印:
在这里插入图片描述


4.5 延时消息

@Test
public void sendDelayMsg() {
    SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	Message<String> msg = MessageBuilder.withPayload("发送延时消息,发送时间:" + cusFormat.format(new Date())).build();
    // 设置延时等级3,这个消息将在 10s 之后发送(详看delayTimeLevel),消息在任务队列里存储,10s 后发送
    // 3000 代表同步等待 3s,若超过 3s 消息队列都没有响应,自动断开链接
    rocketMQTemplate.syncSend("helloTopicBoot", msg, 3000, 3);
}

在这里插入图片描述


4.6 消息过滤

4.6.1 Tag 过滤

生产者生产消息,Topic 和 Tag 以 “:” 分割( “:” 前后不能有空格)

@Test
public void sendTagFilterMsg() {
    Message<String> msg1 = MessageBuilder.withPayload("消息A").build();
    rocketMQTemplate.send("tagFilterBoot:TagA", msg1);

    Message<String> msg2 = MessageBuilder.withPayload("消息B").build();
    rocketMQTemplate.send("tagFilterBoot:TagB", msg2);

    Message<String> msg3 = MessageBuilder.withPayload("消息C").build();
    rocketMQTemplate.send("tagFilterBoot:TagC", msg3);
}

消费者设置过滤条件:

@Component
@RocketMQMessageListener(consumerGroup = "tagFilterConsumerBoot", topic = "tagFilterBoot", 
		selectorExpression = "TagA || TagC") // selectorExpression 过滤条件
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

控制台打印:
在这里插入图片描述


4.6.2 SQL92 过滤

使用 setHeader(String headerName, Object headerValue) 方法设置过滤条件:

@Test
public void sendSQL92FilterMsg() {
    Message<String> msg1 = MessageBuilder.withPayload("美女A").setHeader("age", 22).setHeader("weight", 90).build();
    rocketMQTemplate.send("SQL92FilterBoot", msg1);

    Message<String> msg2 = MessageBuilder.withPayload("美女B").setHeader("age", 20).setHeader("weight", 100).build();
    rocketMQTemplate.send("SQL92FilterBoot", msg2);

    Message<String> msg3 = MessageBuilder.withPayload("美女C").setHeader("age", 25).setHeader("weight", 120).build();
    rocketMQTemplate.send("SQL92FilterBoot", msg3);
}

消费者设置 selectorTypeselectorExpression

@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterConsumerBoot", topic = "SQL92FilterBoot",
        selectorType = SelectorType.SQL92, selectorExpression = "age<25 and weight>90") // 设置
public class Sql92FilterTopicListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

控制台打印:
在这里插入图片描述

注:在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,具体参考 3.6.2


文章参考:Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战+商城双11秒杀+高并发+消息+支付+分布式事物Seata

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2169343.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微服务-流量染色

1. 功能目的 通过设置请求头的方式将http请求优先打到指定的服务上&#xff0c;为微服务开发调试工作提供便利 请求报文难模拟&#xff1a;可以直接在测试环境页面上操作&#xff0c;流量直接打到本地IDEA进行debug请求链路较长&#xff1a;本地开发无需启动所有服务&#xf…

前端常见算法题集

很久没练算法了&#xff0c;打算接下来一段时间每天坚持写题和写题解 这是一篇前端常用算法题集&#xff0c;题目从从简到难&#xff0c;编程语言主要为JavaScript&#xff0c;顺便练习和熟记js的各种方法... 目录 字符串类 1.字符串相加 字符串类 下图为js中常用的字符串方…

神经网络介绍及其在Python中的应用(一)

作者简介&#xff1a;热爱数据分析&#xff0c;学习Python、Stata、SPSS等统计语言的小高同学~ 个人主页&#xff1a;小高要坚强的博客 当前专栏&#xff1a;Python之机器学习 本文内容&#xff1a;神经网络介绍及其在Python中的线性回归应用 作者“三要”格言&#xff1a;要坚…

使用python爬取豆瓣网站?如何简单的爬取豆瓣网站?

1.对python爬虫的看法 首先说说我对python的看法&#xff0c;我的专业是大数据&#xff0c;我从事的工作是java开发&#xff0c;但是在工作之余&#xff0c;我对python又很感兴趣&#xff0c;因为我觉得python是一门很好的语言&#xff0c;第一&#xff1a;它可以用来爬取数据…

fmql之字符驱动设备(2)

例行的点灯来喽。 之前是寄存器读写&#xff0c;现在要学习通过设备树点灯。 dtsled.c 寄存器写在reg 把用到的寄存器写在设备树的led节点的reg属性。 其实还是对寄存器的读写。 &#xff08;不推荐&#xff09; 头文件 #include <linux/kernel.h> #include <li…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-26

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-26 1. LLMs Still Can’t Plan; Can LRMs? A Preliminary Evaluation of OpenAI’s o1 on PlanBench Authors: Karthik Valmeekam, Kaya Stechly, Subbarao Kambhampati LLMs仍然无法规划&#xff1b;LRMs可以…

【通俗易懂】FFT求解全过程,各参数详细解释

在进行FFT全过程讲解之前&#xff0c;小编先给大家解释一下&#xff0c;在FFT中出现的一些参数名词解释。 &#xff08;1&#xff09;采样频率 Fs Fs 1 / 采样间隔 根据奈奎斯特定理&#xff1a;Fs ≥ 最高频率分量的两倍&#xff0c;这样才能避免混叠 &#xff08;2&…

解决macOS安装redis以后不支持远程链接的问题

参考文档:https://blog.csdn.net/qq_37703224/article/details/142542179?spm1001.2014.3001.5501 安装的时候有个提示, 使用指定配置启动: /opt/homebrew/opt/redis/bin/redis-server /opt/homebrew/etc/redis.conf那么我们可以尝试修改这个配置文件: code /opt/homebrew/…

傅里叶级数在机器人中的应用(动力学参数辨识)

B站首发&#xff01;草履虫都能看懂的【傅里叶变换】讲解&#xff0c;清华大学李永乐老师教你如何理解傅里叶变换&#xff0c;辨清美颜和变声原理&#xff0c;&#xff01;&#xff01;_哔哩哔哩_bilibiliB站首发&#xff01;草履虫都能看懂的【傅里叶变换】讲解&#xff0c;清…

AI 智能体 | 手捏素材选题库 Coze Bot,帮你实现无限输出

做自媒体的同学经常遇到的一个痛点就是无限输出&#xff0c;那怎么才能有源源不断的选题呢&#xff1f;那就是搭建一个选题素材库。 下面就为大家介绍一下基于 Coze Bot 快速搭建素材选题库&#xff0c;希望能让大家才思泉涌。 一、流程拆解 日常素材库积累的过程可以描述为…

eslint-plugin-react的使用中,所出现的react版本警告

记一次使用eslint-plugin-react的警告 Warning: React version not specified in eslint-plugin-react settings. See https://github.com/jsx-eslint/eslint-plugin-react#configuration . 背景 我们在工程化项目中&#xff0c;常常会通过eslint来约束我们代码的一些统一格…

汽车总线之----J1939总线

instruction SAE J1939 是由美国汽车工程协会制定的一种总线通信协议标准&#xff0c;广泛应用于商用车&#xff0c;船舶&#xff0c;农林机械领域中&#xff0c;J1939协议是基于CAN的高层协议&#xff0c;我们来看一下两者之间的关系。在J1939 中&#xff0c;物理层和数据链路…

第13讲 实践:设计SLAM系统

设计一个视觉里程计&#xff0c;理解SLAM软件框架如何搭建&#xff0c;理解视觉里程计设计容易出现的问题以及解决方法。 目录 1、工程目标 2、工程框架 3、实现 附录 1、工程目标 实现一个精简版的双目视觉里程计。由一个光流追踪的前端和一个局部BA的后端组成。 2、工程…

asp.net mvc core 路由约束,数据标记DataTokens

》从0自己搭建MVC 》用 asp.net Core web 应用 空web 应用程序 需要配置 mvc服务 、mvc路由 新建 Controller 、Models、Views 》》》core 6 之前版本 vs2022 asp.net Core Web 应用&#xff08;模型-视图-控制器&#xff09; 不需要配置 就是mvc框架 asp.net Core web 应…

从Elasticsearch到RedisSearch:探索更快的搜索引擎解决方案

文章目录 RedisSearch 的关键功能与 ElasticSearch 对比性能对比产品对比 如何使用 Docker 安装 RedisSearch1. 获取 RedisSearch Docker 镜像2. 启动 RedisSearch 容器3. 验证安装 RedisSearch 使用示例1. 连接到 RedisSearch2. 创建索引3. 添加文档4. 执行搜索搜索所有包含 &…

【Geoserver使用】2.26.0版本发布主要内容

文章目录 前言一、GeoServer 2.26.0 版本二、主要内容1.Java17支持2.Docker更新3.搜索改进4.广泛的 MapML 改进4.重写演示请求页面5.栅格属性表扩展6.GeoCSS 改进7.地球静止卫星 AUTO 代码8.labelPoint 功能改进9.改进的矢量图块生成10.GeoPackage QGIS 兼容性改进11.新的图像马…

深度学习—神经网络基本概念

一&#xff0c;神经元 1.生物神经元与人工神经元 1.1神经元是人脑的基本结构和功能单位之一。人脑中有数1000亿个神经元&#xff0c;其功能是接受&#xff08;树突&#xff09;&#xff0c;整合&#xff08;细胞体&#xff09;&#xff0c;传导&#xff08;轴突&#xff09;和…

MySQL --用户管理

文章目录 1.用户1.1用户信息1.2创建用户1.3删除用户1.4修改用户密码 2.数据库的权限2.1给用户授权2.2回收权限 如果我们只能使用root用户&#xff0c;这样存在安全隐患。这时&#xff0c;就需要使用MySQL的用户管理。 1.用户 1.1用户信息 MySQL中的用户&#xff0c;都存储在系…

Spring Cloud 教程(二) | 搭建SpringCloudAlibaba

Spring Cloud 教程&#xff08;二&#xff09; | 搭建SpringCloudAlibaba 前言一、SpringBoot 与 SpringCloud 版本对应关系&#xff1a;二、SpringCloud 与 SpringCloudAlibaba版本对应关系&#xff1a;三、SpringCloudAlibaba版本 与 组件版本 对应关系&#xff1a;四、搭建S…

Django项目配置日志

需求 在Django项目中实现控制台输出到日志文件&#xff0c;并且设置固定的大小以及当超过指定大小后覆盖最早的信息。 系统日志 使用Django自带的配置&#xff0c;可以自动记录Django的系统日志。 可以使用logging模块来配置。下面是一个完整的示例代码&#xff0c;展示了如…