Spring AMQP消息中间件

news2025/1/16 2:02:51

SpringAMQP简单说就是一个中间件,提供了模板方便我们操作各种消息模型
上面已经学了RabbitMQ消息队列是有五种消息模型,并且我们演示了其中的基本消息队列(Hello World)。用的是官方API,来实现的基本消息队列(Hello World)。会发现官方提供的API写起来麻烦,所以我们直接用的就是写好的 'mq-demo' 项目,只是简单运行了解一下运行机制
下面我们将学习SpringAMQP,可以大大简化API的书写。也就是使用SpringAMQP来分别演示五种消息模型
根据用途和交换机类型进行如下分类,前两种是根据用途,后三种是根据交换机类型进行分类

  • 1、基本消息队列: Hello World 这个是上面使用官方API演示过一次(但是接下来也会使用SpringAMQP再演示一次),下面四个还没演示过(接下来用SpringAMQP演示)
  • 2、工作消息队列: Work Queues
  • 3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
  • 4、路由发布订阅(使用Direct交换机): Routing
  • 5、主题发布订阅(使用Topic交换机): Topics

什么是AMQP ?
用于在应用程序或之间传递业务消息的开放标准。该协议与语言、平台无关,更符合微服务中独立性的要求
什么是SpringAMQP ?
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。简单说就是一个中间件,提供了模板方便我们操作各种消息模型 SpringAMQP包含两部分内容,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。也就是spring-amqp是规范,具体由spring-rabbit来实现
SpringAmqp的官网: Spring AMQP
SpringAMQP的特点:

  • 1、提供了监听器容器,可以用异步来处理入站消息,也就是负责接收消息
  • 2、提供了RabbitTemplate,用于发送和接收消息,一般用于发送消息
  • 3、提供了RabbitAdmin,用于自动声明队列、交换和绑定。声明也就是创建的意思,会自动创建消息队列,不需要手动创建消息队列


1. HelloWorld模型的消息发送


Hello World 叫简单队列模型,官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括如下三个角色,如图


案例: 在 '实用篇-RabbitMQ消息队列' 的 '3. Hello World 队列模型案例' ,里面写用官方API实现的基本消息队列。因为有五种模型都要演示一次,现在我们同样是先演示Hello World 队列模型,这次就不是用官方API来写了,而是使用SpringAMQP来写,代码更加的简洁。用的项目还是mq-demo。我把下载链接拿下来了
这里我们先做消息发送,下一课在做消息接收。消息发送的具体操作过程如下
第一步(已做好可跳过): 下载写好的项目文件 'mq-demo',下载到的是mq-demo.zip,解压会得到一个mq-demo文件夹

https://cowtransfer.com/s/1b30e91436984a


在D盘创建一个目录名为DockerRabbitmqCode,把mq-demo文件夹粘贴到 D:\DockerRabbitmqCode
第二步(已做好可跳过): 在idea软件,打开mq-demo,mq-demo是一个项目工程


第三步(已做好可跳过): 确保已经启动Docker、并启动Docker里面的rabbitmq容器
第四步: 打开idea,在父工程(也就是mq-demo)的pom.xml中引入spring-amqp的依赖(已做好可跳过)

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


第五步: 在publisher(也就是publisher子工程,这个子工程是负责发消息的)中,的application.yml写入如下

spring:
  rabbitmq:
    host: 192.168.200.232
    port: 5672 #端口
    username: admin #连接rabbitmq的用户名
    password: 123456 #连接rabbitmq的密码
    virtual-host: / #huanfqc对应的虚拟主机


第六步: 在publisher子工程的src/test/java目录新建cn.itcast.mq.spring.SpringAmqpTest类,写入如下

package cn.itcast.mq.test;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author:豆浆
 * @name :SpringAmqpTest
 * @Date:2024/4/12 13:02
 */

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;//在pom.xmp引入spring-boot-starter-amqp依赖之后就能注入这个模板
    @Test
    public void testSimpleQueue(){
        String QueueName="simple.queue2";//自定义队列名称
        String message="hello,simple.queue";
        rabbitTemplate.convertAndSend(QueueName,message);
    }

}


第六步: 浏览器访问 http://你的ip:15672。

例如如下。打开浏览器后手动在页面创建一个叫simple.queue2的队列,不然上面运行找不到该往哪个队列发消息


第七步: 运行publisher子工程的SpringAmqpTest类,此时就会向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息


