RabbitMQ快速入门

news2025/3/3 18:04:47

中间件&消息队列

中间件概述

中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

说白了就是平台+通信,我们开发出来的应用程序遵循某些协议或者规范,去和底层操作系统打交道。而中间件的一大特征就是跨平台。屏蔽底层操作系统的复杂性,对于上层,屏蔽

中间件应用

举例:
1,RMI(Remote Method Invocations, 远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)
11,Component Life Cycle(组件的生命周期管理)
12,Resource pooling(资源池)
13,Security(安全)
14,Caching(缓存)

MQ概述

MQ全称 Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。也是中间件的一种。

分布式系统在通信过程中,一般分为两种:

  • 直接远程调用
    在这里插入图片描述

  • 引入中间件间接调用
    在这里插入图片描述

小结

  • MQ,消息队列,存储消息的中间件
  • 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
  • 发送方称为生产者,接收方称为消费者

常见的MQ产品

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,
也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑
在这里插入图片描述

MQ的优势&劣势

在这里插入图片描述

MQ的优势

应用解耦

系统开发的时候应该遵循高内聚低耦合的守则,尤其是分布式的系统,如果不是高内聚低耦合,整个系统杂糅在一起将会非常麻烦。

  • 直接系统调用
    在这里插入图片描述
  • 引入MQ实现间接调用
    在这里插入图片描述

异步提速

  • 同步调用(不引入MQ)
    在这里插入图片描述

  • 异步调用(引入MQ)
    不需要管系统的处理结果,这些消息消费的结果都交给各大系统,订单是不对其负责的,都交给MQ执行。这种设计方式从用户的角度来看,就是很短的时间就给用户交付了,感觉就很好。
    在这里插入图片描述

削峰填谷

  • 直接调用
    在这里插入图片描述
  • 间接调用

MQ承载5k请求轻轻松松,一点一点给A系统放出流量处理
在这里插入图片描述
在这里插入图片描述

小结

  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

MQ的劣势

在这里插入图片描述

小结

在这里插入图片描述

RabbitMQ背景

AMQP协议简介

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
在这里插入图片描述
RabbitMQ就是基于AMQP协议所构建出来的中间件

RabbitMQ 简介

基础架构

2007年,Rabbit 技术公司基于 AMQP 标准开发的RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:

在这里插入图片描述
对上图的解释:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。可以反复利用提高系统效率。
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。中间件内部类似交换机的存在,接收生产者发布来的消息,并进行分发。将其分发到对应的Queue中,具体是怎么分发的,要取决于Binding。
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存
    到 exchange 中的查询表中,用于 message 的分发依据

工作模式

  • 简单模式
  • Work queues、
  • Publish/Subscribe 发布与订阅模式
  • Routing路由模式
  • Topics 主题模式
  • RPC 远程调用模式(远程调用,不太算MQ;暂不作介绍)

补充一个Java原生的中间件 JMS
在这里插入图片描述

小结

  1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  2. RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
  3. AMQP 是协议,类比HTTP。
  4. JMS 是 API 规范接口,类比 JDBC。

RabbitMQ快速上手

RabbitMQ的安装详见官网RabbitMQ官网
安装文档:资料/软件/安装 RabbitMQ.md
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

入门程序

使用简单模式完成消息传递

在这里插入图片描述
架构
在这里插入图片描述

创建工程

创建初始工程
在这里插入图片描述

分别添加依赖

引入依赖

    <dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>

编写生产者发送消息

向MQ中发送消息主要有以下的步骤

创建连接工厂
设置参数
创建连接 Connection
创建Channel
创建队列Queue
发送消息

public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称
            3. props:配置信息
            4. body:发送消息数据

         */
        //要放入队列的信息
        String body = "hello rabbitmq~~~";

        //6. 发送消息
        channel.basicPublish("","hello_world",null,body.getBytes());


        //7.释放资源
        channel.close();
        connection.close();

    }

点一下运行,这个时候看管理的页面
在这里插入图片描述
在这里插入图片描述

编写消费者接收消息

消费者和生产者的代码还是很类似的,配置的内容基本一样,只是后面的部分改用了监听的程序

public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue,上面配置内容基本一致
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:接收后是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                匿名内部类的方式进行方法重写
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);


        //关闭资源?不要,因为是监听程序
    }

启动程序之后,监听程序就可以正常从队列中把消息拿出来了
在这里插入图片描述


小结:
在这里插入图片描述

RabbitMQ的工作模式

Work queues 工作队列模式(轮询减压)

