RabbitMQ学习笔记(尚硅谷)

news2024/12/23 22:36:08

文章目录

  • 一. 消息队列
    • 1. 定义
    • 2. 作用
      • 2.1 流量消峰
      • 2.2 应用解耦
      • 2.3 异步处理
    • 3. 分类
    • 4. MQ的选择
    • 5. RabbitMQ
      • 5.1 概念
      • 5.2 四大概念
      • 5.3 六大模式
      • 5.4 RabbitMQ 工作原理
      • 5.5 安装
    • 6. 代码实现
  • 二. Hello World (简单模式)
    • 1. 生产者代码
    • 2. 消费者代码
  • 三. Work Queues (工作队列模式)
    • 1. 轮询分发消息
    • 2. 消息应答
      • 2.1 概念
      • 2.2 自动应答
      • 2.3 手动应答
      • 2.3 消息应答重新入队
    • 3. RabbitMQ持久化
      • 3.1 队列持久化
      • 3.2 消息持久化
      • 3.3 不公平分发
      • 3.4 预取值
  • 四. 发布确认
    • 1. 原理
    • 2. 发布确认的策略
      • 2.1 开启发布确认的方法
      • 2.2 单个确认发布
      • 2.3 批量确认发布
      • 2.4 异步确认发布 (内置了一个监听器)
      • 2.5 确认未处理的消息
      • 2.6 对比
  • 五. 交换机
    • 1. 概述
    • 2. fanout扇形交换机
    • 3. direct直接交换机
    • 4. Topic主题交换机
    • 5. 自我总结
  • 六. 死信队列
    • 1. 死信的概念
    • 2. 死信的来源
    • 3. 实战
  • 七. 延迟队列
    • 1. 概念及应用 ***
    • 2. RabbitMQ中的两种TTL
    • 3. 延时队列基础模型(基于死信队列)
    • 4. 优化(基于死信队列)
    • 5. 插件实现延迟队列
  • 八. 发布确认高级
    • 1. 基于SpringBoot的基本代码及存在问题
    • 2. 回退消息
    • 3. 备份交换机
  • 九. RabbitMQ补充知识 *****
    • 1. 幂等性
      • 1.1 概念
      • 1.2 消息重复消费
      • 1.3 消费端的幂等性保障
    • 2. 优先队列
    • 3.惰性队列
  • 十. RabbitMQ集群
    • 1. 搭建
    • 2. 镜像队列
    • 3. Haproxy + Keepalive实现高可用负载均衡
    • 4. Federation Exchange/Queue
    • 5. Shovel

一. 消息队列

1. 定义

在这里插入图片描述

2. 作用

2.1 流量消峰

在这里插入图片描述

2.2 应用解耦

在这里插入图片描述
在这里插入图片描述

2.3 异步处理

在这里插入图片描述

3. 分类

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4. MQ的选择

在这里插入图片描述
大量数据:Kafaka;高并发:RocketMQ; 中小型数据量少:RabbitMQ

5. RabbitMQ

5.1 概念

快递站
在这里插入图片描述

5.2 四大概念

生产者,消费者,交换机,队列
交换机可以对应多个队列

在这里插入图片描述

5.3 六大模式

简单模式
工作模式
发布/订阅模式
路由模式
主题模式
发布确定模式
在这里插入图片描述

5.4 RabbitMQ 工作原理

在这里插入图片描述

5.5 安装

密码:123456
用户名:admin

6. 代码实现

有基于SpringBoot的代码 实现起来比较简单,参考资源下载。

二. Hello World (简单模式)

在这里插入图片描述

1. 生产者代码

  • 导入依赖
    在这里插入图片描述
  • 代码
    在这里插入图片描述
    密码改成自己的

2. 消费者代码

package com.chent;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static final String QUEUE_NAME = "HELLO";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory  = new ConnectionFactory();
        factory.setHost("192.168.86.130");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待消息被消费..");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

三. Work Queues (工作队列模式)

在这里插入图片描述

1. 轮询分发消息

  • 直接用idea启动两个线程的技巧
    右上角run旁边edict configuration 然后Build and Run最后 modify option选项 选择 allow mutiple instances

