RabbitMQ知识点

news2025/4/26 1:39:10

1.为什么需要消息队列?

 

 

 

 

 

 

 

 RabbitMQ体系结构

操作001:RabbitMQ安装

二、安装

# 拉取镜像
docker pull rabbitmq:3.13-management
​
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

二、验证

访问后台管理界面:http://192.168.200.100:15672

使用上面创建Docker容器时指定的默认用户名、密码登录:

三、可能的问题

1、问题现象

在使用Docker拉取RabbitMQ镜像的时候,如果遇到提示:missing signature key,那就说明Docker版本太低了,需要升级

比如我目前的Docker版本如下图所示:

2、解决办法

基于CentOS7

①卸载当前Docker

更好的办法是安装Docker前曾经给服务器拍摄了快照,此时恢复快照;

如果不曾拍摄快照,那只能执行卸载操作了

yum erase -y docker \
    docker-client \
    docker-client-latest \
    docker-common \
    docker-latest \
    docker-latest-logrotate \
    docker-logrotate \
    docker-selinux \
    docker-engine-selinux \
    docker-engine \
    docker-ce

②升级yum库

yum update -y

③安装Docker最新版

yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

如果这一步看到提示:没有可用软件包 docker-ce,那就添加Docker的yum源:

yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

④设置Docker服务

systemctl start docker
systemctl enable docker

3、验证

上述操作执行完成后,再次查看Docker版本:

操作002:HelloWorld

一、目标

生产者发送消息,消费者接收消息,用最简单的方式实现

官网说明参见下面超链接:

RabbitMQ tutorial - "Hello World!" — RabbitMQ

二、具体操作

1、创建Java工程

①消息发送端(生产者)

②消息接收端(消费者)

③添加依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.20.0</version>
    </dependency>
</dependencies>

2、发送消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
  
        // 设置主机地址  
        connectionFactory.setHost("192.168.200.100");  
  
        // 设置连接端口号:默认为 5672
        connectionFactory.setPort(5672);
  
        // 虚拟主机名称:默认为 /
        connectionFactory.setVirtualHost("/");
  
        // 设置连接用户名;默认为guest  
        connectionFactory.setUsername("guest");
  
        // 设置连接密码;默认为guest  
        connectionFactory.setPassword("123456");
  
        // 创建连接  
        Connection connection = connectionFactory.newConnection();  
  
        // 创建频道  
        Channel channel = connection.createChannel();  
  
        // 声明(创建)队列  
        // queue      参数1:队列名称  
        // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  
        // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  
        // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  
        // arguments  参数5:队列其它参数  
        channel.queueDeclare("simple_queue", true, false, false, null);  
  
        // 要发送的信息  
        String message = "你好;小兔子!";  
  
        // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  
        // 参数2:路由key,简单模式可以传递队列名称  
        // 参数3:配置信息  
        // 参数4:消息内容  
        channel.basicPublish("", "simple_queue", null, message.getBytes());  
  
        System.out.println("已发送消息:" + message);  
  
        // 关闭资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

②查看效果

3、接收消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  
  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
  
public class Consumer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 1.创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 2. 设置参数  
        factory.setHost("192.168.200.100");  
        factory.setPort(5672);  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");
        factory.setPassword("123456");  
  
        // 3. 创建连接 Connection        
        Connection connection = factory.newConnection();  
  
        // 4. 创建Channel  
        Channel channel = connection.createChannel();  
  
        // 5. 创建队列  
        // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  
        // 参数1. queue:队列名称  
        // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  
        // 参数3. exclusive:是否独占。  
        // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  
        // 参数5. arguments:其它参数。  
        channel.queueDeclare("simple_queue",true,false,false,null);  
  
        // 接收消息  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
  
            // 回调方法,当收到消息后,会自动执行该方法  
            // 参数1. consumerTag:标识  
            // 参数2. envelope:获取一些信息,交换机,路由key...  
            // 参数3. properties:配置信息  
            // 参数4. body:数据  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("consumerTag:"+consumerTag);  
                System.out.println("Exchange:"+envelope.getExchange());  
                System.out.println("RoutingKey:"+envelope.getRoutingKey());  
                System.out.println("properties:"+properties);  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        // 参数1. queue:队列名称  
        // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  
        // 参数3. callback:回调对象  
        // 消费者类似一个监听程序,主要是用来监听消息  
        channel.basicConsume("simple_queue",true,consumer);  
  
    }  
  
}

