SpringCloudAlibaba:消息驱动之RocketMQ学习

news2025/1/11 20:59:50

目录

一、MQ简介

(一)什么是MQ

(二)MQ的应用场景

1、异步解耦

2、流量削峰

(三)常见的MQ产品

二、RocketMQ入门

(一)RocketMQ安装部署

1、环境要求

2、下载RocketMQ

3、安装RocketMQ

4、启动RocketMQ

5、测试RocketMQ

6、关闭RocketMQ

(二)RocketMQ控制台安装与启动

下载并解压

三、springcloud集成rocketmq

(一)产品微服务-发送消息

1、pom添加依赖

2、application.yml配置

3、controller发送消息

(二)用户微服务-订阅消息

1、pom添加依赖

2、application.yml配置

3、消息接收服务

4、测试

(三)控制台

四、不同类型的消息发送与接收

(一)普通消息

1、可靠同步发送(sync)

2、可靠异步发送(async)

3、单向发送(oneway)

4、三种发送方式的对比

5、发消息代码案例

(二)顺序消息

(三)广播模式

(四)延时消息

(五)批量消息

(六)过滤消息

(七)事务消息

(八)消息消费要注意的细节


一、MQ简介

(一)什么是MQ

MQ(Message Queue <消息队列>)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

(二)MQ的应用场景

1、异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。 但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。


所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。

2、流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

秒杀处理流程如下所述:

  • 用户发起海量秒杀请求到秒杀业务处理系统。

  • 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。

  • 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。

  • 用户收到秒杀成功的通知。

(三)常见的MQ产品

ZeroMQ、RabbitMQ、ActiveMQ、RocketMQ、Kafka

二、RocketMQ入门

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。

(一)RocketMQ安装部署

接下来我们先在linux平台下安装一个RocketMQ的服务

1、环境要求

  • Linux 64位操作系统

  • 64bit JDK 1.8+

2、下载RocketMQ

Release Notes - Apache RocketMQ - Version 4.9.5 | RocketMQ

下载时要注意与springcloudalibaba版本匹配 版本说名链接地址

3、安装RocketMQ

  1. 上传文件到Linux系统

  2. 解压到安装目录

    [root@bogon RocketMQ]# unzip rocketmq-all-4.9.5-bin-release.zip
    [root@bogon RocketMQ]# ll
    total 32136
    drwxr-xr-x. 6 root root      103 Mar 27 14:47 rocketmq-all-4.9.5-bin-release
    -rw-r--r--. 1 root root 32906177 May 24 10:22 rocketmq-all-4.9.5-bin-release.zip
  3. 修改RocketMQ启动配置

    bin 下的 3 个配置文件不然会报insufficient memory:

    1)runserver.sh

    vi runserver.sh 
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    2)runbroker.sh

    vi runbroker.sh
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

    3)tools.sh

    vi tools.sh
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
  4. 开启自动动创建Topic功能 在conf/broker.conf⽂件中加⼊如下配置,开启自动动创建Topic功能。

    autoCreateTopicEnable=true

4、启动RocketMQ

  1. 启动NameServer 执行命令启动NameServer

    ## 创建日志目录 
    cd bin
    mkdir logs
    ​
    # nohup ./mqnamesrv &:属于后台以静默⽅式启动
    # ./mqnamesrv:属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    ​
    nohup ./mqnamesrv > logs/mqnamesrv.out 2>1 &

    查看启动状态,在当前目录下会有一个nohup.out的日志文件,可以打开查看。

    ## 查看日志
    tail -f logs/mqnamesrv.out
    ​
    ## 看到以下表示启动成功
    The Name Server boot success. serializeType=JSON

    解决报错

    ## 报错
    ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
    ​
    ## 解决 配置jdk环境变量
    # 需要 export 环境变量

  2. 启动Broker

    同样进入 RocketMQ 安装目录下的 /bin目录进行操作 执行启动命令,并且常驻内存,注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问。

    # 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
    # nohup ./mqbroker -n 192.168.109.149:9876 & :属于后台以静默⽅式启动
    # sh ./mqbroker -n 92.168.109.149:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    ​
    nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &

    查看启动状态,启动之后同样提示将日志信息追加到了当前目录下的nohup.out文件中。

    ## 查看日志
    tail -f logs/mqbroker.out
    ​
    ## 看到以下表示启动成功
    The broker[linux1, 192.168.109.149:10911] boot success. serializeType=JSON and name server is 192.168.109.149:9876

    解决没反应:

    #删除/root/store/*
    cd /root/store
    rm -rf *
    ​
    # 重新启动broker
    nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &

5、测试RocketMQ

发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器的位置,RocketMQ 提供了多种方法来实现这一点.

编程方式,如:producer.setNamesrvAddr(“ip:port”) Java 选项,如:rocketmq.namesrv.addr 环境变量,如:NAMESRV_ADDR HTTP 端点

  1. 测试消息发送

    [root@linux1 rocketmq]# export NAMESRV_ADDR=localhost:9876
    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:19:05.806 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F379FA0000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=3], queueOffset=0]
    ......
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F382B603E7, offsetMsgId=AC11000100002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=2], queueOffset=249]
    16:19:08.609 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[172.17.0.1:10911] result: true
    16:19:08.631 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
  2. 测试消息接收

    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:21:15.395 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    Consumer Started.
    ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=201, queueOffset=1, sysFlag=0, bornTimestamp=1659601146477, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146478, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000057F, commitLogOffset=1407, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275866, UNIQ_KEY=AC11000176396FF3C5B512F37A6D0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]] 
    ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=202, queueOffset=2, sysFlag=0, bornTimestamp=1659601146500, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146501, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000008A4, commitLogOffset=2212, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275867, UNIQ_KEY=AC11000176396FF3C5B512F37A84000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 49], transactionId='null'}]] 

    消息发送完毕之后就会退出,在同一窗口中可以使用消费者类来进行接收消息,消费是多线程的。

6、关闭RocketMQ

与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv

./mqshutdown broker
./mqshutdown namesrv

(二)RocketMQ控制台安装与启动

下载并解压

下载地址:Tags · apache/rocketmq-externals · GitHub

下载rocketmq-console-1.0.0:https://codeload.github.com/apache/rocketmq-externals/zip/refs/tags/rocketmq-console-1.0.0

2、将压缩包放到你平时放项目的文件夹中,解压到当前文件夹

3、点进去,选中rocketmq-console

4、进入idea,找到解压的文件夹

5、选中下面 rocketmq-console ,点击ok,打开

6、然后点击 Trust Object(可信的实体),再点击建一个新的窗口,进去之后找到配置文件:按图修改

7、运行启动类,访问访问控制台 http://127.0.0.1:8080

 

三、springcloud集成rocketmq

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

(一)产品微服务-发送消息

1、pom添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

2、application.yml配置

rocketmq:
  name-server: 虚拟机ip.XXX.XXX.XXX:9876
  producer:
    group: product-server # 生产者消息分组

3、controller发送消息

@RestController
@RequestMapping("/mq")
public class RocketMQController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public void send(){
        rocketMQTemplate.convertAndSend("product-topic", "hello world from repository!!");
    }
}

(二)用户微服务-订阅消息

1、pom添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

2、application.yml配置

rocketmq:
  name-server: 虚拟机ip.XXX.XXX.XXX:9876

3、消息接收服务

//@RocketMQMessageListener 注解用于配置一个消息监听器,它包含以下两个重要的参数
//topic:指定监听器要消费哪个主题的消息。一个主题可以有多个消费者组订阅。
//consumerGroup:指定监听器属于哪个消费者组。一个消费者组内的消费者会负责消费主题的一部分消息,实现负载均衡。
@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {
    private Logger logger = LoggerFactory.getLogger("user-server");
    @Override
    public void onMessage(String s) {
        logger.info("收到一个订单信息{},接下来发送短信"+s);
    }
}

4、测试

调用产品微服务的接口发送消息,在用户微服务的控制台查看接收到的消息即可。

 

(三)控制台

在控制台可以查看到主题,可以在控制台发送消息: 

然后idea中user模块控制台可以接收到消息。

四、不同类型的消息发送与接收

(一)普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。

1、可靠同步发送(sync)

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等可靠性要求高的的数据或者需要实时响应的数据。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

2、可靠异步发送(async)

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

3、单向发送(oneway)

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

4、三种发送方式的对比

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息很重要,且对响应时间不敏感的时候采用sync方式;

  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;

  • 当发送的消息不重要,采用one-way方式,以提高吞吐量;

