【SpringBoot整合RabbitMQ(上)】

news2025/1/10 16:08:08

一、简单的生产者-消费者

1.1、创建连接工具类获取信道

public class RabbitMqUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        //创建一个链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 链接RabbitMQ的队列
        factory.setHost("192.168.116.3");
        factory.setVirtualHost("/test");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123");
        //创建链接
        Connection connection = factory.newConnection();
        //获取链接当中的信道
        Channel channel = connection.createChannel();
        return channel;
    }
}

 1.2、创建生产者

步骤:

①生产者连接RabbitMQ服务端,获取信道(即连接)

②由信道声明一个队列(调用queueDeclare方法),其中参数表示:

1.队列名称
2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数(后续使用时说明)
③控制台输入消息内容并发送,发送调用basicPublish方法,其中参数表示:
1.发送到哪个交换机(入门采用默认交换机,后续使用时详细说明其用法)
2.路由的key值是哪个 本次是队列名称
3.其他参数信息(后续使用时说明)
4.发送消息的消息体

public class Task01 {
    //队列的名称
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
           channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送完毕:" + message);

        }
    }
}

 1.3、创建消费者

步骤:

①消费者连接RabbitMQ服务端,获取信道(即连接)

②信道调用basicConsume方法接收消息,其中参数表示:
1.消费哪个队列
2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
3.当一个消息发送过来后的回调接口,即接收到消息如何处理
4.消费者取消消费的回调

