RabbitMQ 发布确认 交换机 死信队列 延迟队列

news2024/12/29 11:08:35

RabbitMQ

  • 发布确认
    • 开启发布确认的方法
    • 单个确认发布
    • 批量消息确认发布
    • 异步确认发布
      • 如何处理异步未确认消息
  • 交换机
    • 绑定
    • Fanout交换机
      • Fannout交换机(消费者)
      • Fannout交换机(生产者)
    • Direct exchage(直接交换机)
      • 生产者
      • 消费者
    • Topic交换机
      • Topic要求
      • Topic交换机(消费者)
      • Topic交换机(生产者)
  • 死信队列
    • 死信的来源
    • 死信实战
      • 死信实战(消费者1)
      • 死信实战(生产者)
      • 死信实战(消费者2)
  • 延迟队列
    • 延迟队列使用场景
    • 整合SpringBoot
      • 队列TTL
      • 基于死信存在的问题
        • Rabbitmq插件实现延迟队列

发布确认

1.设置要求队列必须持久化
2.设置要求队列中的消息
3.发布确认
什么是发布确认?
只有当消息完完整整的发送完成发布确认之后,消息才算在磁盘上保存好了,数据再怎么服务器开关都不会丢失

开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在channel 上调用该方法
在这里插入图片描述

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
当然对于某些应用程序来说这可能已经足够了。


//单个确认
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//大量的发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = i + "";
    channel.basicPublish("",queueName,null,message.getBytes());
    //单个消息就马上进行发布确认
    boolean flag = channel.waitForConfirms();
    if(flag){
        System.out.println("消息发送成功");
    }
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin+"ms"));
}

运行结果:
在这里插入图片描述

批量消息确认发布

上面我们讲到的单个确认发布消息特别慢,与其相比,先发布一批消息然后一起确认可以极大的提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

//批量发布确认
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//批量确认消息大小
int batchSize = 100;

//批量发送消息 批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = i + "";
    channel.basicPublish("",queueName,null,message.getBytes());

    //判断达到100条消息的时候,批量确认一次
    if(i%batchSize==0){
        //发布确认
        channel.waitForConfirms();
    }
}

//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin+"ms"));
}

异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中问件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述

//异步发布确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//消息确认成功,回调函数
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
    System.out.println("确认的消息:"+deliveryTag);
};
//消息确认失败,回调函数
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
    System.out.println("未确认的消息:"+deliveryTag);
};
//准备消息监听器 监听哪些消息成功了 哪些消息失败了
channel.addConfirmListener(ackCallback,nackCallback);

//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = "消息"+i;
    channel.basicPublish("",queueName,null,message.getBytes());
}

//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个确认消息,耗时"+(end-begin+"ms"));
}

如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

//异步发布确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//线程安全有序的一个哈希表适用于高并发的情况下
/*
    * 1.轻松的将序号与消息进行关联
    * 2.轻忪批量删除条目只要给到序号
      3.支持高并发(多线程)*/
ConcurrentSkipListMap<Long,String> outstandingConfirms =
new ConcurrentSkipListMap<>();
//开始时间
long begin = System.currentTimeMillis();

//消息确认成功,回调函数
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
    if(multiple){
        ConcurrentNavigableMap<Long,String> confirmed =
        outstandingConfirms.headMap(deliveryTag);
        confirmed.clear();
    }else {
        outstandingConfirms.remove(deliveryTag);
    }

    //2.删除到已经确认的消息 剩下的就是未确认的消息
    System.out.println("确认的消息:"+deliveryTag);
};
//消息确认失败,回调函数
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
    //打印一下未确认的消息都有哪些
    String message = outstandingConfirms.get(deliveryTag);
    System.out.println("未确认的消息:"+message+"::::"+deliveryTag);
};
//准备消息监听器 监听哪些消息成功了 哪些消息失败了
channel.addConfirmListener(ackCallback,nackCallback);

