RabbitMQ及各种模式

news2025/1/11 8:03:44

目录

一、MQ的基本概念

1.1 MQ概述

1.2 MQ的优势和劣势

1.3 MQ的优势

1.应用解耦

2.异步提速 

3.削峰填谷

1.4 MQ的劣势

小结

 1.5 常见的 MQ 产品

1.6 RabbitMQ 简介

1.7 JMS

小结

二、RabbitMQ管控台 

 三、Hello World简单模式

​编辑

1、生产者

​编辑 2、消费者

​编辑 四、Work queues 工作队列模式

1、生产者

2、消费者

启动两个消费者 

 启动生产者

 小结

五、Pub/Sub订阅模式

 1、生产者

2、消费者1

消费者2 

小结

六、Routing 路由模式

1、生产者 

2、消费者1,2

七、Topics 通配符模式 

1、生产者

2、消费者

小结

八、工作模式总结  

1. 简单模式 HelloWorld

2. 工作队列模式 Work Queue

3. 发布订阅模式 Publish/subscribe

4. 路由模式 Routing

5. 通配符模式 Topic

一、MQ的基本概念

1.1 MQ概述

MQ全称 M essage Q ueue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。MQ,消息队列,存储消息的中间件。发送方称为生产者,接收方称为消费者
分布式系统通信两种方式:
  • 直接远程调用
  • 借助第三方完成间接通信

1.2 MQ的优势和劣势

  • 优势:                                                            劣势:
    • 应用解耦                                               系统可用性降低
    • 异步提速                                               系统复杂度提高
    • 削峰填谷                                               ⚫ 一致性问题

1.3 MQ的优势

1.应用解耦

用户点击下单,进入订单系统,订单系统通过远程调用去调用库存系统、支付系统、物流系统。这样这四个系统就会耦合在一起,可能出现第一个问题:当库存系统出现异常,订单系统链路走不通也会出问题,用户可能得到下单失败这个反馈,整个系统的容错率低;第二个问题:在下订单的过程中要增加一个X系统,就要修改订单系统然后再访问X系统,如果又要加Y系统不要X系统了,那么又要修改订单系统,整个系统的可维护性比较低。

系统的耦合性越高,容错性就越低,可维护性就越低。

用户点击下单,进入订单系统。订单系统只需要发送一条消息到MQ就可以了,可以个用户发送下单成功。库存系统、支付系统、物流系统只需要从MQ里取出消息消费就可以了。对于问题一:库存系统出现异常后,订单系统没有异常,因为订单系统和其他三个系统是隔离的,没有任何影响。库存系统异常是暂时的,修复之后再去MQ里取出消息消费,最终是正常的。系统容错性提高;对于问题二:增加X系统,与订单系统无关,不需要修改订单系统,直接增加新系统然后再去MQ里取出消息消费就可以了

使用 MQ 使得应用间解耦,提升容错性和可维护性。

2.异步提速 

远程调用是个同步的方式,订单系统先调用库存返回后再调用支付返回后再调用物流,需要同步的去完成订单的整个链路的调用,没有问题后就会返回给用户下单成功。

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

用户下订单,订单系统保存到自己的数据库花费20ms,向MQ发送消息花费5ms,这时订单系统可以直接告诉用户下单成功了。后边的操作不管成功与否,取出消息消费即可,这就是异步的方式。

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

3.削峰填谷

A系统每秒最大处理1000请求,现在10点秒杀活动,请求瞬间增多,每秒5000个请求,A系统承载不了这么大的并发,宕机系统不可用,用户的体验就太差了。

可以使用MQ削峰

5000个请求对接MQ,5000请求MQ完全可以承载,小意思,A系统再慢慢的从MQ每秒拉去1000个请求完成消费,A系统的稳定性就提高了很多

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
小结:
  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

1.4 MQ的劣势

系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

小结

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
  • ① 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • ② 容许短暂的不一致性。
  • ③ 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

 1.5 常见的 MQ 产品

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等, 也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

1.6 RabbitMQ 简介

AMQP Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。 类比HTTP

生产者发布消息到exchange,exchange通过不同的规则,把消息路由到不同的队列去存储,consumer监听从队列中拿走对应的消息消费

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:

RabbitMQ 中的相关概念:
  • Broker接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connectionpublisher/consumer 和 broker 之间的 TCP 连接
  • Channel如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue消息最终被送到这里等待 consumer 取走
  • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式 简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ)。

RabbitMQ Tutorials — RabbitMQ

1.7 JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

小结

  • 1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  • 2. RabbitMQ提供了6种工作模式,这边讲解5种。这是重点。
  • 3. AMQP 是协议,类比HTTP。
  • 4. JMS 是 API 规范接口,类比 JDBC。

二、RabbitMQ管控台 

 结果:


 三、Hello World简单模式

