【RabbitMQ】基础篇,学习纪录+笔记

news2025/1/11 0:01:15

目录

一.介绍

1.1MQ概述

1.2MQ优势和劣势

1.3常见的 MQ 产品

1.4RabbitMQ简介

1.5RabbitMQ中的相关概念

1.6RabbitMQ的安装

二.快速入门

2.1入门程序

2.2工作模式

2.2.1Work queues 工作队列模式

2.2.2Pub/Sub 订阅模式

2.2.3Routing 路由模式

2.2.4Topics 通配符模式

三.整合SpringBoot

3.1生产端

3.2消费端

3.3总结


一.介绍


1.1MQ概述


MQ全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
MQ,消息队列,存储消息的中间件
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者

1.2MQ优势和劣势


优势
应用解耦
异步提速
削峰填谷

劣势
系统可用性降低
系统复杂度提高
一致性问题

1.3常见的 MQ 产品

1.4RabbitMQ简介

 AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

RabbitMQ基础架构图

 

1.5RabbitMQ中的相关概念

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据

1.6RabbitMQ的安装

楼主用的是Docker进行安装

拉取镜像

docker pull rabbitmq

运行镜像

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

参数说明:

-d:表示在后台运行容器;
-p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认的用户密码;
--name rabbitmq:设置容器名称;
rabbitmq:容器使用的镜像名称;

二.快速入门

2.1入门程序

需求:使用简单模式完成消息传递

步骤:

① 创建工程(生成者、消费者),在IDEA中创建两个Maven模块

② 分别添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

③ 编写生产者发送消息

public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
        // boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列,则自动创建,有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.发送消息
        // String exchange,交换机名称,简单模式使用默认""
        // String routingKey,路由名称
        // AMQP.BasicProperties props,配置信息
        // byte[] body,发送的消息数据
        for (int i = 1; i <= 10; i++) {
            String body = i+" ---> hello rabbitmq!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }


        //7.释放资源
        channel.close();
        connection.close();

    }
}

④ 编写消费者接收消息

public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
        // boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列,则自动创建,没有就不创建
        channel.queueDeclare("hello_world",true,false,false,null);

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("counsumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };

        channel.basicConsume("hello_world",true,consumer);

    }
}

2.2工作模式

2.2.1Work queues 工作队列模式

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

 

生产者

public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
        // boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列,则自动创建,有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.发送消息
        // String exchange,交换机名称,简单模式使用默认""
        // String routingKey,路由名称
        // AMQP.BasicProperties props,配置信息
        // byte[] body,发送的消息数据
        for (int i = 1; i <= 10; i++) {
            String body = i+" ---> hello rabbitmq!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }


        //7.释放资源
        channel.close();
        connection.close();

    }
}

消费者1

public class Consumer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列,当connection关闭时,是否删除队列
        // boolean autoDelete,是否自动删除,当没有Consumer时,会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列,则自动创建,没有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };

        channel.basicConsume("work_queues",true,consumer);

    }
}

消费者2和消费者1一样

小结

①在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

②Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个, 只需要有一个节点成功发送即可。

2.2.2Pub/Sub 订阅模式

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列 

Direct:定向,把消息交给符合指定routing key 的队列  

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!

生产者

public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形(广播)  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_fanout";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        //8.发送消息
        String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2即把最后一行代码,改为channel.basicConsume(queue2Name,true,consumer);

回调方法可以修改用作差别对待

小结

1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

2. 发布订阅模式与工作队列模式的区别:

①工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

②发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

③发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

2.2.3Routing 路由模式

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

 Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

 

生产者

public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形(广播)  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_direct";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")

        //队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");

        //队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");


        //8.发送消息
        String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2

public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库……");

            }
        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

小结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

2.2.4Topics 通配符模式

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

 生产者

public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形(广播)  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_topic";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")

        //需求 所有error级别日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");

        channel.queueBind(queue2Name,exchangeName,"*.*");


        //8.发送消息
        String body = "日志信息:张三去调用了findAll方法……日志级别:info...";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("把信息存数据库……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息,交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body,消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("把日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

三.整合SpringBoot

3.1生产端

1. 创建生产者SpringBoot工程

2. 引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itheima</groupId>
    <artifactId>producer-springboot</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <!--父工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>

    <dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

</project>

3. 编写yml配置,基本信息配置

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /

4. 定义交换机,队列以及绑定关系的配置类

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2.队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3.绑定关系
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

}

