8.18日学习打卡---Spring Cloud Alibaba(五)

news2025/1/10 10:44:07

8.18日学习打卡

目录:

    • 8.18日学习打卡
  • RocketMQ
    • 什么是RocketMQ
    • 生产者和消费者
    • 技术架构
  • RocketMQ安装与配置
    • 环境搭建与测试
    • RocketMQ管理命令
  • RocketMQ发送消息
    • 普通消息
    • 顺序消息之全局消息
    • 顺序消息之局部消息
    • 消费者消费消息
    • 延迟消息
    • 延迟消息代码实现
    • 单向消息
    • 批量消息
    • 过滤消息
    • 过滤消息之Tag过滤
    • 集群消费
    • 广播消息
    • 可视化配置
    • 消息存储
    • 消息查询
    • SpringBoot接入RocketMQ生产者
    • SpringBoot接入RocketMQ消费者

在这里插入图片描述

RocketMQ

什么是RocketMQ

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

应用场景

RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

所拥有的功能

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;
  • Producer、Consumer、队列都可以分布式;
  • Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
  • 能够保证严格的消息顺序;
  • 提供丰富的消息拉取模式;
  • 高效的订阅者水平扩展能力;
  • 实时的消息订阅机制;
  • 亿级消息堆积能力;
  • 较少的依赖。

生产者和消费者

生产者负责生产消息,一般由业务系统负责生产消息,消费者即后台系统,它负责消费消息。
在这里插入图片描述
消息模型(Message Model)

消息模型主要有队列模型和发布订阅模型,RabbitMQ采用的是队列模型,如下图所示
在这里插入图片描述
RocketMQ采用发布订阅模型,模型如图所示:
在这里插入图片描述
主题(Topic)

在这里插入图片描述
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

队列

存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。

技术架构

在这里插入图片描述
RocketMQ架构上主要分为四部分,如上图所示:
在这里插入图片描述
NameServer
NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。

主要包括两个功能:

  • Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。
    路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过* NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。

1、路由发现

2、路由剔除

3、路由注册

4、客户端选择NameServer的策略

Broker
Broker充当着消息的中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括 消费者组消费进度偏移offset、主题、队列等。

Producer
消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列(先选择Broker,再选择队列)进行消息投递,投递的过程支持快速失败并且低延迟。

在这里插入图片描述
Consumer
消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。
在这里插入图片描述
RocketMQ中的消息消费者都是以消费者组(ConsumerGroup)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是相同Topic类型(可以是一个,可以是多个,但是要相同)、并且是相同的tag(可以是一个,可以是多个,但是要相同)(保证订阅关系的一致性)。消费者组使得在消息消费方面,容易实现

RocketMQ安装与配置

环境搭建与测试

下载安装Apache RocketMQ

修改环境变量
打开/etc/profile文件

vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效环境变量
source /etc/profile

解压文件

unzip rocketmq-all-5.1.4-bin-release.zip

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh ./mqbroker -n localhost:9876 &

问题解决
1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module

解决:添加启动参数

修改runbroker文件,添加红色参数 $JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@

2、堆空间初始值太大也报错

修改文件runbroker.sh
在这里插入图片描述

在这里插入图片描述

通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,修改完成后便可以正常启动以及查看日志。

RocketMQ管理命令

在这里插入图片描述
Topic命令
1、updateTopic
作用:修改或创建一个Topic

命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]

参数:

-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息
-o: 设置topic是否为有序的 取值:true、false(默认)
-p: 设置topic的权限

示例:
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic
作用:从broker和nameserver删除topic

命令:mqadmin deleteTopic -c [-h] [-n ] -t

参数:

-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息

示例:

mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic

3、topicList
作用:从nameserver列出所有topic

命令:mqadmin topicList [-c] [-h] [-n ]

参数:

-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-h: 打印help信息

示例:

mqadmin topicList -n localhost:9876

4、topicStatus
作用:检查topic的状态信息

命令:mqadmin topicStatus [-h] [-n ] -t

参数:

-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息

示例

mqadmin topicStatus -n localhost:9876 -t testtopic

5、cleanUnusedTopic
作用:清理未使用的topic

命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]

参数:

