MQ-rabbitMQ_基础篇

news2024/11/26 3:42:26

MQ-rabbitMQ_基础篇

  • 1.MQ
    • 1.1什么是MQ
    • 1,2应用
  • 2.常见消息中间件协议(模型)
    • 2.1JMS模型(协议)
    • 2.2AMQP协议
  • 3.RabbitMQ
    • 3.1六种工作模式
      • 3.1.1Hello Word简单模式
      • 3.1.2word queues 工作队列
        • 能者多劳
      • 3.1.3Publish/Subscribe 发布与订阅模式
        • direct-routing
        • topic
        • fanout
      • 3.1.4Routing 路由
      • 3.1.5 Topics 主题
      • 3.1.6RPC 远程调用模式(远程调用,不太算消息队列)
      • 消息应答
    • 4.springboot开发
      • 生产者
      • 消费者

1.MQ

1.1什么是MQ

	MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是

message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常

见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不

用依赖其他服务。

1,2应用

  • 异步处理
  • 流量控制
  • 日志
  • 服务解耦

2.常见消息中间件协议(模型)

详细的解释说明在常见消息中间件大 PK

两者的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

2.1JMS模型(协议)

JMS 全称 Java Message Service,类似于 JDBC,不同于 JDBC,JMS 是 JavaEE 的消息服务接口,JMS 主要有两个版本:1.1 和2.0,两者相比,后者主要是简化了收发消息的代码。

考虑到消息中间件是一个非常常用的工具,所以 JavaEE 为此制定了专门的规范 JMS。

不过和 JDBC 一样,JMS 作为规范,他只是一套接口,并不包含具体的实现,如果我们要使用 JMS,那么一般还需要对应的实现,这就像使用 JDBC 需要对应的驱动一样。

JMS 消息服务支持两种消息模型:

  • 点对点或队列模型
  • 发布/订阅模型

常见支持jms的mq:

  • Kafka
  • Apache ActiveMQ
  • OpenJMS

2.2AMQP协议

2006 年,AMQP 规范发布。

RabbitMQ 的版本为 3.5.7,基于 AMQP 0-9-1。

常见实现了amqp的mq:

  • Apache Qpid
  • Apache ActiveMQ
  • RabbitMQ

其实 ActiveMQ 不仅支持 JMS,也支持 AMQP

在 AMQP 协议中,消息收发涉及到如下一些概念:

  • Broker 接收和分发消息的应用,RabbitMQ服务就是Message Broker。

  • Virtual host 虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

  • Connection 消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。

  • Channel Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。

  • Exchange 交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。

  • Queue 队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。

  • Binding 绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

image-20230513142750282

另外还有大家熟知的阿里出品的 RocketMQ,这个是自定义了一套协议,社区也提供了 JMS,但是不太成熟

  • ActiveMQ JMS和 AMQP
  • RabbitMQ AMQP
  • RocketMQ JMS 或者 自定义了一套协议
  • Kafka JMS或者 自定义了一套协议

img

3.RabbitMQ

官方文档https://www.rabbitmq.com/getstarted.html

RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )

image-20230513132006203

  • Broker 接收和分发消息的应用,RabbitMQ服务就是Message Broker。
  • Virtual host 虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
  • Connection 消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。
  • Channel Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。
  • Exchange 交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
  • Queue 队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。
  • Binding 绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

思考:在java中 把它们都封装成对象,然后去使用。

3.1六种工作模式

image-20230513201857234

image-20230513201920973

3.1.1Hello Word简单模式

简单模式:一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

image-20230513175715804

image-20230513181052720

纯java案例


/*HelloWord模型 生产者代码*/
public class Producer {

    public static final String QUEUE_NAME = "hello"; // 队列名称

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.226.129");           // 设置MQ所在机器IP进行连接
        factory.setPort(5672);                        // 指定MQ服务端口
        factory.setVirtualHost("study");              // 指定使用的VirtualHost
        factory.setUsername("admin");                 // 指定MQ账号名
        factory.setPassword("123");                   // 指定MQ密码

        Connection connection = factory.newConnection();    // 创建连接
        Channel channel = connection.createChannel();       // 创建信道