在上图的模型中,有以下概念:
  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

需求:使用简单模式完成消息传递

步骤:     ① 创建工程(生成者、消费者)
                ② 分别添加依赖
                ③ 编写生产者发送消息
                ④ 编写消费者接收消息

添加依赖:rabbitmq客户端,编译版本插件

 <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

1、生产者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 发送消息---7.释放资源

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 发送消息
 * 
 */
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("43.143.246.208");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值 /
        factory.setUsername("root");//用户名 默认 guest
        factory.setPassword("root");//密码 默认值 guest
        factory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                * 一般设为false
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。配置一些怎么删的参数

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的空串 ""
            2. routingKey:路由名称,使用默认的交换机路由名称要和队列名称一致
            3. props:配置信息
            4. body:字节数组,真实发送的消息数据

         */

        String body = "hello rabbitmq~~~";

        //6. 发送消息
        channel.basicPublish("","hello_world",null,body.getBytes());


        //7.释放资源
       // channel.close();
       // connection.close();

    }

}

连接不关闭,不释放资源

 2、消费者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 接收消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("43.143.246.208");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("root");//用户名 默认 guest
        factory.setPassword("root");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);


        //关闭资源?不要

    }
}

 四、Work queues 工作队列模式

Work Queues: 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。比如队列里有1000条消息,C1只能处理500条消息,增加队友C2一起处理,理论上可以处理1000条消息了
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。

1、生产者

队列修改为work_queues
//修改为循环输出10条语句
for (int i = 1; i <= 10; i++) {
    String body = i+"hello rabbitmq~~~";

    //6. 发送消息
    channel.basicPublish("","work_queues",null,body.getBytes());
}

2、消费者

增加为两个消费者
Consumer_WorkQueues1
Consumer_WorkQueues2
队列修改为work_queues

启动两个消费者 

 启动生产者

消费者1 消费13579  ,消费者2 消费246810  ,两个是循环交替消费的

 小结

1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,
只需要有一个节点成功发送即可。

五、Pub/Sub订阅模式

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
                ➢ Fanout:广播,将消息交给所有绑定到交换机的队列
                ➢ Direct:定向,把消息交给符合指定routing key 的队列
                ➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
路由规则的队列,那么消息会丢失!

 1、生产者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建交换机---

6. 创建队列Queue---7. 绑定队列和交换机---8. 发送消息---9. 释放资源

public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("43.143.246.208");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值 /
        factory.setUsername("root");//用户名 默认 guest
        factory.setPassword("root");//密码 默认值 guest
        factory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配,用的比较少

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }

}

