RabbitMQ常用工作模式+整合springboot

news2024/11/24 5:23:11

目录

1.MQ的相关概念

1.1 什么是MQ消息中间件

1.2 为什么使用MQ

(1) 应用解耦

(2) 异步提速  

(3)削峰填谷

1.3 使用MQ的劣势

1.4 常见的MQ组件​​​​​​​

2. RabbitMQ的概述

2.1 RabbitMQ的概念

2.2 RabbitMQ的原理

2.3 安装RabbitMQ

3. RabbitMQ 的工作模式

3.1 simple (简单模式)

3.2 Work queues(工作模式)

3.3.Publish/Subscribe(发布订阅模式)

3.4.Routing(路由模式)

3.5.Topics(主题模式)

4.springboot整合RabbitMQ

4.1.生产方

4.2.消费方

4.3.通过代码创建交换机和队列


1.MQ的相关概念

1.1 什么是MQ消息中间件

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。它是应用程序和应用程序之间的通信方法。

1.2 为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

MQ总结为三个好处:

(1) 应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。

 

(2) 异步提速  

上面要完成下单需要花费的时间: 20 + 300 + 300 + 300 = 920ms 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

使用MQ可以解决上述问题

 用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。 提升用户体验和系统吞吐量(单位时间内处理请求的数目)

(3)削峰填谷

举个例子,如果订单系统最多能处理一千次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两千次下单操作系统是处理不了的,只能限制订单超过一千后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。 简单来说: 就是在访问量剧增的情况下,但是应用仍然不能停,比如“双十一”下单的人多,但是淘宝这个应用仍然要运行,所以就可以使用消息中间件采用队列的形式减少突然访问的压力

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性

1.3 使用MQ的劣势

  • 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
  • 系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  • 一致性问题 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.4 常见的MQ组件

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征

2. RabbitMQ的概述

2.1 RabbitMQ的概念

  • 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
  • RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message
  • ​ RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

2.2 RabbitMQ的原理

名词解释:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销.
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

2.3 安装RabbitMQ

安装详情------>虚拟机安装RabbitMQ

3. RabbitMQ 的工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

3.1 simple (简单模式)

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

演示:

创建工程

 加依赖

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>
    </dependencies>

 生产者:

package com.wqg.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:简单模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建一个队列对象
        /**
         * String queue, 队列的名称. 如果该名称不存在 则创建  如果存在则不创建
         * boolean durable, 该对象是否持久化  当rabbitmq重启后 队列就会消失
         * boolean exclusive, 该队列是否被一个消费者独占
         * boolean autoDelete,当没有消费者时,该队列是否被自动删除
         * Map<String, Object> arguments: 额外参数的设置
         *
         */
        channel.queueDeclare("simple-queue",true,false,false,null);

        //发送信息
        /**
         * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机
         * String routingKey, 路由标识  如果是简单模式起名为队列的名称
         * BasicProperties props, 消息的属性设置。 设置为null
         * byte[] body: 消息的内容
         */
        String msg = "Hello RabbitMQ~~~";
        channel.basicPublish("","simple-queue",null,msg.getBytes());

        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

消费者:

3.2 Work queues(工作模式)

 Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者:

package com.wqg.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:工作模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class WorkProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建一个队列对象
        /**
         * String queue, 队列的名称. 如果该名称不存在 则创建  如果存在则不创建
         * boolean durable, 该对象是否持久化  当rabbitmq重启后 队列就会消失
         * boolean exclusive, 该队列是否被一个消费者独占
         * boolean autoDelete,当没有消费者时,该队列是否被自动删除
         * Map<String, Object> arguments: 额外参数的设置
         *
         */
        channel.queueDeclare("Work-Queue", true, false, false, null);

        //发送信息
        /**
         * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机
         * String routingKey, 路由标识  如果是简单模式起名为队列的名称
         * BasicProperties props, 消息的属性设置。 设置为null
         * byte[] body: 消息的内容
         */
        for (int i=0; i<10; i++){
            String msg = "Hello RabbitMQ~~~工作模式";
            channel.basicPublish("", "Work-Queue", null, msg.getBytes());
        }


        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:工作模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class WorkConsumer01 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Work-Queue", true, consumer);
    }
}

总结: 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

例如:短信服务部署多个,只需要有一个节点成功发送即可。