在这里插入图片描述
在这里插入图片描述

2. 消息应答

2.1 概念

在这里插入图片描述

2.2 自动应答

在这里插入图片描述
不靠谱

2.3 手动应答

在这里插入图片描述

  • 参数Multiple的解释
    在这里插入图片描述
  • 代码实现
    在这里插入图片描述

2.3 消息应答重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认, RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
在这里插入图片描述

默认的自动应答会使消息丢失(自动应答就是发送消息后即认为成功发射),要想实现消息不丢失,必须采用手动应答

3. RabbitMQ持久化

3.1 队列持久化

  • 注意事项
    已有消息队列改成持久化,必须先删除

3.2 消息持久化

在这里插入图片描述

3.3 不公平分发

轮训分发-公平分发,但是不合理
在这里插入图片描述
在这里插入图片描述

3.4 预取值

限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
通过使用 basic.qos 方法设置“预取计数” 值来完成的。 该值定义通道上允许的未确认消息的最大数量

四. 发布确认

1. 原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID( 从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了**,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。**
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2. 发布确认的策略

2.1 开启发布确认的方法

在这里插入图片描述
**confirmSelect()**开启发布

2.2 单个确认发布

发布速度特别的慢
channel.waitForConfirms(): 确认订阅成功
在这里插入图片描述

2.3 批量确认发布

先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
在这里插入图片描述

2.4 异步确认发布 (内置了一个监听器)

在这里插入图片描述
一个监听器+两个回调函数(一个成功,一个失败返回)
在这里插入图片描述
完整代码:channel.addConfirmListener(成功返回函数,失败返回函数)
在这里插入图片描述

2.5 确认未处理的消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

package com.chent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布订阅模式:异步发布确认
 */
public class Publish {
    public static void main(String[] args) throws Exception {
        Publish.publishAsync();
    }
    public static void publishAsync() throws Exception{
        ConnectionFactory factory  = new ConnectionFactory();
        factory.setHost("192.168.86.130");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String queueName = "publishTest";
        channel.queueDeclare(queueName,true,false,false,null);//申明队列
        channel.confirmSelect();//开启发布确认
        //异步发布的逻辑
        //线程安全有序hashmap 适用于高并发情况
        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
        //消息成功确认回调函数
        ConfirmCallback ackCallback = (var1,var3)->{
            //2.删除确认的消息
            if(var3){//VAR3表示处理批量情况
            ConcurrentNavigableMap<Long, String> confirmMessage = map.headMap(var1);
            confirmMessage.clear();}
            else{
                map.remove(var1);
            }
            System.out.println("成功确认:"+ var1);
        };
        //消息失败确认回调函数
        ConfirmCallback nackCallback = (var1,var3)->{
            //3.打印未确认的消息
            String message = map.get(var1);
            System.out.println("未确认的消息是:"+message);
        };
        channel.addConfirmListener(ackCallback,nackCallback);
        long begin = System.currentTimeMillis();
        for(int i = 0;i<1000;i++){
            String message = "消息" + i;
            channel.basicPublish("",queueName,null,message.getBytes());
            //1.记录所有要发送的消息
            map.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();
        System.out.println("===========================================异步耗时:" + (end-begin)+"============================");
    }
}

2.6 对比

在这里插入图片描述

五. 交换机

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅
在这里插入图片描述
在这里插入图片描述

1. 概述

  • 作用
    RabbitMQ 消息传递模型的核心思想是**: 生产者生产的消息从不会直接发送到队列**。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
    相反, 生产者只能将消息发送到交换机(exchange),**交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。**交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定
  • 交换机类型
    直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
  • 无名Exchange
    在这里插入图片描述
  • 临时队列
    在这里插入图片描述
  • 绑定Bind
    在这里插入图片描述

2. fanout扇形交换机

将接收到的所有消息广播到它知道的所有队列中

  • 消费者代码
public class ReceiveLogs01 {
	private static final String EXCHANGE_NAME = "logs";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		// 1. 声明fanout交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		/**
		* 生成一个临时的队列 队列的名称是随机的
		* 当消费者断开和该队列的连接时 队列自动删除
		*/
		//2. 声明一个临时队列
		String queueName = channel.queueDeclare().getQueue();
		//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
		//3. 绑定交换机和队列的关系
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("控制台打印接收到的消息"+message);
		};
		channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
		}
	}
  • 生产者代码
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
	try (Channel channel = RabbitUtils.getChannel()) {
	/**
	* 声明一个 exchange
	* 1.exchange 的名称
	* 2.exchange 的类型
	*/
	channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	Scanner sc = new Scanner(System.in);
	System.out.println("请输入信息");
	while (sc.hasNext()) {
	String message = sc.nextLine();
	channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
	System.out.println("生产者发出消息" + message);
	}
	}
	}
}