第八步: 我们还没有消费者,所以消息会一直在队列里面放着,下面我们将来学习消息接收,也就是编写消费者,去消费队列里面的信息。如下


2. HelloWorld模型的消息接收


注意: 消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能,也就是你找不到已经被消费了的消息
上面学了如何在publisher子工程,向simple.queue2队列,发送消息。下面我们就学习如何让consumer子工程去处理simple.queue2队列里面的消息
第一步: 在consumer子工程的application.yml,添加如下

spring:
  rabbitmq:
    host: 192.168.200.232
    port: 5672 #端口
    username: admin #连接rabbitmq的用户名
    password: 123456 #连接rabbitmq的密码
    virtual-host: / #huanfqc对应的虚拟主机


第二步: 在consumer子工程的src/main/java目录新建cn.itcast.mq.listener.SpringRabbitListener类,写入如下

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:豆浆
 * @name :SpringRabbitListener
 * @Date:2024/4/12 13:26
 */
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue2")
    public void ListenSimpleQueue(String msg){
        System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
    }
}


第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple.queue2队列的消息


3. WorkQueue模型的消息发接


1、基本消息队列: Hello World 这个是上面使用官方API演示过一次(但是接下来也会使用SpringAMQP再演示一次),下面四个还没演示过(接下来用SpringAMQP演示)
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics
Work Queue也叫工作队列,模型图如下


案例: 实现一个队列绑定多个消费者,要求如下

  • 1、在publisher子工程中定义测试方法,每秒产生50条信息,发送到simple.queue2队列
  • 2、在consumer子工程中定义两个消息监听者(也就是消息消费者),都监听simple.queue队列
  • 3、一个消费者每秒处理50条消息,另一个消费者每秒处理10条消息

具体过程如下
第一步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringAmqpTest测试类,我们把这个测试类修改为如下

@Test
    public void testSendMessageWorkQueue() throws InterruptedException {
        String queueName = "simple.queue2";//自定义队列名称
        String message = "我正在往队列里面发送消息第 ";
        for (int i = 1; i <=50; i++) {
            rabbitTemplate.convertAndSend(queueName,message+i+" 条消息");//调用convertAndSend实现发送
            Thread.sleep(20);//每隔20毫秒发送一次,共需要1秒才能发完50条消息
        }
    }


第二步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringRabbitListener类,我们把这个类修改为如下

    @RabbitListener(queues = "simple.queue2")
    public void ListenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1,正在处理的消息是【"+msg+"】" + "----当前时间是 "+ LocalTime.now());
        Thread.sleep(20);//每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
    }

    @RabbitListener(queues = "simple.queue2")
    public void ListenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2,正在处理的消息是【"+msg+"】" + "----当前时间是 "+ LocalTime.now());
        Thread.sleep(100);//每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
    }


第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple.queue2队列的消息


第四步: 浏览器访问 http://你的ip:15672。例如如下。打开浏览器后手动在页面创建一个叫simple.queue2的队列,不然上面运行找不到该往哪个队列发消息


第五步: 运行publisher子工程的SpringAmqpTest类,此时就会向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息


从上面的控制台日志信息中,我们发现消费者1处理的都是奇数消息,消费者2处理的都是偶数消息,看一下总的50条信息的分配情况,可以发现50条信息竟然是平均分配(默认是消息预期)给这两个消费者进行处理,但是我们给消费者1处理的速度设置的更快(每20毫秒就可以处理一条消息),消费者2的处理速度设置的慢(每100毫秒才能处理一条消息),那我们就希望队列给消费者1多分点消息,给消费者2少分点消息,这样50条消息就能被更快处理完。

如何实现呢,如下
把publisher子工程的application.yml,修改为如下,主要就是添加了preFetch参数,表示最大预取数。分析:这两个消费者默认的预取值是"总消息条数/总消费者数量",也就是上面我们有50条消息,然后被两个消费者分别拿了25条,而消费者2处理的慢但是却拿了这么多消息,所以我们通过修改最大预取值,如果把它修改为1的话,那么两个消费者每次最多向队列里面拿一条消息,拿完就处理,处理之后才能继续向队列拿数据

YAML复制代码

spring:
  rabbitmq:
    host: 192.168.200.234
    port: 5672 #端口
    username: admin #连接rabbitmq的用户名
    password: 123456 #连接rabbitmq的密码
    virtual-host: / #huanfqc对应的虚拟主机
    listener:
      direct:
        prefetch: 1