        /*  队列设置(创建队列)
         *参数1:队列名称,名称不存在就自动创建
         *参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
         *参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
         *参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
         *参数5:额外附加参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        String message = "Hello RabbitMQ";            // 需要发送的消息

        /*  交换机&队列设置(指定消息使用的交换机和队列)
         * 参数1: exchange交换机名称(简单队列无交换机,这里不写)
         * 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
         * 参数3: 传递消息的额外设置 (设置消息是否持久化)  MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
         * 参数4: 消息具体内容(要为 Byte类型)
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

        /*关闭资源*/
        channel.close();
        connection.close();

        System.out.println("消息生产完毕");

    }

}

/*HelloWord模型 消费者案例*/
public class Consumer {

      public static final String QUEUE_NAME = "hello";  // 队列名称

      public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.226.129");             // 设置MQ所在机器IP进行连接
        factory.setPort(5672);                          // 指定MQ服务端口
        factory.setVirtualHost("study");                // 指定使用的VirtualHost
        factory.setUsername("admin");                   // 指定MQ账号名
        factory.setPassword("123");                     // 指定MQ密码

        Connection connection = factory.newConnection();    // 创建连接
        Channel channel = connection.createChannel();       // 创建信道

        /*消费者成功消费时的回调接口,这里为打印获取到的消息*/
        DeliverCallback deliverCallback = (consumerTag, message) -> {
              System.out.println(new String(message.getBody()));
        };

        /*消费者取消消费的回调*/
        CancelCallback callback = consumerTag -> {
              System.out.println("消息者取消消费接口回调逻辑");
        };

        /*  消费消息
         * 参数1 : 消费队列的名称
         * 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费)  true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
         * 参数3 : 消费者成功消费时的回调接口
         * 参数4 : 消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, callback);

        System.out.println("消费者执行完毕");

      }

}

3.1.2word queues 工作队列

它是轮训分发消息

image-20230513181240665

/*WorkQueue模型 生产者代码*/
public class WorkProvider {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {

        Connection connection = RabbitUtils.getConnection();            //创建连接

        Channel channel = connection.createChannel();                   //创建信道

        /*  队列设置(创建队列)
         *参数1:队列名称,名称不存在就自动创建
         *参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
         *参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
         *参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
         *参数5:额外附加参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);


        Scanner scanner = new Scanner(System.in);       //从控制台输入消息内容
        while (scanner.hasNext()){
            String message = scanner.next();

            /*  交换机&队列设置(指定消息使用的交换机和队列)
             * 参数1: exchange交换机名称(简单队列无交换机,这里不写)
             * 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
             * 参数3: 传递消息的额外设置 (设置消息是否持久化)  MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
             * 参数4: 消息具体内容(要为 Byte类型)
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送完成:" + message);
        }


    }



}

/*WorkQueue模型 消费者代码*/
public class Consumer1 {


  private static final String QUEUE_NAME = "hello";             //队列名称

  public static void main(String[] args) throws IOException {

      Connection connection = RabbitUtils.getConnection();      //创建连接

      Channel channel = connection.createChannel();             //创建信道


      /*消费者成功消费时的回调接口,这里为打印获取到的消息*/
      DeliverCallback deliverCallback = (consumerTag, message) -> {
          System.out.println("接收到的消息: "+ new String(message.getBody()) );
      };

      /*消费者取消消费的回调*/
      CancelCallback callback = consumerTag -> {
          System.out.println(consumerTag+"消息者取消消费接口回调逻辑");
      };

      System.out.println("消费者B等待接收消息......");

      /*  消费消息
       * 参数1 : 消费队列的名称
       * 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费)  true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
       * 参数3 : 消费者成功消费时的回调接口
       * 参数4 : 消费者取消消费的回调
       */
      channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback);       //消费消息



  }


}

/*WorkQueue模型 消费者代码*/
public class Consumer2 {
  private static final String QUEUE_NAME = "hello";             //队列名称

  public static void main(String[] args) throws IOException {

      Connection connection = RabbitUtils.getConnection();      //创建连接

      Channel channel = connection.createChannel();             //创建信道


      /*消费者成功消费时的回调接口,这里为打印获取到的消息*/
      DeliverCallback deliverCallback = (consumerTag, message) -> {
          System.out.println("接收到的消息: "+ new String(message.getBody()) );
      };

      /*消费者取消消费的回调*/
      CancelCallback callback = consumerTag -> {
          System.out.println(consumerTag+"消息者取消消费接口回调逻辑");
      };

      System.out.println("消费者C等待接收消息......");

      /*  消费消息
       * 参数1 : 消费队列的名称
       * 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费)  true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
       * 参数3 : 消费者成功消费时的回调接口
       * 参数4 : 消费者取消消费的回调
       */
      channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback);       //消费消息
  }
}

能者多劳

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.1.3Publish/Subscribe 发布与订阅模式

(direct、topic、fanout 、 headers 【不常用】)

image-20230513193201946

一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者发送给交换机和路由key就可以了,消费者声明队列和绑定队列

direct-routing

消费者:channel.queueBind(queue,EXCHANGE_NAME,NormalKey);

/*Direct模式 生产者代码*/
public class Provider {