5. 注入RabbitTemplate,调用方法,完成消息发送

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    //1.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello!");
    }
}

3.2消费端

1. 创建消费者SpringBoot工程

2. 引入依赖

与生产端一样

3. 编写yml配置,基本信息配置

与生产端一样

4. 定义监听类,使用@RabbitListener注解完成队列监听。

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        System.out.println(message);
    }
}

3.3总结

SpringBoot提供了快速整合RabbitMQ的方式

基本信息在yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置

生产端直接注入RabbitTemplate完成消息发送

消费端直接使用@RabbitListener完成消息接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/163588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vite 4.0 来了,带你手撕 create-vite 源码

通过本文你将了解到以下内容&#xff1a; 1&#xff0c;npm create 具体执行流程2&#xff0c;minimist、prompts、kolorist三个库3&#xff0c;create-vite 的源码分析 vite源码下载&#xff1a; //复制一份vite源码到自己的本地 git clone https://github.com/vitejs/vit…

抗击洪涝灾害,河道水雨情动态在线监测解决方案

一、项目背景我国是个多山的国家且位于东南季风区&#xff0c;降雨分布广泛还分布不均匀&#xff0c;这样一来使汛期高度集中。导致很多沿海城市以及临近河道的地区面临着河道决堤的威胁如何实时监测河道雨水情动态成了让人头疼的问题。在2022年1月&#xff0c;发改委、水利部在…

基于jsp+mysql+Spring的SpringBoot招聘网站项目(完整源码+sql)

基于jspmysqlSpring的SpringBoot招聘网站项目&#xff08;完整源码sql&#xff09;主要实现了管理员登录,简历管理,问答管理,职位管理,用户管理,职位申请进度更新,查看简历 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀…

Exchanges

文章目录ExchangesExchanges的类型FanoutDirectTopicExchanges &#xff08;交换机&#xff09;RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。 Exchanges的类型 直接(direct)&#xff0c;主题(topic),标题(headers)&#xff0c;扇出(fanout)&…

硅烷聚乙二醇硅烷,Silane-PEG-Silane同官能团科研试剂,化学结构式

产品名称&#xff1a;硅烷聚乙二醇硅烷&#xff0c;双硅烷聚乙二醇 中文别名&#xff1a;硅烷PEG硅烷&#xff0c;双硅烷聚乙二醇 英文名称&#xff1a;Silane-PEG-Silane 分子量&#xff1a;1k&#xff0c;2k&#xff0c;3.4k&#xff0c;5k&#xff0c;10k&#xff0c;20k…

面对一堆烂代码,重构,还是重新开发?

hello&#xff0c;大家好&#xff0c;我是张张&#xff0c;「架构精进之路」公号作者。 1、烂代码的形成 写烂代码很容易&#xff0c;但代码写成一坨屎&#xff0c;还能正常运行&#xff0c;那就要有点水平才行。 尤其是一些经验不足的新手&#xff0c;根本不在乎代码质量的重要…

小年 —— 送日历福利啦!(acwing)

acwing每日一题集日历除夕夜瓜分10000ac币啦&#xff01; 手慢就没了┗|&#xff40;O′|┛ 嗷~~ 上次在acwing上面留言送日历&#xff0c;结果送着送着&#xff0c;连老本都给送没了&#xff0c;这波集齐了把其他的也给发出来了 AcWing【集日历瓜分10000AC币活动】赠送1月日历…

基于Java SSM springboot健身管理系统设计和实现

基于Java SSM springboot健身管理系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联…

Uniswap v3 详解(四):交易手续费

以普通用户的视角来看&#xff0c;对比 Uniswap v2&#xff0c;Uniswap v3 在手续费方面做了如下改动&#xff1a; 添加流动性时&#xff0c;手续费可以有 3个级别供选择&#xff1a;0.05%, 0.3% 和 1%&#xff0c;未来可以通过治理加入更多可选的手续费率Uniswap v2 中手续费…

《啊哈算法》第三章--枚举很暴力

从无到有学算法&#xff08;看漫画学算法&#xff09; (๑•̀ㅂ•́)و✧ 爱要坦荡荡 - 萧潇 - 单曲 - 网易云音乐 一&#xff0c;坑爹的奥数 枚举算法又叫穷举算法&#xff0c;非常的暴力&#xff0c;它的基本思想是“有序地去尝试每一种可能” 题目1 □3 x 6528 3□ x …