3.3.Publish/Subscribe(发布订阅模式)

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    1. Fanout:广播,将消息交给所有绑定到交换机的队列

    2. Direct:定向,把消息交给符合指定routing key 的队列

    3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者:

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:发布订阅模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class PublishProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        /**
         * String exchange, 交换机的名称 如果不存在则创建 存在则不创建
         * BuiltinExchangeType type, 交换机的类型
         * boolean durable: 是否持久化。
         */
        channel.exchangeDeclare("Publish-exchange", BuiltinExchangeType.FANOUT,true);

        //创建队列
        channel.queueDeclare("Publish-Queue01", true, false, false, null);
        channel.queueDeclare("Publish-Queue02", true, false, false, null);

        //队列和交换机绑定
        /**
         * String queue,
         * String exchange,
         * String routingKey: 发布订阅模式 没有routingKey 则写为""
         */
        channel.queueBind("Publish-Queue01","Publish-exchange","");
        channel.queueBind("Publish-Queue02","Publish-exchange","");

        //发送信息
        String msg = "Hello RabbitMQ~~~发布订阅模式";
        channel.basicPublish("Publish-exchange","", null, msg.getBytes());


        //关闭资源
        channel.close();
        connectionFactory.clone();
    }
}

 消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:发布订阅模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class PublishConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        //Publish-Queue02和Publish-Queue01只有一个里面有数据
        channel.basicConsume("Publish-Queue02", true,consumer);
    }
}
  1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

  2. 发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

3.4.Routing(路由模式)

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

生产者:

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:路由模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class RouterProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare("Router-exchange", BuiltinExchangeType.DIRECT,true);

        //创建队列
        channel.queueDeclare("Router-queue001",true,false,false,null);
        channel.queueDeclare("Router-queue002",true,false,false,null);

        //队列和交换机绑定
        channel.queueBind("Router-queue001","Router-exchange","error");
        channel.queueBind("Router-queue002","Router-exchange","error");
        channel.queueBind("Router-queue002","Router-exchange","info");
        channel.queueBind("Router-queue002","Router-exchange","warning");
        String msg = "Hello RabbitMQ~~~路由模式";
        channel.basicPublish("Router-exchange","info",null,msg.getBytes());

        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:路由模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class RouterConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Router-queue002", true,consumer);
    }
}

总结:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

3.5.Topics(主题模式)

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.” 分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词, 例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

生产者:  

package com.wqg.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ fileName:SimpleProducer
 * @ description:主题模式-生产者
 * @ author:wqg
 * @ createTime:2023/7/12 16:26
 */
public class TopicsProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂对象并设置连接信息   -----获取连接对象(指定rabbitMQ服务端的信息)
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //rabbitMQ服务端的地址  默认localhost
        connectionFactory.setHost("192.168.75.129");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置账号   默认guest
        connectionFactory.setUsername("guest");
        //设置密码   默认guest
        connectionFactory.setPassword("guest");
        //设置虚拟主机   默认/
        connectionFactory.setVirtualHost("/");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取Channel信道对象
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare("Topics-exchange", BuiltinExchangeType.TOPIC,true);

        //创建队列
        channel.queueDeclare("Topics-queue001",true,false,false,null);
        channel.queueDeclare("Topics-queue002",true,false,false,null);

        //队列和交换机绑定
        channel.queueBind("Topics-queue001","Topics-exchange","*.orange.*");
        channel.queueBind("Topics-queue002","Topics-exchange","*.*.rabbit");
        channel.queueBind("Topics-queue002","Topics-exchange","lazy.#");
        String msg = "Hello RabbitMQ~~~主题模式";
        channel.basicPublish("Topics-exchange","lazy.rabbit.orange",null,msg.getBytes());

        channel.close();
        connectionFactory.clone();
    }
}

消费者:

package com.wqg.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ fileName:SimpleConsumer
 * @ description:路由模式-消费者
 * @ author:WQG
 * @ createTime:2023/7/12 16:25
 */
public class TopicsConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.75.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //channel.queueDeclare("Work-queue", true, false, false, null);

        //接受队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag: 消费者的标签
             * @param envelope : 设置 拿到你的交换机 路由key等信息
             * @param properties: 消息的属性对象
             * @param body: 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受的内容===" + new String(body));
            }
        };

        /**
         * String queue, 队列名
         * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。
         * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调
         */
        channel.basicConsume("Topics-queue002", true,consumer);
    }
}

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,

只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

4.springboot整合RabbitMQ

4.1.生产方

创建springboot项目----生产者
(1) 引入 rabbitmq 整合的依赖
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
(2) 再配置文件中添加 rabbit 服务的信息
#rabbitmq的配置
spring.rabbitmq.host=192.168.75.129
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
(3) 调用 RabbitTemplate 中发送消息的方法
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("Topics-exchange","lazy.aaa","Hello RabbitMQ...");
    }

}

4.2.消费方

创建springboot项目----消费者