②控制台打印

consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA Exchange: RoutingKey:simple_queue properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) body:你好;小兔子!

③查看后台管理界面

因为消息被消费掉了,所以RabbitMQ服务器上没有了:

操作003:工作队列模式

Work Queues 本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:• 生产者只有一个 • 发送一个消息 • 消费者也只有一个,消息也只能被这个消费者消费所以HelloWorld也称为简单模式。 现在我们还原一下常规情况: • 生产者发送多个消息 • 由多个消费者来竞争 • 谁抢到算谁的

结论: • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。• Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

一、生产者代码

1、封装工具类

package com.atguigu.rabbitmq.util;  
  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class ConnectionUtil {  
  
    public static final String HOST_ADDRESS = "192.168.200.100";  
  
    public static Connection getConnection() throws Exception {  
  
        // 定义连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 设置服务地址  
        factory.setHost(HOST_ADDRESS);  
  
        // 端口  
        factory.setPort(5672);  
  
        //设置账号信息,用户名、密码、vhost  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");  
        factory.setPassword("123456");  
  
        // 通过工程获取连接  
        Connection connection = factory.newConnection();  
  
        return connection;  
    }  
  
  
  
    public static void main(String[] args) throws Exception {  
  
        Connection con = ConnectionUtil.getConnection();  
  
        // amqp://guest@192.168.200.100:5672/  
        System.out.println(con);  
  
        con.close();  
  
    }  
  
}

2、编写代码

package com.atguigu.rabbitmq.work;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
  
public class Producer {  
  
    public static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        for (int i = 1; i <= 10; i++) {  
  
            String body = i+"hello rabbitmq~~~";  
  
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  
  
        }  
  
        channel.close();  
  
        connection.close();  
  
    }  
  
}

3、发送消息效果

img

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

package com.atguigu.rabbitmq.work;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
  
public class Consumer1 {  
  
