rocketMq消息队列原生api使用以及rocketMq整合springboot

news2025/1/17 0:09:09

rocketMq消息队列

文章目录

  • rocketMq消息队列
  • 一、RocketMQ原生API使用
    • 1、测试环境搭建
    • 2、RocketMQ的编程模型
    • 3、RocketMQ的消息样例
      • 3.1 基本样例
      • 3.2 顺序消息
      • 3.3 广播消息
      • 3.4 延迟消息
      • 3.5 批量消息
      • 3.6 过滤消息
      • 3.7 事务消息
      • 3.8 ACL权限控制
  • 二、SpringBoot整合RocketMQ
    • 1、快速实战
    • 2、其他更多消息类型:
    • 3、总结:
  • 三、SpringCloudStream整合RocketMQ
    • 1、快速实战
    • 2、总结

一、RocketMQ原生API使用

使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。

1、测试环境搭建

首先创建一个基于Maven的SpringBoot工程,引入如下依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
</dependency>

RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较老,但是还是都可以运行的。以官网上的顺序进行学习。

2、RocketMQ的编程模型

然后RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。

  • 消息发送者的固定步骤

    1.创建消息生产者producer,并制定生产者组名

    2.指定Nameserver地址

    3.启动producer

    4.创建消息对象,指定主题Topic、Tag和消息体

    5.发送消息

    6.关闭生产者producer

  • 消息消费者的固定步骤

    1.创建消费者Consumer,制定消费者组名

    2.指定Nameserver地址

    3.订阅主题Topic和Tag

    4.设置回调函数,处理消息

    5.启动消费者consumer

3、RocketMQ的消息样例

RocketMQ都支持类型的消息:

3.1 基本样例

同步发送:能够实时知道消息是否从生产者推送到broke

异步发送:能够异步回调的方式知道消息是否从生产者推送的broke

单向发送:消息发送出去就不管了,无法感知消息是否从生产者推送到broke的状态

基本样例部分我们使用消息生产者分别通过三种方式发送消息同步发送、异步发送以及单向发送

然后使用消费者来消费这些消息。

1、同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer

2、异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer

等待消息返回后再继续进行下面的操作。

这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。

3、单向发送消息的样例:

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。

4、使用消费者消费消息
消费者消费消息有两种模式

一种是消费者主动去Broker上拉取消息的拉模式

另一种是消费者等待Broker把消息推送过来的推模式

拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer

推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer

通常情况下,用推模式比较简单。

实际上RocketMQ的推模式也是由拉模式封装出来的。

4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

3.2 顺序消息

顺序消息生产者样例见:org.apache.rocketmq.example.order.Producer

顺序消息消费者样例见:org.apache.rocketmq.example.order.Consumer

验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。
不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。

RocketMQ保证的是消息的局部有序,而不是全局有序。

mqs是什么(一个topic中对应的多个队列)。
再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。

首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

而Broker中一个队列内的消息是可以保证有序的。

然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。

3.3 广播消息

广播消息的消息生产者样例见:org.apache.rocketmq.example.broadcast.PushConsumer

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

3.4 延迟消息

延迟消息的生产者案例

public class ScheduledMessageProducer {
   
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
   
        // Shutdown producer after use.
        producer.shutdown();
    }
       
}

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。

那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);

开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。

那这么好用的延迟消息是怎么实现的?这18个延迟级别除了在延迟消息中用,还有什么地方用到了?别急,我们会在后面部分进行详细讲解。

3.5 批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer

相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB

实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

3.6 过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer

主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA和TagC的消息。

TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。

但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。

SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer

SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer

,>=,<,<=,BETWEEN,=;**
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;

常量支持类型为:

* 数值,比如:123,3.1415;
* 字符,比如:‘abc’,必须用单引号包裹起来;
* NULL,特殊的常量
* 布尔值,TRUEFALSE

使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?

3.7 事务消息

这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。

首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。

事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer

事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例