public class Worker01 {
    //队列的名称
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println("工作线程2接收到的消息:" + new String(message.getBody()));
        };
        /**
         取消消费的回调
         */
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println(consumeTag + "消费者取消消息消费接口回调逻辑");
        };
        System.out.println("T2等待接收消息......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

 二、轮训分发-生产者消费者代码

       轮训分发顾名思义就是生产者将消息发送到MQ当中时,由多个消费者来处理这些消息,消费者1一条,消费者2一条,以此类推,直到MQ当中的消息消费完。

2.1、生产者

此处采用消息持久化策略(添加MessageProperties.PERSISTENT_TEXT_PLAIN),即将队列和队列当中的信息持久化到磁盘当中。

public class Task {
    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        boolean durable = true;//表示开启消息持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            //设置生产者发送的消息为持久化消息(要求保存到磁盘上)
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

2.2、消费者1

此处将消费者1设置为手动应答,自动应答则是消费者一旦接收到消息就告知队列,队列则将该消息从队列中删除,手动应答是为了确保消息确确实实被处理掉了才告知队列进行删除,以防处理时出现问题导致消息未被完全处理就丢弃的情况。

public class Consumer01 {
    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println("C1接收到的消息:" + new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
             * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        /**
         * 采用手动应答
         */
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

2.3、消费者2

消费者2和消费者1一样采取手动应答,确保消息的正确处理。

public class Consumer02 {
    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumeTag, message) ->{

            System.out.println("C2接收到的消息:" + new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
             * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        /**
         * 采用手动应答
         */
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

 2.4、关于自动应答和手动应答

        自动应答可以保证高吞吐量,但是保证不了数据传输安全性。一方面,当采用自动应答时,队列发送完消息,当消息还未被消费者接收到时消费者一旦断开连接,此条消息则会丢失。另一方面,例如消费者端并没有设置传递消息的数量限制,队列持续的发消息给消费端,然而消费者端处理消息的能力低下,导致消息大量积压,最终导致内存耗尽,此时消费者线程会被操作系统杀死,那么这些消息依然没有被消费就丢失了。手动应答可以完美的规避自动应答的弊端,但是,手动应答的吞吐量并不高,因此采用哪种应答方式需要根据实际应用场景进行权衡。

消息应答的方法有两种:

Channel.basicAck(用于肯定确认,告知队列消息已被处理,可以删除了)
Channel.basicNack(用于否定确认,告知队列消息未被正常处理,需要重新发送该消息)
Channel.basicReject(用于否定确认,区别在于:告知队列不处理该消息了直接拒绝,可以将其丢弃了 )
此外,手动应答(basicAck)时的第二个参数Multiple的取值有两种,true和false:
true表示批量应答,即将信道上未被处理的消息一并应答。
false表示不采用批量应答,仅应答此条消息。

三、消息重新入队

参考二当中的代码,引入线程沉睡表示消息处理的效率,让消费者1处理一条消息仅需1s,消费者2处理一条消息需要30s,生产者发送AA\BB\CC\DD四条消息,按照上述逻辑,消费者1会打印AA\CC,消费者2对打印BB\DD,当生产者发送完DD时就将消费者2线程杀死。观察消费者1的打印台情况,可以观察到DD则由消费者1进行消费了,当MQ未收到消费者2对于DD的应答,DD这条消息依然存放在MQ当中,由于消费者2断开了连接,此时MQ就会将DD发送到消费者1的信道当中,由消费者1处理DD这条消息。

 生产者:

消费者1:

 消费者2:

 四、不公平分发

参考三当中的线程沉睡,我们类比成消费者1的处理能力高效,消费者2的处理能力低下,此处采用不公平分发,即能者多劳,让闲着的线程处理更多的消息。在消费者1和消费者2当中分别引入如下代码:

        /**
         * 设置不公平分发
         */
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

生产者发送AA\BB\CC\DD四条消息,消费者1和消费者2打印台执行效果和三当中的效果一样,唯一区别在于我们此时并没有强制杀死消费者2线程。

五、预取值

预取值是对不公平分发的进一步优化,所谓预取值,举个例子,此时队列当中有1\2\3\4\5\6\7\8,共8条消息,设置消费者1的预取值为5,消费者2的预取值为2,表示将这8条消息当中的5条发送给消费者1的信道当中,将2条发送给消费者2的信道当中,消费者1每应答一条消息,队列才会将下一条消息发往消费者1的信道,类似的,消费者2也是如此。这样做的目的在于假设我们可以预估到两个消费者的处理能力,根据他们的能力让他们处理对标能力的消息。预取值的大小我们一般是不会准确把握的,只有通过反复的测试才会把握到一个接近处理能力的预取值。观察两个消费者的打印台输出情况:

设置消费者1预取值及其打印台:

        /**
         * 预取值
         */
        int prefetchCount = 5;
        channel.basicQos(prefetchCount);

设置消费者2预取值及其打印台:

        /**
         * 预取值
         */
        int prefetchCount = 2;
        channel.basicQos(prefetchCount);

 六、发布确认模式

发布确认模式针对生产者,生产者将信道设置为confirm模式,此模式下,生产者发送的消息将需要RabbitMQ进行确认收到,防止生产者发送的消息丢失。发布确认模式有三种策略,分别为单个确认模式、批量确认模式、异步批量确认模式。针对以上三种策略,分别实现其生产者发送消息,观察其发送效率。

将三种模式封装成三个方法,并由主函数调用:

    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "confirm";
    /**
     * 批量发送消息条数
     */
    private static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.confirmSelect();//开启发布确认模式
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        /*
         * 发布确认模式
         * 1、单个确认
         * 2、批量确认
         * 3、异步批量确认
         */
        publishMessageIndividually(channel);//25432
        publishMessageBatch(channel);//391
        asynchronousConfirmMessage(channel);//22
    }

6.1、单个确认模式

单个确认模式即生产者发送一条,MQ收到后回应一条,之后生产者继续发送下一条。

    //单个确认
    public static void publishMessageIndividually(Channel channel) throws Exception{
        //开始时间
        long startTime = System.currentTimeMillis();
        //批量的发消息 单个发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            //单个消息就立即进行发布确认
            boolean flg = channel.waitForConfirms();
            if (!flg){
                System.out.println("第" + i + "条消息发送失败~");
            }
        }
        //结束时间
        long endTime = System.currentTimeMillis();
        long spendTime = endTime - startTime;
        System.out.println("单个确认模式发布1000条消息耗时:" + spendTime + " ms");
    }

6.2、批量确认模式

批量确认模式即发送一批消息后一起确认是否收到,与上述单个确认模式相比要大大减少耗时,但是一旦某条消息未被收到,我们将很难排查到是哪条消息。

    //批量确认
    public static void publishMessageBatch(Channel channel) throws Exception{
        //开始时间
        long startTime = System.currentTimeMillis();
        //批量确认消息大小
        int confirmSize = 100;//每100条确认一次
        //批量的发送消息 批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            //判断达到100条消息的时候批量的确认一次
            if (((i + 1) % confirmSize) == 0){
                boolean flg = channel.waitForConfirms();
                if (!flg){
                    System.out.println("第" + (i - 99) + "条~第" + i + "条持久化失败~" );
                }else {
                    System.out.println("第" + (i - 99) + "条~第" + i + "条持久化成功~" );
                }
            }
        }
        //结束时间
        long endTime = System.currentTimeMillis();
        long spendTime = endTime - startTime;
        System.out.println("批量确认模式发布1000条消息耗时:" + spendTime + " ms");
    }

6.3、异步批量确认模式

异步批量确认模式完美的规避了上两种确认模式的弊端,异步批量确认模式中生产者只负责发送消息,而无需关心消息是否发送到队列当中并进行了持久化,由MQ当中的broker负责告知消息的发送结果,生产者只需要在代码当中准备一个监听器,监听broker的消息,broker告知监听器哪些消息发送成功了,哪些消息发送失败了,我们准备一个线程安全的数据容器即可,将发送的消息放到这个容器当中。监听器当中的参数ackCallback和nackCallback会分别对容器当中的数据进行处理。ackCallback有两手准备,multiple为true时表示批量应答,此时将容器当中Key小于该deliverTag的移到临时的容器当中,反之为false时,表示只应答一条,则从容器中移除该数据。

    //异步确认
    public static void asynchronousConfirmMessage(Channel channel) throws Exception{
        /**
         * 线程安全有序的哈希表 适用于高并发的情况下
         * 1.轻松的将序号和消息进行关联
         * 2.可以轻松的批量删除条目 只要给到序号
         * 3.支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
        //消息确认成功回调函数
        /**
         * 1、消息的标记
         * 2、是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
            //删除已经确认的消息 剩下的就是未确认的消息
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else {
                outstandingConfirms.remove(deliveryTag);
            }
        };
        //消息确认失败回调函数
        /**
         * 1、消息的标记
         * 2、是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
            String message = outstandingConfirms.get(deliveryTag);
            //3--->打印未确认的消息都有哪些
            System.out.println("未确认的消息:序号->" + deliveryTag + " 消息体:" + message);
        };
        //准备消息的监听器 监听哪些消息成功了 哪些消息失败了
        /**
         * 1、监听哪些消息成功了
         * 2、监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback,nackCallback);//异步通知
        //开始时间
        long startTime = System.currentTimeMillis();
        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
                        //此处记录所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
       channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));

        }
        //结束时间
        long endTime = System.currentTimeMillis();
        long spendTime = endTime - startTime;
        System.out.println("异步确认模式发布1000条消息耗时:" + spendTime + " ms");
    }

七、交换机

7.1、Exchanges概念

RabbitMQ 消息传递模型的核心思想是 : 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机 (exchange) ,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

7.2、发布/订阅模式(fanout类型)

假设有这么一个场景,在网购系统当中,某家店铺需要将新产品进行推广,该产品面向中老年人退出,因此就需要将推广消息发送给该群体用户。中年群众和老年群众作为两种消费者群体,如何做到只发布一次消息就能做到这两种消费者都能收到该消息?此时,MQ就为我们提供了发布/订阅模式。

消息发送方:

public class EmitLog {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "LOGS";
    /**
     * 交换机类型
     */
    private static final String EXCHANGE_TYPE = "fanout";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机,交换机名称、交换机类型
        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送方发送消息:" + message);
        }
    }
}

消费者1:

public class ReceviceLogs01 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "LOGS";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列 临时队列
        /**
         * 生成一个临时队列 队列名是随机的
         * 当消费者与队列断开连接时  队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("接收方01等待接收消息......");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println(new String(message.getBody()));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

消费者2:

public class ReceviceLogs02 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "LOGS";
    /**
     * 交换机类型
     */
    private static final String EXCHANGE_TYPE = "fanout";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列 临时队列
        /**
         * 生成一个临时队列 队列名是随机的
         * 当消费者与队列断开连接时  队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("接收方02等待接收消息......");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println(new String(message.getBody()));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

 执行结果:

 fanout交换机说明:此类型的交换机可以将接收到的消息发送给所有与它绑定的队列。参考我们上面所说的轮训分发,轮训分发是将队列当中的消息发送给消费者,一条消息仅能由一个消费者进行消费,如果使用默认的交换机,则需要发送多次消息,由队列再分发给每个消费者。

总结:只要该队列和此交换机绑定,无论routing-key是什么,都会接收到生产者发送的消息。

7.3、直接交换机(Direct类型)

假设有这么一个场景,同样是网购系统,在下单时我们需要将交易金额1W以上的订单交给某支付机构A处理,5000-1W的交给支付机构B处理,5000元以下的也交给支付机构A处理。怎么做?支付机构A和支付机构B分别对应消费者1和消费者2,订单为生产者,生产者(网购系统)将订单信息发送至交换机,交换机根据生产者提供的交易金额(routing-key)决定由哪个消费者(支付机构)去处理。这就是直接交换机的用处,可以指定哪个消费者去消费此条消息。

生产者:

public class DirectLogs {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "direct_logs";
    /**
     * 交换机类型
     */
    public static final String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

消费者1:

public class ReceiveLogsDirect01 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列
        channel.queueDeclare("console",false,false,false,null);
        /**
         * 路由绑定
         */
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println(new String(message.getBody()));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("console",true,deliverCallback,cancelCallback);
    }
}