//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = "消息"+i;
    channel.basicPublish("",queueName,null,message.getBytes());
    //1.此处记录下所有要发送的消息 消息的总和
    outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}

//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个确认消息,耗时"+(end-begin+"ms"));
}

交换机

在这里插入图片描述
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为发布/订阅"。
概念
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

交换机类型
直接(direct),主题(topic),标题(headers),扇出(fanout)
临时队列
每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
在这里插入图片描述

绑定

什么是 bingding呢,binding其实是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是×与Q1和Q2进行了绑定
在这里插入图片描述

Fanout交换机

Fanout这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些exchange类型

Fannout交换机(消费者)

public class ReceiveLogs02 {

    //交换机的名称
    public static final String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //产生一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明一个队列 临时队列
        //当消费者斯开与队列的连接的时候队列就自动鹏除
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs01等待接收消息,把接收到消息打印在屏幕上......");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //消费者取消消息时回调接口
        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    }
}
public class ReceiveLogs02 {

    //交换机的名称
    public static final String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //产生一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明一个队列 临时队列
        //当消费者斯开与队列的连接的时候队列就自动鹏除
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs02等待接收消息,把接收到消息打印在屏幕上......");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //消费者取消消息时回调接口
        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    }
}

Fannout交换机(生产者)

//发消息  交换机
public class EmitLog {
    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //channel.exchangeDeclare(EXCHANGE_NAME,"fauout");

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

运行结果:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Direct exchage(直接交换机)

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(erros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。
在这里插入图片描述

生产者

public class DirectLogs {
    //交换机的名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //channel.exchangeDeclare(EXCHANGE_NAME,"fauout");

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者


public class ReceiveLogDirect01 {
    public static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("console", false,false,false,null);

        channel.queueBind("console",EXCHANGE_NAME,"info");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("ReceiveLogDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //消费者取消消息时回调接口
        channel.basicConsume("console",true,deliverCallback,consumerTag->{});
    }
}
public class ReceiveLogDirect02 {
    public static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("disk", false,false,false,null);

        channel.queueBind("disk",EXCHANGE_NAME,"error");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("ReceiveLogDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //消费者取消消息时回调接口
        channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
    }
}

运行结果:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Topic交换机

在这里插入图片描述
在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.
在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为 blackgreen.和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。

Topic要求

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse” ,“nyse.xvmw”,
“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#|(井号)可以替代零个或多个单词

Topic交换机(消费者)

public class ReceiveLogsTopic01 {
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("接收消息");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(queueName,true,deliverCallback,consumeTag->{});
    }
}
public class ReceiveLogsTopic02 {
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("接收消息");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(queueName,true,deliverCallback,consumeTag->{});
    }
}

Topic交换机(生产者)

public class EmitLogTopic {
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","1");
        bindingKeyMap.put("lazy.orange.elephant","2");
        bindingKeyMap.put("quick.orange.fox","3");
        bindingKeyMap.put("lazy.brown.fox","4");
        bindingKeyMap.put("lazy.pink.rabbit","5");
        bindingKeyMap.put("quick.brown.fox","6");
        bindingKeyMap.put("quick.orange.male.rabbit","7");
        bindingKeyMap.put("lazy.orange.male.rabbit","8");


        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("生产者发出消息"+message);
        }
    }
}

运行结果:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

死信队列

某些时候由于特定的原因导致queue中的某些信息无法被消费,但这样的消息如果没有后续的处理,就变成死信,有死信自然就有了死信队列
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq.中)
消息被拒绝(basic.reject或 basic.nack)并且requeue=false

死信实战

死信实战(消费者1)

public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机,类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        //声明死信队列和普通队列
        Map<String ,Object> arguments = new HashMap<>();
        //过期时间
        //正常队列设置死信交换机
        //过期时间 10s-10000ms
        //arguments.put("x-message-ttl",1000000);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutineKey
        arguments.put("x-dead-letter-routing-key","lisi");

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,null);
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("Consumer01接收的消息是"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
    }
}