    static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("Consumer1 body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。 如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:

image-20231103103841644

操作004:发布订阅模式

生产者不是把消息直接发送到队列,而是发送到交换机

• 交换机接收消息,而如何处理消息取决于交换机的类型• 交换机有如下3种常见类型

• Fanout:广播,将消息发送给所有绑定到交换机的队列

• Direct:定向,把消息交给符合指定routing key的队列

• Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

• 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失! 

组件之间关系:

        • 生产者把消息发送到交换机

        • 队列直接和交换机绑定

• 工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列

• 理解概念:

        • Publish:发布,这里就是把消息发送到交换机上

        • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系

一、生产者代码

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      // 1、获取连接  
        Connection connection = ConnectionUtil.getConnection();  
  
      // 2、创建频道  
        Channel channel = connection.createChannel();  
  
        // 参数1. exchange:交换机名称  
        // 参数2. type:交换机类型  
        //     DIRECT("direct"):定向  
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  
        //     TOPIC("topic"):通配符的方式  
        //     HEADERS("headers"):参数匹配  
        // 参数3. durable:是否持久化  
        // 参数4. autoDelete:自动删除  
        // 参数5. internal:内部使用。一般false  
        // 参数6. arguments:其它参数  
        String exchangeName = "test_fanout";  
  
        // 3、创建交换机  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  
  
        // 4、创建队列  
        String queue1Name = "test_fanout_queue1";  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 5、绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //     如果交换机的类型为fanout,routingKey设置为""  
        channel.queueBind(queue1Name,exchangeName,"");  
        channel.queueBind(queue2Name,exchangeName,"");  
  
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";  
  
        // 6、发送消息  
        channel.basicPublish(exchangeName,"",null,body.getBytes());  
  
        // 7、释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_fanout_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}

2、消费者2号

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

三、运行效果

还是先启动消费者,然后再运行生产者程序发送消息:

img

img

四、小结

交换机和队列的绑定关系如下图所示:

images

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机

  • 发布订阅模式绑定指定交换机

  • 监听同一个队列的消费端程序彼此之间是竞争关系

  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

操作006-路由模式

 通过『路由绑定』的方式,把交换机和队列关联起来

• 交换机和队列通过路由键进行绑定

• 生产者发送消息时不仅要指定交换机,还要指定路由键

• 交换机接收到消息会发送到路由键绑定的队列

• 在编码上与 Publish/Subscribe发布与订阅模式的区别:

        • 交换机的类型为:Direct

        • 队列绑定交换机的时候需要指定routing key。

一、生产者代码

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      Connection connection = ConnectionUtil.getConnection();  
  
      Channel channel = connection.createChannel();  
  
      String exchangeName = "test_direct";  
  
      // 创建交换机  
      channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  
  
      // 创建队列  
      String queue1Name = "test_direct_queue1";  
      String queue2Name = "test_direct_queue2";  
  
      // 声明(创建)队列  
      channel.queueDeclare(queue1Name,true,false,false,null);  
      channel.queueDeclare(queue2Name,true,false,false,null);  
  
      // 队列绑定交换机  
      // 队列1绑定error  
      channel.queueBind(queue1Name,exchangeName,"error");  
  
      // 队列2绑定info error warning  
      channel.queueBind(queue2Name,exchangeName,"info");  
      channel.queueBind(queue2Name,exchangeName,"error");  
      channel.queueBind(queue2Name,exchangeName,"warning");  
  
        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  
  
        // 发送消息  
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());  
        System.out.println(message);  
  
      // 释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_direct_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}

2、消费者2号

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_direct_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer2 将日志信息存储到数据库.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

三、运行结果

1、绑定关系

img

2、消费消息

操作006:主题模式

 • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符

• Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert

• 通配符规则:

        • #:匹配零个或多个词

        • *:匹配一个词

一、生产者代码

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String exchangeName = "test_topic";  
  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  
  
        String queue1Name = "test_topic_queue1";  
        String queue2Name = "test_topic_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //      如果交换机的类型为fanout ,routingKey设置为""  
        // routing key 常用格式:系统的名称.日志的级别。  
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  
        channel.queueBind(queue1Name,exchangeName,"#.error");  
        channel.queueBind(queue1Name,exchangeName,"order.*");  
        channel.queueBind(queue2Name,exchangeName,"*.*");  
  
        // 分别发送消息到队列:order.info、goods.info、goods.error  
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  
  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

消费者1监听队列1:

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue1";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

2、消费者2号

消费者2监听队列2:

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue2";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

三、运行效果

队列1:

img

队列2:

操作007:整合SpringBoot

搭建环境

• 基础设定:交换机名称、队列名称、绑定关系

• 发送消息:使用RabbitTemplate

• 接收消息:使用@RabbitListener注解

1、消费者工程

①创建module

②配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

③YAML

增加日志打印的配置:

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
logging:
  level:
    com.atguigu.mq.listener.MyMessageListener: info

④主启动类

仿照生产者工程的主启动类,改一下类名即可

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQConsumerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerMainType.class, args);
    }
​
}

⑤监听器

package com.atguigu.mq.listener;
​
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MyMessageListener {
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";  
    public static final String QUEUE_NAME  = "queue.order";  
  
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage(String dateString,
                               Message message,
                               Channel channel) {
        log.info(dateString);
    }
  
}

2、@RabbitListener注解属性对比