架构图,实际上就是从原来的单一消费者模式,变成了多个消费者的模式,现在MQ要服务多个消费者了,但是某一个瞬间,C1和C2只有一个消费者取到消息。换句话说,这个时候C1和C2是竞争关系
在这里插入图片描述

Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。

代码实现

所有的代码都是基于上面的例子修改的

  • 生产者
    在这里插入图片描述

  • 消费者1、消费者2
    代码同理和上面的快速入门内容差不多,但是这里的改的内容是队列的名称,要不然找不到具体的被消费队列。
    注意,如果队列中的消息被消费了,那么这个消息就不存在了。也就是说:C1如果先C2一步把所有的消息都给消费了,那么C2也就没得消费了。
    在这里插入图片描述
    在这里插入图片描述
    启动好就可以等待生产者生产好消息放入MQ了

总结

启动生产者,这个时候就可以等待消费者消费了。
消费者自动启动,C1和C2对MQ进行同步监听,一旦新的消息上来了,两个消费者将轮询着对其进行消费。

在这里插入图片描述

  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系,如果我想每个消费者都能消费到,那么要看Pub/Sub 订阅模式
  • Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,
    只需要有一个节点成功发送即可。

Pub/Sub 订阅模式

保证多个消费者都能有消息来拿,而不是竞争的关系。
在这里插入图片描述
注意要在创建交换机时把类型更改为FANOUT

代码实现

思路:用生产者来生产一条日志消息,希望可以让两个消费者都能接收到,并且以不同的方式对消息进行处理。C1负责把消息打印,C2把数据存入数据库。每个消费者只监听自己的队列

创建连接工厂
设置参数 (如何连接MQ)
创建连接 Connection
创建Channel
创建交换机
创建队列Queue
绑定队列和交换机
发送消息
最后释放资源

  • 生产者

相比于上面的WorkQueue,多了创建交换机为每个消费者都创建队列绑定队列和交换机的内容,其余的部分基本一致。

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*
       解读一下创建交换机的参数
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。一般都是这个。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
        //因为这里生产者和交换机进行通信,所以要创建一个交换机,按照参数规定交换机名称,交换机类型等等
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        //为每个队列都创建相应的队列Queue
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");


        //要放入队列的信息
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";

        //发送消息,这里发布消息是通过交换机分发到不同队列中
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称
            3. props:配置信息
            4. body:发送消息数据
         */
        channel.basicPublish(exchangeName,"",null,body.getBytes());


        //释放资源
        channel.close();
        connection.close();

运行,可以发现新的队列已经上来了
在这里插入图片描述

  • 消费者
    消费者基本上和WorkQueue基本一致,只需要绑定好监听队列即可,等待交换机分发消息到MQ中。绑定好队列的名称就可以消费了。
    以消费者C1为例
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue,上面配置内容基本一致
		//队列名称
        String queue2Name = "test_fanout_queue2";
        //如果没有一个名字叫test_fanout_queue2的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                匿名内部类的方式进行方法重写
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //不需要的暂时注释
                /*System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("消费者2接收消息,输出内容存储到数据库");
            }
        };
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:接收后是否自动确认
            3. callback:回调对象
         */
        channel.basicConsume(queue2Name,true,consumer);
        //关闭资源?不要,因为是监听程序
    }

两个队列都从队列中拿到了消息。
在这里插入图片描述

总结

1、 交换机需要与队列进行绑定,绑定之后:一个消息可以被多个消费者都收到。
2、发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

Routing 路由模式

刚刚例子的消息,是无差别发送。但是应该是,不同的消息应该转发到该去的队列。或者说,同一条信息,由不同的人收到之后,来去做不同的事情。
在这里插入图片描述
注意要在创建交换机时把类型更改为DIRECT

代码实现

队列1只绑定 error 的RoutingKey 队列2绑定 error、warning、info
的RoutingKey,多绑定几个即可完成一个队列多个RoutingKey
综上,队列1处理error的消息,队列2处理error、warning、info的消息

  • 生产者
配置信息不写了,太多了
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //创建交换机以及队列,并绑定
        String exchangeName = "routingExchange";
        //5. 创建交换机,指定好交换类型为DIRECT
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列,一共两个队列,分别对应不同的消费者
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7. 绑定队列和交换机,根据RoutingKey绑定不同的交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1只绑定 error
        channel.queueBind(queue1Name, exchangeName, "error");
        //队列2绑定 error、warning、info,多绑定几个即可
        channel.queueBind(queue2Name, exchangeName, "error");
        channel.queueBind(queue2Name, exchangeName, "warning");
        channel.queueBind(queue2Name, exchangeName, "info");
        //综上,队列1处理error的消息,队列2处理error、warning、info的消息

        String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
        //8. 发送消息,规定RoutingKey为warning,所以只有队列2可以被路由分发该消息
        channel.basicPublish(exchangeName,"warning",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    }