死信实战(生产者)

public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息,设置TTL时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();

        for(int i=1;i<11;i++){
            String message = "info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
        }
    }
}

死信实战(消费者2)

public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();


        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("Consumer02接收的消息是"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
    }
}

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
⒉.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
3.用户注册成功后,如果三天内没有登{陆则进行短信提醒
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

整合SpringBoot

第一步:导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.10</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.yc.springbootrabbitmq</groupId>
  <artifactId>demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>demo</name>
  <description>Demo project for Spring Boot</description>
  <properties>
    <java.version>1.8</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--RabbitMQ依赖-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.47</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>

    <!--swagger-->
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger2</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger-ui</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>

第二步:导入Swagger配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(DocumentationType.SWAGGER_2)
        .groupName("webApi")
        .apiInfo(webApiInfo())
        .select()
        .build();
    }

    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
        .title("rabbitmq接口文档")
        .description("文档描述了rabbitmq微服务接口定义")
        .version("1.0")
        .contact(new Contact("enjoy6288","http://atguigu.com","2439317465@qq.com"))
        .build();
    }
}

队列TTL

第一步:配置文件类代码

//Ttl队列 配置文件类代码
@Configuration
public class TtlQueueConfig {
    //普通交换机的名称
    public static final String X_EXCHANGE = "X";
    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列的名称
    public static final String QUEUE_A="QA";
    public static final String QUEUE_B="QB";
    //死信队列的名称
    public static final String DEAD_LETTER_QUEUE="QD";
    //普通队列的名称
    public static final String QUEUE_C="QC";

    @Bean("queueC")
    public Queue queueC(){
        Map<String,Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutineKey
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }

    //声明xExchange   别名
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明yExchange   别名
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明普通队列 TTL 为 10s
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //声明普通队列 TTL 为 10s
    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueABingdingX(@Qualifier("queueA") Queue queueA,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //绑定
    @Bean
    public Binding queueBBingdingY(@Qualifier("queueB") Queue queueB,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //绑定
    @Bean
    public Binding queueDBingdingY(@Qualifier("queueD") Queue queueD,
                                   @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

第二步:配置生产者

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
    }

    //开始发消息 消息  TTL
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);

        rabbitTemplate.convertAndSend("X","XC",message,msg->{
            //发送消息的时候   延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}

第三步:配置消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

基于死信存在的问题

一旦发出两条以上消息,看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

Rabbitmq插件实现延迟队列

安装延时队列插件
在官网上下载https://www.rabbitmg.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange插件,然后解压放置到 RabbitMQ的插件目录。进入RabbitMQ.的安装目录下的plgins.目录,执行下面命令让该插件生效,然后重启RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq _server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之前:

在这里插入图片描述
现在:
在这里插入图片描述
示例代码:

第一步:创建配置类

@Configuration
public class DelayedQueueConfig {

    //交换机
    public static final String DELAYED_EXCHANGE_NAME="delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE_NAME="delayed.queue";
    //routingKey
    public static final String DELAYED_ROUTING_KEY="delayed.routingkey";

    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");

        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false);
    }

    //绑定
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue
                                                      ,@Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

第二步:创建生产者

//开始发消息 基于插件的   消息  及   延迟的时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}", new Date().toString(), delayTime, message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
        //发送消息的时候   延迟时长
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

第三步:创建消费者

//基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {
    //监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的信息:{}",new Date().toString(),msg);
    }
}

运行结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431224.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

遥感影像变化检测新方法:MLDANets

来源&#xff1a;投稿 作者&#xff1a;xin 编辑&#xff1a;学姐 论文标题&#xff1a; Multilevel Deformable Attention-Aggregated Networks for Change Detection in Bitemporal Remote Sensing Imagery Motivation 本文指出: &#xff08;1&#xff09;当前基于自注意…

Spring AOP通知类型

我们之前的文章介绍了AOP通知描述了抽取的共性功能&#xff0c;根据共性功能抽取的位置不同&#xff0c;最终运行代码时要将其加入到合理的位置。 先来认识一下五种通知的具体类型&#xff1a; 前置通知后置通知环绕通知(重点)返回后通知(了解)抛出异常后通知(了解) &#x1f…

(4)(4.6) 强制性硬件配置

文章目录 前言 1 ArduPilot操作的简单概述 2 框架类和类型配置 3 电机编号和方向 4 无线电控制校准 5 加速度计校准 6 罗盘校准 7 遥控发射器飞行模式配置 8 电子调速器(ESC)校准 9 配置电机范围(可选) 10 失控保护机制 11 飞行模式 前言 作为首次安装的一部分&am…

深度卷积神经网络DCNN简介

1. 背景 卷积神经网络CNN&#xff08;Convolutional Neural Network&#xff0c;又称ConvNet&#xff09;保留了空间信息&#xff0c;因此可以更好地用于图像分类。 卷积操作基于仔细甄选的局部感受野&#xff0c;在多个特征平面共享权值&#xff1b;之后全连接层基于传统的多层…

C++之 多态(Polymorphism)

目录 一、基本概念 多态的使用&#xff1a; 案例一——计算机类 多态的优点&#xff1a; 二、纯虚函数与抽象类 特点&#xff1a; ①无法实例化对象 ②子类必须重写父类中的纯虚函数&#xff0c;否则也属于抽象类 案例二——制作饮品 三、虚析构与纯虚析构 因为父类指…

【C++】结构体嵌套结构体

目录 1、缘起 2、结构体嵌套结构体 3、总结 1、缘起 结构体嵌套结构体 是一种数据组织方式&#xff0c;就像 俄罗斯套娃 一样&#xff0c;一个数据结构可以包含另一个数据结构。这种嵌套结构使得程序可以更加灵活地处理数据&#xff0c;从而更好地满足复杂的需求。类比生活中…

Java之~ Aop自定义注解日志

大纲步骤&#xff1a; 一&#xff0c;创建需要记录的日志表&#xff0c;创建基础方法。&#xff08;省略&#xff09; 二&#xff0c;在需要加记录日志的方法上加Aop注解1&#xff0c;创建一个注解类&#xff0c;Aop中定义一个注解import java.lang.annotation.*; /*** http 请…

银行数字化转型导师坚鹏:商业银行零售业务数字化风控

商业银行零售业务数字化风控课程背景&#xff1a; 数字化背景下&#xff0c;很多银行存在以下问题&#xff1a; 不清楚商业银行数字化风控发展现状&#xff1f; 不清楚对公业务数字化风控工作如何开展&#xff1f; 不知道零售业务数字化风控工作如何开展&#xff1f; 课程特…

十三、RNN循环神经网络实战

因为我本人主要课题方向是处理图像的&#xff0c;RNN是基本的序列处理模型&#xff0c;主要应用于自然语言处理&#xff0c;故这里就简单的学习一下&#xff0c;了解为主 一、问题引入 已知以前的天气数据信息&#xff0c;进行预测当天(4-9)是否下雨 日期温度气压是否下雨4-…

拦截器 JWT SpringBoot 多环境开发 本地文件上传 阿里云OSS存储 异常处理

Springboot&#xff1a; 是&#xff1a;由pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化spring应用的初始搭建以及开发过程 作用&#xff1a;简化Spring的环境搭建和代码开发 使用原理&#xff1a;就是boot提前写好了一些maven的工程和jar包&#xff0c;程序员在…

Android屏幕适配dp、px两套解决办法

最新最全文章(2018-08-25)&#xff1a;Android dp方式的屏幕适配-原理(后期补充完整讲解)_手机dp输出是横屏还是竖屏_android阿杜的博客-CSDN博客 “又是屏幕适配&#xff0c;这类文章网上不是很多了吗&#xff1f;” 我也很遗憾&#xff0c;确实又是老问题。但本文重点对网上…

ChatGPT来势凶猛,公有云格局会不会大变?

【引言】&#xff1a; AI风暴来袭&#xff0c;全球无人幸免。 但公有云与ChatGPT到底啥关系&#xff1f; 1) 公有云与ChatGPT&#xff0c;到底谁会“吃”掉谁&#xff1f; 【科技明说 &#xff5c; 热点关注】在看到公有云厂商纷纷开始大模型发布&#xff0c;开始GPT的融入之…

虹科教您 | 基于Linux系统的虹科RELY-TSN-KIT套件操作指南(2)——操作演示

RELY-TSN-KIT是首款针对TSN的开箱即用的解决方案&#xff0c;它可以无缝实施确定性以太网网络&#xff0c;并从这些技术复杂性中抽象出用户设备和应用。该套件可评估基于IEEE 802.1AS同步的时间常识的重要性&#xff0c;并借助时间感知整形器来确定性地交付实时流量&#xff0c…

EJBCA搭建

EJBCA搭建 前言&#xff1a; 本次EJBCA搭建使用操作系统为centos 7&#xff0c;shell用户为root。 1.下载jdk环境&#xff1a; // 下载jdk yum install -y java-1.8.0-openjdk-devel // 查看java版本 java -version 本人jdk环境&#xff1a; openjdk version "1.8.…

计算机自动化有哪些SCI期刊推荐? - 易智编译EaseEditing

以下是计算机自动化领域的一些知名SCI期刊&#xff1a; IEEE Transactions on Automation Science and Engineering&#xff1a; 该期刊由IEEE&#xff08;电气和电子工程师协会&#xff09;出版&#xff0c;涵盖了自动化科学和工程领域的研究&#xff0c;包括自动控制、人工…

算法记录 | Day35 贪心算法

860.柠檬水找零 思路&#xff1a; 只需要维护三种金额的数量&#xff0c;5&#xff0c;10和20。 有如下三种情况&#xff1a; 情况一&#xff1a;账单是5&#xff0c;直接收下。情况二&#xff1a;账单是10&#xff0c;消耗一个5&#xff0c;增加一个10情况三&#xff1a;账…

九、Locust运行与配置

1. 配置 1.1 环境变量 也可以通过环境变量设置选项。它们通常与命令行参数相同&#xff0c;但大写并带有前缀LOCUST_&#xff1a; 在 Linux/macOS 上&#xff1a; $ LOCUST_LOCUSTFILEcustom_locustfile.py locust在 Windows 上&#xff1a; > set LOCUST_LOCUSTFILEcu…

MySQL的安装与卸载(Centos7.9环境下,全篇图文手把手安装教程)

前言 在安装MySQL之前&#xff0c;我们先来看看MySQL如何卸载。如果从未安装过MySQL的任何版本&#xff0c;可以直接跳过这部分。 如果已经安装过MySQL&#xff0c;一定要确保自己的环境中的MySQL相关文件删除干净&#xff0c;否则重新安装时可能会出现一些错误 tips&#xf…

问界M9全剧透:华为的「科技豪华」样板间

作者 | 德新 编辑 | 王博 今天在nova 11的发布会上&#xff0c;曝了一个意外——问界M9提前剧透。实际上这车要今年四季度才上市。「本来没打算这么早发出来&#xff0c;前阵子网上有一个很老的设计图在传&#xff08;实在太丑&#xff09;。没办法&#xff0c;干脆先讲讲」…

DriveGPT、车企订单背后,为什么毫末每年都能搞出新东西?

作者 | 祥威 编辑 | 德新 4月11日&#xff0c;毫末智行正式发布自动驾驶生成式大模型 DriveGPT&#xff0c;中文名 雪湖海若&#xff0c;可以提升自动驾驶认知能力&#xff0c;最终提升规控效率。 雪湖海若的核心&#xff0c;是将各种驾驶场景作为Token输入到模型中&…