消费者2:

public class ReceiveLogsDirect02 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "direct_logs";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列
        channel.queueDeclare("disk",false,false,false,null);
        /**
         * 路由绑定
         */
        channel.queueBind("disk",EXCHANGE_NAME,"error");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println(new String(message.getBody()));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("disk",true,deliverCallback,cancelCallback);
    }
}

 避坑:队列一旦与direct类型的交换机绑定,则生产者发送消息时则不能使用队列名称来指定发送到哪个队列,只能使用routing-key来进行队列选择。

7.4、Topic交换机

Topic交换机的应用场景是非广泛,fanout交换机和direct交换机的功能都可以使用Topic交换机实现。发送到topic交换机的消息不能具有任意的  routing_key —— 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。binding key也必须采用相同的形式。topic交换机背后的逻辑 类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:

1.*( 星号 ) 可以代替一个单词
2.#( 井号 ) 可以替代零个或多个单词

7.4.1、实用案例:

生产者:

public class TopicExchangePro {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "topic_logs";
    /**
     * 交换机类型
     */
    private static final String EXCHANGE_TYPE = "topic";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明一个交换机
         */
        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String rountingKey = scanner.next();
            System.out.println("输入的rountingKey为:" + rountingKey);
            String message = scanner.next();
            System.out.println("输入消息体的为:" + message);
            channel.basicPublish(EXCHANGE_NAME,rountingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息体:" + message + "已发送给rountKey为:" + rountingKey + "的队列");
        }
    }
}