    private static final String EXCHANGE_NAME = "DirectExchange";          //交换机名称
    private static final String VipKey = "Vip";                 //普通VIP
    private static final String NormalKey = "Normal";           //普通用户
    private static final String SuperVipKey = "SuperVip";       //超级VIP

    public static void main(String[] args) throws Exception {

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            //声明交换机

        /*模拟不同会员等级接收到的不同消息内容*/
        for (int i = 0; i < 9; i++) {
            String message = "当前是待发送的消息,序号:"+i;
            if(i%3==0){
                channel.basicPublish(EXCHANGE_NAME,SuperVipKey,null,message.getBytes(StandardCharsets.UTF_8));  //发送超级VIP消息
            }
            if(i%3==1){
                channel.basicPublish(EXCHANGE_NAME,VipKey,null,message.getBytes(StandardCharsets.UTF_8));      //发送VIP消息
            }
            if(i%3==2){
                channel.basicPublish(EXCHANGE_NAME,NormalKey,null,message.getBytes(StandardCharsets.UTF_8));   //发送通用消息
            }
            System.out.println("消息发送: " + message);
        }


    }


}
/*Direct模式 消费者代码*/
public class VipConsumer {

    private static final String EXCHANGE_NAME = "DirectExchange";          //交换机名称
    private static final String NormalKey = "Normal";           //普通用户
    private static final String VipKey = "Vip";                 //普通VIP

    public static void main(String[] args) throws Exception {

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            //声明交换机,由于生产者已经创建该交换机,如果生产者先执行,这行实际可以省略不写
        String queue = channel.queueDeclare().getQueue();                //创建临时队列
        channel.queueBind(queue,EXCHANGE_NAME,VipKey);                   //绑定队列和交换机
        channel.queueBind(queue,EXCHANGE_NAME,NormalKey);                //绑定队列和交换机

        /*消费者成功消费回调逻辑*/
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Vip用户接收到的信息为:"+new String(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);     //手动消息应答
        };

        /*消费者取消消费回调逻辑*/
        CancelCallback cancelCallback = a->{
            System.out.println("Vip用户进行取消消费操作!");
        };

        channel.basicConsume(queue,false,deliverCallback,cancelCallback);

    }
}
/*Direct模式 消费者代码*/
public class NormalConsumer {

    private static final String EXCHANGE_NAME = "DirectExchange";          //交换机名称
    private static final String NormalKey = "Normal";           //普通用户

    public static void main(String[] args) throws Exception {

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            //声明交换机,由于生产者已经创建该交换机,如果生产者先执行,这行实际可以省略不写
        String queue = channel.queueDeclare().getQueue();                //创建临时队列
        channel.queueBind(queue,EXCHANGE_NAME,NormalKey);                //绑定队列和交换机

        /*消费者成功消费回调逻辑*/
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("普通用户接收到的信息为:"+new String(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);     //手动消息应答
        };

        /*消费者取消消费回调逻辑*/
        CancelCallback cancelCallback = a->{
            System.out.println("普通用户进行取消消费操作!");
        };

        channel.basicConsume(queue,false,deliverCallback,cancelCallback);

    }
}

topic

  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

fanout

  • Fanout:广播,将消息交给所有绑定到交换机的队列

3.1.4Routing 路由

image-20230513204705318

3.1.5 Topics 主题

image-20230514121209000

3.1.6RPC 远程调用模式(远程调用,不太算消息队列)

消息应答

类型 代码 说明

自动应答
basicConsume中 autoAck 设置为 true
如果应答后的代码出现异常导致回滚,该消息由于已被消费无法找回

手动应答
Channel.basicAck
确认处理消息

手动应答
Channel.basicNack
否认处理消息(支持批量)

手动应答
Channel.basicReject
否认处理消息(不支持批量)如果队列配置了死信交换机将会发送到死信队列中,未配置则进行丢弃操作