在这里插入图片描述
在这里插入图片描述

  • 消费者

消费者一共有两个,队列1处理error的消息,队列2处理error、warning、info的消息
也就是说,如果我发了一个warning的消息,和队列1的RoutingKey匹配不到,所以消费者1就没有消息可以消费了。
消费者代码基本上和WorkQueue一样,这里就不赘述了,因为RoutingKey的分发是靠交换机,而不是消费者。所以消费者只需要去关心怎么处理MQ来的消息就可以了。
启动两个消费者监听
再启动生产者,此时发现,只有符合warning条件的queue2拿到了消息,而queue1是没有消息的。
在这里插入图片描述

总结

Routing 模式要求队列在绑定交换机时要指定 routing key,交换机会把消息转发到符合 routing key 的队列,消费者对队列进行监听,有新消息就把自己队列的消息拿出来消费。

Topics 通配符模式

说白了就是带了通配符,进行RoutingKey的匹配
一般的通配符有俩用.来进行分割

  • #匹配一个或者多个次
  • *匹配一个

在这里插入图片描述
注意要在创建交换机时把类型更改为TOPIC

代码实现

  • 生产者

实际上总体思路和上面的Routing是差不多的。只不过在绑定队列和交换机的时候,用到的RoutingKey被改为了通配符版本。除此以外,再改一下路由器类型为TOPIC即可
更改后的通配符绑定规则为:

        //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
		//队列1,只收error结尾和order开头后面跟一个单词的
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
		//队列2,全部都要
        channel.queueBind(queue2Name,exchangeName,"*.*");

在这里插入图片描述
在这里插入图片描述

  • 消费者

保证队列名字和生产者创建的队列名字一致,能找到该找的队列就可以了。其余的消费部分都差不多。基本上RoutingKey的分发全靠交换机,只是消费者单纯的消费。

路由分发规则:

//队列1
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
//队列2
channel.queueBind(queue2Name,exchangeName,"*.*");

先测试第一个RoutingKey:"a.a"
只符合队列2的规则,路由会把消息分发给对应的队列2,所以绑定队列2的消费者也就可以直接拿到消息
在这里插入图片描述
再测试第一个RoutingKey:"a.error"
这个消息两个队列的匹配规则都符合,因此两个队列都能接收到消息。
在这里插入图片描述

总结

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key
的时候可以使用通配符,显得更加灵活。

总结工作模式

  1. 简单模式 HelloWorld
    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
  2. 工作队列模式 Work Queue一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
  3. 发布订阅模式 Publish/subscribe需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
  4. 路由模式 Routing
    需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
  5. 通配符模式 Topic
    需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

Spring整合RabbitMQ

代码实现

配置文件

一般来说,先引入相关的POM文件已经对应的目标。这里不做过多演示,就是Spring和MQ的依赖
在这里插入图片描述

  • properties文件,主要为配置MQ的连接,和之前的配置内容基本没差别
    在这里插入图片描述
  • xml文件内部主要配置的是交换机、队列等等内容,根据消费者和生产者的角色不同,其内容也不相同,这个在后面再详解。

生产者

  • xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory,读取properties的内容 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
    默认交换机类型为direct,名字为:"",路由键为队列的名称
    -->
    <!--
        id:bean的名称
        name:queue的名称
        auto-declare:自动创建
        auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
        exclusive:是否独占
        durable:是否持久化
    -->

    <rabbit:queue id="spring_queue" name="spring_queue"  auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播(Pub/Sub 订阅模式);所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding  queue="spring_fanout_queue_1"  />
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--<rabbit:direct-exchange name="aa" >
        <rabbit:bindings>
            &lt;!&ndash;direct 类型的交换机绑定队列  key :路由key  queue:队列名称&ndash;&gt;
            <rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>-->

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符(Topic模式);*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star"  auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建(auto-declare)-->
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <rabbit:topic-exchange id="spring_topic_exchange"  name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="heima.*"  queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