5、发消息代码案例

product模块

@Slf4j
@RestController
@RequestMapping("/mq")
public class RocketMQController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 同步消息
    @GetMapping("/testSyncSend")
    public void testSyncSend(){
        //参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
        //参数二: 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("product-topic", "这是一条同步消息");
        log.info(sendResult.toString());
    }

    // 异步消息
    @GetMapping("/testASyncSend")
    public void testASyncSend(){
        //参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
        //参数二: 消息内容
        //参数三: 回调函数, 处理返回结果
        rocketMQTemplate.asyncSend("product-topic", "这是一条异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(sendResult.toString());
            }

            @Override
            public void onException(Throwable throwable) {
                log.info(throwable.toString());
            }
        });
        //因为是异步发送,所以让线程不要终止以便测试
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // 单向消息
    @GetMapping("/testOneWay")
    public void testOneWay(){
        rocketMQTemplate.sendOneWay("product-topic", "这是一条单向消息");
    }

}

接收异步、同步消息可以使用RocketMQ的消息监听器。通过实现RocketMQListener接口来监听指定Topic上的消息,异步处理消息时不需要等待,示例代码如下:

user模块

@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {

    private Logger logger = LoggerFactory.getLogger("user-server");
    @Override
    public void onMessage(String s) {
        logger.info("收到一个产品信息{},接下来发送短信"+s);
    }
}

在上面的示例中,通过使用@RocketMQMessageListener注解指定了Topic和Consumer Group,消息被接收到后会自动调用onMessage方法进行处理。

需要注意的是,在异步接收消息时,RocketMQ会启动多个消费线程来处理消息,需要确保消息处理的线程安全性。

(二)顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型

//同步顺序消息[异步顺序 单向顺序写法类似]

// 顺序消息
@GetMapping("/testSyncSendOrderly")
public void testSyncSendOrderly(){
    //第三个参数用于队列的选择
    rocketMQTemplate.syncSendOrderly("product-topic", "这是一条异步顺序消息", "xxxx");
}

(三)广播模式

在RocketMQ中,消息可以以广播模式发送,广播模式的消息会被所有订阅了该topic的消费者都接收到。下面是一个简单的广播模式下消息的发送和接收的示例:

  1. 发送广播模式的消息 (一个服务发,两个服务收)

    @GetMapping("/sendBroadcastMsg")
    public void sendBroadcastMsg(@RequestParam("msg") String msg){
        Message<String> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend("product-topic", message);
        System.out.printf("广播消息发送结果:MsgId:%s SendStatus:%s%n", sendResult.getMsgId(), sendResult.getSendStatus());
    }

    这个消息发给一个主题(product-topic),只要是这个主题都可以收到消息,但消费者的组名得不同。

  2. 接收广播模式的消息

    order-server

    @Service                // consumerGroup  消费者消息分组  topic 主题
    @RocketMQMessageListener(consumerGroup = "order-server", topic = "product-topic")
    public class MessageListener implements RocketMQListener<String> {
        private Logger logger = LoggerFactory.getLogger("order-server");
        @Override
        public void onMessage(String message) {
            logger.info("收到一个短信:"+message);
        }
    }

    user-server

    @Service
    @RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
    public class MessageListener implements RocketMQListener<String> {
        private Logger logger = LoggerFactory.getLogger("user-server");
        @Override
        public void onMessage(String s) {
            logger.info("收到一个短信: "+s);
        }
    }

    生产者发送消息,两个消费者都收到消息了。

上述示例代码中,通过使用@RocketMQMessageListener注解指定了订阅的Topic和Consumer Group,实现对广播消息的接收。接收广播模式的消息时,消息广播到所有订阅该topic的消费者,每个消费者都会独立接收到广播的消息。

需要注意的是,广播消息可能会被重复消费,消费者收到广播消息后会自动提交offset,若后续有新的消费者加入,则会从消费组最早的消费offset位置重新消费。

(四)延时消息

可以使用Spring Boot提供的RocketMQTemplate来发送延时消息,具体代码如下:

当使用 syncSend 发送延迟消息时,必须注意以下两个点:

  1. 必须设置 timeout 超时时间,防止方法堵塞

  2. timeout 时间建议设置稍大于 delayLevel 对应的延迟时间