3. direct直接交换机

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
队列只对它绑定的交换机的消息感兴趣。绑定用参数: routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。
在这里插入图片描述

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

4. Topic主题交换机

  • 引入
    在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
    尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型
  • 命名规范
    在这里插入图片描述
    在这里插入图片描述

5. 自我总结

交换机和队列绑定后,可以指定消息发给对应的队列
扇形交换机会给全部绑定的队列发消息,routingKey为空
直接交换机会给对应RoutingKey的队列发消息
主题交换机会给对应类型的routingKey(在直接交换机基础上改进)的队列发消息
说白了三种交换机其实就是在改变routingKey参数

channel.exchangeDeclare(交换机名称,类型);
channel.queueBind(队列名称,交换机名称, routingKey);

六. 死信队列

1. 死信的概念

  • 概念
    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

  • 应用场景:
    为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

2. 死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3. 实战

  • 代码架构图

在这里插入图片描述

  • 模拟TTL过期
  • 生产者代码
    在生产者中设置过期时间
public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel())
			{ channel.exchangeDeclare(NORMAL_EXCHANGE,
			BuiltinExchangeType.DIRECT);
			//设置消息的 TTL 时间
			AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
			//该信息是用作演示队列个数限制
			for (int i = 1; i <11 ; i++) {
				String message="info"+i;
				channel.basicPublish(NORMAL_EXCHANGE,
				message.getBytes());
				System.out.println("生产者发送消息:"+message);
			}
		}
	}
}
  • 消费者C1代码
    声明普通/死信队列和交换机,绑定routingKey关系;
    核心部分:普通队列通过声明队列的参数 param绑定死信交换机的关系
public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel()//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");
		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("Consumer01 接收到消息"+message);
		};
		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
	}
}
  • 消费者C2代码
    只是简单的让C2消费死信
public class Consumer02 {
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		System.out.println("等待接收死信队列消息........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("Consumer02 接收死信队列的消息" + message);
		};
		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
		});
	}
}
  • 队列达到最长长度
    如何设置队列长度:在队列声明中添加参数
    在这里插入图片描述

七. 延迟队列

1. 概念及应用 ***

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2. RabbitMQ中的两种TTL

在这里插入图片描述
在这里插入图片描述

3. 延时队列基础模型(基于死信队列)

  • 定义
    在这里插入图片描述

  • 代码见资源下载(基于SpringBoot来实现)

  • 代码架构
    在这里插入图片描述

  • 效果展示
    在这里插入图片描述

  • 存在的问题
    如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

4. 优化(基于死信队列)

  • 架构
    删除线格式
    新建一个队列,但是不设置TTL;在生产者端设置TTL即可

  • 效果展示
    在这里插入图片描述

  • 缺点
    看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

5. 插件实现延迟队列

  • 效果展示
    在这里插入图片描述

  • 实现步骤

  • 安装插件
    在这里插入图片描述

  • 架构图及代码
    在这里插入图片描述
    在这里插入图片描述

  • 延时队列的其他选择
    当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

八. 发布确认高级

1. 基于SpringBoot的基本代码及存在问题

在这里插入图片描述

在这里插入图片描述
交换机发出了确认回调,但实际上队列没有收到消息