【JavaEE】网络初识之网络通信基础

✨哈喽&#xff0c;进来的小伙伴们&#xff0c;你们好耶&#xff01;✨ &#x1f6f0;️&#x1f6f0;️系列专栏:【JavaEE】 ✈️✈️本篇内容:网络初识之网络通信基础。 &#x1f680;&#x1f680;代码存放仓库gitee&#xff1a;JavaEE初阶代码存放&#xff01; ⛵⛵作者简介…

Uniswap v3 详解(二):创建交易对/提供流动性

前文已经说过 Uniswap v3 的代码架构。一般来说&#xff0c;用户的操作都是从 uniswap-v3-periphery 中的合约开始。 创建交易对 创建交易对的调用流程如下&#xff1a; 用户首先调用 NonfungiblePositionManager 合约的 createAndInitializePoolIfNecessary 方法创建交易对&…

【软件测试】软件测试分类

1. 按照测试对象划分 界面测试 界面测试&#xff08;简称UI测试)&#xff0c;测试用户界面的功能模块的布局是否合理、整体风格是否一致、各个控件的放置位置是 否符合客户使用习惯&#xff0c;此外还要测试界面操作便捷性、导航简单易懂性&#xff0c;页面元素的可用性&…

U3751频谱分析仪

18320918653 U3751 频谱分析仪爱德万U3751特点&#xff1a; 频率范围&#xff1a;9kHz&#xff5e;8GHz 大输入电平&#xff1a;30dBm RBW&#xff1a;300Hz&#xff5e;3MHz 体积小&#xff0c;重量轻(5.6公斤)&#xff0c;测量速度快 户外量测&#xff1a;W-CDMA&#xff…

unity日记10(无头盔开发vr XR Device Simulator操作说明| 模之屋模型导入unity )

目录 XR Device Simulator配置参考视频 XR Device Simulator操作方法参考视频 模之屋模型导入unity参考视频 XR Device Simulator操作方法&#xff08;个人心得&#xff09; 1.摄像机 1.摄像机左右移动 右键移动鼠标 2.摄像机前后移动 右键滚动滚轮 3.摄像…

Vulnhub之HACKABLE: II

1.信息收集 使用arp-scan扫描存活网段 使用nmap对192.168.239.126进行端口扫描&#xff0c;发现存在21(可匿名登录)、22、80端口 2.漏洞发现 使用ftp 192.168.239.126进行匿名登录&#xff0c;注意&#xff1a;anonymous都要小写。执行dir命令发现CALL.html 执行get CALL…

mybatis 的mapper接口没有实现类,那么他是如何工作的

一、mybatis使用动态代理要实现的功能。 mybatis 的底层实际上运行的还是ibatis&#xff0c;即需要把接口和xml映射翻译成 ibatis 需要的这种格式。 二、mapper接口的动态代理 当使用 sqlSession.getMapper 获取一个Mapper 的时候一般是使用 sqlSession 的 DefaultSqlSession…

K_A11_006 基于STM32等单片机采集雨水模块 串口与OLED0.96双显示

K_A11_006 基于STM32等单片机采集雨水模块 串口与OLED0.96双显示一、资源说明二、基本参数参数引脚说明三、驱动说明IIC地址/采集通道选择/时序对应程序:四、部分代码说明1、接线说明1.1、STC89C52RC雨水模块1.2、STM32F103C8T6雨水模块五、基础知识学习与相关资料下载六、视频…

电脑开机找不到启动设备怎么办?

如果你的电脑弹出错误消息并提示“找不到启动的设备”&#xff0c;不用担心&#xff0c;本文将告诉你5种不同的方法&#xff0c;可以轻松修复无可引导的设备的问题&#xff01;“找不到启动设备”是什么意思&#xff1f;可引导设备&#xff08;又称启动设备&#xff09;是一种存…

Vue.js学习笔记

vue.js学习笔记 Vue.js 是一款流行的 JavaScript 前端框架&#xff0c;Vue 所关注的核心是 MVC 模式中的视图层&#xff0c;它也方便地获取数据更新&#xff0c;实现视图与模型的交互。 1.创建代码片段 声明式渲染&#xff1a;Vue.js 的核心是一个允许采用简洁的模板语法来声…