然后重新运行consumer子工程的ConsumerApplication引导类,此时这两个消费者就准备就绪了,如果队列有消息就会被消费
最后运行运行publisher子工程的SpringAmqpTest类,此时就会又向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息
重点看一下现在consumer子工程的两个消费者的消费情况


4. Fanout交换机的消息发接

Fanout交换机,其中也就是Publish/Subscribe,是消息订阅模式的其中一种,共三种
'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下


如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下
1、Fanout: 广播
2、Direct: 路由
3、Topic: 话题
我们前面已经学了前两种消息队列,接下来学习后三种(其实就是交换机的三种类型),后三种统称为 '发布订阅模式',刚刚也简单介绍了 '发布订阅模式'
1、基本消息队列: Hello World 这个是上面使用官方API演示过一次
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics
先学后三种其中的Fanout类型交换机,也叫 '广播发布订阅',也就是Publish/Subscribe,该类型交换机的结构图如下


案例: 利用AMQP演示FanoutExchange的使用
实现思路如下:

  • 1、在consumer子工程,利用代码声明(声明就是创建的意思)队列、交换机,并将两者绑定
  • 2、在consumer子工程,编写两个消费者方法,分别监听fanout.queue1队列和fanout.queue2队列
  • 3、在publisher子工程中编写测试方法,向publisher.fanout交换机发送消息(publisher.fanout交换机接收到消息之后,会自动向队列路由消息)

案例结构图如下


SpringAMQP提供了声明交换机、队列、绑定关系的API,如下继承关系图。下面需要用到的Exchange、FanoutExchange这两个API


具体操作如下
第一步: 在consumer子工程的src/test/java/cn.itcast.mq目录,新建config.FanoutConfig类,写入如下

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:豆浆
 * @name :FanoutConfig
 * @Date:2024/4/12 14:34
 */
@Configuration
public class FanoutConfig {

    //声明fanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        //创建名称为publisher.fanout
        return new FanoutExchange("itcast.fanout");
    }

    //声明队列1
    @Bean
    public Queue fanoutQueue1(){
        //创建名称为fanout.queue1
        return  new Queue("fanout.queue1");
    }

    //绑定  队列1到交换机
    @Bean
    public Binding fanoutBindingA(Queue fanoutQueue1, FanoutExchange fanoutExchang){
        //将fanoutQueue1队列绑定到fanoutExchang队列
        return BindingBuilder.
                bind(fanoutQueue1).
                to(fanoutExchang);
    }

    //声明队列2
    @Bean
    public Queue fanoutQueue2(){
        //创建名称为fanout.queue1
        return  new Queue("fanout.queue2");
    }

    //绑定   队列2到交换机
    @Bean
    public Binding fanoutBindingB(Queue fanoutQueue2,FanoutExchange fanoutExchang){
        //将fanoutQueue1队列绑定到fanoutExchang队列
        return BindingBuilder.
                bind(fanoutQueue2).
                to(fanoutExchang);
    }
}


第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
第三步: 运行consumer子工程的ConsumerApplication引导类


第四步: 运行publisher子工程的SpringAmqpTest类,作用仅仅是发送消息,才能触发 'consumer子工程的FanoutConfig类' 的绑定


    @Test
    public void testSendMessageFanoutQueue() throws InterruptedException {
        String exchangeName = "itcast.fanout";//自定义交换机名称
        String message = "hello ,exchange every one ";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }


第五步: 浏览器访问: http://你的Centos7的ip:15672


第六步: 消息接收功能。在consumer子工程的SpringRabbitListener类修改为如下

   @RabbitListener(queues = "fanout.queue1")
    public void ListenSimpleWorkQueue1(String msg) {
        System.out.println("消费者1........接收并处理了fanout.queue2队列的消息:【"+msg+"】"+ LocalTime.now());//当前时间
        //Thread.sleep(20);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void ListenSimpleWorkQueue2(String msg) {
        System.err.println("消费者2.......接收并处理了fanout.queue2队列的消息:【"+msg+"】"+ LocalTime.now());//当前时间
        //Thread.sleep(200);
    }


第七步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下


第八步: 重新运行consumer子工程(消费者)的ConsumerApplication引导类,且保持运行,等待处理消息
第九步: 运行publisher子工程(生产者)的SpringAmqpTest类,表示向消费者发送消息
第十步: 查看consumer子工程的控制台,是否能处理消息。是可以处理消息的,并且我们发现两个队列处理的是同一条消息,也就是交换机会把同一条消息路由到两个不同的队列