-n: name server地址列表
-b: broker地址
-c: 集群名称
-h: 打印help信息

关闭namesrv和broker服务

mqshutdown namesrv
mqshutdown broker

RocketMQ发送消息

普通消息

在这里插入图片描述
创建jjy-rocketmq工程

添加依赖

在pom.xml文件中添加依赖

  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>5.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.6</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>

配置文件

spring:
  application:
  # 应用名字
   name: springboot_rocketmq_producer

测试类

package com.jjy.sendmessage;


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;


import java.nio.charset.StandardCharsets;
import java.util.List;


/**
 * 1、发送普通消息
 * 普通消息,也叫并发消息,是发送效率最高、使用场景最多的一类消息。
 */


@Slf4j
public class SimpleTest {


  @Test
  public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
    producer.setNamesrvAddr("192.168.47.100:9876");
    producer.start();
    for (int i = 0; i < 10; i++) {
      Message message = new Message("TestTopic", "Tags", (i + "_syncProducer").getBytes(StandardCharsets.UTF_8));
      SendResult send = producer.send(message);
      System.out.println(i + "消息发送成功:" + send);
     }
    producer.shutdown();
   }




  @Test
  public void consumer() throws MQClientException, InterruptedException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simpleconsmer");
    consumer.setNamesrvAddr("192.168.47.100:9876");
    consumer.subscribe("TestTopic","*");
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        list.forEach( n -> {
          System.out.println("消费成功" + n);
         });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    Thread.sleep(Long.MAX_VALUE);
   }


}


顺序消息之全局消息

在这里插入图片描述

什么是顺序消息
消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。

举个容易理解的例子:

通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 ->
订单完成】。在创建完订单后,会发送五条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单发货 -> 订单配送
-> 订单完成】这个顺序去消费,这样的订单才是有效的。

顺序消息的原理

、

在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

全局顺序消息
全局顺序消息的话,我们需要将所有消息都发送到同一个队列,然后消费者端也订阅同一个队列,这样就能实现顺序消费消息的功能。下面通过一个示例说明如何实现全局顺序消息。

顺序消息之局部消息

在这里插入图片描述
生产者发送消息

public class OrderMQProducer {
  public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
    // 创建DefaultMQProducer类并设定生产者名称
    DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
    // 设置NameServer地址,如果是集群的话,使用分号;分隔开
    mqProducer.setNamesrvAddr("10.0.90.86:9876");
    // 启动消息生产者
    mqProducer.start();
 
    List<Order> orderList = getOrderList();
 