/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {

    System.out.println("消费者A对消息进行消费!");
    try {
        TimeUnit.SECONDS.sleep(1);  //模拟实际业务操作
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("消费者A接收到的信息为:"+new String(message.getBody()));

    /*
    * 参数一:  消息标记tag
    * 参数二:  是否批量消费消息(true为应答该队列中所有的消息,false为只应答接收到的消息)
    * */
    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);     //手动消息应答


};

/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {

   System.out.println("消费者A对消息进行消费!");
   try {
     	TimeUnit.SECONDS.sleep(1);  //模拟实际业务操作
   } catch (InterruptedException e) {
       	e.printStackTrace();
   }
   System.out.println("消费者A接收到的信息为:"+new String(message.getBody()));

   /*
    * 参数一:  消息标记tag
    * 参数二:  是否批量消费消息(true为应答该队列中所有的消息,false为只应答接收到的消息)
    * 参数三:  是否重回队列
    * */
    channel.basicNack(message.getEnvelope().getDeliveryTag(),false, false);  //拒绝消息应答方法1

   /*
    * 参数一:  消息标记tag
    * 参数二:  是否重回队列
    * */
    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);       //拒绝消息应答方法2

};

4.springboot开发

生产者

image-20230514191936045

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring.rabbitmq.host=192.168.226.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=springboot

image-20230514195025905

image-20230514201822811

另一种详细的:

/* RabbitMQ配置文件 */
@Configuration
public class RabbitConfig {


    @Autowired
    private RabbitProperties properties;