5. Direct交换机的消息发接


Direct交换机,也就是Routing,是消息订阅模式的其中一种,共三种
'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下


如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下
1、Fanout类型: 广播 (上面在 '4. Publish/Subscribe模型的消息发接' 学过)
2、Direct类型: 路由 (现在正在学的),也就是Routing
3、Topic类型: 话题,也就是Topics
我们前面已经学了前两种消息队列,和第一种消息订阅模式(Publish/Subscribe模型),接下来学习'发布订阅模式' 其中的后两种
1、基本消息队列: Hello World 这个是上面使用官方API演示过一次
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics
先学后两种其中的Direct类型交换机,也叫 '路由发布订阅',也就是Routing,该类型交换机的结构图如下
Direct类型交换机的特点: 不会路由到所有的队列,会根据规则路由到指定的队列


案例: 利用SpringAMQP演示DirectExchange(Direct类型的交换机)的使用。
实现思路如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey(取消@Bean注解)
2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3、在publisher中编写测试方法,向huanfqc. direct交换机发送消息
案例结构图如下


第一步: 把consumer子工程的SpringRabbitListener修改为如下

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void ListenDirectQueue1(String msg){
        System.out.println("我接收并处理了direct.queue1队列的消息:【"+msg+"】");

    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void ListenDirectQueue2(String msg){
        System.out.println("我接收并处理了direct.queue2队列的消息:【"+msg+"】");

    }


第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
第三步: 运行consumer子工程的ConsumerApplication引导类


第四步: 运行publisher子工程的SpringAmqpTest类,作用仅仅是发送消息,才能触发 'consumer子工程的FanoutConfig类' 的绑定

    @Test
    public void testSendMessageDirectQueue() throws InterruptedException {
        String exchangeName = "itcast.direct";//自定义交换机名称
        String message = "hello ,blue";
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }


第五步: 浏览器访问: http://你的Centos7的ip:15672


第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为blue的消息
修改完之后运行SpringAmqpTest类
第七步: 消息接收功能。重新运行consumer子工程的ConsumerApplication引导类。验证是否只有对应key的队列才能获取交换机里面带有对应key消息


第八步: 再一次验证,发一条带有key值为yellow的消息,查看控制台是否只有direct.queue2队列才能获取交换机里面带有对应key值为yellow的消息


第九步: 我们发现,direct.queue1和direct.queue2队列都带有值为red的key,那请问在生产者往交换机发送一条key值为red的消息,那么会被direct.queue1和direct.queue2队列同时接收吗。答案是肯定的


第十步: 要是生产者往交换机发送一条key值,既不匹配direct.queue1队列,也不匹配direct.queue2队列,的消息,到交换机,那么会被direct.queue1和direct.queue2队列同时接收吗。答案是否定的(但是老师按照讲的,这步的验证应该是肯定的,但是我验证出来是否定的,你们可以去试试)


6. Topics交换机的消息发接


Topics是消息订阅模式的其中一种,共三种
'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下


如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下
1、Fanout类型: 广播 (上面在 '4. Publish/Subscribe模型的消息发接' 学过)
2、Direct类型: 路由 (现在正在学的),也就是Routing
3、Topic类型: 话题
我们前面已经学了前两种消息队列,和第一种消息订阅模式(Publish/Subscribe模型),接下来学习'发布订阅模式' 其中的后两种
1、基本消息队列: Hello World 这个是上面使用官方API演示过一次
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics (下面学的是这个)
Topic交换机和上一节学的Direct交换机类似,区别在于Topic交换机的key必须是多个单词的列表,并且以小数点.分隔。结构图如下


案例: 利用SpringAMQP演示TopicExchange的使用
实现思路如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey。在上面学Direct交换机时,也是使用@RabbitListener来声明
2、在consumer子工程中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3、在publisher子工程中编写测试方法,向huanfqc.topic发送消息
案例结构图如下


第一步: 把consumer子工程的SpringRabbitListener修改为如下

//topic.queue1队列,并根据key来绑定交换机
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//声明(创建)队列
            //声明(创建)交换机,创建的交换机类型是topic
            exchange = @Exchange(name = "huanfqc.topic",type = ExchangeTypes.TOPIC),
            //交换机对应的key,如下就是指定topic.queue1队列、huanfqc.topic交换机的key为china.#。#通配符,表示任意个单词
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到了topic.queue1队列的消息是【"+msg+"】");
    }

    //topic.queue2队列,并根据key来绑定交换机
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//声明(创建)队列
            //声明(创建)交换机,创建的交换机类型是topic
            exchange = @Exchange(name = "huanfqc.topic",type = ExchangeTypes.TOPIC),
            //交换机对应的key,如下就是指定topic.queue1队列、huanfqc.topic交换机的key为#.news。#通配符,表示任意个单词
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到了topic.queue2队列的消息是【"+msg+"】");
    }