public class TransactionListenerImpl implements TransactionListener {
  //在提交完事务消息后执行。
  //返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
  //返回ROLLBACK_MESSAGE状态的消息会被丢弃。
  //返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        //TagA的消息会立即被消费者消费到
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息会被丢弃
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息会等待Broker进行事务状态回查。
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
  //在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String tags = msg.getTags();
        //TagC的消息过一段时间会被消费者消费到
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也会在状态回查时被丢弃掉
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息会在多次状态回查后最终丢弃
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

然后,我们要了解下事务消息的使用限制:

1、事务消息不支持延迟消息和批量消息。

2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。
然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。

​ 3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。
另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 这样就是10秒。

4、事务性消息可能不止一次被检查或消费。

5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

接下来,我们还要了解下事务消息的实现机制,参见下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E53Lro88-1688484594136)(image/image_C0hrAMgJzB.png)]

事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

最后,我们还需要思考下事务消息的作用。

大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上了?事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案

3.8 ACL权限控制

权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。

注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>

​ 而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。我们来简单分析下源码中的plan_acl.yml的配置:

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一个账户
- accessKey: 自定义1
  secretKey: 自定义1
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默认Topic访问策略是拒绝
  defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
  topicPerms:
  - topicA=DENY #topicA拒绝
  - topicB=PUB|SUB #topicB允许发布和订阅消息
  - topicC=SUB #topicC只允许订阅
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: 自定义2
  secretKey: 自定义2
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

二、SpringBoot整合RocketMQ

1、快速实战

这部分我们看下SpringBoot如何快速集成RocketMQ。

在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。

我们创建一个maven工程,引入关键依赖:

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。

然后我们以SpringBoot的方式,快速创建一个简单的Demo

启动类:

@SpringBootApplication
public class RocketMQScApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQScApplication.class,args);
    }
}

配置文件 application.properties

#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

消息生产者

package com.roy.rocket.basic;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
  //发送普通消息的示例
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
  //发送事务消息的示例
    public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination =topic+":"+tags[i % tags.length];
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        }
    }
}

消息消费者

package com.roy.rocket.basic;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:

例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制

消息有序消费还是并发消费则由consumeMode属性定制。

消费者是集群部署还是广播部署由messageModel属性定制。

然后关于事务消息,还需要配置一个事务消息监听器:

package com.roy.rocket.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author :
 * @date :Created in 2020/11/5
 * @description:
 **/

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object id = msg.getHeaders().get("id");
        String destination = arg.toString();
        localTrans.put(id,destination);
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
//        String destination = localTrans.get(msg.getTransactionId());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

这样我们启动应用后,就能够通过访问 http://localhost:8080/MQTest/sendMessage?message=123 接口来发送一条简单消息。并在SpringConsumer中消费到。

也可以通过访问http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,来发送一条事务消息。

这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。

2、其他更多消息类型:

对于其他的消息类型,文档中就不一一记录了。具体可以参见源码中的junit测试案例。

3、总结:

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
  • SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
  • 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。

三、SpringCloudStream整合RocketMQ

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

1、快速实战

创建Maven工程,引入依赖:

<dependencies>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-acl</artifactId>
      <version>4.7.1</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
      <version>2.2.3.RELEASE</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-acl</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>2.3.3.RELEASE</version>
    </dependency>
  </dependencies>

应用启动类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class,args);
    }
}

注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。

然后增加配置文件application.properties

#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。

然后就可以增加消息消费者:

package com.roy.scrocket.basic;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScConsumer {

    @StreamListener(Sink.INPUT)
    public void onMessage(String messsage){
        System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
    }
}

消息生产者:

package com.roy.scrocket.basic;

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @author :
 * @date :Created in 2020/10/22
 * @description:
 **/
@Component
public class ScProducer {

    @Resource
    private Source source;

    public void sendMessage(String msg){
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
        this.source.output().send(message);
    }
}

最后增加一个Controller类用于测试:

package com.roy.scrocket.controller;

import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author :
 * @date :Created in 2020/10/27
 * @description:
 **/
@RestController
@RequestMapping("/MQTest")
public class MQTestController {