    for (int i = 0; i < orderList.size(); i++) {
      String body = "【" + orderList.get(i) + "】订单状态变更消息";
      // 创建消息,并指定Topic(主题),Tag(标签)和消息内容
      Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));
 
      // MessageQueueSelector: 消息队列选择器,根据业务唯一标识自定义队列选择算法
      /**
       * msg:消息对象
       * selector:消息队列的选择器
       * arg:选择队列的业务标识,如本例中的orderId
       */
      SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
 
        /**
         * @param mqs 队列集合
         * @param msg 消息对象
         * @param arg 业务标识的参数,对应send()方法传入的第三个参数arg
         * @return
         */
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
          //根据arg(实际上是订单id)选择消息发送的队列
          long index = (Long) arg % mqs.size();
          return mqs.get((int) index);
         }
        //mqProducer.send()方法第三个参数, 会传递到select()方法的arg参数
       }, orderList.get(i).getOrderId());
 
      System.out.println(String.format("消息发送状态:%s, orderId:%s, queueId:%d, body:%s",
          sendResult.getSendStatus(),
          orderList.get(i).getOrderId(),
          sendResult.getMessageQueue().getQueueId(),
          body));
     }
    // 如果不再发送消息,关闭Producer实例
    mqProducer.shutdown();
   }
 
  /**
   * 订单状态变更流程: ORDER_CREATE(订单创建) -> ORDER_PAYED(订单已支付) -> ORDER_COMPLETE(订单完成)
   */
  public static List<Order> getOrderList() {
    List<Order> orderList = new ArrayList<>();
    Order orderDemo = new Order();
    orderDemo.setOrderId(1L);
    orderDemo.setOrderStatus("ORDER_CREATE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(2L);
    orderDemo.setOrderStatus("ORDER_CREATE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(1L);
    orderDemo.setOrderStatus("ORDER_PAYED");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(2L);
    orderDemo.setOrderStatus("ORDER_PAYED");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(2L);
    orderDemo.setOrderStatus("ORDER_COMPLETE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(3L);
    orderDemo.setOrderStatus("ORDER_CREATE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(4L);
    orderDemo.setOrderStatus("ORDER_CREATE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(3L);
    orderDemo.setOrderStatus("ORDER_PAYED");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(1L);
    orderDemo.setOrderStatus("ORDER_COMPLETE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(3L);
    orderDemo.setOrderStatus("ORDER_COMPLETE");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(4L);
    orderDemo.setOrderStatus("ORDER_PAYED");
    orderList.add(orderDemo);
 
    orderDemo = new Order();
    orderDemo.setOrderId(4L);
    orderDemo.setOrderStatus("ORDER_COMPLETE");
    orderList.add(orderDemo);
 
    return orderList;
   }
 
}
 
public class Order implements Serializable {
 
  /**
   * 订单ID
   */
  private Long orderId;
 
  /**
   * 订单状态
   */
  private String orderStatus;
 
  public Long getOrderId() {
    return orderId;
   }
 
  public void setOrderId(Long orderId) {
    this.orderId = orderId;
   }
 
  public String getOrderStatus() {
    return orderStatus;
   }
 
  public void setOrderStatus(String orderStatus) {
    this.orderStatus = orderStatus;
   }
 
  @Override
  public String toString() {
    return "Order{" +
        "orderId=" + orderId +
        ", orderStatus='" + orderStatus + '\'' +
        '}';
   }
}

消费者消费消息

public class OrderMQConsumer {
  public static void main(String[] args) throws MQClientException {
 
    // 创建DefaultMQPushConsumer类并设定消费者名称
    DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
 
    // 设置NameServer地址,如果是集群的话,使用分号;分隔开
    mqPushConsumer.setNamesrvAddr("192.168.47.100:9876");
 
    // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
    // 如果不是第一次启动,那么按照上次消费的位置继续消费
    mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
    // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
    mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");
 
    // 注册回调实现类来处理从broker拉取回来的消息
    // 注意:顺序消息注册的是MessageListenerOrderly监听器
    mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {
        consumeOrderlyContext.setAutoCommit(true);
 
        for (MessageExt msg : msgList) {
          // 每个queue有唯一的consume线程来消费, 订单对每个queue都是分区有序
          System.out.println("消费线程=" + Thread.currentThread().getName() +
              ", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
         }
 
        try {
          TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
 
        // 标记该消息已经被成功消费
        return ConsumeOrderlyStatus.SUCCESS;
       }
     });
 
    // 启动消费者实例
    mqPushConsumer.start();
   }
}

延迟消息

什么是延迟消息
延迟消息顾名思义不是用户能立即消费到的,而是等待一段特定的时间才能收到。

举例如下场景比较适合使用延时消息:

订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

在RocketMQ中使用延迟消息
不像其他延迟消息的实现,客户端可以自定义延迟时间,而RocketMQ则不支持任意时间的延迟,它提供了18个级别(延迟时间选择)。

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

在这里插入图片描述

延迟消息代码实现



/**
 *
 * 3、发送延时消息
 * 生产者发送消息后,消费者在指定时间才能消费消息,这类消息
 * 被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级
 * 别,目前开源版本支持18个延迟级别:Broker在接收用户发送的消息
 * 后,首先将消息保存到名为SCHEDULE_TOPIC的Topic中。此时,
 * 消费者无法消费该延迟消息。然后,由Broker端的定时投递任务定时
 * 投递给消费者。 1s  5s  10s  30s  1m 2m 3m 4m 5m 6m 7m 8m 9m
 */
public class ScheduleTest {




  @Test
  public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    // 1、初始化生产者,配置生产者参数,启动生产者
    DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
    producer.setNamesrvAddr("192.168.47.100:9876");
    producer.start();
    for (int i = 0; i < 5; i++) {
      for (int j = 0; j < 10; j++) {
        // 2、初始化消息对象  topic名字   tags: 消息过滤词
        Message message = new Message("Schedule","Tags",(i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8));
        //1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        //message.setDelayTimeLevel(1);
        //十秒之后发送
        message.setDelayTimeMs(10000L);
        producer.send(message);
        System.out.println("消息发送成功_"+ LocalTime.now());
       }
     }
    producer.shutdown();
   }




  @Test
  public void consumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumer");
    consumer.setNamesrvAddr("192.168.47.100:9876");
    consumer.subscribe("Schedule","*");
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (int i = 0; i < list.size(); i++) {
          System.out.println(i+"_消息消费成功_"+ LocalTime.now());
         }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    System.out.println("consumer started.%n");
   }
}


单向消息

什么是单向消息
单向消息的生产者只管发送过程,不管发送结果。

单向消息应用场景:

主要用于日志传输等消息允许丢失的场景。

代码示例

package com.jjy.sendmessage;


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;


import java.nio.charset.StandardCharsets;


/**
 * 5、发送单向消息
 * 单向消息的生产者只管发送过程,不管发送结果。单项消息主要
 * 用于日志传输等消息允许丢失的场景
 */
public class OnewayTest {


  @Test
  public void Producer() throws MQClientException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
    producer.setNamesrvAddr("192.168.47.100:9876");
    producer.start();
    for (int i = 0; i < 2; i++) {
      Message message = new Message("Simple","Tags",(i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
      producer.sendOneway(message);
      System.out.printf(i+"_消息发送成功%n");
     }
    producer.shutdown();
   }
}

批量消息

什么是批量消息
Rocket MQ的批量消息可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,是一种优化消息传输和处理的有效手段。

批量消息的特点
批量消息具有相同的topic
批量消息不支持延迟消息
批量消息的大小不超过4M(4.4版本之后要求不超过1M)

批量消息的使用场景

  • 消息的吞吐能力和处理效率:通过将多条消息打包成一批进行发送,可以减少网络传输开销和消息处理的时间,从而提高整体的消息处理效率。
  • 下游系统的API调用频率:通过将多条消息合并成一条批量消息进行发送,可以减少下游系统接收和处理消息的次数,从而降低API调用频率,减轻下游系统的负载压力。

代码示例

    DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
    producer.setNamesrvAddr("192.168.47.100:9876");
    producer.start();
    List<Message> messages = new ArrayList<>();
    for (int i = 0; i < 100000; i++) {
      Message message = new Message("BatchTest","Tags",("OrderID" + i).getBytes(StandardCharsets.UTF_8));
      messages.add(message);
     }
    producer.send(messages);
    producer.shutdown();

生产者最佳实践总结

相对消费者而言,生产者的使用更加简单,一般主要关注消息类型、消息发送方法即可正常使用RocketMQ发送消息。

常用消息类型

消息类型优点缺点备注
普通消息 (并发消息)性能最好。单机 TPS 的级别为100000消息的生产和消费都无序大部分场景适用
分区有序消息单分区中消息有序,单机发送 TPS万级别单点问题。如果 Broker宕机则会导致发送失败大部分有序消息场景适用
全局有序消息类似传统的 Queue,全部消息有序,单机发送 TPS 千级别单点问题。如果 Broker宕机则会导致发送失败极少场景使用
延迟消息RocketMO 自身支持,不需要额使用组件,支持延迟特性不能根据任意时间延迟,使用范围受限。Broker 随着延迟级别增大支持越多,CPU压力越大,延迟时间不准确非精确、延迟级别不多的场景,非常方便使用
事务消息RocketMQ 自身支持,不需要额外使用组建支持事务特性RocketMO 事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效简单事务处理可以使用

常用的发送方法

发送方法优点缺点备注
send(Message msg) 同步最可靠性能最低适用于高可靠场景
send(Message msg,MessageQueue mq)可以发送顺序消息单点故障后不可用适用于顺序消息
send(Message msg,MessageQueueSelectorselector, Object arg)队列选择方法最灵活比较低级的接口,使用有门槛特殊场景
sendOneway(Message msg)使用最方便消息有丢失风险适用于对消息丢失不敏感的场景
send(Collection msgs)效率最高发送失败后降级比较困难适用于特殊场景

过滤消息

在这里插入图片描述
你去市场买菜,阿姨给你一个二维码付款,支持某信、某付宝、某银联App等。假如你被要求设计这个二维码的功能,在一个支付请求进入Topic后,如果你只需要处理某付宝的消息,你的代码得这么写:

boolean receiveMessage(Message msg)
{
  String from = msg.getProperty("from");
  if ( "某付宝".equals(from) ){
    callApiXbao(msg);
   }else{
    log.error("请求异常" + JSON.toJsonString(msg));
   }
}

由于阿姨的生意非常好,每天有上亿的支付订单,你的应用程序要处理某信、某付宝、某银联App全部的支付消息,这会导致消费效率低下,我们有什么办法,可以只消费某付宝的支付消息呢?

解决:

RocketMQ 设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带。

消息过滤分类
RocketMQ 版支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:

对比项Tag标签过滤SQL属性过滤
过滤目标消息的Tag标签。消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。
过滤能力精准匹配。SQL语法匹配。
适用场景简单过滤场景、计算逻辑简单轻量。复杂过滤场景、计算逻辑较复杂。

应用场景
在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。使用消息队列 RocketMQ 版的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。

过滤消息之Tag过滤

在这里插入图片描述
什么是Tag过滤
Tag 过滤是最简单的一种过滤方法,通常 Tag 可以用作消息的业务标识。可以设置 Tag 表达式,判断消息是否包含这个 Tag。

场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:

  • 订单消息

  • 支付消息

  • 物流消息
    这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的下游系统所订阅:

  • 支付系统:只需订阅支付消息。

  • 物流系统:只需订阅物流消息。

  • 交易成功率分析系统:需订阅订单和支付消息。

  • 实时计算系统:需要订阅所有和交易相关的消息。
    过滤效果如下图所示:

在这里插入图片描述
代码示例

/**
   * 过滤消息生产者-tag方式
   * @throws MQClientException
   * @throws MQBrokerException
   * @throws RemotingException
   * @throws InterruptedException
   */
  @Test
  public void tagproduce() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("FilterTagProducer");
    producer.setNamesrvAddr("192.168.47.100:9876");
    producer.start();
    String[] tags = new String[] {"TagA","TagB","TagC"};
    for (String tag : tags) {
      Message message = new Message("Filter",tag,(tag+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
      SendResult sendResult = producer.send(message);
     }
    producer.shutdown();
   }




  /**
   * 过滤消息生产者-tag方式
   * @throws MQClientException
   */
  @Test
  public void tagconsumer () throws MQClientException, InterruptedException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
    consumer.setNamesrvAddr("192.168.47.100:9876");
    consumer.subscribe("Filter","TagA || TagC");
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (int i = 0; i < list.size(); i++) {
          System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
         }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    Thread.sleep(Integer.MAX_VALUE);
   }


集群消费

什么是集群消费

同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

代码示例

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
    consumer.setNamesrvAddr("192.168.47.100:9876");
    consumer.subscribe("SimpleMessageTest","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (int i = 0; i < list.size(); i++) {
          System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
         }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    Thread.sleep(Integer.MAX_VALUE);

广播消息

在这里插入图片描述

什么是广播消费
当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

示例代码

 @Test
  public void consumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
    consumer.setNamesrvAddr("192.168.47.100:9876");
    consumer.subscribe("Simple","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (int i = 0; i < list.size(); i++) {
          System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
         }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    System.out.println("consumer started.%n");
   }

可视化配置

在这里插入图片描述

下载地址
https://www.redisant.cn/rocketmq?spm=a2c6h.12873639.article-detail.7.b9ee4f500GjzZr

登录
在这里插入图片描述
实时查看您的 RocketMQ 健康指标
在这里插入图片描述
支持丰富的数据格式
RocketMQ Assistant 会自动识别并格式化不同的数据格式,包括Text、JSON、XML、YAML、HEX、MessagePack,以及各种整数、浮点类型。
快速查看与发布消息

可以从主题的最开始、指定时间戳或指定偏移处开始消费消息、过滤消息;发布消息时配合数据模板一次发送数千条消息进行性能测试,以了解系统如何处理负载
在这里插入图片描述
实时查看主题的消息情况
使用 RocketMQ Assistant,您可以快速查看并更新Topic配置;管理消费者组,重置偏移量,或者查看它们订阅的主题与分区。
在这里插入图片描述
查看消费者组

查看组内的每个消费者订阅的主题与队列,以及当前消费位置和延迟;支持跳过消息堆积、根据时间戳重置偏移量。
在这里插入图片描述
消息轨迹

根据消息ID或消息Key追踪消息,了解消息从生产、存储到消费的详细过程。
在这里插入图片描述

消息存储

关系型数据库
ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。

文件
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。
在这里插入图片描述
性能对比
文件系统 > 关系型数据库DB

消息查询

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

//返回结果
SendResult [
sendStatus=SEND_OK, 
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], 
queueOffset=0]
  • 按MessageId查询消息

Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址、偏移信息,并且会把Message Id作为结果的一部分返回。Message Id中属于精确匹配,代表唯一一条消息,查询效率更高。

  • 按照Message Key查询消息

消息的key是开发人员在发送消息之前自行指定的,通常把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。

  • 按照Unique Key查询消息

除了开发人员指定的消息key,生产者在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。

消息在消息队列RocketMQ中存储的时间默认为3天(不建议修改),即只能查询从消息发送时间算起3天内的消息,三种查询方式的特点和对比如下表所述:

查询方式查询条件查询类别说明
按Message IDMessage ID精确查询根据Message ID可以精确定位任意一条消息,获取消息的属性。
按Message KeyTopic+Message Key模糊查询根据Topic和Message Key可以匹配到包含指定Key的最近64条消息。注意 建议消息生产方为每条消息设置尽可能唯一的Key,以确保相同的Key的消息不会超过64条,否则消息会漏查。
按Unique KeyUnique Key精确查询与Message Id类似,匹配唯一一条消息,如果生产端重复发送消息,则有可能匹配多条,但它们的unique key是唯一的。

推荐查询过程
在这里插入图片描述

SpringBoot接入RocketMQ生产者

添加maven依赖

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.2.3</version>
</dependency>

添加生产者spring配置

rocketmq:
 # nameserver地址
  name-server: 192.168.47.100:9876
  producer:
  # 生产组
   group: my-group1
  # 发送消息超时时间
   send-message-timeout: 300000

添加生产者代码
生产者rocketMQTemplate会根据配置的Namesrv地址自动生成一个bean注入spring容器中,我们在使用的时候直接添加@Resource 或者@Autowired 注解即可。当前的版本支持直接发送一个对象或字符串,RocketMQ 使用 JSON 作为序列化方式进行传输。

package com.jjy.produce;


import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


@Service
public class MessageProduce {


  @Autowired
  private RocketMQTemplate rocketMQTemplate; // 直接注入生产者


  @Value("${demo.rocketmq.topic}")
  private String topic;


  /**
   * 发送消息
   * @param topic 主题
   * @param message 消息
   * @return
   */
  public SendResult sendMessage( String message){
    return rocketMQTemplate.syncSend(topic, message);
   }
}

RocketMQ SpringBoot 3.0不兼容解决方案

报错信息

*************************** APPLICATION FAILED TO START *************************** 
Description: 
Field rocketMQTemplate in com.dayuwebtech.dayupay.common.rocketmq.impl.SendPayReNotifyMessageImpl required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found. 
The injection point has the following annotations:  
- @org.springframework.beans.factory.annotation.Autowired(required=true)  
Action:  Consider defining a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' in your configuration.

在这里插入图片描述

我们要在resources文件夹中,新建META-INF/spring文件夹,在里面新建一个叫org.springframework.boot.autoconfigure.AutoConfiguration.imports的文件里面填入 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

SpringBoot接入RocketMQ消费者

消费代码
消费者主要使用RocketMQMessageListener接口进行监听配置。

package com.jjy.consumer;


import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;


@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}",
    consumerGroup = "${demo.rocketmq.consumer}",
    // 过滤方式,默认为Tag过滤。
    selectorType = SelectorType.TAG,
    //消费模式,有顺序消费、并发消费。
    consumeMode = ConsumeMode.ORDERLY,
    // 消息模式,有集群消费、广播消费
    messageModel = MessageModel.CLUSTERING,
    //  default″*:过滤值,默认 为全部消费,不过滤。
    selectorExpression = "a"
)
public class MessagConsumer implements RocketMQListener<String> {