第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
第三步: 运行consumer子工程的ConsumerApplication引导类


第四步: 运行publisher子工程的SpringAmqpTest类,作用仅仅是发送消息,才能触发 'consumer子工程的FanoutConfig类' 的绑定

    @Test
    public void testSendMessageDirectQueue() throws InterruptedException {
        String exchangeName = "huanfqc.topic";//自定义交换机名称
        String message = "hello ,china.news";
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }


第五步: 浏览器访问: http://你的Centos7的ip:15672


第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为blue的消息
修改完之后运行SpringAmqpTest类


第七步: 消息接收功能。重新运行consumer子工程的ConsumerApplication引导类。验证是否只有对应key的队列才能获取交换机里面带有对应key消息


第八步: 再次验证


7. 消息转换器的消息发送


案例: 测试发送Object类型的消息
说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
从上面的学习中,我们知道有两种方式可以创建队列(交换机也同理),一种是通过注解(例如我们在FanoutConfig类里面写的代码),
另一种是通过代码(例如我们在SpringRabbitListener类里面写的代码)。在这个案例中,我们将使用通过注解的方式来创建队列
第一步: 创建队列。在consumer子工程的FanoutConfig类,写入如下。然后,重新运行consumer子工程的ConsumerApplication引导类

//消息转换器案例。先在这里创建一个队列
    @Bean
    public Queue xxobjectQueue(){
        //指定要创建什么名字的队列
        return new Queue("xxobject.queue");
    }


第二步: 向队列发送消息。在publisher子工程的SpringAmqpTest类,写入如下

//如何发送对象类型的消息
    @Test
    public void xxtestSendObjectQueue(){
        //创建一个Map类型的对象,key是String类型,value是Object类型
        Map<String,Object> xxmsg = new HashMap<>();
        xxmsg.put("name", "张三");
        xxmsg.put("age", 20);
        //第一个参数是往哪个队列发送消息,第二个参数是具体的消息(我们发送的是对象)
        rabbitTemplate.convertAndSend("xxobject.queue",xxmsg);
    }


第三步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
第四步: 不需要消费者,我们重点是看一下对象类型的消息能不能发送到队列。浏览器访问: http://你的Centos7的ip:15672


我们已经实现了把对象类型的消息发送给队列。下面是解决序列化问题,spring默认的序列化方式是基于jdk的,缺陷较多
如何使用基于json的序列化方式呢,具体操作过程如下

第一步: 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下

<!--Java对象转换为JSON格式数据-->
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>


第二步: 在publisher子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把PublisherApplication引导类修改为如下

@Bean
    //这里只添加这个方法
    public MessageConverter messageConverter(){
        //使用Jackson消息转换工具,将消息转换为JSON格式数据
        return new Jackson2JsonMessageConverter();
    }


第三步: 为了不影响测试,先去浏览器清理刚刚的那条消息


第四步: 重新运行consumer子工程的ConsumerApplication引导类,并且重新运行publisher子工程的SpringAmqpTest类(运行xxtestSendObjectQueue方法)


第五步: 浏览器访问: http://你的Centos7的ip:15672


总结
SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的
1、利用MessageConverter实现的,默认是JDK的序列化
2、注意发送方与接收方必须使用相同的MessageConverter(下面学消息接收的时候会才学)
上面只是学习发送对象类型的消息,下面会学如何接收对象类型的消息


8. 消息转换器的消息接收


案例: 测试接收Object类型的消息
说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
上述 '7. 消息转换器的消息发送' 我们已经成功向队列发送了Object类型(Map类型的对象)的消息,该消息是以json序列化的形式存放在队列,我们怎么让消费者能从队列中获取该消息呢,也就是我们下面要学的
第一步(做好可跳过): 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下

<!--Java对象转换为JSON格式数据-->
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>