    @Resource
    private ScProducer producer;
    @RequestMapping("/sendMessage")
    public String sendMessage(String message){
        producer.sendMessage(message);
        return "消息发送完成";
    }
}

启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。

2、总结

  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的, 也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/719795.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PLC工程师到C#上位机开发:成功转型的故事

从自动化PLC工程师转变为C#上位机开发工程师&#xff01;这是一个很大的转变&#xff0c;但是您的自动化背景将为您提供宝贵的经验和技能。刚好&#xff0c;我这里有上位机入门&#xff0c;学习线路图&#xff0c;各种项目&#xff0c;需要留个6。 在成功转变的过程中&#xf…

7.3.3 【Linux】磁盘格式化(创建文件系统)

XFS 文件系统 mkfs.xfs 我们常听到的“格式化”其实应该称为“创建文件系统 &#xff08;make filesystem&#xff09;”&#xff0c;使用的是mkfs&#xff0c;创建的是xfs文件系统&#xff0c;使用的是mkfs.xfs这个指令。这个指令这样用&#xff1a; 使用默认的xfs文件系统参…

速通协程,一步到位!

前言 协程的概念最核心的点就是一段程序能够被挂起&#xff0c;稍后在挂起的位置恢复&#xff0c;挂起和恢复是由使用者控制的。 数学模型 在一个线程的视角中&#xff0c;我们的程序是按照顺序来执行的&#xff0c;假设我们使用??????来描述一段程序的所有指令。那么…

elk中logstash的使用

1.前言 logstash是一个相对较重的日志收集器&#xff0c;可以通过多种方式获取到日志数据&#xff0c;如tcp、日志文件、kafka、redis、rabbitmq等方式&#xff0c;还可以使用filter去过滤日志、转换日志为json格式&#xff0c;所以logstash是一个功能强大的日志收集器&#x…

MFC项目添加外部头文件和源文件后编译出现C1010错误

出现这个问题的主要原因是如果使用VC向生成工程的话&#xff0c;默认使用预编译头文件“stdafx.h”&#xff0c;这样做的目的是为了加快编译速度。 如果加入第三方c/cpp文件没有#include “stdafx.h” &#xff0c;就会报此错误。 在<解决方案管理器中>(就是可以看到工程…

NLLloss,KLDivLoss,CrossEntropyLoss三类损失函数比对

前置知识 这三个函数在深度学习模型中十分常见&#xff0c;尤其是在知识蒸馏领域&#xff0c;经常会将这三个函数进行比较 1、Softmax函数 softmax函数通常作为多分类以及归一化函数使用&#xff0c;其公式如下&#xff1a; s o f t m a x ( x ) e x i ∑ i 1 e x i soft…

Drools概述和基本原理

目录 ​编辑 一、Drools是什么&#xff1f; 二、Drools使用场景 三、Drool架构内容 3.1 总体架构 3.2 构成内容说明 3.2.1 Rules 3.2.2 Production memory 3.2.3 Facts 3.2.4 Working memory 3.2.5 Pattern matcher 3.2.6 Agenda 四、为什么要用规则引擎&#xff1f; 4.1 声明…

el-descriptions的使用

el-descriptions的使用 解释&#xff1a; 我们页面有很多无序的列表展示&#xff0c;为了高效得去开发我们得页面&#xff0c;可以借助于这个组件进行适应。图片&#xff1a; 代码&#xff1a; template部分 <el-descriptions class"margin-top" :column"…

IP数据云揭示高风险IP的来源地

在全球网络安全日临近之际&#xff0c;IP数据云揭示高风险IP的来源地。这些高风险IP以其潜在威胁和犯罪活动而闻名&#xff0c;已引起了全球范围内的关注。 根据IP数据云介绍&#xff0c;高风险IP的主要来源地是位于亚洲和东欧的国家其中包括俄罗斯、朝鲜和乌克兰等地。这些地区…

论文阅读:Segment Anything之阅读笔记

目录 引言整体结构介绍论文问答代码仓库中&#xff0c;模型哪部分转换为了ONNX格式&#xff1f;以及如何转的&#xff1f;Mask decoder部分 Transformer decoder block?如何整合image_embedding&#xff0c;image_pe, sparse_prompt_embedding和dense_prompt_embedding的&…