  @Override
  public void onMessage(String s) {
    System.out.println(s);
   }
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2051745.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HarmonyOS】云开发-用户自动认证

背景 华为云服务提供了统一认证的云服务&#xff0c;支持手机、邮箱等自定义登录服务&#xff0c;并且提供了免费使用的额度&#xff0c;这样子方便中小企业或者项目快速的开发工作。下面是支持的认证方式&#xff1a; 操作步骤 1.AGC(AppGallery Connect)创建项目 在AGC界…

C++ | Leetcode C++题解之第342题4的幂

题目&#xff1a; 题解&#xff1a; class Solution { public:bool isPowerOfFour(int n) {return n > 0 && (n & (n - 1)) 0 && n % 3 1;} };

zabbix监控进程、日志、主从状态和主从延迟

zabbix监控进程、日志、主从状态和主从延迟 监控进程1、下载服务2、编写脚本3、编写zabbix_agentd.conf4、新建监控项配置触发器5、查看邮件 监控日志1、上传log.py的2、编写zabbix_agentd.conf3、新建监控项配置触发器 监控数据库主从状态1、编写/etc/hosts&#xff08;master…

IOS 09 R.swift框架和使用方法

R.swift框架主要是实现通过类字段访问字符串&#xff0c;图片&#xff0c;等资源&#xff1b;类似Android那边通过R类访问&#xff0c;好处是有提示&#xff0c;如果缺少资源&#xff0c;直接就是编译错误&#xff1b;OC类似的功能叫R.objc。 添加依赖 添加依赖 #将资源&…

第八周:机器学习笔记

第八周机器学习笔记 摘要Abstract机器学习1. 鱼和熊掌和可兼得的机器学习1.1 Deep network v.s. Fat network 2. 为什么用来验证集结果还是不好&#xff1f; Pytorch学习1. 卷积层代码实战2. 最大池化层代码实战3. 非线性激活层代码实战 总结 摘要 本周学习对李宏毅机器学习视…

AI学习记录 - Word2Vec 超详细解析

创作不易&#xff0c;点个赞 我们有一堆文本&#xff0c;词汇拆分 sentences ["jack like dog", "jack like cat", "jack like animal","dog cat animal", "banana apple cat dog like", "dog fish milk like"…

URP平面阴影合批处理 shadow

闲谈 相信大家在日常工作中发现了一个问题 &#xff0c; urp下虽然可以做到3个Pass 去写我们想要的效果&#xff0c;但是&#xff0c;不能合批&#xff08;不能合批&#xff0c;那不是我们CPU要干冒烟~&#xff01;&#xff09; 好家伙&#xff0c;熊猫老师的偏方来了 &#x…

【数值方法-Python实现】Crout分解+追赶法实现

涉及Crout分解、追赶法的线性方程组求解方法的Python实现。 原文链接&#xff1a;https://www.cnblogs.com/aksoam/p/18366119 Codes def CroutLU(A:np.ndarray)->Tuple[np.ndarray,np.ndarray]:"""Crout LU分解算法,ALUinput:A: (n,n) np.ndarray,方阵out…

DrissionPage自动化获取城市数据内容

一、获取页面内容 二、最终结果 上海市 约收录140个指标 查看98075次 人均GDP 153299元 公交车 17899辆 户籍人口 1469.3万人 三、代码 from DrissionPage._pages.chromium_page import ChromiumPage import time page ChromiumPage() page.get(https://www.swguancha.com/…

【Delphi】中多显示器操作基本知识点

提要&#xff1a; 目前随着计算机的发展&#xff0c;4K显示器已经逐步在普及&#xff0c;笔记本的显示器分辨率也都已经超过2K&#xff0c;多显示器更是普及速度很快。本文介绍下Delphi中操作多显示器的基本知识点&#xff08;Windows系统&#xff09;&#xff0c;这些知识点在…

UniFab 是一款由人工智慧驅動的視訊增強器+ crack

UniFab 是一款功能强大的视频处理工具,包括 10 个基于 AI 的功能。使用 UniFab,您可以提高视频和音频质量、将视频转换为不同的格式、根据自己的喜好编辑视频等等。以下是适用于 Windows 的 UniFab 程序的简要说明: 视频转换器。UniFab 支持 1000 多种视频格式的转换,包括 …

构建自己的图数据集

代码&#xff1a; import warnings warnings.filterwarnings("ignore") import torch from torch_geometric.data import Datax torch.tensor([[2,1],[5,6],[3,7],[12,0]],dtypetorch.float) y torch.tensor([0,1,0,1],dtypetorch.float)#定义边 edge_index torc…

⌈ 传知代码 ⌋ DETR[端到端目标检测]

&#x1f49b;前情提要&#x1f49b; 本文是传知代码平台中的相关前沿知识与技术的分享~ 接下来我们即将进入一个全新的空间&#xff0c;对技术有一个全新的视角~ 本文所涉及所有资源均在传知代码平台可获取 以下的内容一定会让你对AI 赋能时代有一个颠覆性的认识哦&#x…

Leetcode3232. 判断是否可以赢得数字游戏

Every day a Leetcode 题目来源&#xff1a;3232. 判断是否可以赢得数字游戏 解法1&#xff1a;3232. 判断是否可以赢得数字游戏 用一个 sum1 统计个位数的和&#xff0c;sum2 统计十位数的和。 只要 sum1 和 sum2 不相等&#xff0c;Alice 拿大的就能赢得这场游戏。 代码…

【论文阅读】HuatuoGPT-II, One-stage Training for Medical Adaption of LLMs

总体概要 本文深入探讨了一款专为医疗领域设计的大规模语言模型——HuatuoGPT-II的创新、性能与应用。HuatuoGPT-II采用统一的单阶段训练流程&#xff0c;将传统的继续预训练和监督微调整合&#xff0c;有效解决了医疗数据的异质性问题&#xff0c;包括语言、体裁和格式差异&a…

【STM32单片机_(HAL库)】3-2-1【中断EXTI】【电动车报警器项目】继电器定时开闭

1.硬件 STM32单片机最小系统继电器模块 2.软件 继电器模块alarm驱动文件添加GPIO常用函数main.c程序 #include "sys.h" #include "delay.h" #include "led.h" #include "alarm.h"int main(void) {HAL_Init(); …

硬件面试经典 100 题(71~90 题)

71、请问下图电路的作用是什么&#xff1f; 该电路实现 IIC 信号的电平转换&#xff08;3.3V 和 5V 电平转换&#xff09;&#xff0c;并且是双向通信的。 上下两路是一样的&#xff0c;只分析 SDA 一路&#xff1a; 1&#xff09; 从左到右通信&#xff08;SDA2 为输入状态&…

同一台电脑同时连接使用Gitee(码云)和Github

1、添加对应的密钥 ssh-keygen -t rsa -C "your_emailexample.com" -f ~/.ssh/github_id-rsa //生成github秘钥 ssh-keygen -t rsa -C "your_emailexample.com" -f ~/.ssh/gitee_id-rsa //生成码云秘钥 2、在 ~/.ssh 文件里会生成对应的文件 文件夹里会…

[k8s源码]12.远程调试dlv

在Windows/Mac宿主机上&#xff0c;使用GoLand的IDE进行开发&#xff0c;但是如何将这些代码直接运行在k8s集群中并看到运行效果呢&#xff0c;这里有一个远程调试工具dlv。 图中展示了dlv的工作方式。GoLand IDE中包含Editor(编辑器)和Debugger(调试器)组件&#xff0c;其中De…

深度学习基础之前馈神经网络

目录 基本结构和工作原理 神经元和权重 激活函数 深度前馈网络 应用场景 优缺点 深度前馈神经网络与卷积神经网络&#xff08;CNN&#xff09;和循环神经网络&#xff08;RNN&#xff09;的具体区别和联系是什么&#xff1f; 具体区别 联系 如何有效解决前馈神经网络…