第二步: 在consumer子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把ConsumerApplication引导类修改为如下

    @Bean
    public MessageConverter messageConverter(){
        //使用Jackson消息转换工具,将消息转换为JSON格式数据
        return new Jackson2JsonMessageConverter();
    }


第三步: 把consumer子工程的SpringRabbitListener修改为如下

Java复制代码

@RabbitListener(queues = "xxobject.queue")//从哪个队列获取消息
    //由于我们要接收的消息是Map<String,Object>类型的对象,所以下面形参要写Map<String,Object>类型
    public void xxlistenObjectQueue(Map<String,Object> xxmsg){
        System.out.println("我已接收到Map对象类型的消息: "+ xxmsg);
    }


第四步: 测试。先运行publisher子工程的SpringAmqpTest表示向队列发送消息,再运行consumer子工程的ConsumerApplication引导类
查看子工程是否能接收到消息


总结:
SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的
1、利用MessageConverter实现的,默认是JDK的序列化
2、注意发送方与接收方必须使用相同的MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1588555.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ROS2笔记四】ROS2话题通信

4.ROS2话题通信 文章目录 4.ROS2话题通信4.1订阅发布模型4.2ROS2话题工具4.3rclcpp实现话题4.3.1编写发布者4.4编写订阅者 Reference 话题是ROS2中常用的通信方式之一&#xff0c;话题通信采取的是订阅发布模型&#xff0c;一个节点的数据会发布到某个话题之上&#xff0c;然后…

C#基础--之数据类型

C#基础–之数据类型 在第一章我们了解了C#的输入、输出语句后&#xff0c;我这一节主要是介绍C#的基础知识&#xff0c;本节的内容也是后续章节的基础&#xff0c;好的开端等于成功的一半。在你阅读完本章后&#xff0c;你就有足够的C#知识编写简单的程序了。但还不能使用封装、…

一些优雅的算法(c++)

求最大公约数&#xff1a;辗转相除法 int gcd(int a,int b){return b0?a:gcd(b,a%b); }求最小公倍数&#xff1a;两整数之积除以最大公约数 int lcm(int a, int b){return a*b / gcd(a, b); }十进制转n进制&#xff1a; char get(int x){if(x<9){return x0;}else{return…

java 邮件发送表格

邮件发送表格 问题导入效果图 实现方案1. 拼接HTML文件&#xff08;不推荐&#xff09;2. excel 转HTML使用工具类来转化依赖工具类代码示例 使用已工具包 如 aspose-cells依赖代码示例 3.使用模板生成流程准备模板工具类代码示例 问题导入 在一些定时任务中&#xff0c;经常会…

SpringBoot 集成Swagger3

说明&#xff1a; 1&#xff09;、本文使用Spring2 集成Swagger3&#xff0c; 本想使用Springboot3 jdk 17 集成Swagger3, 但是搜了一些资料&#xff0c;Spring 想引用swagger3 需要依赖降级使用Spring2 版本&#xff0c; 或者使用Spring3 springdoc 实现swagger的功能&…

数据结构—顺序表(如果想知道顺序表的全部基础知识点,那么只看这一篇就足够了!)

前言&#xff1a;学习完了C语言的基础知识点之后&#xff0c;我们就需要使用我们所学的知识来进一步对存储在内存中的数据进行操作&#xff0c;这时候我们就需要学习数据结构。而这篇文章为数据结构中顺序表的讲解。 ✨✨✨这里是秋刀鱼不做梦的BLOG ✨✨✨想要了解更多内容可以…

JavaEE初阶——多线程(一)

T04BF &#x1f44b;专栏: 算法|JAVA|MySQL|C语言 &#x1faf5; 小比特 大梦想 此篇文章与大家分享多线程的第一部分:引入线程以及创建多线程的几种方式 此文章是建立在前一篇文章进程的基础上的 如果有不足的或者错误的请您指出! 1.认识线程 我们知道现代的cpu大多都是多核心…

Flutter学习13 - Widget

1、Flutter中常用 Widget 2、StatelessWidget 和 StateFulWidget Flutter 中的 widget 有很多&#xff0c;但主要分两种&#xff1a; StatelessWidget无状态的 widget如果一个 widget 是最终的或不可变的&#xff0c;那么它就是无状态的StatefulWidget有状态的 widget如果一个…

SpringCloud Alibaba Sentinel 简介和安装