消费者1:

public class TopicExchangeCon01 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明一个队列
         */
        channel.queueDeclare("receive1",false,false,false,null);
        /**
         * 路由绑定
         */
        channel.queueBind("receive1",EXCHANGE_NAME,"*.*.pink");
        channel.queueBind("receive1",EXCHANGE_NAME,"green.#");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            String rountingKey = message.getEnvelope().getRoutingKey();
            System.out.println("receive1接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("receive1",true,deliverCallback,cancelCallback);
    }
}

消费者2:

public class TopicExchangeCon02 {
    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明一个队列
         */
        channel.queueDeclare("receive2",false,false,false,null);
        /**
         * 路由绑定
         */
        channel.queueBind("receive2",EXCHANGE_NAME,"*.red.*");
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            String rountingKey = message.getEnvelope().getRoutingKey();
            System.out.println("receive2接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("receive2",true,deliverCallback,cancelCallback);
    }
}

7.4.2、结果分析及测试:

消费者1routing-key:

*.*.pink
green.#

 两种匹配规则,路由key为(什么什么pink)可以发往该队列,路由key为(green什么什么什么...)也可以发往该队列。

消费者2routing-key:

*.red.*

匹配规则:路由key是(什么red什么)可以发往该队列。

测试用例:

dog.is.pink 消费者1
green.like.pink 消费者1
green.red.dog 消费者1和消费者2
green.cat 消费者1
house.red.pink 消费者1和消费者2
we.red.family 消费者2