在代码中主要是注入一个RabbitTemplate,通过这个RabbitTemplate来进行消息的发送,主要是convertAndSend这个API,该API下重载有许多方法,通过不同的参数。实现不同队列,不同工作模式的收发。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    //1.注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 普通的轮询RoutingKey模式
     */
    @Test
    public void testHelloWorld(){
        //2.发送消息

        rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
    }


    /**
     * 发送fanout消息,也就是无差别发送,每个消费者都有消息,单一交换机保证信息发送
     */
    @Test
    public void testFanout(){
        //2.发送消息

        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
    }


    /**
     * 发送topic消息,也就是RoutingKey改为通配符的形式
     */
    @Test
    public void testTopics(){
        //2.发送消息

        rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic....");
    }
}

消费者

主要是实现一个监听器MessageListener接口,实现消息,来完成对于队列的监听。具体监听哪个队列在XML队列里配置即可

  • xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--配置对于不同队列的监听类,自己配置的监听类-->
    <bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
    <!--这些还没写,所以注释掉了
    <bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
    <bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
    <bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
    <bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
    <bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>
    -->
    <!--配置具体类来对应监听哪一个队列-->
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
       <!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
        <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
        <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
        <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
    </rabbit:listener-container>
</beans>

监听类:
这个监听类每次只运行一次,但是对于队列的监听肯定是要求持续监听的,所以我们这里再去写一个测试的循环,保证这个监听类能一直检测,保持监听状态。

public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}

Junit测试里面写的循环类,持续循环

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        boolean flag = true;
        while (true){

        }
    }
}

运行测试
在这里插入图片描述

SpringBoot整合RabbitMQ

主要是引入MQ的starter即可

生产者

生产端基本步骤如下:

  1. 创建生产者SpringBoot工程
  2. 引入start,依赖坐标
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 编写yml配置,基本信息配置
  2. 定义交换机,队列以及绑定关系的配置类
  3. 注入RabbitTemplate,调用方法,完成消息发送
  • 创建SpringBoot工程,引入RabbitMQ的坐标
    在这里插入图片描述
  • 编写yml配置信息
# 配置RabbitMQ的基本信息  ip 端口 username  password..
spring:
  rabbitmq:
    host: localhost # ip
    port: 5672
    username: guest
    password: guest
    virtual-host: /

在这里插入图片描述

  • 在配置类内部定义交换机,队列以及绑定关系的配置类
@Configuration
public class RabbitMQConfig {

    //常量定义 交换机 和 队列 的名字
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    /**
     * 定义交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange(){
        /*
        通过调用ExchangeBuilder的方法来配置交换机
        分别为:ExchangeBuilder.topic模式的交换机(交换机名字).是否持久化(是).构建()
        当然,这个语句只是一小部分,其他的配置内容需要自己调用
         */
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 定义队列
     */
    @Bean("bootQueue")
    public Queue bootQueue(){
        /*
        通过调用QueueBuilder的方法来配置队列
        分别为:QueueBuilder.durable持久化队列(队列名字)..构建()
        当然,这个语句只是一小部分,其他的配置内容需要自己调用
         */
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     队列和交互机绑定关系 Binding
        这里参数列表(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange")
        是因为这里以后会绑定很多交换机和队列,所以一定要用@Qualifier来区分一下
            1. 知道哪个队列
            2. 知道哪个交换机
            3. routing key
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
        //这里通过调用绑定器BindingBuilder来绑定队列和交换机、并且因为交换机是topic模式,要配好通配符
        //BindingBuilder.绑定(参数列表的队列).和哪个交换机绑定(参数列表的交换机).绑定规则(RoutingKey通配符).没有参数()
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

}

  • 注入RabbitTemplate,调用方法,完成消息发送
    还是之前的那个API,和Spring的写法基本一样,写一个测试类测试一下,将消息发送到队列中。
@SpringBootTest
class RabbitMqProducerApplicationTests {

	//注入 RabbitTemplate
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Test
	public void sendMessage(){
		//发送消息,RoutingKey格式为:boot.#
		//convertAndSend(要发送的交换机名称,RoutingKey,消息)
		rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.abc","boot mq hello~~~");
	}
}

运行一下,将服务主启动类启动,再启动测试类
看一下MQ的管理页面,新的队列和交换机已经就位了
在这里插入图片描述
在这里插入图片描述
现在等待消费者配置好消费就可以了

消费者

消费端基本步骤如下:

  1. 创建消费者SpringBoot工程
  2. 引入start,依赖坐标
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 编写yml配置,基本信息配置
  2. 定义监听类,使用@RabbitListener注解完成队列监听。
  • 新建SpringBoot工程,引入RabbitMQ依赖就可以了,这里不赘述了
  • 编写yml根据实际情况对各项配置修改即可这里也不赘述了
  • 创建一个MQ监听类,主要是通过注解@RabbitListener(queues = "队列名称") 来确定具体对于某个队列的监听。然后和方法里的参数进行绑定即可。方法内部通过Message 对象 进行操作,即可拿到消息。
@Component
public class RabbimtMQListener {
    //绑定队列名称,监听boot_queue的队列
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        //System.out.println(message);
        System.out.println(new String(message.getBody()));
    }
}

总结