如果不设置 timeout 时间,方法会永久堵塞,直到消息被消费后才返回。

@GetMapping("/sendMsg")
    public void sendMsg(String msg) {
        // 发送延时消息,设置延时10秒(即该消息将在10秒后被消费)
        Message<String> message = MessageBuilder.withPayload(msg).build();
        // 这里设置 timeout = 10000 毫秒,即 10 秒。同时设置 delayLevel = 3,对应 10 秒的延迟。
        // 这样可以确保,即使消息在 broker 延迟 10 秒未消费,syncSend 方法也不会永久堵塞,它会在超过 10 秒后返回。
        rocketMQTemplate.syncSend("product-topic", message, 10000, 3);
    }

RocketMQTemplate提供了syncSend方法,第三个参数是超时时间,第四个参数是延时时间。以上示例中延时级别为3,即延时10秒。

如果需要在Spring Boot应用中接收延时消息,推荐使用@RocketMQMessageListener注解,具体代码如下:

@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<String> {
    private Logger logger = LoggerFactory.getLogger("user-server");
    @Override
    public void onMessage(String s) {
        logger.info("收到一个短信: "+s);
    }
}

等待十秒钟,订阅主题的消费者们会收到消息。当收到延时消息后,Spring会自动调用onMessage方法来处理消息。

RocketMQ支持通过延时级别来控制消息的延时时间,延时级别设置在消息发送时,具体的延时时间由该延时级别对应的配置参数决定。RocketMQ定义了18个延时级别,级别从1到18,延时时间从1秒到2小时不等,各级别对应的延时时间和配置参数如下:

例如,若要设置一个延时30秒的消息,可以将延时级别设为4,参数设置为delayLevel=4。

需要注意的是,这里提到的延时时间是指消息发送后到达Broker存储的时间,与消息从Broker发送到消费者端的时间无关。在实际使用过程中,考虑到网络延迟等因素,消息的最终消费时间可能会比预设的延时时间略晚。

(五)批量消息

下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收批量消息。

发送批量消息:

// 批量消息
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {
    List<String> messages = new ArrayList<>();
    messages.add("Message 1");
    messages.add("Message 2");
    messages.add("Message 3");
    messages.add("Message 4");
    messages.add("Message 5");
    try {
        SendResult result = rocketMQTemplate.syncSend("product-topic", messages);
        System.out.println("Batch message send result: " + result);
    } catch (MessagingException e) {
        e.printStackTrace();
    }
}

在以上示例中,使用了RocketMQTemplate的syncSend()方法,来发送批量消息。syncSend()方法的第二个参数可以是一个List集合,每个元素表示一条消息。具体使用时,可以将多条消息打包成一个List集合,然后将集合作为syncSend()方法的第二个参数。最后通过获取SendResult对象,来获取消息发送的结果信息。

接收批量消息的RocketMQListener:(user-server接收代码,order-server和下面差不多,只是消费消息分组不同)

@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<List<String>> {
    private Logger logger = LoggerFactory.getLogger("user-server");
    @Override
    public void onMessage(List<String> s) {
        logger.info("收到一个短信: "+s);
    }
}

在以上示例中,通过实现RocketMQListener<List>接口,来接收List类型的批量消息。在重写的onMessage()方法中,处理接收到的批量消息。

需要特别注意的是,RocketMQListener的泛型需要根据发送的消息类型进行设置。如果发送的是String类型的批量消息,那么RocketMQListener的泛型就应该设置为List。如果发送的是其他类型的批量消息,比如自定义的对象,那么需要自定义RocketMQMessageConverter实现类,来将消息转换为指定的类型。

(六)过滤消息

下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收过滤消息。

@GetMapping("/sendFilterMsg")
public void sendMsg(@RequestParam("msg") String msg, @RequestParam("tag") String tag) {
    rocketMQTemplate.convertAndSend("product-topic" + ":" + tag, message);
}

在以上示例中,使用Autowired注解自动注入RocketMQTemplate实例,并通过setMessageProperty()方法和setTags()方法指定消息的属性和标签。最后调用convertAndSend()方法,将消息发送到名为product-topic的主题。

接收过滤消息的RocketMQListener:

@Slf4j
@Service
@RocketMQMessageListener(
        topic = "product-topic",
        consumerGroup = "filter-user-server",
        selectorExpression = "tagA || tagB",
        selectorType = SelectorType.TAG
)
public class FilterMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("收到一个过滤短信:"+ msg);
    }
}

在以上示例中,通过@RocketMQMessageListener注解指定了消费组、主题和选择器表达式(相当于过滤表达式)。在选择器表达式中使用了消息的属性prop1和prop2,以及消息的标签tag1来过滤消息。最后通过实现RocketMQListener接口,来处理接收到的过滤消息。

(七)事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。 事务消息交互流程:

两个概念:

  1. 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

  2. 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端。

  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。

  3. 发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

(八)消息消费要注意的细节

@RocketMQMessageListener(
        consumerGroup = "shop",//消费者分组
        topic = "order-topic",//要消费的主题
        consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序
        messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群
)
public class SmsService implements RocketMQListener<Order> {
}

RocketMQ支持两种消息模式:

  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;

  • 集群消费: 一条消息只能被一个消费者实例消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/751723.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux网络综合基础实验 (二十三)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、实验目的 二、实验要求 三、实验拓扑 四、实验步骤 1. DHCP 安装 2、DNS 服务器搭建 3、web服务器配置 3.1基础配置 3.2查看IP获得情况 3.3 配置本地yum源 4、…

JDK 7 ConcurrentHashMap

目录 概述 构造器分析 put 流程 get 流程 size 计算流程 概述 JDK1.7中的ConcurrentHashMap间接地实现了Map&#xff0c;并将每一个元素称为分段锁segment&#xff0c;每个segment都是一个HashEntry<K,V>数组&#xff0c;称为table&#xff0c;table的每个元素都是一…

【雕爷学编程】Arduino动手做(149)---MAX9814咪头传感器模块2

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

MyBatis PostgreSQL实现数组类型的操作

我的GitHub&#xff1a;Powerveil GitHub 我的Gitee&#xff1a;Powercs12 (powercs12) - Gitee.com 皮卡丘每天学Java 最近在学习数据库PostgreSQL&#xff0c;遇到如何实现对数组类型的数据操作&#xff0c;试着自己尝试学习实现。 话不多说&#xff0c;直接撸代码。 建表…

linux下一个iic驱动(按键+点灯)-互斥

一、前提&#xff1a; 硬件部分&#xff1a; 1. rk3399开发板&#xff0c;其中的某一路iic&#xff0c;这个作为总线的主控制器 2. gd32单片机&#xff0c;其中的某一路iic&#xff0c;从设备。主要是按键上报和灯的亮灭控制。&#xff08;按键大约30个&#xff0c;灯在键的…

新手杯—easy_base

0x00 前言 CTF 加解密合集&#xff1a;CTF 加解密合集 0x01 题目 0XezFWZfNXafRjNlNXYit3dvh2cmR3Y0x02 Write Up 先倒序 然后base64解码 以上

Self-Attention Cross-Attention

transformer的细节到底是怎么样的&#xff1f;Transformer 连环18问&#xff01; 4.1 从功能角度&#xff0c;Transformer Encoder的核心作用是提取特征&#xff0c;也有使用Transformer Decoder来提取特征。例如&#xff0c;一个人学习跳舞&#xff0c;Encoder是看别人是如何…

智能网卡在分布式 SDN 网络的应用与实践 | 龙蜥技术

编者按&#xff1a;当前智能网卡能够加速数据处理和传输&#xff0c;并能实现网络、存储和安全等功能卸载&#xff0c;在云计算领域得到广泛的应用。今天&#xff0c;浪潮数据云计算网络架构师王培辉带大家了解智能网卡加速原理和以及在浪潮分布式 SDN 网络加速的应用&#xff…

我连夜咨询了30个老同学,学IT上培训班到底有用么?

文章目录 一、背景二、学习IT上培训班的益处2.1 IT行业本身还不错2.2 获取到系统的专业知识2.3 获取到实战经验2.4 获取到网络资源和支持2.5 获取到职业发展指导2.6 建立初步的职业圈子人脉 三、学习IT上培训班的风险3.1 质量风险3.2 课程更新速度风险3.2 缺乏互动与实践机会风…

积分微分电路