①bindings属性

  • 表面作用:

    • 指定交换机和队列之间的绑定关系

    • 指定当前方法要监听的队列

  • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

②queues属性

@RabbitListener(queues = {QUEUE_ATGUIGU})
  • 作用:指定当前方法要监听的队列

  • 注意:此时框架不会创建相关交换机和队列,必须提前创建好

3、生产者工程

①创建module

images

②配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

③YAML

spring: 
  rabbitmq: 
    host: 192.168.200.100
    port: 5672 
    username: guest 
    password: 123456 
    virtual-host: /

④主启动类

package com.atguigu.mq;  
  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
  
@SpringBootApplication
public class RabbitMQProducerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }
​
}

⑤测试程序

package com.atguigu.mq.test;
  
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

消息可靠性投递

下单操作的正常流程

 故障情况1

消息没有发送到消息队列上

后果:消费者拿不到消息,业务功能缺失,数据错误

 故障情况2

消息成功存入消息队列,但是消息队列服务器宕机了

原本保存在内存中的消息也丢失了

即使服务器重新启动,消息也找不回来了

后果:消费者拿不到消息,业务功能缺失,数据错误

故障情况3 

消息成功存入消息队列,但是消费端出现问题,

例如:宕机、抛异常等等后果:业务功能缺失,数据错误

 对症下药

• 故障情况1:消息没有发送到消息队列

        • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送

        • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

• 故障情况2:消息队列服务器宕机导致内存中消息丢失

        • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

• 故障情况3:消费端宕机或抛异常导致消息没有成功被消费

        • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息

        • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性

操作008-01-A:生产者端消息确认机制

一、创建module

二、搭建环境

1、配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、主启动类

没有特殊设定:

package com.atguigu.mq;  
  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
  
@SpringBootApplication
public class RabbitMQProducerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }
​
}

3、YAML

注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.atguigu.mq.config.MQProducerAckConfig: info

三、创建配置类

1、目标

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnCallback()ReturnCallback接口类型

2、API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for publisher confirmations.
     *
     */
    @FunctionalInterface
    public interface ConfirmCallback {
​
        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
​
    }

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机

  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for returned messages.
     *
     * @since 2.3
     */
    @FunctionalInterface
    public interface ReturnsCallback {
​
        /**
         * Returned message callback.
         * @param returned the returned message and metadata.
         */
        void returnedMessage(ReturnedMessage returned);
​
    }

注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称

3、配置类代码

①要点1

加@Component注解,加入IOC容器

②要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。

使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数。

  2. 方法必须是非静态的。

  3. 方法不能返回任何值。

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

③代码

有了以上说明,下面我们就可以展示配置类的整体代码:

package com.atguigu.mq.config;
​
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
​
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功!数据:" + correlationData);
        } else {
            log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
        }
    }
​
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        //发送到交换机失败调用的方法
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

四、发送消息

package com.atguigu.mq.test;
  
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确

  • 交换机正确、路由键不正确,无法发送到队列

  • 交换机不正确,无法发送到交换机

操作008-01-B:备份交换机

一、创建备份交换机

1、创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

image-20231203231926247

image-20231202183701454

2、创建备份交换机要绑定的队列

①创建队列

image-20231202183906676

image-20231202183949674

②绑定交换机

注意:这里是要和备份交换机绑定

image-20231203232801504

3、针对备份队列创建消费端监听器

    public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";
​
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
            key = {""}
    ))
    public void processMessageBackup(String dateString,
                                     Message message,
                                     Channel channel) {
        log.info("BackUp: " + dateString);
    }

二、设定备份关系

1、原交换机删除

·

image-20231202184840124

2、重新创建原交换机

image-20231202185211633

image-20231202185342087

3、原交换机重新绑定原队列

image-20231202190111581

image-20231202185955138

image-20231202190036520