测试结果:

 生产者:

 消费者1:

消费者2:

 避坑:当路由key匹配到同一个队列时(即路由key匹配到同一队列的多个routing-key时),只会给队列发送一次消息。

值得注意的是:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。如果队列绑定键当中没有#*出现,那么该队列绑定类型就是 direct

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/530010.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

google_breakpad库的基本使用

参考链接&#xff1a; windows下捕获dump之Google breakpad_client的理解Google Breakpad&#xff1a;基本介绍和操作方法Breakpad 入门linux下用QT捕获程序异常 简介 github 地址 三大组件 client:读取当前线程的状态、加载的可执行文件、共享库等信息&#xff0c;写入到…

Azure深层防御

深层防御的目的是保护信息&#xff0c;防止未经授权访问信息的人员窃取信息。 深层防御策略使用一系列机制来减缓攻击进度&#xff0c;这些攻击旨在获取对数据的未经授权的访问权限。 深层防御层 可以将深度防御可视化为一组层&#xff0c;并将要保护的数据放在中心&#xf…

一篇文章搞定ftp、dns服务器

一篇文章搞定ftp、dns服务器 1、ftp 安装ftp 挂载centos镜像cd /media/CentOS_6.8_Final/Packages安装命令&#xff1a;[rootlocalhost Packages]# rpm -ivh vsftpd-2.2.2-21.el6.x86_64.rpm Vsftpd配置目录为/etc/vsftpd&#xff0c;其中包含下面几个文件 /var/ftp/&#xf…

awk命令编辑

awk工作原理 逐行读取文本&#xff0c;默认以空格或tab键分隔符进行分隔&#xff0c;将分隔所得的各个字段保存到内建变量中&#xff0c;并按模式或者条件执行编辑命令。 sed命令常用于一整行的处理&#xff0c;而awk比较倾向于将一行分成多个“字段”然后再进行处理。awk信息…

做网工10年,没人在30岁前和我讲这些(一)

晚上好&#xff0c;我是老杨。 23年才刚过几天&#xff0c;我就感觉自己又上了点年纪&#xff0c;时常面对年纪比较小的粉丝&#xff0c;无意识的面露慈爱的笑容。 还是每次小冬提醒我&#xff0c;我才发现我的表情不对劲。 我对年轻人的包容度是很强的&#xff0c;尤其是一…

VMware、CentOS、XShell、Xftp的安装

第 1 章 VMware 1.1 VMware 安装 一台电脑本身是可以装多个操作系统的&#xff0c;但是做不到多个操作系统切换自如&#xff0c;所以我们 需要一款软件帮助我们达到这个目的&#xff0c;不然数仓项目搭建不起来。 推荐的软件为 VMware&#xff0c;VMware 可以使用户在一台计…

DNS正反向解析

正向解析 1.准备工作 关闭Selinux服务和firewalld服务 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld 修改服务器与客户端的IP为静态IP地址 [rootserver ~]# nmcli connection modify ens160 ipv4.method manual ipv4.address …

剑指offer 19. 正则表达式匹配

文章目录 1. 题目描述2. 解题思想3. 设置dp初始值4.代码实现 1. 题目描述 2. 解题思想 定义dp数组 dp[i][j]&#xff1a;表示当字符串长度i&#xff0c;j是&#xff0c;s与p是否匹配 确定递推公式 核心是s[i]要与p[j]进行比较&#xff0c;比较的结果来确定 dp数组的值&#xf…

STM32-ADC多通道输入实验

之前已经介绍了几个ADC的笔记和实验了&#xff0c;链接如下&#xff1a; 关于ADC的笔记1_Mr_rustylake的博客-CSDN博客 STM32-ADC单通道采集实验_Mr_rustylake的博客-CSDN博客 STM32-单通道ADC采集&#xff08;DMA读取&#xff09;实验_Mr_rustylake的博客-CSDN博客 接下来…

NodeJs基础之NRM与NPM

nrm nrm can help you easy and fast switch between different npm registries, now include: npm, cnpm, taobao, nj(nodejitsu). 译文&#xff1a;nrm可以帮助您在不同的npm注册表之间轻松快速地切换&#xff0c;现在包括&#xff1a;npm、cnpm、taobao、nj&#xff08;no…