  • SpringBoot提供了快速整合RabbitMQ的方式
  • 基本信息再yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
  • 生产端直接注入RabbitTemplate完成消息发送
  • 消费端直接使用@RabbitListener完成消息接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/38650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

你不能错过的【Python爬虫】测试3(爬取所有内容 + 完整源代码 + 架构 + 结果)

目录 一、主要工具包 以及 版本二、架构展示三、各部分code3.1 yjs.py (重要)3.2 items.py3.3 middlewares.py3.4 pipelines.py3.5 settings.py3.6 start.py四、结果展示一、主要工具包 以及 版本 scrapy:2.7.1版本(这里主要用到的工具包) 二、架构展示 三、各部分code 3…

MySQL体系-日志与MVCC(源码层面)

MySQL 本身具备生产binlog日志的功能&#xff0c;在InnoDB存储引擎中&#xff0c;为了持久性有了redo log,为了原子性和隔离性有了undo log&#xff0c;最终通过redo log undo log 保证了一致性&#xff1b; 我先画一个InnoDB操作流程&#xff0c;先简单的了解下它们的工作机制…

基于S2SH的保险业务管理系统【数据库设计、源码、开题报告】

数据库脚本下载地址&#xff1a; https://download.csdn.net/download/itrjxxs_com/86467452 主要使用技术 SpringStruts2HibernateJSPJSCSSMysql 功能介绍 本系统旨在为当今的保险行业提供一套综合性的管理系统业务&#xff0c;系统的主要用户为保险的购买者以及系统的管理…

10.实用调试技巧

一、调试 1.调试的定义 调试&#xff08;英语&#xff1a;Debugging / Debug&#xff09;&#xff0c;又称除错&#xff0c;是发现和减少计算机程序或电子仪器设备中程序 错误的一个过程。 2.调试的基本步骤 发现程序错误的存在 以隔离、消除等方式对错误进行定位 确定错误产…

Linux系统编程(三)——Linux下的进程

第一篇中总结了系统的环境搭建&#xff0c;第二篇中学习了系统的一些IO函数&#xff0c;接下来就深入到了Linux下的进程线程的实现。 目录 0x01 进程概述 一、进程的信息 二、程序与进程 三、并行与并发 四、进程控制块PCB 0x02 进程状态转换 一、进程的状态 二、进程相…

【C++】哈希-bitset位图与模拟

目录 1.位图 1.1什么是位图 1.2位图的作用 2.bitset应用 2.1bitset构造 2.2bitset成员函数与使用 3.bitset模拟实现 构造函数 set reset test flip count size none,any 1.位图 在前文中我们介绍了哈希的一些内容&#xff0c;接下来我们介绍一个新奇的玩意&am…

回归问题原理

回归问题是一种常见的监督机器学习任务&#xff0c;在很多领域均有广泛应用。其典型应用包括销量预测、库存预测、股票价格预测、天气预测等。本问将讨论线性回归&#xff0c;包括线性回归模型的目标函数&#xff08;损失函数和正则函数&#xff09;、线性回归模型的优化求解、…

【一包通刷】晶晨S905L3A/B_完美AI语音线刷包_打开ADB_ROOT权限

【9.0一包通刷】晶晨S905L3A/B_完美AI语音线刷包_默认打开ADB ROOT权限支持游戏启动_万物互联启动动画 适用型号&#xff1a;M401A、CM311-1a、CM311-1sa、B863AV3.1-M2、B863AV3.2-M、UNT403A、M411A等等&#xff1b; 系统版本&#xff1a;Android9 系统桌面&#xff1a;超…

【机器学习项目实战10例】(五):基于随机森林的假新闻检测项目

💥 项目专栏:【机器学习项目实战10例】 文章目录 一、基于随机森林的假新闻检测项目二、数据集介绍三、导包四、加载数据集五、划分训练集、测试集六、构建模型七、精度测试八、网格搜索一、基于随机森林的假新闻检测项目 在当今时代,传播错误信息已经成为一个真正的问题,…

初试hashlib加密模块

文章目录 一、加密解密基础二、使用hashlib模块实现数据加密(一)加密数据1、编写程序,实现功能2、运行程序,查看结果(二)登录加密校验1、编写程序,实现功能2、运行程序,查看结果一、加密解密基础 二、使用hashlib模块实现数据加密 (一)加密数据 1、编写程序,实现功…

【DeepLearning 8】Self-Attention自注意力神经网络

&#x1f34a;本文主要介绍了Self-Attention产生的背景以及解析了具体的网络模型 一、Introduction 很多时候&#xff0c;我们需要输入的数据非常的复杂&#xff0c;难以用统一、固定长度的向量来表示。比如NLP中长短不一的句子。此外&#xff0c;我们需要输出的数据有时候也会…

Spring Security中文文档

Spring Security中文文档 来源&#xff1a;https://www.springcloud.cc/spring-security.html#overall-architecture 作者 Ben Alex&#xff0c;Luke Taylor&#xff0c;Rob Winch&#xff0c;Gunnar Hillert&#xff0c;Joe Grandja&#xff0c;Jay Bryant5.1.2.RELEASE…

使用Redis查询数据库数据增加访问速度小案例

黑马B栈网课案例 文章目录案例需求&#xff1a;SQL建表所需jar包项目结构代码展示index.html首页面domainProcince.javadaoProvinceDaoProvinceDaoImplserviceProvinceServiceProvinceServiceImplservletJDBC工具类配置文件jedis.propertiesdruid.properties案例需求&#xff1…

JSP学习日记

JSP简述 Java Sever Pages----->Java服务器界面 用于前后端结合 jsp为什么淘汰&#xff1f; 由于JSP的前后端耦合性极高&#xff0c;编写代码非常臃肿。前后端的代码放在一起&#xff0c;所以JSP可以看成是已经被淘汰的技术。 为什么还要学jsp&#xff1f; 由于一些公司…

大数据:HDFS的Shell常用命令操作

文章目录一 HDFS的Shell介绍二 HDFS常用命令操作01 创建目录&#xff08;1&#xff09;创建单层目录&#xff08;3&#xff09;创建多层目录02 查看目录03 上传本地文件到HDFS04 查看文件内容05 下载HDFS文件到本地06 删除HDFS文件07 删除HDFS目录08 移动目录或文件09 文件合并…

数据库大咖,带你深入高性能MySQL架构系统,值得一看

MySQL 作为互联网中非常热门的数据库&#xff0c;在高并发业务场景下&#xff0c;一条好的 MySQL 语句能为企业节省大量的运作时间和成本&#xff0c;这也是为何互联网大厂面试官最爱考察数据库底层和性能调优的原因。因此&#xff0c;了解其底层原理和架构的设计非常重要&…

Go-Excelize API源码阅读(三十五)——SetSheetCol

Go-Excelize API源码阅读&#xff08;三十五&#xff09;——SetSheetCol 开源摘星计划&#xff08;WeOpen Star&#xff09; 是由腾源会 2022 年推出的全新项目&#xff0c;旨在为开源人提供成长激励&#xff0c;为开源项目提供成长支持&#xff0c;助力开发者更好地了解开源…

【C++数据结构】程序性能分析

程序性能分析 2.1 什么是程序性能 程序性能&#xff1a;所谓程序性能&#xff08;performance of a program&#xff09;是指运行这个程序所需要的内存和时间的多少。 性能分析&#xff1a;在性能分析&#xff08;performance analysis&#xff09;时&#xff0c;采用分析方…

ceph命令应用

记录&#xff1a;337 场景&#xff1a;在CentOS 7.9操作系统上&#xff0c;在ceph集群中&#xff0c;使用ceph命令查看ceph集群信息&#xff0c;以及mon、mgr、mds、osd、rgw等组件信息。 版本&#xff1a; 操作系统&#xff1a;CentOS 7.9 ceph版本&#xff1a;ceph-13.2.…

JavaSE从基础到入门:抽象类和接口

1.抽象类 1.抽象类的概念 在面向对象的概念中&#xff0c;所有的对象都是通过类来描绘的&#xff0c;但是反过来&#xff0c;并不是所有的类都是用来描绘对象的&#xff0c;如果一个类中没有包含足够的信息来描绘一个具体的对象&#xff0c;这样的类就是抽象类。 比如&#x…