三、测试

  • 启动消费者端

  • 发送消息,但是路由键不对,于是转入备份交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2310937.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025-03-06 学习记录--C/C++-PTA 习题6-6 使用函数输出一个整数的逆序数

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、题目描述 ⭐️ 二、代码&#xff08;C语言&#xff09;⭐️ #include <stdio.h>int reverse( int number );int main…

力扣132. 分割回文串 II

力扣132. 分割回文串 II 题目 题目解析及思路 题目要求返回将s切割成若干回文串的最少切割次数 对于子串s[j...i]&#xff0c;若为回文串&#xff0c;则问题变为求s[0...j]的最少切割次数 因此可以考虑动态规划 对于问题s[j...i]是否为回文串&#xff0c;若s[i] s[j]则问…

调研:如何实现智能分析助手(Agent)(AutoCoder、FastGPT、AutoGen、DataCopilot)

文章目录 调研&#xff1a;如何实现智能分析助手&#xff08;Agent&#xff09;&#xff08;AutoCoder、FastGPT、AutoGen、DataCopilot&#xff09;一、交互流程二、数据流程三、架构分类四、开源产品4.1 AutoCoder&#xff08;知识库变体&#xff09;4.2 FastGPT&#xff08;…

学习使用ESP8266进行MQTT通信并在网页上可视化显示

目录 一、工具 二、 流程 三、代码实现 设置MQTT服务器地址 设置服务器和端口号 连接MQTT服务器并订阅话题 回调处理函数 发布数据到话题 四、调试软件使用 打开MQTTx 添加话题 五、网页使用 一、工具 arduino ide esp8266/32单片机 lot物联网网页 MQTTx软件或者m…

mysql进阶(三)

MySQL架构和存储引擎 1. MySQL架构 MySQL8.0服务器是由连接池、服务管理⼯具和公共组件、NoSQL接⼝、SQL接⼝、解析器、优化 器、缓存、存储引擎、⽂件系统组成。MySQL还为各种编程语⾔提供了⼀套⽤于外部程序访问服务器 的连接器。整体架构图如下所⽰&#xff1a; 2. 连接层 …

【MYSQL数据库异常处理】执行SQL语句报超时异常

MYSQL执行SQL语句异常&#xff1a;The last packet successfully received from the server was 100,107 milliseconds ago. The last packet sent successfully to the server was 100,101 milliseconds ago. 这个错误表明 MySQL 服务器与 JDBC 连接之间的通信超时了。通常由…

深入理解三色标记、CMS、G1垃圾回收器

三色标记算法 简介 三色标记算法是一种常见的垃圾收集的标记算法&#xff0c;属于根可达算法的一个分支&#xff0c;垃圾收集器CMS&#xff0c;G1在标记垃圾过程中就使用该算法 三色标记法&#xff08;Tri-color Marking&#xff09;是垃圾回收中用于并发标记存活对象的核心算…

60页PDF | 四川电信数据湖及数据中台实施方案!(附下载)

一、前言 这份报告是关于四川电信数据湖与数据中台实施方案的详细规划。报告从数据驱动、事件管理、数据湖构建、数据资产管理和数据治理等多个方面展开&#xff0c;介绍了如何通过数据湖与数据中台的建设&#xff0c;实现数据的高效采集、存储、分析与共享&#xff0c;提升数…

短分享-Flink图构建

一、背景 通过简单的书写map、union、keyby等代码&#xff0c;Flink便能构建起一个庞大的分布式计算任务&#xff0c;Flink如何实现的这个酷炫功能呢&#xff1f;我们本次分享Flink做的第一步&#xff0c;将代码解析构建成图 源码基于Flink 2.10&#xff0c;书籍参考《Flink核…

java常见的几种并发安全问题及解决方案

项目场景&#xff1a; 并发的应用场景&#xff0c;在开发过程会经常遇到。 例如&#xff1a;服务应用启动后&#xff0c;需要简单统计接口的总访问量&#xff1b;实时更新订单状态&#xff0c;成交总额。 问题描述&#xff1a; 比如统计接口访问次数&#xff0c;如下的实现&a…