    /*RabbitMQ连接池,从配置文件读取参数*/
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(properties.getHost());
        cachingConnectionFactory.setPort(properties.getPort());
        cachingConnectionFactory.setUsername(properties.getUsername());
        cachingConnectionFactory.setPassword(properties.getPassword());
        cachingConnectionFactory.setVirtualHost(properties.getVirtualHost());
        return cachingConnectionFactory;
    }

    /* RabbitTemplate配置 */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); //让RabbitTemplate使用连接池
        return rabbitTemplate;
    }


    /*创建交换机*/
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("TemplateDirectEx",false,false);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("TemplateFanoutEx",false,false);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("TemplateTopicEx",false,false);
    }


    /*创建队列*/
    @Bean
    public Queue directQueue1(){
        return new Queue("directQueue1",true);
    }

    @Bean
    public Queue directQueue2(){
        return new Queue("directQueue2",true);
    }

    @Bean
    public Queue topicQueue1(){
        return QueueBuilder.durable("topicQueue1").build();
    }

    @Bean
    public Queue topicQueue2(){
        return QueueBuilder.durable("topicQueue2").build();
    }


    /*创建绑定关系方法一*/
    @Bean
    public Binding directBind1(){
        return new Binding("directQueue1", Binding.DestinationType.QUEUE,
                "TemplateDirectEx","WeiXin",null);
    }

    @Bean
    public Binding directBind2(){
        return BindingBuilder.bind(new Queue("directQueue2",false))
                .to(new DirectExchange("TemplateDirectEx"))
                .with("WeiXin");
    } 

    /*创建绑定关系方法二
    *   将Bean方法名称作为参数代入*/
    @Bean
    public Binding topicBind1(@Qualifier("topicQueue1") Queue queue,
                              @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("user.#");
    }

    @Bean
    public Binding topicBind2(@Qualifier("topicQueue2") Queue queue,
                              @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("vip.*");
    }


    /* 消息容器SimpleMessageListenerContainer 配置*/
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(CachingConnectionFactory cachingConnectionFactory){

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);        //设置连接池
        container.setQueues(topicQueue1(),topicQueue2(),directQueue1(),directQueue2());        //设置队列
        container.setConcurrentConsumers(1);                    //消费者数量
        container.setMaxConcurrentConsumers(10);                //最大消费者
        container.setDefaultRequeueRejected(false);             //是否设置重回队列,一般都为false,相当于 channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);     //消息应答方式,自动/手动/拒绝
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });     //消费端的标签策略,每个消费端都有独立的标签,可在控制台的 channel > consumer 中查看 对应tag


        /*  消息监听器方法一 实际用消息适配器
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("消费者的消息"+new String(message.getBody()  ));
            }
        });  */


        /*消息监听器方法二 使用消息适配器 方案一,通用适配模式
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumerMessage");    //自定义消息处理方法名称
        adapter.setMessageConverter(new MyMessageConverter());  //添加消息转换器
        container.setMessageListener(adapter);
        */


        /*消息监听器方法二 使用消息适配器 方案二,指定不同的队列使用不同的监听方法
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setMessageConverter(new MyMessageConverter());      //添加消息转换器
        adapter.setDefaultListenerMethod("consumerMessage");        //消息适配器默认监听方法名称
        Map<String,String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put("directQueue1","method1");
        queueOrTagToMethodName.put("directQueue2","method2");
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);  //队列标识与方法名称组成的集合
        container.setMessageListener(adapter);
        */


        /*使用默认的JSON格式转换器,消息需要使用Map进行接收
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumerMessage");        //消息适配器默认监听方法名称
        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jsonMessageConverter);
        container.setMessageListener(adapter);
        */


        /*使用默认的JSON格式转换器,消息转换为具体的Java对象,需要使用对象进行接收
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumerMessage");

        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        javaTypeMapper.addTrustedPackages("*");             //允许使用所有包进行转换,默认会使用 java核心类进行转换
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        */


        /*使用默认的JSON格式转换器,消息转换为具体的Java对象,需要使用对象进行接收,支持多映射
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumerMessage");

        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

        Map<String,Class<?>> idClassMap = new HashMap<>();  //创建Map进行多映射指定,KEY为名称,value为类全路径
        idClassMap.put("student",com.dmbjz.entity.Student.class);
        idClassMap.put("packaged",com.dmbjz.entity.Packaged.class);
        javaTypeMapper.setIdClassMapping(idClassMap);
        javaTypeMapper.addTrustedPackages("*");             //允许使用所有包进行转换,默认会使用 java核心类进行转换

        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        */


        /*多类型消息转换器,不同消息类型使用不同类型转换器进行转换*/
        MessageListenerAdapter adapter =new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("extComsumeMessage");

        ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();      //复杂消息转换器

        TextMessageConverter textConvert = new TextMessageConverter();  //文本转换器
        converter.addDelegate("text",textConvert);
        converter.addDelegate("html/text",textConvert);
        converter.addDelegate("xml/text",textConvert);
        converter.addDelegate("text/plain",textConvert);

        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();    //JSON转换器
        converter.addDelegate("json",jsonConverter);
        converter.addDelegate("application/json",jsonConverter);

        ImageMessageConverter imageConverter = new ImageMessageConverter();     //图片转换器
        converter.addDelegate("image/png",imageConverter);
        converter.addDelegate("image",imageConverter);

        PDFMessageConverter pdfConverter = new PDFMessageConverter();           //PDF转换器
        converter.addDelegate("application/pdf",pdfConverter);

        adapter.setMessageConverter(converter);
        container.setMessageListener(adapter);

        return container;

    }


}
@SpringBootTest
class RabbitMqBootRabbitTemplateApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*RabbitTemplate的API案例*/
    @Test
    public void testTemplate(){

      /*创建消息,可以指定消息具体参数*/
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.getHeaders().put("desc","请求头desc参数信息描述");
      messageProperties.getHeaders().put("type","请求头type参数信息描述");
      messageProperties.setContentType("application/json");       //发送格式
      messageProperties.setContentEncoding("UTF-8");              //UTF-8格式化

      /*封装消息
      * 参数一:消息内容
      * 参数二:消息配置
      */
      Message message = new Message("这是RabbitTemplate消息".getBytes(StandardCharsets.UTF_8),messageProperties);

      /* MessagePostProcessor:发送消息前的消息拦截器
       *    可以对消息参数进行修改,例如设置优先级、请求头等
       */
      rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
              System.out.println("拦截需要发送的消息并进行二次设置");
              message.getMessageProperties().getHeaders().put("desc","请求头desc参数信息修改");
              message.getMessageProperties().getHeaders().put("attr","请求头额外新加attr参数");
              return message;
            }
      });


      /*创建消息,使用链式调用*/
      Message message2 = MessageBuilder.withBody("这是Template消息2".getBytes(StandardCharsets.UTF_8))
              .setContentType(MessageProperties.CONTENT_TYPE_JSON)
              .setMessageId("消息ID:"+ UUID.randomUUID())
              .setContentEncoding("UTF-8")
              .setHeader("desc","额外修改的信息描述")
              .setHeader("info","请求头参数2")
              .build();

      rabbitTemplate.convertAndSend("TemplateTopicEx", "user.student", message2);


      /*最简单的调用方式*/
      //rabbitTemplate.convertAndSend("TemplateTopicEx", "vip.student", "我是最简单的消息!");
      rabbitTemplate.send("TemplateTopicEx", "user.teacher.aa", message2);


    }


    /*发送消息使用JSON格式转换器*/
    @Test
    public void testSendJsonMessage() throws JsonProcessingException {

        Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
        /*使用ObjectMapper将消息转换为JSON数据,换成FastJSON也可以*/
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(student);
        System.out.println("需要发送的消息内容:" + json);

        MessageProperties properties = new MessageProperties();
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        Message message = new Message(json.getBytes(StandardCharsets.UTF_8),properties);
        rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);

    }


    /*发送消息使用JSON格式转换器 转换为 Java实体类*/
    @Test
    public void testSendJavaMessage() throws JsonProcessingException {

        Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
        /*使用ObjectMapper将消息转换为JSON数据,换成FastJSON也可以*/
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(student);
        System.out.println("需要发送的消息内容:" + json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.getHeaders().put("__TypeId__","com.dmbjz.entity.Student");  //key为固定值,value为需要转换对象的全路径

        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);

    }


    /*发送消息使用JSON格式转换器 转换为 Java实体类 多映射*/
    @Test
    public void testSendJavaMessage2() throws JsonProcessingException {

        /*实体类一发送消息*/
        Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(student);
        System.out.println("需要发送的Student消息内容:" + json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.getHeaders().put("__TypeId__","student");  //key为固定值,value为多映射配置的实体类名称

        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);

        /*实体类二发送消息*/
        Packaged packaged = new Packaged().setIds("002").setPname("dmbjz").setPdesc("打包内容");
        String json2 = mapper.writeValueAsString(packaged);
        System.out.println("需要发送的Package消息内容:" + json2);
        messageProperties.getHeaders().put("__TypeId__","packaged");  //key为固定值,value为 idClassMap 的 Key
        Message message2 = new Message(json2.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("TemplateTopicEx","user.man",message2);

    }


    /*发送消息使用多类型消息转换器*/
    @Test
    public void testSendExtConverterMessage() throws Exception {

        /*发送图片文件*/
        byte[] body = Files.readAllBytes(Paths.get("E:/图片/头像", "ludashi.png"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/png");
        messageProperties.getHeaders().put("extName", "png");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message);


        /*发送PDF文件*/
        byte[] body2 = Files.readAllBytes(Paths.get("E:/", "rabbitmq.pdf"));
        MessageProperties messageProperties2 = new MessageProperties();
        messageProperties2.setContentType("application/pdf");
        Message message2 = new Message(body2, messageProperties2);
        rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message2);

    }

`
}

消费者

image-20230514203338389

image-20230514203731027

另一种详细的:

@Configuration
public class RabbitMQConfig {

    @Autowired
    private RabbitProperties properties;

    public static final String EXCHANGE_NAME = "CustomListener-Exchange";
    public static final String QUEUE_NAME = "CustomListener-Queue";
    public static final String Routing_Key = "CustomListener-Key";

    /*RabbitMQ连接池,从配置文件读取参数*/
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(properties.getHost());
        cachingConnectionFactory.setPort(properties.getPort());
        cachingConnectionFactory.setUsername(properties.getUsername());
        cachingConnectionFactory.setPassword(properties.getPassword());
        cachingConnectionFactory.setVirtualHost(properties.getVirtualHost());
        cachingConnectionFactory.setPublisherReturns(properties.isPublisherReturns());      //开启连接池的ReturnCallBack支持
        return cachingConnectionFactory;
    }




    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE_NAME,false,false,null);
    }


    @Bean
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }


    @Bean
    public Binding binding(@Qualifier("queue") Queue queue,
                           @Qualifier("directExchange") DirectExchange directExchange
    ){
        return BindingBuilder.bind(queue).to(directExchange).with(Routing_Key);
    }



    /* 设置自定义监听方法 */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
                                                                         OrderMessageService orderMessageService){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue());
        container.setExposeListenerChannel(true);       //是否将监听器交给 ChannelAwareMessageListener,低版本SpringBoot需要手动开启
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(orderMessageService);          //对队列使用自定义监听器方法进行处理
        return container;
    }



}
/*  自定义消息监听,可以编写公共消息应答方法 */
@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {


    public abstract void receviceMessage(Message message);

    /* 获取到的队列消息执行抽象方法,抽象方法实际落地就是 OrderMessageService 的 receviceMessage */
    @Override
    public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {

        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();

        log.info("收到消息{}: ", message);

        try{
            receviceMessage(message);
            channel.basicAck(deliveryTag , false);      //同意应答
        }catch (Exception e){
            log.error(e.getMessage(), e);
            /* 这里可以根据消息具体的参数判断是否应该拒绝重回队列 */
            if (message.getBody().length>100){
                channel.basicReject(deliveryTag, false);
            } else {
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }


}
/* 自定义消息监听器的消息处理方法,一般为具体的业务处理逻辑 */
@Slf4j
@Service
public class OrderMessageService extends AbstractMessageListener {

    @Override
    public void receviceMessage(Message message) {
        log.info("OrderMessageService获取到消息:{}", new String(message.getBody()));
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/525516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在SwissTargetsPrediction数据库中预测成分靶点

1.对筛选的多肽成分进行靶点预测&#xff1a; ①用Uniport中的蛋白进行一系列操作&#xff08;水解&#xff0c;挑选2~8短肽&#xff0c;活性预测&#xff0c;毒性&#xff0c;过敏性预测&#xff0c;胃肠吸收度&#xff0c;半衰期和苦味的预测、生物活性功能预测&#xff09;…

mybatis连接池源码分析

文章目录 前言一、PooledDataSourceFactory二、获取连接三、归还连接 前言 其实大部分连接池的代码都大同小异&#xff0c;总体获取连接&#xff0c;归还连接逻辑大都相同。希望通过阅读本文章&#xff0c;能给你带来帮助。 测试用例 public void testMybatis()throws Excepti…

深入篇【C++】类与对象:运算符重载详解

深入篇【C】类与对象&#xff1a;运算符重载详解 ⏰一.运算符重载&#x1f553;1.<运算符重载&#x1f550;2.>运算符重载&#x1f552;3.运算符重载&#x1f551;4.运算符重载①.格式1.改进12.改进2 ②.默认成员函数1.功能2.不足 &#x1f553;5.<运算符重载&#x1…

学内核之十九:Linux文件系统结构大蓝图

目录 一&#xff1a;参考资料 二&#xff1a;整理的原因及基本原则 三&#xff1a;Linux文件系统大蓝图 四&#xff1a;补充说明 一&#xff1a;参考资料 博主梳理的关于文件系统的基础知识&#xff1a; 7.5 文件系统_定义_龙赤子的博客-CSDN博客 博主转载的关于page cac…

深入理解深度学习——正则化(Regularization):参数范数惩罚

分类目录&#xff1a;《深入理解深度学习》总目录 正则化在深度学习的出现前就已经被使用了数十年。线性模型&#xff0c;如线性回归和逻辑回归可以使用简单、直接、有效的正则化策略。许多正则化方法通过对目标函数 J J J添加一个参数范数惩罚 Ω ( θ ) \Omega(\theta) Ω(θ…

三、Neo4j 源码研究系列 - 持久化

version: v-2023051401 author: 路__ 说到数据库&#xff0c;那么离不开的模块就是持久化&#xff08;Persistence&#xff09;&#xff0c;数据持久化是数据库不可缺少的重要组成模块之一。可以说一个数据库少了持久化功能&#xff0c;可以说这个数据库就不足以称为数据库。…

并查集:解密算法面试中的常客

文章目录 1. 并查集原理&#x1f351; 举例说明&#x1f351; 并查集的应用 2. 并查集实现&#x1f351; 接口总览&#x1f351; 构造函数&#x1f351; 查询操作&#x1f345; 代码实现 &#x1f351; 合并操作&#x1f345; 动图演示&#x1f345; 代码实现 &#x1f351; 判…

Linux文件打开函数open()

#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> int main(void) {int fd -1; /*这个整数用来存放文件描述符*/char filename[] "good.txt"; /*打开的文件名&#xff0c;是一个字符数组…

String类 [下]

目录 一、拷贝构造和赋值重载的传统写法和现代写法 0x01 拷贝构造的传统写法 0x02 拷贝构造的现代写法 0x03 赋值重载的传统写法 0x04 赋值重载的现代写法 0x05 总结 二、 增删改查之后的string 0x01 成员函数swap: 0x02 reserve&#xff1a;改变容量 0x03 push_back: 尾…

带你深入理解Java异常

&#x1f495;“人生就像一盘棋&#xff0c;有时候你慢一步&#xff0c;就输掉了一局。但只要你不停止思考和行动&#xff0c;就永远有机会翻盘。”&#x1f495; &#x1f43c;作者&#xff1a;不能再留遗憾了&#x1f43c; &#x1f386;专栏&#xff1a;Java学习&#x1f38…

《计算机网络——自顶向下方法》精炼——3.4.1-3.4.3

聪明出于勤奋,天才在于积累。——华罗庚 文章目录 对协议的进一步改进rdt2.1rdt2.2rdt3.0&#xff1a;含有比特差错和丢包的可靠数据传输协议 流水线协议回退n步&#xff08;GBN&#xff09; 对协议的进一步改进 rdt2.1 在上一篇文章中&#xff0c;我们讲到对于产生比特差错的…

A2-RIDE Long-tailed recognition by routing diverse distribution-aware experts

文章目录 0. Abstract1. Introduction2. Related Works3. RIDE&#xff1a;ROUTING DIVERSE DISTRIBUTION-AWARE EXPERTS4. Experiments5. Summary论文总结长尾数据分布 (Long-tailed Data Distribution)RIDE方法及模型1. **Multi-expert framework**2. **Routing diversified …

RabbitMQ如何保证顺序消费

目录标题 生产者有序的情况下如何保证顺序生产单个消费者多个消费者 生产者无序的情况下消息返回队列消息不返回队列 生产者有序的情况下 如何保证顺序生产 单一生产者&#xff1a;消息生产的顺序性仅支持单一生产者。 串行发送&#xff1a;如果生产者采用多线程并行发送&…

借助国内ChatGPT平替+markmap/Xmind飞速生成思维导图

系列文章目录 借助国内ChatGPT平替MindShow&#xff0c;飞速制作PPT 文章目录 系列文章目录前言一、科大讯飞“星火”认知大模型二、使用步骤1.借助讯飞星火生成思维导图的文案2.选择markmap绘制思维导图3.选择Xmind绘制思维导图 总结 前言 随着人工智能技术的不断发展&#x…

自动操作魔法师4.9.0.0

产品下载 (won-soft.com) 如下图所示&#xff1a; 彻底远离枯燥乏味的工作 在日常办公中&#xff0c;开发票&#xff0c;更新客户资料&#xff0c;打印报表&#xff0c;录入数据等等工作是极为重要&#xff0c;但大部分时候这些工作是相当枯燥的。你不得得一遍又一遍的进行重复…

第二章: Mybatis-Plus 快速入门

目录 1. 准备工作 数据库准备: 创建Maven 父模块 2. Mybatis 整合 Mybatis-Plus 创建子模块: 准备 log4j.properties 日志文件 3. Mybatis 原生写法实现查询User 编写mybatis-config.xml文件&#xff1a; 编写User实体对象&#xff1a;&#xff08;这里使用lombok进行了…

Hyper-V搭建免费桌面云

Hyper-V 是 Microsoft 的硬件虚拟化产品。 它用于创建并运行计算机的软件版本&#xff0c;称为“虚拟机”。 每个虚拟机都像一台完整的计算机一样运行操作系统和程序。 如果需要计算资源&#xff0c;虚拟机可提供更大的灵活性、帮助节省时间和金钱&#xff0c;并且与在物理硬件…

【AI面试】RoI Pooling 和 RoI Align 辨析

RoI Pooling和RoI Align是两种常用的目标检测中的RoI特征提取方法。它们的主要区别在于&#xff1a;如何将不同大小的RoI对齐到固定大小的特征图上&#xff0c;并在这个过程中保留更多的空间信息。 一、RoI Pooling RoI Pooling最早是在Fast R-CNN中提出的&#xff0c;它的基…

MySQL MHA

概述 什么是 MHA MHA&#xff08;Master High Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中&#xff0c;MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程中…

JAVA语言-比较器Comparator(java中Comparable和Comparator的区别)

文章目录 一、什么是Comparator二、Java compare方法和compareTo方法三、java中Comparable和Comparator的区别 Comparator的例子三、demo&#xff1a;java8使用Lambda表达式比较器Comparator给List对象排序&#xff0c;按时间、数字、字典排序 一、什么是Comparator Comparato…