2. 回退消息

  • 概念:
    在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
  • 核心代码
    在这里插入图片描述
  • 效果展示
    在这里插入图片描述

3. 备份交换机

  • 概念
    备份交换机可以理解为 RabbitMQ 中交换机的“备胎” ,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
  • 代码框架
    在这里插入图片描述
  • 代码实现
@Configuration
public class ConfirmConfig {
	public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
	public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
	public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
	public static final String BACKUP_QUEUE_NAME = "backup.queue";
	public static final String WARNING_QUEUE_NAME = "warning.queue";
	// 声明确认队列
	@Bean("confirmQueue")
	public Queue confirmQueue(){
	return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
	}
	//声明确认队列绑定关系
	@Bean
	public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
	@Qualifier("confirmExchange") DirectExchange exchange){
	return BindingBuilder.bind(queue).to(exchange).with("key1");
	}
	//声明备份 Exchange
	@Bean("backupExchange")
	public FanoutExchange backupExchange(){
	return new FanoutExchange(BACKUP_EXCHANGE_NAME);
	}
	//声明确认 Exchange 交换机的备份交换机
	@Bean("confirmExchange")
	public DirectExchange
	confirmExchange(){ExchangeBuilder
	exchangeBuilder =
	ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
	.durable(true)
	//设置该交换机的备份交换机
	.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
	return (DirectExchange)exchangeBuilder.build();
	}
	// 声明警告队列
	@Bean("warningQueue")
	public Queue warningQueue(){
	return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
	}
	// 声明报警队列绑定关系
	@Bean
	public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
	@Qualifier("backupExchange") FanoutExchange
	backupExchange){
	return BindingBuilder.bind(queue).to(backupExchange);
	}
	// 声明备份队列
	@Bean("backQueue")
	public Queue backQueue(){
	return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
	}
	// 声明备份队列绑定关系
	@Bean
	public Binding backupBinding(@Qualifier("backQueue") Queue queue,
	@Qualifier("backupExchange") FanoutExchange backupExchange){
	return BindingBuilder.bind(queue).to(backupExchange);
	}
}

九. RabbitMQ补充知识 *****

1. 幂等性

1.1 概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

1.2 消息重复消费

  • 问题描述
    消费者在消费 MQ 中的消息时, MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
  • 解决方案
    MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

1.3 消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性。

  • 方案一:唯一ID+指纹码机制
    指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
  • 方案二:Redis原子性
    利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

2. 优先队列

  • 应用场景
    在这里插入图片描述
  • 实现原理
    在这里插入图片描述
  • 代码实现