【mysql系】mysql启动异常Can‘t create test file localhost.lower-test

1.查看通过下面命令获取对应mysql配置文件 whereis my.cnf 2.查看日志文件 下面这里是对应的错误日志 2025-03-03T06:33:56.402057Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (see …

如何使用 LLM 生成的术语自动在搜索应用程序上构建 autocomplete 功能

作者&#xff1a;来自 Elastic Michael Supangkat 了解如何在 Elastic Cloud 中&#xff0c;通过使用 LLM 生成的词汇&#xff0c;为搜索应用增强自动补全功能&#xff0c;实现更智能、更动态的搜索建议。 自动补全是搜索应用中的一项关键功能&#xff0c;它通过在用户输入时实…

vscode离线配置远程服务器

目录 一、前提 二、方法 2.1 查看vscode的commit_id 2.2 下载linux服务器安装包 2.3 安装包上传到远程服务器&#xff0c;并进行文件解压缩 三、常见错误 Failed to set up socket for dynamic port forward to remote port&#xff08;vscode报错解决方法&#xff09;-C…

MinIO 容器化快速部署指南

MinIO 容器化快速部署指南 一、快速开始 # 创建网络&#xff08;需提前执行&#xff09; docker network create srebro#创建工作目录 mkdir -p /home/application/Middleware/minio/# 启动服务 docker-compose up -d二、配置说明 docker-compose.yaml 结构 services:minio…

K8S学习之基础十四:k8s中Deployment控制器概述

Deployment控制器概述&#xff1a; Deployment控制器是k8s中最常用的资源对象&#xff0c;为Replicaset和Pod创建提供了一种声明式的定义方法&#xff0c;在Deployment对象中描述一个期望的状态&#xff0c;Deployment控制器就会按照一定的控制速率把实际状态改成期望状态&…

记录Linux安装mysql8

1.mysql8安装 ​​​​​​​yum安装mysql8.0版本_yum 安装mysql8-CSDN博客文章浏览阅读833次&#xff0c;点赞10次&#xff0c;收藏9次。yum安装mysql8.0版本,如果系统中已经安装了旧版本的 MySQL 或者 mariadb&#xff0c;需要先卸载._yum 安装mysql8https://blog.csdn.net/…

CodeBlocks个性化竞赛配置

文章目录 1. 主题设置2. 设置默认代码3. 比赛时的使用 1. 主题设置 参考博客 --> codeblocks更改主题颜色及调试 跟随当前教程配置主题 博主使用的主题为son of obsidian&#xff0c;此主题为黑色背景&#xff0c;按照上边参考博文修改一下光标颜色即可。 效果图&#xff1…

如何用单机版deepseek编写示例

以下是一个简单的 DeepSeek 编程示例&#xff0c;通过 API 调用实现智能对话功能&#xff1a;文末有链接 一、环境准备 pip install requests # 安装 HTTP 请求库‌:ml-citation{ref"2" data"citationList"} 二、代码实现 import requests def chat_wi…

python实现的可爱卸载动画

在逛掘金时&#xff0c;掘金用户在B站看到的灵感进行的一个卸载窗口的动画效果的实用案例。人类是一种不断在学习的动物&#xff0c;并且是一种模仿能力学习能里比较强的动物。我这里是第三波的学习实践者咯&#xff01; 相对VUE构建动画效果窗口&#xff0c;我更加喜欢用pytho…

钣金加工行业数字化转型MES方案

一、 行业痛点&#xff1a;钣金加工行业普遍面临以下挑战&#xff1a; 订单多样化、小批量、定制化需求增多&#xff1a;传统生产模式难以适应快速变化的市场需求。 生产流程复杂、工序繁多&#xff1a;涉及切割、折弯、焊接、表面处理等多个环节&#xff0c;协同效率低。 生产…