(1)引入rabbitmq整合的依赖------同上

(2)再配置文件中添加rabbit服务的信息------同上

(3) 创建类 --- 创建监听方法即可 @RabbitListener
@Component
public class MyListener {

    //queues:表示你监听的队列名
    @RabbitListener(queues = {"Topics-queue002"})
    public void h(Message message){
        //把监听到的消息封装到Message类对象中
        byte[] body = message.getBody();
        System.out.println("消息内容==="+new String(body));
    }
}

4.3.通过代码创建交换机和队列

package com.wqg.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ fileName:RabbitConfig
 * @ description:
 * @ author:wqg
 * @ createTime:2023/7/12 19:41
 */
@Configuration
public class RabbitConfig {
    //定义了一个名为EXCHANGE_NAME的常量,用于表示交换器的名称
    public static final String EXCHANGE_NAME = "Topics-queue002";

    @Bean
    public Exchange exchange() {
        // 创建一个Topic类型的Exchange,名称为EXCHANGE_NAME,持久化
        Exchange topic_exchange02 = ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        return topic_exchange02;
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列,名称为"Topics-queue003"
        Queue topic_queue03 = QueueBuilder.durable("Topics-queue003").build();
        return topic_queue03;
    }

    @Bean
    public Binding binding() {
        // 创建一个绑定关系,将队列绑定到Exchange上,并指定routing key为"qy165.#",不使用任何参数
        Binding noargs = BindingBuilder.bind(queue()).to(exchange()).with("qy165.#").noargs();
        return noargs;
    }