编译安装及yum安装

一、编译安装 源码包&#xff1a;是由程序员按照特定格式和语法编写的包 二进制包:源码包经过成功编译之后产生的包 1.tar -xf httpd-2.4.29.tar.bz #解压源码包 2.安装依赖环境 3.配置安装路径 4.编译make并安装 5.关闭防火墙&#xff0c;和安全机制 6.开启服务器 7.…

全电发票时代,记账凭证不用再打印了!

—政策通告— 为进一步推进电子发票应用和推广实施工作&#xff0c;助力国家数字经济发展&#xff0c;国家档案局会同财政部、商务部、国家税务总局总结三批增值税电子发票电子化报销、入账、归档试点经验&#xff0c;依据国家相关法律法规和标准规范&#xff0c;编制形成了《…

KMP匹配算法

目录 一、暴力匹配法动画演示代码实现 二、KMP算法的概念三、KMP算法的应用题目代码实现 一、暴力匹配法 动画演示 时间复杂度为&#xff1a; O ( m ∗ n ) O(m * n) O(m∗n) 代码实现 #define _CRT_SECURE_NO_WARNINGS #include <iostream> using namespace std;int…

Revit API:ErrorHandling

前言 本文介绍 Revit 的错误处理机制。 内容 程序员对错误处理的定义和理解 程序的错误处理机制可以分为两种类型&#xff1a;错误返回码和异常捕捉。 错误返回码是指在程序中遇到错误时&#xff0c;通过函数返回值来表明错误的类型和信息。错误返回码可以在程序中被预测和…

分段存储管理方式

目录 一、分段存储管理方式的引入的需求: 1.方便编程 2.信息共享 3.信息保护 4.动态增长 5.动态链接 二、分段系统的基本原理 1.分段 2.段表 3.地址变换机构 4.分页与分段的主要区别 三、信息共享 四、段页式存储管理方式 1.基本原理 2.地址变换过程 分段与分页…

Spring实现IOC和DI入门案例(XML版)

文章目录 1 IOC入门案例(XML版)1.1 思路分析1.2 代码实现步骤1:创建Maven项目步骤2:添加Spring的依赖jar包步骤3:添加案例中需要的类步骤4:添加spring配置文件步骤5:在配置文件中完成bean的配置步骤6:获取IOC容器步骤7:从容器中获取对象进行方法调用步骤8:运行程序 2 DI入门案例…

AltTab for Mac 像Windows一样的窗口快速切换工具

AltTab for Mac AltTab for Mac 是一款非常好用的窗口快速切换工具&#xff0c;AltTab将Windows的 “Alt-Tab” 窗口切换器的功能引入到了macOS。可以让您更快的在各个程序之间自由切换&#xff0c;大大提高您的工作效率。 AltTab for Mac下载 AltTab for Mac AltTab for Ma…

哈工大软件过程与工具作业2

云原生技术云原生技术 哈尔滨工业大学 计算机科学与技术学院/国家示范性软件学院 2022年秋季学期 《软件过程与工具》课程 作业报告 作业 2&#xff1a;需求分析UML建模 姓名 学号 联系方式 石卓凡 120L021011 944613709qq.com/18974330318 目 录 1 需求概述...........…

Vue3——简易版个人空间(上半部分)

创建项目 使用vue 的图形化界面创建一个新的vue3项目如下图所示 装两个新的插件——router和vuex插件 该过程的可能有点久&#xff0c;需要耐心等待。 再装一些需要的依赖 需要用到的依赖: boostrap和poperjs/core&#xff08;bootstrap是提供给不会做美工的程序员的一个新的…

物联网|uart串口相关寄存器|波特率设置及计算|发送处理代码|串口接收中断处理函数|物联网之蓝牙4.0 BLE基础-学习笔记(7)

文章目录 13 uart串口基础开发基本电路图&#xff1a;实验相关寄存器波特率设置及计算计算过程&#xff1a;设置中断发送处理代码串口接收中断处理函数main.c 13 uart串口基础开发 基本电路图&#xff1a; 实验相关寄存器 相关寄存器UxCSR、UxCSR、UxGCR、UxBUF、UxBAUD、CLK…