Android Framework基础面试必问习题~

AMS 下面是一些可能会被问到的 Android Framework 中 Activity Manager Service (AMS) 相关的面试题&#xff1a; 1.什么是 AMS&#xff1f; AMS 是 Android framework 中的一个系统进程&#xff0c;它负责管理应用程序生命周期&#xff0c;处理应用程序间的交互和协调不同组…

用Electron将Vue项目打包成桌面版软件

创建Electron项目这里是直接通过官方教程创建的 要检查 Node.js 是否正确安装&#xff0c;请在您的终端输入以下命令&#xff1a; node -vnpm -v这两个命令应输出了 Node.js 和 npm 的版本信息。 创建Electron应用程序 使用脚手架创建 Electron 应用程序遵循与其他 Node.js…

【Squid 代理服务器应用】

目录 一、Squid 代理服务器1、代理的工作机制2、代理服务器的概念及其作用3、Squid 代理的类型 二、安装 Squid 服务1&#xff0e;编译安装 Squid2&#xff0e;修改 Squid 的配置文件3&#xff0e;Squid 的运行控制1、检查配置文件语法是否正确2、启动 Squid&#xff0c;第一次…

在SpringBoot中对es集群的查询操作

在进行查询之前要先给ll索引中插入数据: POST /ll/product/1 {"id":1,"title": "小米手机Mix","category": "手机","brand": "小米","price": 2899.00,"images": "http://ima…

【数据结构】——常见排序算法(演示图+代码+算法分析)

目录 1. 常见排序算法 1.2 稳定性 2. 常见排序算法的实现 2.1 插入排序 2.1.1基本思想 2.1.2代码 2.1.4算法分析 2.2 希尔排序 2.2.1基本思想 2.2.2代码 2.2.3演示图 2.2.4算法分析 2.3 选择排序 2.3.1基本思想 2.3.2代码 2.3.3演示图 2.3.4算法分析 2.4 堆排…

[Visual Studio 报错] error 找不到指定的 SDK“Microsoft

[Visual Studio 2022 报错] error : 找不到指定的 SDK“Microsoft.NET.Sdk.Web” 问题描述&#xff1a; 在新电脑上安装了VS2022&#xff0c;打开现有项目的解决方案后的时候报了这个错&#xff0c;所有projet文件都加载失败,如图所示&#xff1a; 报错分析及解决 打开项目配…

黑客是怎样炼成的?

前言 首先我谈下对黑客&网络安全的认知&#xff0c;其实最重要的是兴趣热爱&#xff0c;不同于网络安全工程师&#xff0c;他们大都是培训机构培训出来的&#xff0c;具备的基本都是防御和白帽子技能&#xff0c;他们绝大多数的人看的是工资&#xff0c;他们是为了就业而学…

MFC扩展库BCGControlBar Pro v33.5新版亮点 - 控件、脚本管理增强

BCGControlBar库拥有500多个经过全面设计、测试和充分记录的MFC扩展类。 我们的组件可以轻松地集成到您的应用程序中&#xff0c;并为您节省数百个开发和调试时间。 BCGControlBar专业版 v33.5已正式发布了&#xff0c;此版本包含了Ribbon&#xff08;功能区&#xff09;自定义…

CSS文本样式

CSS文本样式 1、字体 友情提醒&#xff1a; 字体有没有版权&#xff1f; 省略写法 语法&#xff1a; [ [ <‘font-style’> || || <‘font-weight’> || <‘font-stretch’> ]? <‘font-size’> [ / <‘line-height’> ]? <‘font-fam…

IDEA2023.1.3自带插件禁用,减少内存占用

前言 前两个星期安装了idea2023.1.3&#xff08;之前用的一直是idea2020.3版本&#xff09;&#xff0c;我发现新版界面确实更好看一些&#xff0c;而且启动速度也非常快&#xff0c;打开多个项目也一样很快&#xff0c;都是秒开。但是吧&#xff0c;它的内存占用比idea2020.3…