    //如果交换机要绑定多个队列 需要再写一个bind方法

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/745775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Swift与OC的混编

一些场面话 在一位前辈的博客里看到了关于iOS开发的各种语言的混编&#xff0c;浅浅学习一下怎么使用。不得不说语言混编的开发者是真的&#x1f42e;&#x1f37a; Swift中用OC混编 新建一个Swift文件 创建一个OC的类 选择language为OC 继续往下走&#xff0c;会跳出这个界…

学无止境·MySQL⑤(存储函数、存储过程)

存储函数和存储过程试题 试题一1、创建一个可以统计表格内记录条数的存储函数 &#xff0c;函数名为count_sch()2、创建一个存储过程avg_sai&#xff0c;有3个参数&#xff0c;分别是deptno&#xff0c;job&#xff0c;接收平均工资&#xff0c;功能查询emp表dept为30&#xff…

C/C++用socket实现简单的TCP文件传输

C/C:用socket实现简单的TCP文件传输 网络中进程之间如何进行通信socket是什么socket的基本操作socket()函数bind()函数listen()、connect()函数accept()函数recv()/send()函数close()函数 TCP的“三次握手”“三次握手”的作用 TCP的“四次挥手”四次挥手的一些注意事项 代码实…

【附3.7安装包】python安装包下载及安装(超详细)

python3.7链接&#xff1a;https://pan.baidu.com/s/1Ett3XBMjWhkVOxkOU8NRqw?pwdqz3l 提取码&#xff1a;qz3l 今日资源&#xff1a;Python 适用系统&#xff1a;WINDOWS ​ Python 3.7.0 软件介绍&#xff1a; Python是一款通用型的计算机程序设计语言&#xff0c;Pytho…

【Leetcode】19. 删除链表的倒数第N个节点

给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 进阶&#xff1a;你能尝试使用一趟扫描实现吗&#xff1f; 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&…

有没有线上兼职的副业,在家可以做什么赚钱

线上兼职副业啊&#xff0c;确实是一个不错的副业选择&#xff01;不过&#xff0c;我得提醒你&#xff0c;如果想要那种点一下鼠标就会下金币的神仙项目&#xff0c;这样的期待的话&#xff0c;那我只能告诉你&#xff0c;你可能走错地方了&#xff0c;在这帮不了你。 在线上赚…

LeetCode[470]用Rand7()实现Rand10()

难度&#xff1a;Medium 题目&#xff1a; 给定方法 rand7 可生成 [1,7] 范围内的均匀随机整数&#xff0c;试写一个方法 rand10 生成 [1,10] 范围内的均匀随机整数。 你只能调用 rand7() 且不能调用其他方法。请不要使用系统的 Math.random() 方法。 每个测试用例将有一个内部…

Python案例分析|基于模块的库存管理系统

本案例是通过一个多模块的库存管理系统案例&#xff0c;帮助大家深入了解基于模块的Python应用程序的开发流程。 01、库存管理系统API设计 本文实现一个简单的基于模块的库存管理系统。系统采用JSON文件来保存数据。产品信息设计为字典&#xff0c;键为sku_id&#xff08;产品…

软件工程——第13章软件项目管理知识点整理(完结)

本专栏是博主个人笔记&#xff0c;主要目的是利用碎片化的时间来记忆软工知识点&#xff0c;特此声明&#xff01; 文章目录 1.管理的定义&#xff1f; 2.软件项目管理地位&#xff1f;&#xff08;重要性&#xff09; 3.软件项目管理过程从一组项目计划活动开始&#xff0c…

怎么制作查询成绩的网页?这个不用代码的方法你用过没?

作为一名老师&#xff0c;与家长沟通交流是日常工作中重要的一部分。特别是每次考完试后&#xff0c;家长都急切地想了解孩子的成绩&#xff0c;以便能及时了解孩子的学习情况并给予适当的支持和指导。然而&#xff0c;为了保护学生的隐私&#xff0c;大部分学校不公开张榜学生…

字符设备驱动led灯实验

应用程序代码 #include<stdio.h> #include<string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <string.h>int main (int argc, const char *argv[]) {char buf[128] &quo…

马上就要到2023年秋招的黄金时期了,计算机专业面试究竟需要注意些什么?

说说自己&#xff0c;不算很突出。本人 17 年就读于一所普通的本科学校&#xff0c;20 年 6 月在三年经验的时候顺利通过校招实习面试进入大厂&#xff0c;现就职于某大厂安全实验室。 所以我还是比较有话语权的吧。 话不多说直接进入主题。 1.理论知识准备&#xff1a;复习计算…

驱动 day9 作业

要求&#xff1a; 现象 按下key1 led1灯的状态取反&#xff0c;number的值取反&#xff0c;然后应用程序打印从内核读到的number的值 应用程序test.c #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #incl…

Bash 有效电话号码

193 有效电话号码 给定一个包含电话号码列表&#xff08;一行一个电话号码&#xff09;的文本文件 file.txt&#xff0c;写一个单行 bash 脚本输出所有有效的电话号码。 你可以假设一个有效的电话号码必须满足以下两种格式&#xff1a; (xxx) xxx-xxxx 或 xxx-xxx-xxxx。&…

三维重建以及神经渲染中的学习(三)

三维重建以及神经渲染中的学习 公众号AI知识物语 本文内容为参加过去一次暑期课程学习时的笔记&#xff0c;浅浅记录下。 三维图形可控生成&#xff1a; 1&#xff1a;学习一个图形生成模型 2&#xff1a;具有可控三维变量&#xff1a;1物体形状&#xff1b;2物体位置&…

ScannerException: while scanning for the next token found character ‘@‘ 问题解决

问题描述 后端项目启动的时候报ScannerException: while scanning for the next token found character ‘‘ 异常&#xff0c;自己有些疑问&#xff0c;项目前一会都还可以&#xff0c;到报错的过程中&#xff0c;项目都没有动过。 解决办法 重新刷新项目就解决了。

RxSwift 使用方式

背景 最近项目业务&#xff0c;所有模块已经支持Swift混编开发&#xff0c;正在逐步使用Swift 方式进行开发新业务&#xff0c;以及逐步替换老业务方式进行发展&#xff0c;所以使用一些较为成熟的Swift 的三方库&#xff0c;成为必要性&#xff0c;经过调研发现RxSwift 在使用…

HarmonyOS/OpenHarmony应用开发-程序包多HAP机制(上)

一、多HAP机制设计目标 方便开发者模块化的管理应用&#xff0c;好的应用一般都是模块化管理&#xff0c;模块之间属于松耦合关系。多HAP方便了开发者将业务划分成多个模块&#xff0c;每个模块放到独立的HAP中。例如支付类应用&#xff0c;有统一的主界面&#xff0c;主界面管…

命令注入(Command Injection)安全漏洞(SQL注入、LDAP注入、OS命令注入、XPath注入、JavaScript注入)

文章目录 命令注入&#xff08;Command Injection&#xff09;发生场景示例防范手段其他类型命令注入漏洞1. SQL注入&#xff08;SQL Injection&#xff09;2. LDAP注入&#xff08;LDAP Injection&#xff09;3. OS命令注入&#xff08;OS Command Injection&#xff09;4. XP…

VectorCAST对外部函数打桩和查看覆盖率

一、对外部函数打桩 在单元测试中&#xff0c;如果要调用到外部函数调用的时候&#xff0c;就要对外部函数进行打桩。 对外部函数进行打桩的目的&#xff0c;一方面是为了验证外部函数接口的正确性&#xff0c;另一方面是对外部函数打桩 之后就可以自定义外部函数返回值。 对…