public class Producer {
	private static final String QUEUE_NAME="hello";
	public static void main(String[] args) throws Exception {
		try (Channel channel = RabbitMqUtils.getChannel();) {
		//给消息赋予一个 priority 属性
		AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();		
		for (int i = 1; i <11; i++){
			String message = "info"+i;
			if(i==5){
			channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
			}else{
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			}
			System.out.println("发送消息完成:" + message);
			}
		}
	}
}
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
{Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费..............");
DeliverCallback deliverCallback=(consumerTag, delivery)-
>{ String receivedMessage = new
String(delivery.getBody());System.out.println("接收到消
息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)-
>{System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}

3.惰性队列

  • 定义
    惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。
  • 应用场景
    当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
    在这里插入图片描述
  • 两种设置模式

在这里插入图片描述

  • 内存开销对比
    在这里插入图片描述

十. RabbitMQ集群

1. 搭建

2. 镜像队列

在这里插入图片描述

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

3. Haproxy + Keepalive实现高可用负载均衡

在这里插入图片描述
Haproxy 实现负载均衡
Keepalived 实现双机(主备)热备

4. Federation Exchange/Queue

在这里插入图片描述

在这里插入图片描述

5. Shovel

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/681445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OUC编译原理实验报告 实验5:用Yacc设计语法分析器1 实验6:用Yacc设计语法分析器2

编译原理实验报告 实验5&#xff1a;用Yacc设计语法分析器1 实验6&#xff1a;用Yacc设计语法分析器2 中国海洋大学编译原理实验2023春 仅供同学参考思路 请勿直接抄袭 否则可能喜提0分 目录 文章目录 编译原理实验报告目录一.实验目的二.实验内容实验5实验6 三.实验要求实…

RISCV Reader笔记_2 RV32I

RV32I 完整的RV32I指令集可以用下面的式子中出现单词的首字母表示&#xff1a; 比如这一条&#xff1a; set less than {immediate} {unsigned} 也就是slt slti sltu sltiu这4个指令。 RISCV指令格式如下。R 寄存器操作&#xff0c;I 立即数或load访存&#xff0c;S store访…

C51单片机期末复习第八章单片机接口技术

一 总线&#xff1a; 传送同类信息的连线 三总线&#xff1a; 地址总线AB&#xff0c;数据总线DB,控制总线CB 目录(ppt给的没啥用&#xff0c;乱还不全)&#xff1a; 8.1 单片机的系统总线 8.2 简单并行I/O口扩展 8.3 可编程并行I/O口扩展 8.4 D/A转换与DAC0832应用 8…

稀疏表:最大公约数

问题描述 给定一个数组, 每次操作可以选择数组中任意两个相邻的元素 x , y x, y x,y 并将其 中的一个元素替换为 gcd ⁡ ( x , y ) \operatorname{gcd}(x, y) gcd(x,y), 其中 gcd ⁡ ( x , y ) \operatorname{gcd}(x, y) gcd(x,y) 表示 x x x 和 y y y 的最大公约数。 请…

MIT 6.S081 教材第五章内容 -- 中断与设备驱动--下

MIT 6.S081 教材第五章内容 -- 中断与设备驱动--下 引言关于RISC-V特权级架构说明RISC-V特权模式OpenSBI介绍RISC-V启动过程RISC-V中的异常M模式下的异常1. 硬件中断的处理&#xff08;以时钟中断为例&#xff09;2. M模式下的异常相关寄存器3. 同步异常的处理 S模式下的异常1.…

chatgpt赋能python:Python散点图介绍:如何用Python绘制散点图?

Python散点图介绍&#xff1a;如何用Python绘制散点图&#xff1f; Python是一门流行的编程语言&#xff0c;用于解决各种问题和编写各种应用程序。其中&#xff0c;数据可视化是Python应用程序中非常重要的组成部分。散点图是最常用的数据可视化图形之一&#xff0c;它能够清…

拓扑排序:神经网络

题目链接 神经网络 题目大意 在兰兰的模型中&#xff0c;神经网络就是一张有向图&#xff0c;图中的节点称为神经元&#xff0c;而且两个神经 元之间至多有一条边相连&#xff0c;下图是一个神经元的例子&#xff1a; 图中&#xff0c; X 1 — X 3 X_1—X_3 X1​—X3​是信…

从类加载到双亲委派:深入解析类加载机制与 ClassLoader

目录 前言Class 文件介绍如何生成 class 文件观察 Bytecode 方法class 文件到底是什么样的呢&#xff1f; Class 加载、链接、初始化加载、类加载器双亲委派Launcher 核心类ClassLoader 相关源码ClassLoader 相关问题自定义简单 ClassLoader自定义加密 ClassLoader打破双亲委派…

动态ip与静态ip的概念、区别、应用场景

动态ip与静态ip的区别 前言一、介绍IP地址的概念和作用1.1、IP地址的定义1.2、IP地址的作用 二、动态IP和静态IP的区别2.1、动态IP和静态IP的定义2.2、动态IP和静态IP的特点2.3、动态IP和静态IP的优缺点比较 三、动态IP和静态IP的应用场景3.1. 动态IP的应用场景3.2. 静态IP的应…

利用numpy解决解方程组的基本问题

1 问题 进入大学&#xff0c;我们接触了线性代数&#xff0c;利用线性代数解方程组比高中慢慢计算会好了许多&#xff0c;快捷许多&#xff0c;我们作为编程人员&#xff0c;有没有用python解决解方程组的办法呢&#xff1f; 2 方法 我们提出使用python的numpy解方程。 找到用于…

11- C程序的组成结构 (C语言)

一、C程序的基本组成结构 1、源文件: 后缀为.c 的文件2、头文件&#xff1a;后缀为.h的文件 注意&#xff1a; 源文件 功能&#xff1a;实现程序功能头文件 功能&#xff1a;函数的声明、全局变量的声明、宏定义、类型的声明一个由C语言所组成的项目中 只允许有一个main函数 …

离散数学大作业任务书

目 录 实际的练习题目、系统的总功能和各子模块的功能………………………………………………………………………………1 1.1题目及问题描述………………………………………………………………1 1.2功能概述………………………………………………………………………1 1.3技…

02 | 日志系统:一条SQL更新语句是如何执行的?

以下内容出自《MySQL 实战 45 讲》 02 | 日志系统&#xff1a;一条SQL更新语句是如何执行的&#xff1f; 查询语句的那套流程&#xff0c;更新语句也会走一遍。 更新流程中和查询不一样的是&#xff0c;更新流程中涉及了两个重要的日志模块。redo log (重做日志) 和 binglog&a…

如何编写用于Neo-Hookean材料的Abaqus VUMAT Fortran子例程

引言 大家好&#xff0c;我是一个热爱编程、研究有限元分析的普通程序员。我非常感谢你们能够抽出宝贵的时间来阅读我的文章&#xff0c;你们的支持是我前行的动力。今天&#xff0c;我们将讨论一个非常专业的话题&#xff0c;即如何编写用于Neo-Hookean材料的Abaqus VUMAT Fo…

Unreal 5 实现UI制作

这一篇讲解一下unreal engine里面的内置ui插件UMG&#xff0c;虚幻示意图形界面设计器&#xff08;Unreal Motion Graphics UI Designer&#xff09;(UMG) 是虚幻引擎内置的一套ui制作工具&#xff0c;通过它我们能够实现平面ui&#xff0c;场景hud内容 实现背景图片填充整个…

【MySQL数据管理】:插入、修改、删除操作

前言 ✨欢迎来到小K的MySQL专栏&#xff0c;本节将为大家带来MySQL数据插入、修改、删除的讲解✨ 目录 前言一、插入数据二、修改数据三、删除数据四、总结 一、插入数据 使用INSERT INTO语句来向表中插入数据 ✨语法&#xff1a; 给指定字段添加数据 INSERT INTO 表名 (字段…

ctfshow web入门 php特性web98-102

1.web98 get会被post方式覆盖&#xff0c;传入的参数需要等于flag&#xff0c;才能读取到flag值,如果直接传http_flagflag,返回的结果会是一个空数组&#xff0c;因为get变量被覆盖了&#xff0c;而post没有传参 payload: get 11 post HTTP_FLAGflag 2.web99 array_push在数组…

机器视觉初步8:特征提取专题

文章目录 1.角点检测2.纹理特征提取3.特征描述符匹配3.1 Harris角点描述符3.2 SIFT&#xff08;尺度不变特征变换&#xff09;描述符3.3 SURF&#xff08;加速稳健特征&#xff09;描述符 4.基于深度学习的特征提取 在机器视觉中&#xff0c;特征提取是从目标图像中提取有用的视…

C语言:打印菱形(输入菱形上半部分行数)

题目&#xff1a; 用C语言在屏幕上输入以下图案&#xff1a; 思路&#xff1a; 总体思路&#xff1a; &#xff08;一&#xff09;. 输入菱形上半部分行数 -- scanf()函数 &#xff08;二&#xff09;. 使用 for循环 进行 菱形上半部分三角形 的打印&#xff0c; 菱形上半部分…

基于5G网络的视频远程操控应用实践——低延迟视频技术及应用

本次分享将分为三个部分&#xff1a;第一部分介绍低延迟视频所涉及到的关键技术&#xff0c;包括低延迟视频编解码、视频传输、视频处理低延时框架、视频采集和显示&#xff1b;第二部分重点介绍5G环境下低延迟视频对抗弱网提出的要求&#xff0c;包括&#xff1a;弱网状态的探…