积分微分电路 通过写出时域的推导&#xff0c;再到频域&#xff0c;详细介绍了积分微分的频率响应的推导&#xff0c;手绘了bode图&#xff0c;并仿真电路得到对应的结果。积分的频率响应&#xff1a;频率增加10倍&#xff0c;增益下降20db。输出相位超前输入相位90度。微分的…

GPT-4 最强竞争对手,Claude 杀疯了!

公众号关注 “GitHubDaily” 设为 “星标”&#xff0c;每天带你逛 GitHub&#xff01; 在今年早些时候&#xff0c;ChatGPT、Bard、Claude 等大语言模型&#xff0c;在 AI 领域呈三权鼎立之势&#xff0c;无人能出其右&#xff0c;被视为是能力表现最为卓越的 3 款 AI 聊天机器…

阿里云无影云电脑具体价格_云桌面不同配置1元报价

阿里云无影云电脑配置费用&#xff0c;4核8G企业办公型云电脑可以免费使用3个月&#xff0c;无影云电脑地域不同费用不同&#xff0c;无影云电脑是由云桌面配置、云盘、互联网访问带宽、AD Connector、桌面组共用桌面session等费用组成&#xff0c;阿里云百科分享阿里云无影云电…

大模型的“第一性原理”:技术创新与社会价值的接轨

随着时间来到2023年第三季度&#xff0c;国产大模型已经达到100多个&#xff0c;“百模大战”正式开启。 大模型&#xff0c;我们有了很多选择&#xff0c;也开始呈现出某种同质化。除了拼参数、比背景、看榜单&#xff0c;有没有其他方法&#xff0c;让我们更好地判断一个大模…

解决Gson解析json字符串,Integer变为Double类型的问题

直接上代码记录下。我代码里没有Gson包&#xff0c;用的是nacos对Gson的封装&#xff0c;只是包不同&#xff0c;方法都一样 import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; import com.alibaba.nacos.shaded.com.google.gson.*;import java.util.Map;…

经典CNN(一):ResNet-50算法实战与解析

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊|接辅导、项目定制 1 ResNet理论 深度残差网络ResNet(deep residual network)在2015年由何凯明等提出&#xff0c;因为它简单与实用并存&#xff0c;随后很多研究…

Hutool工具类 -集常用工具类为一体 - 工具类之大成

文章目录 说在前面的话简介gitee介绍项目介绍 网址gtiee 网址github 网址 安装pom依赖引入 &#xff1a;下载jar 文档中文文档中文备用文档参考API视频介绍 部分截图首页包含组件(总)IO流相关部分工具类(Util)集合类HTTP客户端 功能不再一一赘述和截图&#xff0c;具体请查看官…

详解TCP协议

TCP协议段格式 序号和确认序号&#xff1a;在真实服务器和客服端通信过程中请求是并行执行的&#xff0c;这会导致到达是乱序的&#xff0c;所以才会有序号这个东西&#xff0c;确认序号是对方应答时返回的&#xff0c;例如序号发送到1&#xff0c;确认序号会返回2&#xff0c;…

计算机网络 day6 arp病毒 - ICMP协议 - ping命令 - Linux手工配置IP地址

目录 arp协议 arp病毒\欺骗 arp病毒的运行原理 arp病毒产生的后果&#xff1a; 解决方法&#xff1a; ICMP协议 ICMP用在哪里&#xff1f; ICMP协议数据的封装过程 ​编辑 为什么icmp协议封装好数据后&#xff0c;还要加一个ip包头&#xff0c;再使用ip协议再次进…

springboot农机电招平台

本系统为了数据库结构的灵活性所以打算采用MySQL来设计数据库&#xff0c;而java技术&#xff0c;B/S架构则保证了较高的平台适应性。本文主要介绍了本系统的开发背景&#xff0c;所要完成的功能和开发的过程&#xff0c;主要说明了系统设计的重点、设计思想。 本系统主要是设…

关于java垃圾回收的小结

一、为什么要有垃圾回收 我们每次创建对象都需要在栈上开辟空间&#xff0c;堆上使用内存&#xff0c;如果我们只是开辟了这个空间&#xff0c;而不去释放他&#xff0c;那么再大的内存和空间也会有满的一天&#xff0c;所以我们在Java中引入了GC&#xff08;垃圾回收机制&…