一、前言 接下来是开展一系列的 SpringCloud 的学习之旅&#xff0c;从传统的模块之间调用&#xff0c;一步步的升级为 SpringCloud 模块之间的调用&#xff0c;此篇文章为第十三篇&#xff0c;即介绍 SpringCloud Alibaba Sentinel 简介和安装。 二、Sentinel 简介 2.1 Sent…

2024Mathorcup(妈妈杯)数学建模C题python代码+数据教学

2024Mathorcup数学建模挑战赛&#xff08;妈妈杯&#xff09;C题保姆级分析完整思路代码数据教学 C题题目&#xff1a;物流网络分拣中心货量预测及人员排班 因为一些不可抗力&#xff0c;下面仅展示部分代码&#xff08;很少部分部分&#xff09;和部分分析过程&#xff0c;其…

(Java)数据结构——图(第七节)Folyd实现多源最短路径

前言 本博客是博主用于复习数据结构以及算法的博客&#xff0c;如果疏忽出现错误&#xff0c;还望各位指正。 Folyd实现原理 中心点的概念 感觉像是充当一个桥梁的作用 还是这个图 我们常在一些讲解视频中看到&#xff0c;就比如dist&#xff08;-1&#xff09;&#xff0…

bayes_opt引用失败,解决方案

bayes_opt引用失败&#xff0c;如图&#xff1a; 1.pip install bayesian-optimization报错&#xff0c;如图&#xff1a; 2.【解决方案】pip install -i https://pypi.tuna.tsinghua.edu.cn/simple bayesian-optimization

【opencv】示例-detect_blob.cpp

// 导入所需的OpenCV头文件 #include <opencv2/core.hpp> #include <opencv2/imgproc.hpp> #include <opencv2/highgui.hpp> #include <opencv2/features2d.hpp> // 导入向量和映射容器 #include <vector> #include <map> // 导入输入输出…

一文读懂传统服务器与云服务器的区别

传统服务器 传统服务器一般指的是物理服务器。物理服务器是独立存在的&#xff0c;无需与其他用户共享资源&#xff0c;拥有完全管理员权限和独立IP地址&#xff0c;安全稳定性高&#xff0c;性能优越。物理服务器与通用的计算机架构类似&#xff0c;由CPU、主板、内存条、硬…

区块链安全-----区块链基础

区块链是一种全新的信息网络架构 &#xff0c;是新一代信息基础设施 &#xff0c;是新型的价值交换方式、 分布式协 同生产机制以及新型的算法经济模式的基础。 区块链技术可以集成到多个领域。 区块链的主要用途 是作为加密货币的分布式总帐。 它在银行 &#xff0c;金融 &…

oracle数据库怎么查看当前登录的用户?

方法如下&#xff1a; 输入select * from dba_users; 即可。 常用语句&#xff1a; 一&#xff0c;查看数据库里面所有用户&#xff1a; select * from dba_users; 前提是你是有dba权限的帐号&#xff0c;如sys,system。 二&#xff0c;查看你能管理的所有用户&#xff1…

react17中配置webpack:使用@代表src目录

在vue的项目中可以使用表示src目录&#xff0c;使用该符号表示绝对路径&#xff0c;那么在react中想要使用怎么办呢&#xff1f; 在react中使用表示src目录是需要在webpack中配置的&#xff0c;在核心模块node_modules-》react-scripts-》config-》webpack.config.js中搜索找到…

python——列表(list)

概念 列表一般使用在一次性存储多个数据 语法 lst[数据1&#xff0c;数据2&#xff0c;.....]方法 #mermaid-svg-flVxgVdpSqFaZyrF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-flVxgVdpSqFaZyrF .error-icon{…

【2024最新博客美化教程重置版】一分钟教会你在博客页面中加入javascript点击出弹出文字效果!

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;程序猿、设计师、技术分享 &#x1f40b; 希望大家多多支持, 我们一起学习和进步&#xff01; &#x1f3c5; 欢迎评论 ❤️点赞&#x1f4ac;评论 &#x1f4c2;收藏 &#x1f4c2;加关注 我们可以在博客…

利用正射影像对斜射图像进行反向投影

在图像投影和映射领域,有两种类型的投影:正向投影和反向投影。正向投影涉及使用内部方向(即校准相机参数)将 3D 点(例如地面上的物体)投影到 2D 图像平面上。另一方面,向后投影是指根据 2D 图像确定地面物体的 3D 坐标的过程。 为了匹配倾斜图像和正射影像并确定相机位置…