3. Exchange 交换机的使用

news2025/1/13 17:28:55

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消
费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式 称为 ”发布/订阅”

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消 息,第二个程序是消费者。其中我们会启动三个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,第三个假装存储在数据库中,事实上第一个程序发出的日志消息将广播给所有消费 者者

Exchange

概念

RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

image.png

交换机类型

总共有以下类型:

直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

无名 exchange

在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的
原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

            channel.basicPublish("", "YJLA", MessageProperties.PERSISTENT_TEXT_PLAIN, "你好啊,岳泽霖".getBytes(StandardCharsets.UTF_8));

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

临时队列

之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。
队列的名称我对们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。

创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

image.png

绑定(bindings)

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,
它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1和 Q2 进行了绑定

image.png

Fanout 扇出

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
所有队列中。系统中默认有些 exchange 类型

image.png

image.png

生产者

routkingKey 没有值

public class FanoutMessageProducer {

    private static String EXCHANGE_NAME = "fanout_logs";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        Channel channel = connection.createChannel();
        // 创建交换机, 交换机 名称是  logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,
                true, false, null);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            // routingKey 为 空
            channel.basicPublish(EXCHANGE_NAME, "",
                    null, new String("发布广播消息,内容为:" + scanner.next()).getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

控制台消费者1

public class FanoutMessageConsumer1 {
    private static String EXCHANGE_NAME = "fanout_logs";
    private static String QUEUE_NAME = "debug_console";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.FANOUT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 ""
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(QUEUE_NAME + ">>> 获取到消息 :" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(
                QUEUE_NAME, true, deliverCallback, cancelCallback);

        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

文件消费者2

public class FanoutMessageConsumer2 {
    private static String EXCHANGE_NAME = "fanout_logs";
    private static String QUEUE_NAME = "debug_file";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.FANOUT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 ""
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String textMessage = new String(message.getBody());
            // 将消息保存到 文件里面,追加形式。
            FileUtil.appendUtf8Lines(Collections.singletonList(textMessage), "D:\\rabbitMq\\debug.log");

            System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + textMessage);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

数据库消费者3

public class FanoutMessageConsumer3 {
    private static String EXCHANGE_NAME = "fanout_logs";
    private static String QUEUE_NAME = "debug_db";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.FANOUT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 ""
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String textMessage = new String(message.getBody());
            // 假装存储到数据库里面
            System.out.println(">>>>> 插入到数据库中的信息是:" + textMessage);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

验证

启动生产者 ,依次启动3个消费者

image.png

image.png

生产者依次发送3条消息

发送消息1
发送消息2
发送消息3

查询控制台消费者

debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息1
debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息2
debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息3

查询文件消费者

>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息1
>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息2
>>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息3

查看数据库消费者

>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息1
>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息2
>>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息3

没有通过 routingKey 进行绑定, 三个消费者均可以收到。

Direct exchange

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。
在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。
例如我们只把 较严重错误消息定向存储到日志文件(以节省磁盘空间),严重错误消息发送给数据库,
同时仍然能够在控制台上打印所有日志消息。

我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,
创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);
绑定之后的 意义由其交换类型决定。

介绍

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,
例如我们希望将 warn 和 error 级别的错误发送给 数据库, 将 info, warn, error 的发送给文件, 将 debug,info,warn,error 全部打印到控制台中。
Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

image.png

生产者

/**
 * @ClassName:work
 * @Description 消息发布者
 * @Author 岳建立
 * @Date 2020/12/22 19:55
 * @Version 1.0
 * <p>
 * direct 直连, routingKey 路由 key 一致的,才进行发送。
 * <p>
 * console 接收  debug info warn error
 * file 接收  info warn error
 * db 接收  warn error
 * routingKey 有值。
 **/
public class DirectMessageProducer {
    private static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        Channel channel = connection.createChannel();
        // 创建交换机, 交换机 名称是  logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
                true, false, null);

        // 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。

        Map<String,String> messageMap = new HashMap<>();

        messageMap.put("debug","一个 debug 消息 1");
        messageMap.put("debug","一个 debug 消息 2");

        messageMap.put("info","一个 info 消息 1");
        messageMap.put("info","一个 info 消息 2");
        messageMap.put("info","一个 info 消息3 ");

        messageMap.put("warn","一个 warn 消息 1");
        messageMap.put("warn","一个 warn 消息 2");
        messageMap.put("warn","一个 warn 消息 3");

        messageMap.put("error","一个 error 消息 1");
        messageMap.put("error","一个 error 消息 2");
        messageMap.put("error","一个 error 消息 3");
        messageMap.put("error","一个 error 消息 4");

        messageMap.forEach((routingKey,message)->{
            try {
                channel.basicPublish(EXCHANGE_NAME,routingKey,
                        null,message.getBytes("UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        //输入流等待
        System.in.read();

        channel.close();
        connection.close();


//        Scanner scanner = new Scanner(System.in);
//          // 传入的格式为  debug 一个 debug 消息 1
//        while (scanner.hasNextLine()) {
//            // routingKey 为 空
//            String inputMessage = scanner.nextLine();
//
//            String[] splitMessage = inputMessage.split("\\ ");
//
//            channel.basicPublish(EXCHANGE_NAME, splitMessage[0],
//                    null, splitMessage[1].getBytes("UTF-8"));
//        }
//        channel.close();
//        connection.close();
    }
}

消费者1 控制台

public class DirectMessageConsumer1 {
    private static String EXCHANGE_NAME = "direct_logs";
    private static String QUEUE_NAME = "log_console";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "debug");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(
                QUEUE_NAME, true, deliverCallback, cancelCallback);

        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

消费者2 文件

public class DirectMessageConsumer2 {
    private static String EXCHANGE_NAME = "direct_logs";
    private static String QUEUE_NAME = "log_file";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 info ,warn, error
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {

            String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());

            FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");

            System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

消费者3 数据库

public class DirectMessageConsumer3 {
    private static String EXCHANGE_NAME = "direct_logs";
    private static String QUEUE_NAME = "log_db";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 warn error

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());
            // 假装存储到数据库里面
            System.out.println(">>>>> 插入到数据库中的信息是:" + fileMessage);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

验证

启动生产者, 依次启动 三个消费者

消费者1 控制台打印:

image.png

消费者2 文件打印:

image.png

消费者3 数据库打印

image.png

Topics

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是
使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有
info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。
这个时候 就只能使用 topic 类型

要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单
词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

匹配案例

下图绑定关系如下

Q1–>绑定的是 中间带 orange 带 3 个单词的字符串(.orange.)

Q2–>绑定的是 最后一个单词是 rabbit 的 3 个单词(..rabbit) 第一个单词是 lazy 的多个单词(lazy.#)

image.png
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit 被队列 Q1Q2接收到

lazy.orange.elephant 被队列 Q1Q2接收到

quick.orange.fox 被队列 Q1接收到

lazy.brown.fox 被队列 Q2接收到

lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次

quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit 是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

生产者

public class TopicMessageProducer {
    private static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        Channel channel = connection.createChannel();
        // 创建交换机, 交换机 名称是  logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,
                true, false, null);

        // 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。

        Map<String, String> messageMap = new HashMap<>();

        messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");

        messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
        messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        messageMap.put("info", "一个 info 消息3 ");

        messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");

        messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        messageMap.forEach((routingKey, message) -> {
            try {
                channel.basicPublish(EXCHANGE_NAME, routingKey,
                        null, message.getBytes("UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        //输入流等待
        System.in.read();

        channel.close();
        connection.close();


//        Scanner scanner = new Scanner(System.in);
//        while(scanner.hasNextLine()) {
//            // routingKey 为 空
//            String inputMessage = scanner.nextLine();
//
//            String[] splitMessage = inputMessage.split("\\ ");
//
//            channel.basicPublish(EXCHANGE_NAME,splitMessage[0],
//                    null,splitMessage[1].getBytes("UTF-8"));
//        }
//        channel.close();
//        connection.close();
    }
}

消费者1

public class TopicMessageConsumer1 {
    private static String EXCHANGE_NAME = "topic_logs";
    private static String QUEUE_NAME = "topic_log_console";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.TOPIC, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");


        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(
                QUEUE_NAME, true, deliverCallback, cancelCallback);

        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

消费者2

public class TopicMessageConsumer2 {
    private static String EXCHANGE_NAME = "topic_logs";
    private static String QUEUE_NAME = "topic_log_file";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactoryUtil.createConnection();
        final Channel channel = connection.createChannel();
        //1. 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME,
                BuiltinExchangeType.TOPIC, true, false, null);
        //2.  创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        //7. 绑定队列与 交换机。  其中 routingKey 为 info ,warn, error
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

        // 打印到控制台
        DeliverCallback deliverCallback = (consumerTag, message) -> {

            String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());

            FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");

            System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        //输入流等待
        System.in.read();
        //关闭
        channel.close();
        connection.close();
    }
}

验证

消费者1:

image.png

消费者2:

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59968.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

孪生神经网络

孪生神经网络 孪生神经网络&#xff08;Siamese network&#xff09;主要用途是比较两图片的相似程度&#xff0c;其核心思想就是权值共享。 卷积神将网络是通过卷积运算提取图像的特征进行训练的&#xff0c;如果想比较两个图像的相似程度&#xff0c;也要对两个图像分别进行…

毛里智慧小学宿舍楼工程量清单编制

目 录 摘 要 I 第1章 前言 1 第2章 招标控制价编制 3 2.1招标控制价 3 2.2建设项目招标控制价汇总表 4 2.3单项工程招标控制价汇总表 5 2.4单项工程招标控制价汇总表 14 2.5分部分项工程和单价措施项目清单与计价表 24 2.6总价措施项目清单与计价表 27 2.7综合单价分析表 28 2.…

C语言学习之路(基础篇)—— 文件操作(上)

说明&#xff1a;该篇博客是博主一字一码编写的&#xff0c;实属不易&#xff0c;请尊重原创&#xff0c;谢谢大家&#xff01; 概述 1) 磁盘文件和设备文件 磁盘文件 指一组相关数据的有序集合,通常存储在外部介质(如磁盘)上&#xff0c;使用时才调入内存。 设备文件 在操作…

亚马逊云科技re:Invent:Serverless是所有构想的核心

12月2日&#xff0c;2022亚马逊云科技re:Invent全球大会上&#xff0c;Amazon.com副总裁兼首席技术官Werner Vogels博士向开发者们展示了另一种可能。在一系列Serverless工具的帮助下&#xff0c;一些代码可以少写&#xff0c;因为未来你可能再也不需要写它们了。这恐怕是自云原…

包装类-Wrapper

包装类的分类 针对八种基本数据类型相应的引用类型-包装类有了类的特点&#xff0c;就可以调用对应的类中的方法 装箱和拆箱 Java是一种面向对象的编程语言&#xff0c;学习Java时就被明确灌输了一个概念&#xff1a;OOP&#xff0c;即面向对象编程。一切皆对象。但是基本…

[附源码]JAVA毕业设计框架的电脑测评系统(系统+LW)

[附源码]JAVA毕业设计框架的电脑测评系统&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技…

Win11右键菜单反应慢有延迟解决方法分享

Win11右键菜单反应慢有延迟解决方法分享。有用户发现电脑鼠标点击右键菜单的时候&#xff0c;会出现一些延迟&#xff0c;导致自己在使用的过程中非常难受。那么这个问题如何自己去进行解决呢&#xff1f;我们一起来看看详细的解决方法分享吧。 解决方法&#xff1a; 注意&…

物联网IoT体系结构及核心技术

物联网&#xff0c;英文名为Internet of things&#xff08;IoT&#xff09;&#xff0c;顾名思义&#xff0c;物联网就是物物相连的互联网。 这有两层意思&#xff1a; 1、物联网的核心和基础仍然是互联网&#xff0c;是在互联网基础上的延伸和扩展的网络&#xff1b; 2、从…

超级详细 的 Redis 安装教程

超级详细 的 Redis 安装教程 Windows 版本的 Redis 是 Microsoft 的开源部门提供的 Redis. 这个版本的 Redis 适合开发人员学习使用&#xff0c;生产环境中使用 Linux 系统上的 Redis, 这里讲解了这两种的安装和下载。按照你们需要的liunx 或window步骤来 就可以了&#xff08;…

智能优化算法:法医调查优化算法 - 附代码

智能优化算法&#xff1a;法医调查优化算法 摘要&#xff1a;法医调查优化算法( Forensic-based investigation algorithm, FBI), 是由 Jui-Sheng Chou 等于2020 年提出的一种群体智能优化算法。其灵感来源于警官调查嫌疑人的过程。 1.法医调查优化算法 警察的大规模案件调查…

Java并发编程—线程详解

文章目录线程简介什么是线程多线程的使用什么时候需要使用多线程&#xff1f;写多少个线程比较合适&#xff1f;线程优先级靠谱的让出CPU的方法&#xff1f;线程的状态线程的状态有哪几种&#xff1f;线程的状态转换Daemon线程启动和终止线程构造线程启动线程理解中断如何安全的…

[附源码]计算机毕业设计基于Springboot的物品交换平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

逻辑回归模型和Python代码实现

文章目录逻辑回归原理sigmoid函数优化建模代码实现自编代码sklearn代码代码测试原理测试交叉验证逻辑回归原理 此前介绍的线性回归基本模型和增加了正则项的优化模型都只能用来预测连续值&#xff08;标签值是多少&#xff09;&#xff0c;如果想要应用于分类问题&#xff08;…

回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出

回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出 文章目录 回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出预测效果基本介绍模型结构程序设计参考资料致谢预测效果 基本介绍 GRU神经网络是LSTM神经网络的一种变体,LSTM 神经网 络是在RNN的基础上发展起来的。RNN是一…

Python学习----网络编程

网络&#xff1a;网络就是实现资源共享和信息传递的虚拟平台&#xff0c;我们可以编写基于网络通信的程序。比如socket编程&#xff0c;web开发 Socket编程 Socket是程序之间通信的一个工具&#xff0c;好比显示生活中的电话&#xff0c;你知道了对方的电话号码之后&#xff…

RabbitMQ进阶

可以结合着狂神的RabbitMQ的笔记来进行学习 狂神说RabbitMQ笔记 RabbitMQ高级特性 消息可靠性投递 保证我发出的消息可以到达中间件&#xff0c;避免在传输的过程中发生丢失的情况。 这两个可靠性传输方式分别是负责不同的阶段&#xff0c;confirm是负责保证从生产者到队列…

[附源码]Python计算机毕业设计Django抗疫医疗用品销售平台

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

2023年江苏省职业院校技能大赛中职赛项规程样题

2023年江苏省职业院校技能大赛中职赛项规程 一、赛项名称 赛项编号&#xff1a;JSZ202335 赛项名称&#xff1a;网络安全 赛项组别&#xff1a;学生组、教师组 赛项归属专业大类&#xff1a;信息技术类 竞赛目的 贯彻落实《国家职业教育改革实施方案》《关于推动现代职业教…

设计模式——观察者模式

动机&#xff08;Motivation&#xff09; 在软件构建过程中&#xff0c;我们需要为某些对象建立一种“通知依赖关系” ——一个对象&#xff08;目标对象&#xff09;的状态发生改变&#xff0c;所有的依赖对象&#xff08;观察者对象&#xff09;都将得到通知。如果这样的依赖…

非零基础自学Golang 2 开发环境 2.1 Go 的安装

非零基础自学Golang 学习文档地址&#xff1a;https://www.topgoer.cn/ 本文仅用于学习记录&#xff0c;不存在任何商业用途&#xff0c;如侵删【已联系过文档作者】 文章目录非零基础自学Golang2 开发环境2.1 Go 的安装2.1.1 下载地址2.1.2 Go 的安装2.1.3 安装检查2 开发环境…