2、消费者1

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {


                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //2. 设置参数
                factory.setHost("43.143.246.208");//ip  默认值 localhost
                factory.setPort(5672); //端口  默认值 5672
                factory.setVirtualHost("/itcast");//虚拟机 默认值/
                factory.setUsername("root");//用户名 默认 guest
                factory.setPassword("root");//密码 默认值 guest
                //3. 创建连接 Connection
                Connection connection = factory.newConnection();
                //4. 创建Channel
                Channel channel = connection.createChannel();
                String queue1Name = "test_fanout_queue1";
                String queue2Name = "test_fanout_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
                // 接收消息
                Consumer consumer = new DefaultConsumer(channel){
                    /*
                        回调方法,当收到消息后,会自动执行该方法

                        1. consumerTag:标识
                        2. envelope:获取一些信息,交换机,路由key...
                        3. properties:配置信息
                        4. body:数据

                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("body:"+new String(body));//字节数组转成字符串
                        System.out.println("将日志信息打印到控制台.....");
                    }
                };
                channel.basicConsume(queue1Name,true,consumer);


                //消费者关闭资源?不要!

            }
}

消费者2 

public class Consumer_PubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {


                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //2. 设置参数
                factory.setHost("43.143.246.208");//ip  默认值 localhost
                factory.setPort(5672); //端口  默认值 5672
                factory.setVirtualHost("/itcast");//虚拟机 默认值/
                factory.setUsername("root");//用户名 默认 guest
                factory.setPassword("root");//密码 默认值 guest
                //3. 创建连接 Connection
                Connection connection = factory.newConnection();
                //4. 创建Channel
                Channel channel = connection.createChannel();
                String queue1Name = "test_fanout_queue1";
                String queue2Name = "test_fanout_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
                // 接收消息
                Consumer consumer = new DefaultConsumer(channel){
                    /*
                        回调方法,当收到消息后,会自动执行该方法

                        1. consumerTag:标识
                        2. envelope:获取一些信息,交换机,路由key...
                        3. properties:配置信息
                        4. body:数据

                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("body:"+new String(body));//字节数组转成字符串
                        System.out.println("将日志信息保存到数据库.....");
                    }
                };
                channel.basicConsume(queue2Name,true,consumer);


                //消费者关闭资源?不要!

            }
}

小结

1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到,可以进行不同的处理
2. 发布订阅模式与工作队列模式的区别:
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

六、Routing 路由模式

 

模式说明:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

1、生产者 

修改内容:exchangeName = "test_direct"
BuiltinExchangeType.DIRECT
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//8. 发送消息
channel.basicPublish(exchangeName,"info",null,body.getBytes());//2收到
//channel.basicPublish(exchangeName,"error",null,body.getBytes());//1,2都收到
//channel.basicPublish(exchangeName,"warning",null,body.getBytes());//2收到

2、消费者1,2

修改内容
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";

 

 Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

七、Topics 通配符模式 

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用 通配符
  •  Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert 而 item.* 只能匹配 item.insert
图解:
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

1、生产者

修改内容:
exchangeName = "test_topic" 
BuiltinExchangeType.TOPIC 
String queue1Name = "test_topic_queue1"; 
String queue2Name = "test_topic_queue2";
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"*.orange.*");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"*.*.rabbite");
channel.queueBind(queue2Name,exchangeName,"lazy.#");
//8. 发送消息
channel.basicPublish(exchangeName,"lazy.orange.ra",null,body.getBytes());//1,2都有
//channel.basicPublish(exchangeName,"lazy.orange",null,body.getBytes());//1,2都没有

2、消费者

 修改内容:

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

 

 

小结

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

八、工作模式总结  

1. 简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

2. 工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

3. 发布订阅模式 Publish/subscribe

需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

4. 路由模式 Routing

需要设置类型为 direct 的交换机交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

5. 通配符模式 Topic

需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1013802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PV操作-同步与互斥

浅记学习PV操作的部分题目。 消费者与生产者 单生产者与单消费者 理解PV操作可以从消费者与生产者之间的关系入手。 一个生产者与消费者的情况 消费者想要获取一份商品&#xff0c;需要检查市场中该商品是否有余量&#xff1a; 如果剩余商品足够&#xff0c;则获取该商品。如…

Vue.js和TypeScript:如何完美结合

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

msvcp120.dll怎么修复?msvcp120.dll丢失的解决方法

在当今这个信息化的时代&#xff0c;电脑已经成为我们生活和工作中不可或缺的一部分。然而&#xff0c;随着电脑技术的不断发展&#xff0c;我们也会遇到各种各样的问题。其中&#xff0c;msvcp120.dll丢失是一个常见的问题。一、msvcp120.dll 文件介绍 1 msvcp120.dll 文件的定…

windows7远程连接linux可视化界面——vnc使用教程(华为云服务器实测通过)

** CentOS系统 ** 首先&#xff0c;我们将安装VNC服务系统所需的库文件&#xff0c;这里我们将使用系统自带的yum包管理器进行安装。 yum install xorg-x11-xauth xterm libXi libXp libXtst libXtst-devel libXext libXext-devel -y接下来&#xff0c;我们将安装xvfb服务&…

采用Linux 6.5 内核的Manjaro Linux 23.0正式上线

据了解&#xff0c;当前Manjaro Linux 23.0已正式发布&#xff0c;代号“Uranos”&#xff0c;该版本采用的是Linux 6.5 内核。 依据相关资料可知&#xff0c;Manjaro Linux是一款“快速、用户友好、面向桌面、基于 Arch Linux”的发行版&#xff0c;并且还拥有诸多的显著特征…

python爬虫大作业爬取豆豆影评

python爬虫大作业爬取豆豆影评 一、系统介绍二、效果展示三、其他系统实现四、获取源码 一、系统介绍 1)数据描述 数据来源&#xff1a;豆豆最受欢迎的影评 数据获取&#xff1a;豆豆最受欢迎的影评并将获取的这些信息&#xff08;评论链接、电影名、电影详细地址、评论标题以…

基于YOLOv8模型的深海鱼目标检测系统(PyTorch+Pyside6+YOLOv8模型)

摘要&#xff1a;基于YOLOv8模型和BDD数据集的自动驾驶目标检测系统可用于日常生活与海洋中检测与定位深海鱼目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的目标检测&#xff0c;另外本系统还支持图片、视频等格式的结果可视化与结果导出。本系统采用YOLOv…

高德地图实现-微信小程序地图导航

效果图&#xff1a; 一、准备阶段 1、在高德开放平台注册成为开发者2、申请开发者密钥&#xff08;key&#xff09;。3、下载并解压高德地图微信小程序SDK 高德开放平台&#xff1a; 注册账号(https://lbs.amap.com/)) 申请小程序应用的 key 应用管理(https://console.ama…

【unocss】apply聚合语法,unocss配置

前言 最近在使用unocss时&#xff0c;我感觉原子化CSS把这些类名堆在一个标签里&#xff0c;实在谈不上精致美观&#xff0c;那我们有没有办法将这些样式类名搬到style里呢&#xff1f;有的&#xff0c;unocss、tailwindCSS都给出了一种语法 #apply 操作方法 这个不可以直接…

狮子鱼社区团购小程序v18.1独立全开源版+小程序前端

狮子鱼社区团购商城系统小程序V18.1独立开源版&#xff0c;该系统本身就非常完善也没更新的必要&#xff0c;此系统拿来即用非常方便&#xff0c;同一版一样人类小徐特别优化很多细节首页美化了下&#xff0c;如小程序端授权窗口美化了下&#xff0c;该版本用户授权接口正常。功…

数据分享|R语言逻辑回归、线性判别分析LDA、GAM、MARS、KNN、QDA、决策树、随机森林、SVM分类葡萄酒交叉验证ROC...

全文链接:http://tecdat.cn/?p27384 在本文中&#xff0c;数据包含有关葡萄牙“Vinho Verde”葡萄酒的信息&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 介绍 该数据集&#xff08;查看文末了解数据获取方式&#xff09;有1599个观测值和12个变量&#xf…

ubuntu中如何用docker下载华为opengauss数据库(超简单)

ubuntu中如何下载华为opengauss数据库 前言一、安装docker1.方法一&#xff1a;2.方法二 二、拉取openguass镜像三、创建容器四、连接数据库 ,切换到omm用户 &#xff0c;用gsql连接到数据库五.最后用DateGrip远程连接测试(1&#xff09;选择数据源(2&#xff09;查看虚拟机ip地…

ITIL 4指导、计划和改进—评估和计划

第3章 评估和计划 当规划改进或其他倡议时&#xff0c;了解当前状态至关重要。这使组织能够&#xff1a; ● 比较当前状态与期望的未来状态&#xff1b; ● 找出两个状态之间的差距&#xff1b; ● 开发符合逻辑的计划以弥补这些差距。 3.1 评估的基础 评估用于测量、分析…

Slim-neck by GSConv:自动驾驶车辆检测器架构的更好设计范式(文末附代码)

Slim-neck by GSConv:自动驾驶车辆检测器架构的更好设计范式 摘要引言相关工作本文方法GSConv的优势在于轻量级检测器&#xff0c;这些检测器通过添加DSC层和Shuffle来增加非线形表达能力。但是&#xff0c;如果GSConv在模型的所有阶段都使用&#xff0c;模型的网络层会变得更深…

Django系列:Django开发环境配置与第一个Django项目

Django系列 Django开发环境配置与第一个Django项目 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_28550263/article/details/1328…

三维模型3DTile格式轻量化压缩处理的数据质量提升方法分析

三维模型3DTile格式轻量化压缩处理的数据质量提升方法分析 在处理三维模型3DTile格式的轻量化压缩时&#xff0c;如何在减少数据量的同时&#xff0c;保证或提升数据质量是一大挑战。以下为一些提升数据质量的方法分析&#xff1a; 改进几何简化算法&#xff1a;在进行几何简化…

精品SpringCloud的B2C模式在线学习网微服务分布式

《[含文档PPT源码等]精品基于SpringCloud实现的B2C模式在线学习网站-微服务-分布式》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程等 软件开发环境及开发工具&#xff1a; 开发语言&#xff1a;Java 框架&#xff1a;springcloud JDK版本&#xf…

基于matlab中点放炮各类地震波时距曲线程序

完整程序&#xff1a; clear all dx50;x-500:dx:500;%炮检距 h100;V11500; theta25*pi/180; V2V1/sin(theta); t1sqrt(x.*x4*h*h)/V1;%反射波时距曲线 t2abs(x)./V1;%直达波时距曲线 %折射波时距曲线 xm2*h*tan(theta);%求盲区 k1; for i1:length(x) if x(i)<-xm …

Python提取JSON数据中的键值对并保存为.csv文件

本文介绍基于Python&#xff0c;读取JSON文件数据&#xff0c;并将JSON文件中指定的键值对数据转换为.csv格式文件的方法。 在之前的文章Python提取JSON文件中的指定数据并保存在CSV或Excel表格文件内&#xff08;https://blog.csdn.net/zhebushibiaoshifu/article/details/132…

Mac电脑安装Zulu Open JDK 8 使用 spring-kafka 消费不到Kafka Partition中的消息

一、现象描述 使用Mac电脑本地启动spring-kakfa消费不到Kafka的消息&#xff0c;监控消费组的消息偏移量发现存在Lag的消息&#xff0c;但是本地客户端就是拉取不到&#xff0c;通过部署到公司k8s容器上消息却能正常消费&#xff01; 本地启动的服务消费组监控 公司k8s容器服…